from rest_framework import viewsets, mixins from rest_framework.viewsets import ViewSet from rest_framework import status from django.conf import settings from rest_framework.decorators import action from rest_framework.views import APIView from rest_framework.viewsets import ModelViewSet # from .models import AbilityReview, QualityCommendation, QualityActivities, Contact, ExternalAuditors # from .serializers import AbilityReviewSerializer, QualityCommendationSerializer, QualityActivitiesSerializer,ContactSerializer, ExternalAuditorsSerializer from utils.queryset import get_child_queryset2 from rest_framework.response import Response from apps.system.permission import has_permission from openpyxl import load_workbook from django.db import transaction from rest_framework.exceptions import ParseError from apps.system.models import Organization from .models import * from .serializers import * from datetime import datetime import os class AbilityReviewViewSet( mixins.CreateModelMixin, mixins.ListModelMixin, mixins.DestroyModelMixin, mixins.UpdateModelMixin, viewsets.GenericViewSet): queryset = AbilityReview.objects.all() serializer_class = AbilityReviewSerializer #自定义查询 def get_queryset(self): pass class ImpMixin: def get_queryset(self): mydept = self.request.user.dept qs = super().get_queryset() if has_permission('task2', self.request.user): return qs return qs.filter(belong_dept=mydept) def format_date(self, ind, val): new_val = val if isinstance(val, datetime.datetime): new_val = val.date() elif isinstance(val, datetime.date): new_val = val elif isinstance(val, str): try: new_val = datetime.datetime.strptime(val, '%Y-%m-%d').date() except ValueError: raise ParseError(f'第{ind}行, 日期时间格式错误') elif val is None: pass else: raise ParseError(f'第{ind}行, 日期时间格式错误') return new_val def get_enum(self, val, atuple, ind): for i in atuple: if i[1] == val: return i[0] raise ParseError('第{}: 请选择固定选项值'.format(ind)) def F(self, data, sheet, i, etype): raise NotImplementedError() def gen_imp_view(self, request, start: int, mySerializer, types = None): if 'file' not in request.data: raise ParseError('请提供文件') path = request.data['file'] if not str(path).endswith('.xlsx'): raise ParseError('请提供xlsx格式文件') fullpath = os.path.join(settings.BASE_DIR, str(path)) wb = load_workbook(fullpath,data_only=True) sheet = wb.active # 遍历Excel文件中的数据 if types.lower() == "qt": data_list = self.build_qt_data(sheet) else: pass serializer = mySerializer(data=data_list, many=True, context={'request': request}) if serializer.is_valid(): serializer.save() else: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response({'uploaded': 'File uploaded successfully'}, status=status.HTTP_201_CREATED) class QualityCommendationViewSet(ImpMixin, ModelViewSet): queryset = QualityCommendation.objects.all() serializer_class = QualityCommendationSerializer perms_map = {"get": "*", "post": "*", "put": "*", "delete": "*"} def create(self): if Organization.objects.filter(name=self.request.data['awardee_company']).exists(): department_id = Organization.objects.filter(name=self.request.data['awardee_company']).first().id self.request.data['department'] = department_id serializer = self.get_serializer(data=self.request.data) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status = status.HTTP_201_CREATED) # 人名存在重复 else: raise ParseError("部门不存在") # 导入表格 @action(methods=['post'], detail=False) @transaction.atomic def imp(self, request, *args, **kwargs): return self.gen_imp_view(request, 5, QualityCommendationSerializer, 'qt') #构造导入的数据格式 def build_qt_data(self, sheet): data_list = [] for row in sheet.iter_rows(min_row=2, values_only=True): # 假设第一行是表头,从第二行开始读取数据 if row[0] is not None: awarded_date = row[6].strftime("%Y-%m-%d") # 判断获奖的是人还是公司 department_id = Organization.objects.filter(name=row[4]).first().id if department_id: awardee_people = None awardee_company = row[4] else: awardee_company = None awardee_people = row[4] serializer_data = { 'name': row[1], # 第一列是名字 'commendation_name':row[2], 'Awards_level':row[3], 'awardee_company':awardee_company, 'awardee_people':awardee_people, 'awarded_by':row[5], 'awarded_date':awarded_date, 'department':department_id, } data_list.append(serializer_data) return data_list # 查询子以及已经本公司的质量表彰 @action(detail=False, methods=['get']) def commentdation_info(self, request, *args, **kwargs): father_dept = request.user.dept child_dept = get_child_queryset2(father_dept) query = QualityCommendation.objects.filter(department__in=child_dept) serializer = QualityCommendationSerializer(query, many=True) data = {'count':len(serializer.data), 'results':serializer.data} return Response(data, status = status.HTTP_200_OK) # 质量活动 class QualityActivitiesViewSet(ModelViewSet): queryset = QualityActivities.objects.all() serializer_class = QualityActivitiesSerializer def create(self, request): if Organization.objects.filter(name=self.request.data['orgunits']).exists(): department_id = Organization.objects.filter(name=self.request.data['orgunits']).first().id self.request.data['department'] = department_id serializer = self.get_serializer(data=self.request.data) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status = status.HTTP_201_CREATED) else: raise ParseError("组织单位不存在") # 查询子以及已经本公司的质量活动 @action(detail=False, methods=['get']) def activate_info(self, request, *args, **kwargs): child_dept = get_child_queryset2(request.user.dept) query = QualityActivities.objects.filter(department__in=child_dept) serializer = QualityActivitiesSerializer(query, many=True) data = {'count':len(serializer.data), 'results':serializer.data} return Response(data, status = status.HTTP_200_OK) class ContactViewSet(mixins.CreateModelMixin, mixins.ListModelMixin, mixins.DestroyModelMixin, mixins.UpdateModelMixin, viewsets.GenericViewSet): queryset = Contact.objects.all() serializer_class = ContactSerializer class ExternalAuditorsViewSet(mixins.CreateModelMixin, mixins.ListModelMixin, mixins.DestroyModelMixin, mixins.UpdateModelMixin, viewsets.GenericViewSet): queryset = ExternalAuditors.objects.all() serializer_class = ExternalAuditorsSerializer