from rest_framework import status from django.conf import settings from rest_framework.decorators import action from rest_framework.viewsets import ModelViewSet from apps.system.permission_data import RbacFilterSet from utils.queryset import get_child_queryset2 from rest_framework.response import Response from apps.system.permission import has_permission from openpyxl import load_workbook from django.db import transaction from rest_framework.exceptions import ParseError from apps.system.models import Organization from .models import * from .serializers import * from utils.pdf2txt import run from datetime import datetime import os import traceback import shutil # EXCEL_PATH = os.path.join(settings.BASE_DIR, 'excel') EXCEL_PATH = "C:/code/pdf_exc/检验检测服务业统计数据上报任务-空表.xlsx" class ImpMixin: def get_queryset(self): mydept = self.request.user.dept qs = super().get_queryset() if has_permission('task2', self.request.user): return qs return qs.filter(belong_dept=mydept) def format_date(self, ind, val): new_val = val if isinstance(val, datetime.datetime): new_val = val.date() elif isinstance(val, datetime.date): new_val = val elif isinstance(val, str): try: new_val = datetime.datetime.strptime(val, '%Y-%m-%d').date() except ValueError: raise ParseError(f'第{ind}行, 日期时间格式错误') elif val is None: pass else: raise ParseError(f'第{ind}行, 日期时间格式错误') return new_val def get_enum(self, val, atuple, ind): for i in atuple: if i[1] == val: return i[0] raise ParseError('第{}: 请选择固定选项值'.format(ind)) def F(self, data, sheet, i, etype): raise NotImplementedError() def gen_imp_view(self, request, start: int, mySerializer): if 'file' not in request.data: raise ParseError('请提供文件') path = request.data['file'] print(path, "---------ssss") if not str(path).endswith('.xlsx'): raise ParseError('请提供xlsx格式文件') fullpath = os.path.join(settings.BASE_DIR, str(path)) wb = load_workbook(fullpath,data_only=True) sheet = wb.active # 遍历Excel文件中的数据 data_list = self.build_data(sheet, start) serializer = mySerializer(data=data_list, many=True, context={'request': request}) if serializer.is_valid(): serializer.save(create_by=request.user, belong_dept=request.user.dept) else: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response({'uploaded': 'File uploaded successfully'}, status=status.HTTP_201_CREATED) class CreateUpdateCustomMixin: """ 整合 """ def perform_create(self, serializer): if hasattr(self.queryset.model, 'belong_dept'): belong_dept = serializer.validated_data.get('belong_dept', self.request.user.dept) serializer.save(create_by = self.request.user, belong_dept=belong_dept) else: serializer.save(create_by = self.request.user) def perform_update(self, serializer): serializer.save(update_by = self.request.user) class AbilityReviewViewSet(ImpMixin, RbacFilterSet, CreateUpdateCustomMixin, ModelViewSet): perms_map = {'get': '*', 'post': 'abilityreview', 'put': 'abilityreview', 'delete': 'abilityreview'} queryset = AbilityReview.objects.select_related('quali').all() serializer_class = AbilityReviewSerializer filterset_fields = ['belong_dept', 'judging_method', 'judging_type'] search_fields = ['name'] # def get_queryset(self): # qs = super().get_queryset() # if self.request.method == 'GET': # return qs # else: # return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept)) # if has_permission('ability_review_jygl', self.request.user): # return qs # return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept)) #根据日期过滤数据 @action(detail=False, methods=['post']) @transaction.atomic def filter_by_date(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) start_date = request.data['startDate'] end_date = request.data['endDate'] query = AbilityReview.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept) serializer = AbilityReviewSerializer(query, many=True) data_list = serializer.data # 构造结构化数据 map_key_dict = {"name": "公司名称", "qualification_name": "资质名称", "qualification_level": "资质等级", "judging_method": "评审方法", "judging_type": "评审类型", "add_param":"新增参数", "review_date": "评审日期", "now_count":"现有场所数量", "add_count":"新增场所数量", } review_method = {0:"文审", 10:"现场"} juge_type = {0:"初次", 10:"扩项", 20:"变更", 30:"复评",40:"迁址"} # 遍历字典,将旧key映射到新key new_data_list = [] for i in range(len(data_list)): new_data_dict = {map_key_dict[old_key]: old_value for old_key, old_value in data_list[i].items() if old_key in map_key_dict.keys()} new_data_dict['评审方法'] = review_method[data_list[i]['judging_method']] new_data_dict['评审类型'] = juge_type[data_list[i]['judging_type']] new_data_list.append(new_data_dict) data = {'count':len(serializer.data), 'results':new_data_list} return Response(data, status = status.HTTP_200_OK) def build_data(self, sheet, start): data_list = [] for row in sheet.iter_rows(min_row=start, values_only=True): # 假设第一行是表头,从第二行开始读取数据 if row[0] is not None: review_date = row[6].strftime("%Y-%m-%d") quil_id = Qualification.objects.filter(name=row[2]).first().id judging_method_dict = {"文审":0, "现场":10} judging_type_dict = {"初次":0, "扩项":10, "变更":20, "复评":30, "迁址":40} serializer_data = { 'name': row[1], # 第一列是名字 'quali':quil_id, 'judging_method':judging_method_dict.get(row[3]), 'judging_type':judging_type_dict.get(row[4]), 'add_param':row[5], 'review_date':review_date, 'now_count':row[7], 'add_count':row[8] } data_list.append(serializer_data) return data_list @action(detail=False, methods=['post']) @transaction.atomic def imp(self, request, *args, **kwargs): """ 导入数据 """ return self.gen_imp_view(request, 2, AbilityReviewSerializer) class QualityCommendationViewSet(ImpMixin, RbacFilterSet, CreateUpdateCustomMixin, ModelViewSet): queryset = QualityCommendation.objects.all() serializer_class = QualityCommendationSerializer def build_data(self, sheet, start): data_list = [] for row in sheet.iter_rows(min_row=start, values_only=True): # 假设第一行是表头,从第二行开始读取数据 if row[0] is not None: awarded_date = row[6].strftime("%Y-%m-%d") # 判断获奖的是人还是公司 department_id = Organization.objects.filter(name=row[4]).first().id if department_id: awardee_people = None awardee_company = row[4] else: awardee_company = None awardee_people = row[4] serializer_data = { 'name': row[1], # 第一列是名字 'commendation_name':row[2], 'Awards_level':row[3], 'awardee_company':awardee_company, 'awardee_people':awardee_people, 'awarded_by':row[5], 'awarded_date':awarded_date, 'belong_dept':department_id, } data_list.append(serializer_data) return data_list @action(detail=False, methods=['post']) @transaction.atomic def imp(self, request, *args, **kwargs): """ 导入数据 """ return self.gen_imp_view(request, 2, QualityCommendationSerializer) #根据日期过滤数据 @action(detail=False, methods=['post']) @transaction.atomic def filter_by_date(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) start_date = request.data['startDate'] end_date = request.data['endDate'] query = QualityCommendation.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept) serializer = QualityCommendationSerializer(query, many=True) data_list = serializer.data map_key_dict = {"name": "项目名称", "commendation_name": "表彰名称", "Awards_level": "获奖等级", "awardee_company": "获奖单位", "awardee_people": "获奖人", "awarded_by":"颁奖单位", "review_date": "获奖日期" } new_data_list = [] for i in range(len(data_list)): new_data_list.append({map_key_dict[key] : value for key, value in data_list[i].items() if key in map_key_dict.keys()}) data = {'count':len(serializer.data), 'results':new_data_list} return Response(data, status = status.HTTP_200_OK) # 质量活动 class QualityActivitiesViewSet(ImpMixin, RbacFilterSet, CreateUpdateCustomMixin, ModelViewSet): queryset = QualityActivities.objects.all() serializer_class = QualityActivitiesSerializer #根据日期过滤数据 @action(detail=False, methods=['post']) @transaction.atomic def filter_by_date(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) start_date = request.data['startDate'] end_date = request.data['endDate'] query = QualityActivities.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept) serializer = QualityActivitiesSerializer(query, many=True) data_list = serializer.data map_key_dict = {"name": "活动名称", "roles": "参与角色", "collaborators": "合作方", "orgunits": "组织单位", "place": "活动地点", "activate_time":"活动时间", "participations": "活动参与单位数量", "function": "活动中发挥的作用", "earnings":"活动收益(元)" } role_map ={0:"组织方", 1:"参与方"} new_data_list = [] for i in range(len(data_list)): new_dict = {map_key_dict[key] : value for key, value in data_list[i].items() if key in map_key_dict.keys()} new_dict['参与角色'] = role_map[data_list[i]['roles']] new_data_list.append(new_dict) data = {'count':len(serializer.data), 'results':new_data_list} return Response(data, status = status.HTTP_200_OK) def build_data(self, sheet, start): data_list = [] for row in sheet.iter_rows(min_row=start, values_only=True): # 假设第一行是表头,从第二行开始读取数据 if row[0] is not None: department_id = Organization.objects.filter(name=row[4]).first().id activate_time = row[6].strftime("%Y-%m-%d") role_dict = {"组织方":0, "参与方":1} serializer_data = { 'name': row[1], # 第一列是名字 'roles':role_dict.get(row[2]), 'collaborators':row[3], 'orgunits':row[4], 'place':row[5], 'activate_time':activate_time, 'participations':row[7], 'function':row[8], 'earnings':row[9], 'belong_dept':department_id, } data_list.append(serializer_data) return data_list @action(detail=False, methods=['post']) @transaction.atomic def imp(self, request, *args, **kwargs): """ 导入数据 """ return self.gen_imp_view(request, 2, QualityActivitiesSerializer) class ContactViewSet(ImpMixin, CreateUpdateCustomMixin, ModelViewSet): queryset = Contact.objects.all() serializer_class = ContactSerializer def get_queryset(self): qs = super().get_queryset() if self.request.method == 'GET': return qs else: return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept)) # if has_permission('ability_review_jygl', self.request.user): # return qs # return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept)) def build_data(self, sheet, start): data_list = [] for row in sheet.iter_rows(min_row=start, values_only=True): # 假设第一行是表头,从第二行开始读取数据 if row[0] is not None: serializer_data = { 'name': row[1], # 第一列是名字 'address':row[2], 'header':row[3], 'tel':row[4], 'email':row[5], 'head_technology':row[6], 'tel_technology':row[7], 'email_technology':row[8], 'head_quality':row[9], 'tel_quality':row[10], 'email_quality':row[11], } data_list.append(serializer_data) return data_list @action(detail=False, methods=['post']) @transaction.atomic def imp(self, request, *args, **kwargs): """ 导入数据 """ return self.gen_imp_view(request, 2, ContactSerializer) class ParsePdfViewSet(CreateUpdateCustomMixin, ModelViewSet): queryset = Parsepdf.objects.all() serializer_class = ParsepdfSerializer filterset_fields = ['belong_dept', 'annual'] #解析pdf到excel @action(detail=False, methods=['post']) @transaction.atomic def parse_pdf(self, request, *args, **kwargs): """ 解析pdf到excel """ try: pdf_file = request.data['pdf_file'] annual = request.data['year'] # 读数据路径copy 在media 下新建excel,解析完成后存入数据库。 media_excel = os.path.join(os.path.dirname(EXCEL_PATH),"media_excel") if not os.path.exists(media_excel): os.makedirs(media_excel) shutil.copy(EXCEL_PATH, media_excel) save_path = os.path.join(media_excel, os.path.basename(EXCEL_PATH)) run(pdf_file, save_path) except Exception: traceback.print_exc() return Response({"message":"解析失败"}, status = status.HTTP_400_BAD_REQUEST) Parsepdf.objects.create( pdf_path=pdf_file, excel_path= EXCEL_PATH, parse_excel = save_path, belong_dept = request.user.dept, create_by = request.user, create_time = datetime.now(), annual = annual, excel_name = os.path.basename(EXCEL_PATH), pdf_name = os.path.basename(pdf_file) ) return Response({"message":"解析成功", "url":save_path}, status = status.HTTP_200_OK) class ExternalAuditorsViewSet(ImpMixin, RbacFilterSet, CreateUpdateCustomMixin, ModelViewSet): queryset = ExternalAuditors.objects.all() serializer_class = ExternalAuditorsSerializer #根据日期过滤数据 @action(detail=False, methods=['post']) @transaction.atomic def filter_by_date(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) start_date = request.data['startDate'] end_date = request.data['endDate'] query = ExternalAuditors.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept) serializer = ExternalAuditorsSerializer(query, many=True) data_list = serializer.data map_key_dict = {"name_company": "公司名称", "name": "姓名", "certificate_expiration": "证书有效期", "contact": "联系方式", "judging_areas": "评审领域", "remark":"备注", "review_types": "评审类型" } new_data_list = [] for i in range(len(data_list)): new_dict = {map_key_dict[key] : value for key, value in data_list[i].items() if key in map_key_dict.keys()} new_data_list.append(new_dict) data = {'count':len(serializer.data), 'results':new_data_list} return Response(data, status = status.HTTP_200_OK) def build_data(self, sheet, start): data_list = [] for row in sheet.iter_rows(min_row=start, values_only=True): # 假设第一行是表头,从第二行开始读取数据 if row[0] is not None: department_id = Organization.objects.filter(name=row[1]).first().id activate_time = row[4].strftime("%Y-%m-%d") serializer_data = { 'name_company': row[1], # 第一列是名字 'name':row[2], 'review_types':row[3], 'certificate_expiration':activate_time, 'contact':row[5], 'judging_areas':row[6], 'remark':row[7], 'belong_dept':department_id, } data_list.append(serializer_data) return data_list @action(detail=False, methods=['post']) @transaction.atomic def imp(self, request, *args, **kwargs): """ 导入数据 """ return self.gen_imp_view(request, 2, ExternalAuditorsSerializer) class QualificationViewSet(ModelViewSet): queryset = Qualification.objects.all() serializer_class = QualificationSerializer perms_map = {'get': '*', 'post': 'qa_create', 'put': 'qa_update', 'delete': 'qa_delete'} class AuditLogViewSet(RbacFilterSet, CreateUpdateCustomMixin, ModelViewSet): queryset = AuditLog.objects.select_related('instance').all() serializer_class = AuditLogSerializer filterset_fields = ['instance_id'] class QualiChangeViewSet(RbacFilterSet, CreateUpdateCustomMixin, ModelViewSet): queryset = Qualification.objects.all() serializer_class = QualificationSerializer perms_map = {'get': '*', 'post': 'qchange_create', 'put': 'qchange_update', 'patch': 'qchange_update', 'delete': 'qchange_delete'} # 重写更新的方法 def partial_update(self, request, pk=None): #获取需要更新的实列 instance = self.get_object() # 数据比较 ignore_fields = ['create_by', 'create_date', 'update_date', 'id'] origin_dict = QualificationSerializer(instance=instance).data diff = [] for k, v in request.data.items(): if k not in ignore_fields: origin_value = origin_dict.get(k) if origin_value != v: diff.append({'old':origin_value, 'new':v, 'name':k}) serializers = self.get_serializer(instance, data=request.data, partial=True) serializers.is_valid(raise_exception=True) self.perform_update(serializers) if diff: AuditLog.objects.create( action='update', instance=instance, change_time = datetime.now(), change_user=request.user, val_new=serializers.data, difference=diff ) return Response(serializers.data, status = status.HTTP_204_NO_CONTENT)