cma_search/server/apps/information/views.py

218 lines
9.1 KiB
Python

from rest_framework import viewsets, mixins
from rest_framework.viewsets import ViewSet
from rest_framework import status
from django.conf import settings
from rest_framework.decorators import action
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from utils.queryset import get_child_queryset2
from rest_framework.response import Response
from apps.system.permission import has_permission
from openpyxl import load_workbook
from django.db import transaction
from rest_framework.exceptions import ParseError
from apps.system.models import Organization
from .models import *
from .serializers import *
from datetime import datetime
import os
class AbilityReviewViewSet(ModelViewSet):
queryset = AbilityReview.objects.all()
serializer_class = AbilityReviewSerializer
# 查询子以及已经本公司的评审情况
@action(detail=False, methods=['get'])
def review_info(self, request, *args, **kwargs):
father_dept = request.user.dept
child_dept = get_child_queryset2(father_dept)
query = AbilityReview.objects.filter(department__in=child_dept)
serializer = AbilityReviewSerializer(query, many=True)
data = {'count':len(serializer.data), 'results':serializer.data}
return Response(data, status = status.HTTP_200_OK)
def create(self, request):
if Organization.objects.filter(name=request.data['name']).exists():
department_id = Organization.objects.filter(name=request.data['name']).first().id
request.data['department'] = department_id
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
else:
raise ParseError("公司名称不存在")
class ImpMixin:
def get_queryset(self):
mydept = self.request.user.dept
qs = super().get_queryset()
if has_permission('task2', self.request.user):
return qs
return qs.filter(belong_dept=mydept)
def format_date(self, ind, val):
new_val = val
if isinstance(val, datetime.datetime):
new_val = val.date()
elif isinstance(val, datetime.date):
new_val = val
elif isinstance(val, str):
try:
new_val = datetime.datetime.strptime(val, '%Y-%m-%d').date()
except ValueError:
raise ParseError(f'{ind}行, 日期时间格式错误')
elif val is None:
pass
else:
raise ParseError(f'{ind}行, 日期时间格式错误')
return new_val
def get_enum(self, val, atuple, ind):
for i in atuple:
if i[1] == val:
return i[0]
raise ParseError('{}: 请选择固定选项值'.format(ind))
def F(self, data, sheet, i, etype):
raise NotImplementedError()
def gen_imp_view(self, request, start: int, mySerializer, types = None):
if 'file' not in request.data:
raise ParseError('请提供文件')
path = request.data['file']
if not str(path).endswith('.xlsx'):
raise ParseError('请提供xlsx格式文件')
fullpath = os.path.join(settings.BASE_DIR, str(path))
wb = load_workbook(fullpath,data_only=True)
sheet = wb.active
# 遍历Excel文件中的数据
if types.lower() == "qt":
data_list = self.build_qt_data(sheet)
else:
pass
serializer = mySerializer(data=data_list, many=True, context={'request': request})
if serializer.is_valid():
serializer.save()
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
return Response({'uploaded': 'File uploaded successfully'}, status=status.HTTP_201_CREATED)
class QualityCommendationViewSet(ModelViewSet):
queryset = QualityCommendation.objects.all()
serializer_class = QualityCommendationSerializer
def create(self, request, *args, **kwargs):
if Organization.objects.filter(name=request.data['awardee_company']).exists():
department_id = Organization.objects.filter(name=request.data['awardee_company']).first().id
request.data['department'] = department_id
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
# 人名存在重复
else:
raise ParseError("获奖单位不存在")
# # 导入表格
# @action(methods=['post'], detail=False)
# @transaction.atomic
# def imp(self, request, *args, **kwargs):
# return self.gen_imp_view(request, 5, QualityCommendationSerializer, 'qt')
#构造导入的数据格式
def build_qt_data(self, sheet):
data_list = []
for row in sheet.iter_rows(min_row=2, values_only=True): # 假设第一行是表头,从第二行开始读取数据
if row[0] is not None:
awarded_date = row[6].strftime("%Y-%m-%d")
# 判断获奖的是人还是公司
department_id = Organization.objects.filter(name=row[4]).first().id
if department_id:
awardee_people = None
awardee_company = row[4]
else:
awardee_company = None
awardee_people = row[4]
serializer_data = {
'name': row[1], # 第一列是名字
'commendation_name':row[2],
'Awards_level':row[3],
'awardee_company':awardee_company,
'awardee_people':awardee_people,
'awarded_by':row[5],
'awarded_date':awarded_date,
'department':department_id,
}
data_list.append(serializer_data)
return data_list
# 查询子以及已经本公司的质量表彰
@action(detail=False, methods=['get'])
def commentdation_info(self, request, *args, **kwargs):
father_dept = request.user.dept
child_dept = get_child_queryset2(father_dept)
query = QualityCommendation.objects.filter(department__in=child_dept)
serializer = QualityCommendationSerializer(query, many=True)
data = {'count':len(serializer.data), 'results':serializer.data}
return Response(data, status = status.HTTP_200_OK)
# 质量活动
class QualityActivitiesViewSet(ModelViewSet):
queryset = QualityActivities.objects.all()
serializer_class = QualityActivitiesSerializer
def create(self, request):
if Organization.objects.filter(name=request.data['orgunits']).exists():
department_id = Organization.objects.filter(name=request.data['orgunits']).first().id
request.data['department'] = department_id
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
else:
raise ParseError("组织单位不存在")
# 查询子以及已经本公司的质量活动
@action(detail=False, methods=['get'])
def activate_info(self, request, *args, **kwargs):
child_dept = get_child_queryset2(request.user.dept)
query = QualityActivities.objects.filter(department__in=child_dept)
serializer = QualityActivitiesSerializer(query, many=True)
data = {'count':len(serializer.data), 'results':serializer.data}
return Response(data, status = status.HTTP_200_OK)
class ContactViewSet(ModelViewSet):
queryset = Contact.objects.all()
serializer_class = ContactSerializer
class ExternalAuditorsViewSet(ModelViewSet):
queryset = ExternalAuditors.objects.all()
serializer_class = ExternalAuditorsSerializer
def create(self, request):
if Organization.objects.filter(name=request.data['name_company']).exists():
department_id = Organization.objects.filter(name=request.data['name_company']).first().id
request.data['department'] = department_id
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
else:
raise ParseError("组织单位不存在")
# 查询子以及已经本公司的质量活动
@action(detail=False, methods=['get'])
def activate_info(self, request, *args, **kwargs):
child_dept = get_child_queryset2(request.user.dept)
query = ExternalAuditors.objects.filter(department__in=child_dept)
serializer = ExternalAuditorsSerializer(query, many=True)
data = {'count':len(serializer.data), 'results':serializer.data}
return Response(data, status = status.HTTP_200_OK)