from rest_framework import viewsets, mixins from rest_framework.viewsets import ViewSet from rest_framework import status from django.conf import settings from rest_framework.decorators import action from rest_framework.views import APIView from rest_framework.viewsets import ModelViewSet from utils.queryset import get_child_queryset2 from rest_framework.response import Response from apps.system.permission import has_permission from openpyxl import load_workbook from django.db import transaction from rest_framework.exceptions import ParseError from apps.system.models import Organization from .models import * from .serializers import * from datetime import datetime import os class AbilityReviewViewSet(ModelViewSet): queryset = AbilityReview.objects.all() serializer_class = AbilityReviewSerializer def create(self, request): if Organization.objects.filter(name=request.data['name']).exists(): department_id = Organization.objects.filter(name=request.data['name']).first().id request.data['department'] = department_id serializer = self.get_serializer(data=request.data) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status = status.HTTP_201_CREATED) else: raise ParseError("公司名称不存在") # 查询子以及已经本公司的评审情况 @action(detail=False, methods=['get']) def review_info(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) query = AbilityReview.objects.filter(department__in=child_dept) serializer = AbilityReviewSerializer(query, many=True) data = {'count':len(serializer.data), 'results':serializer.data} return Response(data, status = status.HTTP_200_OK) #根据日期过滤数据 @action(detail=False, methods=['post']) def filter_by_date(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) start_date = request.data['startDate'] end_date = request.data['endDate'] query = AbilityReview.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept) serializer = AbilityReviewSerializer(query, many=True) data_list = serializer.data # 构造结构化数据 map_key_dict = {"name": "公司名称", "qualification_name": "资质名称", "qualification_level": "资质等级", "judging_method": "评审方法", "judging_type": "评审类型", "add_param":"新增参数", "review_date": "评审日期", "now_count":"现有场所数量", "add_count":"新增场所数量", } review_method = {0:"文审", 10:"现场"} juge_type = {0:"初次", 10:"扩项", 20:"变更", 30:"复评",40:"迁址"} # 遍历字典,将旧key映射到新key new_data_list = [] for i in range(len(data_list)): new_data_dict = {map_key_dict[old_key]: old_value for old_key, old_value in data_list[i].items() if old_key in map_key_dict.keys()} new_data_dict['评审方法'] = review_method[data_list[i]['judging_method']] new_data_dict['评审类型'] = juge_type[data_list[i]['judging_type']] new_data_list.append(new_data_dict) data = {'count':len(serializer.data), 'results':new_data_list} return Response(data, status = status.HTTP_200_OK) class ImpMixin: def get_queryset(self): mydept = self.request.user.dept qs = super().get_queryset() if has_permission('task2', self.request.user): return qs return qs.filter(belong_dept=mydept) def format_date(self, ind, val): new_val = val if isinstance(val, datetime.datetime): new_val = val.date() elif isinstance(val, datetime.date): new_val = val elif isinstance(val, str): try: new_val = datetime.datetime.strptime(val, '%Y-%m-%d').date() except ValueError: raise ParseError(f'第{ind}行, 日期时间格式错误') elif val is None: pass else: raise ParseError(f'第{ind}行, 日期时间格式错误') return new_val def get_enum(self, val, atuple, ind): for i in atuple: if i[1] == val: return i[0] raise ParseError('第{}: 请选择固定选项值'.format(ind)) def F(self, data, sheet, i, etype): raise NotImplementedError() def gen_imp_view(self, request, start: int, mySerializer, types = None): if 'file' not in request.data: raise ParseError('请提供文件') path = request.data['file'] if not str(path).endswith('.xlsx'): raise ParseError('请提供xlsx格式文件') fullpath = os.path.join(settings.BASE_DIR, str(path)) wb = load_workbook(fullpath,data_only=True) sheet = wb.active # 遍历Excel文件中的数据 if types.lower() == "qt": data_list = self.build_qt_data(sheet) else: pass serializer = mySerializer(data=data_list, many=True, context={'request': request}) if serializer.is_valid(): serializer.save() else: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response({'uploaded': 'File uploaded successfully'}, status=status.HTTP_201_CREATED) class QualityCommendationViewSet(ModelViewSet): queryset = QualityCommendation.objects.all() serializer_class = QualityCommendationSerializer def create(self, request, *args, **kwargs): if Organization.objects.filter(name=request.data['awardee_company']).exists(): department_id = Organization.objects.filter(name=request.data['awardee_company']).first().id request.data['department'] = department_id serializer = self.get_serializer(data=request.data) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status = status.HTTP_201_CREATED) # 人名存在重复 else: raise ParseError("获奖单位不存在") # # 导入表格 # @action(methods=['post'], detail=False) # @transaction.atomic # def imp(self, request, *args, **kwargs): # return self.gen_imp_view(request, 5, QualityCommendationSerializer, 'qt') #构造导入的数据格式 def build_qt_data(self, sheet): data_list = [] for row in sheet.iter_rows(min_row=2, values_only=True): # 假设第一行是表头,从第二行开始读取数据 if row[0] is not None: awarded_date = row[6].strftime("%Y-%m-%d") # 判断获奖的是人还是公司 department_id = Organization.objects.filter(name=row[4]).first().id if department_id: awardee_people = None awardee_company = row[4] else: awardee_company = None awardee_people = row[4] serializer_data = { 'name': row[1], # 第一列是名字 'commendation_name':row[2], 'Awards_level':row[3], 'awardee_company':awardee_company, 'awardee_people':awardee_people, 'awarded_by':row[5], 'awarded_date':awarded_date, 'department':department_id, } data_list.append(serializer_data) return data_list # 查询子以及已经本公司的质量表彰 @action(detail=False, methods=['get']) def commentdation_info(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) query = QualityCommendation.objects.filter(department__in=child_dept) serializer = QualityCommendationSerializer(query, many=True) data = {'count':len(serializer.data), 'results':serializer.data} return Response(data, status = status.HTTP_200_OK) #根据日期过滤数据 @action(detail=False, methods=['post']) def filter_by_date(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) start_date = request.data['startDate'] end_date = request.data['endDate'] query = QualityCommendation.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept) serializer = QualityCommendationSerializer(query, many=True) data_list = serializer.data map_key_dict = {"name": "项目名称", "commendation_name": "表彰名称", "Awards_level": "获奖等级", "awardee_company": "获奖单位", "awardee_people": "获奖人", "awarded_by":"颁奖单位", "review_date": "获奖日期" } new_data_list = [] for i in range(len(data_list)): new_data_list.append({map_key_dict[key] : value for key, value in data_list[i].items() if key in map_key_dict.keys()}) data = {'count':len(serializer.data), 'results':new_data_list} return Response(data, status = status.HTTP_200_OK) # 质量活动 class QualityActivitiesViewSet(ModelViewSet): queryset = QualityActivities.objects.all() serializer_class = QualityActivitiesSerializer def create(self, request): if Organization.objects.filter(name=request.data['orgunits']).exists(): department_id = Organization.objects.filter(name=request.data['orgunits']).first().id request.data['department'] = department_id serializer = self.get_serializer(data=request.data) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status = status.HTTP_201_CREATED) else: raise ParseError("组织单位不存在") # 查询子以及已经本公司的质量活动 @action(detail=False, methods=['get']) def activate_info(self, request, *args, **kwargs): child_dept = get_child_queryset2(request.user.dept) query = QualityActivities.objects.filter(department__in=child_dept) serializer = QualityActivitiesSerializer(query, many=True) data = {'count':len(serializer.data), 'results':serializer.data} return Response(data, status = status.HTTP_200_OK) #根据日期过滤数据 @action(detail=False, methods=['post']) def filter_by_date(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) start_date = request.data['startDate'] end_date = request.data['endDate'] query = QualityActivities.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept) serializer = QualityActivitiesSerializer(query, many=True) data_list = serializer.data map_key_dict = {"name": "活动名称", "roles": "参与角色", "collaborators": "合作方", "orgunits": "组织单位", "place": "活动地点", "activate_time":"活动时间", "participations": "活动参与单位数量", "function": "活动中发挥的作用", "earnings":"活动收益(元)" } role_map ={0:"组织方", 1:"活动方"} new_data_list = [] for i in range(len(data_list)): new_dict = {map_key_dict[key] : value for key, value in data_list[i].items() if key in map_key_dict.keys()} new_dict['参与角色'] = role_map[data_list[i]['roles']] new_data_list.append(new_dict) data = {'count':len(serializer.data), 'results':new_data_list} return Response(data, status = status.HTTP_200_OK) class ContactViewSet(ModelViewSet): queryset = Contact.objects.all() serializer_class = ContactSerializer class ExternalAuditorsViewSet(ModelViewSet): queryset = ExternalAuditors.objects.all() serializer_class = ExternalAuditorsSerializer def create(self, request): if Organization.objects.filter(name=request.data['name_company']).exists(): department_id = Organization.objects.filter(name=request.data['name_company']).first().id request.data['department'] = department_id serializer = self.get_serializer(data=request.data) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status = status.HTTP_201_CREATED) else: raise ParseError("组织单位不存在") # 查询子以及已经本公司的质量活动 @action(detail=False, methods=['get']) def activate_info(self, request, *args, **kwargs): child_dept = get_child_queryset2(request.user.dept) query = ExternalAuditors.objects.filter(department__in=child_dept) serializer = ExternalAuditorsSerializer(query, many=True) data = {'count':len(serializer.data), 'results':serializer.data} return Response(data, status = status.HTTP_200_OK) #根据日期过滤数据 @action(detail=False, methods=['post']) def filter_by_date(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) start_date = request.data['startDate'] end_date = request.data['endDate'] query = ExternalAuditors.objects.filter(create_date__range=[start_date, end_date], department__in=child_dept) serializer = ExternalAuditorsSerializer(query, many=True) data_list = serializer.data map_key_dict = {"name_company": "公司名称", "name": "姓名", "certificate_expiration": "证书有效期", "contact": "联系方式", "judging_areas": "评审领域", "remark":"备注", "review_types": "评审类型" } role_map ={0:"CNAS", 1:"CMA", 2:"DICA"} new_data_list = [] for i in range(len(data_list)): new_dict = {map_key_dict[key] : value for key, value in data_list[i].items() if key in map_key_dict.keys()} new_dict['评审类型'] = role_map[data_list[i]['review_types']] new_data_list.append(new_dict) data = {'count':len(serializer.data), 'results':new_data_list} return Response(data, status = status.HTTP_200_OK)