200 lines
7.9 KiB
Python
200 lines
7.9 KiB
Python
|
|
from rest_framework import viewsets, mixins
|
|
from rest_framework.viewsets import ViewSet
|
|
from rest_framework import status
|
|
|
|
from django.conf import settings
|
|
from rest_framework.decorators import action
|
|
from rest_framework.views import APIView
|
|
from rest_framework.viewsets import ModelViewSet
|
|
# from .models import AbilityReview, QualityCommendation, QualityActivities, Contact, ExternalAuditors
|
|
# from .serializers import AbilityReviewSerializer, QualityCommendationSerializer, QualityActivitiesSerializer,ContactSerializer, ExternalAuditorsSerializer
|
|
from utils.queryset import get_child_queryset2
|
|
from rest_framework.response import Response
|
|
from apps.system.permission import has_permission
|
|
from openpyxl import load_workbook
|
|
from django.db import transaction
|
|
from rest_framework.exceptions import ParseError
|
|
from apps.system.models import Organization
|
|
from .models import *
|
|
from .serializers import *
|
|
|
|
from datetime import datetime
|
|
import os
|
|
|
|
class AbilityReviewViewSet( mixins.CreateModelMixin,
|
|
mixins.ListModelMixin,
|
|
mixins.DestroyModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
viewsets.GenericViewSet):
|
|
queryset = AbilityReview.objects.all()
|
|
serializer_class = AbilityReviewSerializer
|
|
|
|
#自定义查询
|
|
def get_queryset(self):
|
|
pass
|
|
|
|
class ImpMixin:
|
|
def get_queryset(self):
|
|
mydept = self.request.user.dept
|
|
qs = super().get_queryset()
|
|
if has_permission('task2', self.request.user):
|
|
return qs
|
|
return qs.filter(belong_dept=mydept)
|
|
|
|
def format_date(self, ind, val):
|
|
new_val = val
|
|
if isinstance(val, datetime.datetime):
|
|
new_val = val.date()
|
|
elif isinstance(val, datetime.date):
|
|
new_val = val
|
|
elif isinstance(val, str):
|
|
try:
|
|
new_val = datetime.datetime.strptime(val, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
raise ParseError(f'第{ind}行, 日期时间格式错误')
|
|
elif val is None:
|
|
pass
|
|
else:
|
|
raise ParseError(f'第{ind}行, 日期时间格式错误')
|
|
return new_val
|
|
|
|
def get_enum(self, val, atuple, ind):
|
|
for i in atuple:
|
|
if i[1] == val:
|
|
return i[0]
|
|
raise ParseError('第{}: 请选择固定选项值'.format(ind))
|
|
|
|
def F(self, data, sheet, i, etype):
|
|
raise NotImplementedError()
|
|
|
|
def gen_imp_view(self, request, start: int, mySerializer, types = None):
|
|
if 'file' not in request.data:
|
|
raise ParseError('请提供文件')
|
|
path = request.data['file']
|
|
|
|
if not str(path).endswith('.xlsx'):
|
|
raise ParseError('请提供xlsx格式文件')
|
|
fullpath = os.path.join(settings.BASE_DIR, str(path))
|
|
wb = load_workbook(fullpath,data_only=True)
|
|
sheet = wb.active
|
|
# 遍历Excel文件中的数据
|
|
if types.lower() == "qt":
|
|
data_list = self.build_qt_data(sheet)
|
|
else:
|
|
pass
|
|
serializer = mySerializer(data=data_list, many=True, context={'request': request})
|
|
if serializer.is_valid():
|
|
serializer.save()
|
|
else:
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
return Response({'uploaded': 'File uploaded successfully'}, status=status.HTTP_201_CREATED)
|
|
|
|
|
|
class QualityCommendationViewSet(ImpMixin, ModelViewSet):
|
|
queryset = QualityCommendation.objects.all()
|
|
serializer_class = QualityCommendationSerializer
|
|
perms_map = {"get": "*", "post": "*", "put": "*", "delete": "*"}
|
|
|
|
def create(self):
|
|
if Organization.objects.filter(name=self.request.data['awardee_company']).exists():
|
|
department_id = Organization.objects.filter(name=self.request.data['awardee_company']).first().id
|
|
self.request.data['department'] = department_id
|
|
serializer = self.get_serializer(data=self.request.data)
|
|
if serializer.is_valid(raise_exception=True):
|
|
serializer.save()
|
|
return Response(serializer.data, status = status.HTTP_201_CREATED)
|
|
# 人名存在重复
|
|
else:
|
|
raise ParseError("部门不存在")
|
|
|
|
# 导入表格
|
|
@action(methods=['post'], detail=False)
|
|
@transaction.atomic
|
|
def imp(self, request, *args, **kwargs):
|
|
return self.gen_imp_view(request, 5, QualityCommendationSerializer, 'qt')
|
|
|
|
#构造导入的数据格式
|
|
def build_qt_data(self, sheet):
|
|
data_list = []
|
|
for row in sheet.iter_rows(min_row=2, values_only=True): # 假设第一行是表头,从第二行开始读取数据
|
|
if row[0] is not None:
|
|
awarded_date = row[6].strftime("%Y-%m-%d")
|
|
# 判断获奖的是人还是公司
|
|
department_id = Organization.objects.filter(name=row[4]).first().id
|
|
if department_id:
|
|
awardee_people = None
|
|
awardee_company = row[4]
|
|
else:
|
|
awardee_company = None
|
|
awardee_people = row[4]
|
|
serializer_data = {
|
|
'name': row[1], # 第一列是名字
|
|
'commendation_name':row[2],
|
|
'Awards_level':row[3],
|
|
'awardee_company':awardee_company,
|
|
'awardee_people':awardee_people,
|
|
'awarded_by':row[5],
|
|
'awarded_date':awarded_date,
|
|
'department':department_id,
|
|
}
|
|
data_list.append(serializer_data)
|
|
return data_list
|
|
|
|
|
|
# 查询子以及已经本公司的质量表彰
|
|
@action(detail=False, methods=['get'])
|
|
def commentdation_info(self, *args, **kwargs):
|
|
father_dept = self.request.user.dept
|
|
child_dept = get_child_queryset2(father_dept)
|
|
query = QualityCommendation.objects.filter(department__in=child_dept)
|
|
serializer = QualityCommendationSerializer(query, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
# 质量活动
|
|
class QualityActivitiesViewSet(ModelViewSet):
|
|
queryset = QualityActivities.objects.all()
|
|
serializer_class = QualityActivitiesSerializer
|
|
|
|
def create(self, request):
|
|
if Organization.objects.filter(name=self.request.data['organizational_units']).exists():
|
|
department_id = Organization.objects.filter(name=self.request.data['organizational_units']).first().id
|
|
self.request.data['department'] = department_id
|
|
print("-----------,",self.request.data)
|
|
serializer = self.get_serializer(data=self.request.data)
|
|
if serializer.is_valid(raise_exception=True):
|
|
serializer.save()
|
|
return Response(serializer.data, status = status.HTTP_201_CREATED)
|
|
else:
|
|
raise ParseError("部门不存在")
|
|
|
|
# 查询子以及已经本公司的质量活动
|
|
@action(detail=False, methods=['get'])
|
|
def activate_info(self, *args, **kwargs):
|
|
father_dept = self.request.user.dept
|
|
child_dept = get_child_queryset2(father_dept)
|
|
query = QualityActivities.objects.filter(department__in=child_dept)
|
|
serializer = QualityActivitiesSerializer(query, many=True)
|
|
return Response(serializer.data)
|
|
|
|
class ContactViewSet(mixins.CreateModelMixin,
|
|
mixins.ListModelMixin,
|
|
mixins.DestroyModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
viewsets.GenericViewSet):
|
|
queryset = Contact.objects.all()
|
|
serializer_class = ContactSerializer
|
|
|
|
|
|
class ExternalAuditorsViewSet(mixins.CreateModelMixin,
|
|
mixins.ListModelMixin,
|
|
mixins.DestroyModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
viewsets.GenericViewSet):
|
|
queryset = ExternalAuditors.objects.all()
|
|
serializer_class = ExternalAuditorsSerializer
|
|
|
|
|