cma_search/server/apps/information/views.py

381 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from rest_framework import status
from django.conf import settings
from rest_framework.decorators import action
from rest_framework.viewsets import ModelViewSet
from utils.queryset import get_child_queryset2
from rest_framework.response import Response
from apps.system.permission import has_permission
from openpyxl import load_workbook
from django.db import transaction
from rest_framework.exceptions import ParseError
from apps.system.models import Organization
from .models import *
from .serializers import *
from datetime import datetime
import os
class AbilityReviewViewSet(ModelViewSet):
queryset = AbilityReview.objects.all()
serializer_class = AbilityReviewSerializer
def create(self, request):
if Organization.objects.filter(name=request.data['name']).exists():
department_id = Organization.objects.filter(name=request.data['name']).first().id
request.data['department'] = department_id
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
else:
raise ParseError("公司名称不存在")
# 查询子以及已经本公司的评审情况
@action(detail=False, methods=['get'])
def review_info(self, request, *args, **kwargs):
father_dept = request.user.dept
child_dept = get_child_queryset2(father_dept)
query = AbilityReview.objects.filter(department__in=child_dept)
serializer = AbilityReviewSerializer(query, many=True)
data = {'count':len(serializer.data), 'results':serializer.data}
return Response(data, status = status.HTTP_200_OK)
#根据日期过滤数据
@action(detail=False, methods=['post'])
@transaction.atomic
def filter_by_date(self, request, *args, **kwargs):
father_dept = request.user.dept
child_dept = get_child_queryset2(father_dept)
start_date = request.data['startDate']
end_date = request.data['endDate']
query = AbilityReview.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept)
serializer = AbilityReviewSerializer(query, many=True)
data_list = serializer.data
# 构造结构化数据
map_key_dict = {"name": "公司名称",
"qualification_name": "资质名称",
"qualification_level": "资质等级",
"judging_method": "评审方法",
"judging_type": "评审类型",
"add_param":"新增参数",
"review_date": "评审日期",
"now_count":"现有场所数量",
"add_count":"新增场所数量",
}
review_method = {0:"文审", 10:"现场"}
juge_type = {0:"初次", 10:"扩项", 20:"变更", 30:"复评",40:"迁址"}
# 遍历字典将旧key映射到新key
new_data_list = []
for i in range(len(data_list)):
new_data_dict = {map_key_dict[old_key]: old_value for old_key, old_value in data_list[i].items() if old_key in map_key_dict.keys()}
new_data_dict['评审方法'] = review_method[data_list[i]['judging_method']]
new_data_dict['评审类型'] = juge_type[data_list[i]['judging_type']]
new_data_list.append(new_data_dict)
data = {'count':len(serializer.data), 'results':new_data_list}
return Response(data, status = status.HTTP_200_OK)
class ImpMixin:
def get_queryset(self):
mydept = self.request.user.dept
qs = super().get_queryset()
if has_permission('task2', self.request.user):
return qs
return qs.filter(belong_dept=mydept)
def format_date(self, ind, val):
new_val = val
if isinstance(val, datetime.datetime):
new_val = val.date()
elif isinstance(val, datetime.date):
new_val = val
elif isinstance(val, str):
try:
new_val = datetime.datetime.strptime(val, '%Y-%m-%d').date()
except ValueError:
raise ParseError(f'{ind}行, 日期时间格式错误')
elif val is None:
pass
else:
raise ParseError(f'{ind}行, 日期时间格式错误')
return new_val
def get_enum(self, val, atuple, ind):
for i in atuple:
if i[1] == val:
return i[0]
raise ParseError('{}: 请选择固定选项值'.format(ind))
def F(self, data, sheet, i, etype):
raise NotImplementedError()
def gen_imp_view(self, request, start: int, mySerializer, types = None):
if 'file' not in request.data:
raise ParseError('请提供文件')
path = request.data['file']
if not str(path).endswith('.xlsx'):
raise ParseError('请提供xlsx格式文件')
fullpath = os.path.join(settings.BASE_DIR, str(path))
wb = load_workbook(fullpath,data_only=True)
sheet = wb.active
# 遍历Excel文件中的数据
if types.lower() == "qt":
data_list = self.build_qt_data(sheet)
else:
pass
serializer = mySerializer(data=data_list, many=True, context={'request': request})
if serializer.is_valid():
serializer.save()
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
return Response({'uploaded': 'File uploaded successfully'}, status=status.HTTP_201_CREATED)
class QualityCommendationViewSet(ModelViewSet):
queryset = QualityCommendation.objects.all()
serializer_class = QualityCommendationSerializer
def create(self, request, *args, **kwargs):
if Organization.objects.filter(name=request.data['awardee_company']).exists():
department_id = Organization.objects.filter(name=request.data['awardee_company']).first().id
request.data['department'] = department_id
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
# 人名存在重复
else:
raise ParseError("获奖单位不存在")
def build_qt_data(self, sheet):
data_list = []
for row in sheet.iter_rows(min_row=2, values_only=True): # 假设第一行是表头,从第二行开始读取数据
if row[0] is not None:
awarded_date = row[6].strftime("%Y-%m-%d")
# 判断获奖的是人还是公司
department_id = Organization.objects.filter(name=row[4]).first().id
if department_id:
awardee_people = None
awardee_company = row[4]
else:
awardee_company = None
awardee_people = row[4]
serializer_data = {
'name': row[1], # 第一列是名字
'commendation_name':row[2],
'Awards_level':row[3],
'awardee_company':awardee_company,
'awardee_people':awardee_people,
'awarded_by':row[5],
'awarded_date':awarded_date,
'department':department_id,
}
data_list.append(serializer_data)
return data_list
# 查询子以及已经本公司的质量表彰
@action(detail=False, methods=['get'])
def commentdation_info(self, request, *args, **kwargs):
father_dept = request.user.dept
child_dept = get_child_queryset2(father_dept)
query = QualityCommendation.objects.filter(department__in=child_dept)
serializer = QualityCommendationSerializer(query, many=True)
data = {'count':len(serializer.data), 'results':serializer.data}
return Response(data, status = status.HTTP_200_OK)
#根据日期过滤数据
@action(detail=False, methods=['post'])
@transaction.atomic
def filter_by_date(self, request, *args, **kwargs):
father_dept = request.user.dept
child_dept = get_child_queryset2(father_dept)
start_date = request.data['startDate']
end_date = request.data['endDate']
query = QualityCommendation.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept)
serializer = QualityCommendationSerializer(query, many=True)
data_list = serializer.data
map_key_dict = {"name": "项目名称",
"commendation_name": "表彰名称",
"Awards_level": "获奖等级",
"awardee_company": "获奖单位",
"awardee_people": "获奖人",
"awarded_by":"颁奖单位",
"review_date": "获奖日期"
}
new_data_list = []
for i in range(len(data_list)):
new_data_list.append({map_key_dict[key] : value for key, value in data_list[i].items() if key in map_key_dict.keys()})
data = {'count':len(serializer.data), 'results':new_data_list}
return Response(data, status = status.HTTP_200_OK)
# 质量活动
class QualityActivitiesViewSet(ModelViewSet):
queryset = QualityActivities.objects.all()
serializer_class = QualityActivitiesSerializer
def create(self, request):
if Organization.objects.filter(name=request.data['orgunits']).exists():
department_id = Organization.objects.filter(name=request.data['orgunits']).first().id
request.data['department'] = department_id
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
else:
raise ParseError("组织单位不存在")
# 查询子以及已经本公司的质量活动
@action(detail=False, methods=['get'])
def activate_info(self, request, *args, **kwargs):
child_dept = get_child_queryset2(request.user.dept)
query = QualityActivities.objects.filter(department__in=child_dept)
serializer = QualityActivitiesSerializer(query, many=True)
data = {'count':len(serializer.data), 'results':serializer.data}
return Response(data, status = status.HTTP_200_OK)
#根据日期过滤数据
@action(detail=False, methods=['post'])
@transaction.atomic
def filter_by_date(self, request, *args, **kwargs):
father_dept = request.user.dept
child_dept = get_child_queryset2(father_dept)
start_date = request.data['startDate']
end_date = request.data['endDate']
query = QualityActivities.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept)
serializer = QualityActivitiesSerializer(query, many=True)
data_list = serializer.data
map_key_dict = {"name": "活动名称",
"roles": "参与角色",
"collaborators": "合作方",
"orgunits": "组织单位",
"place": "活动地点",
"activate_time":"活动时间",
"participations": "活动参与单位数量",
"function": "活动中发挥的作用",
"earnings":"活动收益(元)"
}
role_map ={0:"组织方", 1:"活动方"}
new_data_list = []
for i in range(len(data_list)):
new_dict = {map_key_dict[key] : value for key, value in data_list[i].items() if key in map_key_dict.keys()}
new_dict['参与角色'] = role_map[data_list[i]['roles']]
new_data_list.append(new_dict)
data = {'count':len(serializer.data), 'results':new_data_list}
return Response(data, status = status.HTTP_200_OK)
class ContactViewSet(ModelViewSet):
queryset = Contact.objects.all()
serializer_class = ContactSerializer
class ExternalAuditorsViewSet(ModelViewSet):
queryset = ExternalAuditors.objects.all()
serializer_class = ExternalAuditorsSerializer
def create(self, request):
if Organization.objects.filter(name=request.data['name_company']).exists():
department_id = Organization.objects.filter(name=request.data['name_company']).first().id
request.data['department'] = department_id
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
else:
raise ParseError("公司名称不存在")
# 查询子以及已经本公司的质量活动
@action(detail=False, methods=['get'])
def activate_info(self, request, *args, **kwargs):
child_dept = get_child_queryset2(request.user.dept)
query = ExternalAuditors.objects.filter(department__in=child_dept)
serializer = ExternalAuditorsSerializer(query, many=True)
data = {'count':len(serializer.data), 'results':serializer.data}
return Response(data, status = status.HTTP_200_OK)
#根据日期过滤数据
@action(detail=False, methods=['post'])
@transaction.atomic
def filter_by_date(self, request, *args, **kwargs):
father_dept = request.user.dept
child_dept = get_child_queryset2(father_dept)
start_date = request.data['startDate']
end_date = request.data['endDate']
query = ExternalAuditors.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept)
serializer = ExternalAuditorsSerializer(query, many=True)
data_list = serializer.data
map_key_dict = {"name_company": "公司名称",
"name": "姓名",
"certificate_expiration": "证书有效期",
"contact": "联系方式",
"judging_areas": "评审领域",
"remark":"备注",
"review_types": "评审类型"
}
role_map ={0:"CNAS", 1:"CMA", 2:"DICA"}
new_data_list = []
for i in range(len(data_list)):
new_dict = {map_key_dict[key] : value for key, value in data_list[i].items() if key in map_key_dict.keys()}
new_dict['评审类型'] = role_map[data_list[i]['review_types']]
new_data_list.append(new_dict)
data = {'count':len(serializer.data), 'results':new_data_list}
return Response(data, status = status.HTTP_200_OK)
class QualificationViewSet(ModelViewSet):
queryset = Qualification.objects.all()
serializer_class = QualificationSerializer
#重写保存方法
def create(self, request):
if Organization.objects.filter(name=request.data['company_name']).exists():
department_id = Organization.objects.filter(name=request.data['company_name']).first().id
request.data['department'] = department_id
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
else:
raise ParseError("公司名称不存在")
# 重写更新的方法
def partial_update(self, request, pk=None):
#获取需要更新的实列
instance = self.get_object()
# 数据比较
ignore_fields = ['create_by', 'create_date', 'update_date', 'id']
origin_dict = QualificationSerializer(instance=instance).data
diff = []
for k, v in request.data.items():
if k not in ignore_fields:
origin_value = origin_dict.get(k)
if origin_value != v:
diff.append({k:{'old':origin_value, 'new':v}})
serializers = self.get_serializer(instance, data=request.data, partial=True)
serializers.is_valid(raise_exception=True)
self.perform_update(serializers)
if diff:
AuditLog.objects.create(
action='update',
instance_id=instance.id,
change_time = datetime.now(),
change_user=request.user,
val_new=serializers.data,
difference=diff
)
return Response(serializers.data, status = status.HTTP_204_NO_CONTENT)
# 查询子以及已经本公司的资质情况
@action(detail=False, methods=['get'])
def activate_info(self, request, *args, **kwargs):
child_dept = get_child_queryset2(request.user.dept)
query = Qualification.objects.filter(department__in=child_dept)
serializer = QualificationSerializer(query, many=True)
data = {'count':len(serializer.data), 'results':serializer.data}
return Response(data, status = status.HTTP_200_OK)