from crm.models import Candidate from datetime import datetime, timedelta from django.db.models import Avg from django.db.models.query import QuerySet from django.utils.translation import get_language_from_request from django_filters.rest_framework import DjangoFilterBackend from openpyxl import Workbook, load_workbook from rest_framework import status from rest_framework.decorators import action from rest_framework.filters import OrderingFilter, SearchFilter from rest_framework.generics import GenericAPIView from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.serializers import Serializer from rest_framework.views import APIView from rest_framework.viewsets import ModelViewSet from rest_framework_jwt.authentication import JSONWebTokenAuthentication from rest_framework import serializers from question.models import Question from question.serializers import QuestionSerializer from server import settings from utils.custom import CommonPagination from utils.mixins import OptimizationMixin from .exports import export_test, exportw_test from .models import AnswerDetail, Banner, ExamTest, Exam from .models_paper import Paper, PaperQuestions, TestRule, WorkScope from .serializers import ( AnswerDetailCreateSerializer, AnswerDetailSerializer, BannerSerializer, ExamTestExamListSerializer, ExamTestListSerializer, MoniTestSerializer, PaperDetailSerializer, PaperQuestionsCreateSerializer, PaperSerializer, TestRuleSerializer, WorkScopeSerializer, ExamCreateUpdateSerializer, ExamListSerializer, ExamTestDetailSerializer) from django.utils import timezone from django.db.models import Q from utils.pagination import PageOrNot from rest_framework.exceptions import ParseError from django.db import transaction from examtest.exports import gen_candidate # Create your views here. class ExamViewSet(ModelViewSet): """ 正式考试增删改查 """ perms_map = [ {'get': 'exam_view'}, {'post': 'exam_create'}, {'put': 'exam_update'}, {'delete': 'exam_delete'}] pagination_class = CommonPagination filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter] ordering = ['-id'] search_fields = ['name'] def get_queryset(self): queryset = Exam.objects.all() if hasattr(self.get_serializer_class(), 'setup_eager_loading'): queryset = self.get_serializer_class().setup_eager_loading(queryset) # 性能优化 if self.request.user.is_superuser: return queryset roles = self.request.user.roles.values_list('name', flat=True) if '普通管理' in roles: queryset = queryset.filter(create_admin = self.request.user) elif '省管理' in roles: if self.request.user.pname: queryset = queryset.filter(examtest_exam__consumer__company__geo__pname = self.request.user.pname) else: return queryset.objects.none() elif '本单位管理' in roles: if self.request.user.bcompany: queryset = queryset.filter(examtest_exam__consumer__company = self.request.user.bcompany) else: return queryset.objects.none() else: return queryset.none() return queryset def get_serializer_class(self): if self.action in ['create', 'update']: return ExamCreateUpdateSerializer return ExamListSerializer def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) instance = serializer.save(create_admin=request.user) instance.code = instance.pk + 10000 instance.save() return Response({"code":instance.code, "id":instance.pk}, status=status.HTTP_201_CREATED) def destroy(self, request, *args, **kwargs): instance = self.get_object() if ExamTest.objects.filter(exam=instance).exists(): return Response({'error':'存在考试记录,禁止删除'}) instance.delete() return Response(status=status.HTTP_204_NO_CONTENT) @action(methods=['post'], detail = False,perms_map=[{'post':'exam_attend'}]) def attend(self, request, *args, **kwargs): """ 参加考试 """ if request.data.get('deptname', None): request.user.deptname = request.data['deptname'] request.user.save() if request.data.get('code', None): code = request.data.get('code') now = timezone.now() try: exam = Exam.objects.get(code=code, opentime__lt=now, closetime__gt=now) tests = ExamTest.objects.filter(exam=exam, consumer=request.user) if tests.count() < exam.chance: # 还有考试机会就可以接着考 return Response({'exam':exam.pk}) return Response({'error':'考试机会不足'}) except: return Response({'error':'考试编号不存在'}) return Response({'error':'操作失败'}) @action(methods=['post'], detail = True,perms_map=[{'post':'exam_attend'}]) def init(self, request, *args, **kwargs): """ 生成考试卷 """ obj = Exam.objects.get(pk=kwargs['pk']) workscope = obj.workscope ret = {} ret['name'] = obj.name ret['consumer_detail'] = request.data ret['type'] = '正式考试' # 正式考试 ret['exam'] = kwargs['pk'] ret['exam_'] = ExamListSerializer(instance=obj).data ret['workscope'] = workscope.id if obj.paper: serializer = PaperDetailSerializer(instance=obj.paper) ret['paper'] = obj.paper.id retx = serializer.data retx.update(ret) return Response(retx) elif workscope.name in ['医学Ⅲ类', '非医学Ⅲ类']: ret['limit'] = 45 ret['total_score'] = 120 ret['pass_score'] = 90 ret['danxuan_count'] = 40 ret['danxuan_score'] = 2 ret['duoxuan_count'] = 10 ret['duoxuan_score'] = 4 question_queryset = Question.objects.none() questioncats = workscope.questioncat.order_by('type', 'create_time') if questioncats.count() == 3: queryset = Question.objects.filter(is_delete=0, enabled=True) a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:12] a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:12] a3_set = queryset.filter(questioncat=questioncats[2], type='单选').order_by('?')[:16] b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:3] b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:3] b3_set = queryset.filter(questioncat=questioncats[2], type='多选').order_by('?')[:4] question_queryset = question_queryset|a1_set|a2_set|a3_set|b1_set|b2_set|b3_set questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data for i in questions: if i['type'] == '单选': i['total_score'] = 2 elif i['type'] == '多选': i['total_score'] = 4 ret['questions'] = questions return Response(ret) elif workscope.name in ['放射工作人员(上岗)', '放射工作人员(在岗)']: ret['limit'] = workscope.rule.limit ret['total_score'] = workscope.rule.total_score ret['pass_score'] = workscope.rule.pass_score ret['danxuan_count'] = workscope.rule.danxuan_count ret['danxuan_score'] = workscope.rule.danxuan_score ret['duoxuan_count'] = workscope.rule.duoxuan_count ret['duoxuan_score'] = workscope.rule.duoxuan_score ret['panduan_count'] = workscope.rule.panduan_count ret['panduan_score'] = workscope.rule.panduan_score question_queryset = Question.objects.none() queryset = Question.objects.filter(is_delete=0,questioncat__in = workscope.questioncat.all(), enabled=True) if ret['danxuan_count']: danxuan = queryset.filter(type='单选').order_by('?')[:ret['danxuan_count']] question_queryset = question_queryset | danxuan if ret['duoxuan_count']: duoxuan = queryset.filter(type='多选').order_by('?')[:ret['duoxuan_count']] question_queryset = question_queryset | duoxuan if ret['panduan_count']: panduan = queryset.filter(type='判断').order_by('?')[:ret['panduan_count']] question_queryset = question_queryset | panduan questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data for i in questions: if i['type'] == '单选': i['total_score'] = ret['danxuan_score'] elif i['type'] == '多选': i['total_score'] = ret['duoxuan_score'] else: i['total_score'] = ret['panduan_score'] ret['questions'] = questions return Response(ret) return Response({'error':'生成试卷失败'}) @action(methods=['post'], detail = True ,perms_map=[{'post':'exam_update'}]) def upimgs(self, request, *args, **kwargs): obj = self.get_object() obj.qdimgs = request.data.get('qdimgs', []) obj.xcimgs = request.data.get('xcimgs', []) obj.save() return Response() class AnswerDetailView(APIView): authentication_classes = [] permission_classes = [] def get(self, request, *args, **kwargs): queryset = AnswerDetail.objects.all() if request.query_params.get('examtest', None): queryset = queryset.filter(examtest=request.query_params.get('examtest')).order_by('question__type') queryset = AnswerDetailSerializer.setup_eager_loading(queryset) serializer = AnswerDetailSerializer(instance=queryset,many=True) return Response(serializer.data) class WorkScopeViewSet(ModelViewSet): """ 工作类别:增删改查 """ perms_map = [ {'get': '*'}, {'post': 'workscope_create'}, {'put': 'workscope_update'}, {'delete': 'workscope_delete'}] pagination_class = None queryset = WorkScope.objects.filter(is_delete=0).all().order_by("id") serializer_class = WorkScopeSerializer ordering_fields = ('id',) ordering = ['-can_exam', 'sortnum'] filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter] filterset_fields = ['subject', 'can_exam', 'is_public'] search_fields = ('name',) def get_queryset(self): queryset = self.queryset queryset = self.get_serializer_class().setup_eager_loading(queryset) return queryset @action(methods=['get'], detail=True,url_path='monitest', url_name='gen_monitest', perms_map=[{'get':'gen_monitest'}]) def monitest(self, request, pk=None): ''' 生成自助模拟考试 ''' ret = {} workscope = self.get_object() ret['name'] = '自助模考' + datetime.now().strftime('%Y%m%d%H%M') ret['type'] = '自助模考' # 自助模拟考试 ret['workscope'] = workscope.id ret['limit'] = workscope.rule.limit ret['total_score'] = workscope.rule.total_score ret['pass_score'] = workscope.rule.pass_score ret['danxuan_count'] = workscope.rule.danxuan_count ret['danxuan_score'] = workscope.rule.danxuan_score ret['duoxuan_count'] = workscope.rule.duoxuan_count ret['duoxuan_score'] = workscope.rule.duoxuan_score ret['panduan_count'] = workscope.rule.panduan_count ret['panduan_score'] = workscope.rule.panduan_score question_queryset = Question.objects.none() questioncats = workscope.questioncat.exclude(name='辐射小知识更新中').order_by('type', 'create_time') if questioncats.count() == 3 and questioncats.filter(name='电离辐射法律法规').exists(): import random b2 = random.randint(1,8) b3 = random.randint(1,9-b2) b1 = 10 - b2 -b3 a1 = 2*b2 + 2*b3 - 2 a2 = 18 - 2*b2 a3 = 24 - 2*b3 queryset = Question.objects.filter(is_delete=0, enabled=True) a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:a1] a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:a2] a3_set = queryset.filter(questioncat=questioncats[2], type='单选').order_by('?')[:a3] b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:b1] b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:b2] b3_set = queryset.filter(questioncat=questioncats[2], type='多选').order_by('?')[:b3] question_queryset = question_queryset|a1_set|a2_set|a3_set|b1_set|b2_set|b3_set elif workscope.name == '辐射安全管理': # 辐射安全管理出卷规则 queryset = Question.objects.filter(is_delete=0, enabled=True) a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:16] a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:24] b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:4] b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:6] question_queryset = question_queryset|a1_set|a2_set|b1_set|b2_set elif workscope.name == '科研、生产及其他': # 科研、生产及其他出卷规则 queryset = Question.objects.filter(is_delete=0, enabled=True) a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:24] a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:16] b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:6] b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:4] question_queryset = question_queryset|a1_set|a2_set|b1_set|b2_set else: queryset = Question.objects.filter(is_delete=0,questioncat__in = workscope.questioncat.all(), enabled=True) if ret['danxuan_count']: danxuan = queryset.filter(type='单选').order_by('?')[:ret['danxuan_count']] question_queryset = question_queryset | danxuan if ret['duoxuan_count']: duoxuan = queryset.filter(type='多选').order_by('?')[:ret['duoxuan_count']] question_queryset = question_queryset | duoxuan if ret['panduan_count']: panduan = queryset.filter(type='判断').order_by('?')[:ret['panduan_count']] question_queryset = question_queryset | panduan questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data for i in questions: if i['type'] == '单选': i['total_score'] = ret['danxuan_score'] elif i['type'] == '多选': i['total_score'] = ret['duoxuan_score'] else: i['total_score'] = ret['panduan_score'] ret['questions'] = questions return Response(ret) @action(methods=['get'], detail=False,url_path='monitest2', url_name='gen_monitest2', perms_map=[{'get':'gen_monitest'}]) def monitest2(self, request, pk=None): ''' 生成体验版自助模拟考试 ''' ret = {} rule = TestRule.objects.first() ret['name'] = '自助模考' + datetime.now().strftime('%Y%m%d%H%M') ret['type'] = '自助模考' # 自助模拟考试 ret['limit'] = rule.limit ret['total_score'] = rule.total_score ret['pass_score'] = rule.pass_score ret['danxuan_count'] = rule.danxuan_count ret['danxuan_score'] = rule.danxuan_score ret['duoxuan_count'] = rule.duoxuan_count ret['duoxuan_score'] = rule.duoxuan_score ret['panduan_count'] = rule.panduan_count ret['panduan_score'] = rule.panduan_score question_queryset = Question.objects.none() queryset = Question.objects.filter(is_delete=0, enabled=True) if ret['danxuan_count']: danxuan = queryset.filter(type='单选').order_by('?')[:ret['danxuan_count']] question_queryset = question_queryset | danxuan if ret['duoxuan_count']: duoxuan = queryset.filter(type='多选').order_by('?')[:ret['duoxuan_count']] question_queryset = question_queryset | duoxuan if ret['panduan_count']: panduan = queryset.filter(type='判断').order_by('?')[:ret['panduan_count']] question_queryset = question_queryset | panduan questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data for i in questions: if i['type'] == '单选': i['total_score'] = ret['danxuan_score'] elif i['type'] == '多选': i['total_score'] = ret['duoxuan_score'] else: i['total_score'] = ret['panduan_score'] ret['questions'] = questions user = request.user user.remain_count = user.remain_count -1 request.user.save() ret['remain_count'] = user.remain_count return Response(ret) # @action(methods=['get'], detail=True, perms_map=[{'get':'*'}]) # def examtest(self, request, pk=None): # ''' # 生成正式考试 # ''' # ret = {} # workscope = self.get_object() # ret['name'] = '正式考试' + datetime.now().strftime('%Y%m%d%H%M') # ret['type'] = '正式考试' # 自助模拟考试 # ret['workscope'] = workscope.id # ret['limit'] = workscope.rule.limit # ret['total_score'] = workscope.rule.total_score # ret['pass_score'] = workscope.rule.pass_score # ret['danxuan_count'] = workscope.rule.danxuan_count # ret['danxuan_score'] = workscope.rule.danxuan_score # ret['duoxuan_count'] = workscope.rule.duoxuan_count # ret['duoxuan_score'] = workscope.rule.duoxuan_score # ret['panduan_count'] = workscope.rule.panduan_count # ret['panduan_score'] = workscope.rule.panduan_score # question_queryset = Question.objects.none() # queryset = Question.objects.filter(is_delete=0,questioncat__in = workscope.questioncat.all(), enabled=True) # if ret['danxuan_count']: # danxuan = queryset.filter(type='单选').order_by('?')[:ret['danxuan_count']] # question_queryset = question_queryset | danxuan # if ret['duoxuan_count']: # duoxuan = queryset.filter(type='多选').order_by('?')[:ret['duoxuan_count']] # question_queryset = question_queryset | duoxuan # if ret['panduan_count']: # panduan = queryset.filter(type='判断').order_by('?')[:ret['panduan_count']] # question_queryset = question_queryset | panduan # questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data # for i in questions: # if i['type'] == '单选': # i['total_score'] = ret['danxuan_score'] # elif i['type'] == '多选': # i['total_score'] = ret['duoxuan_score'] # else: # i['total_score'] = ret['panduan_score'] # ret['questions'] = questions # return Response(ret) @action(methods=['get'], detail=True, perms_map=[{'get':'*'}]) def chose(self, request, *args, **kwargs): """ 用户自己选择工作类别 """ obj = self.get_object() if obj.can_exam: candidates = Candidate.objects.filter(consumer=request.user, workscope=request.user.workscope) if candidates.exists(): pass else: Candidate.objects.create(consumer=request.user, workscope=request.user.workscope) request.user.workscope = obj request.user.save() return Response({'workscope':obj.pk, 'workscope_name':obj.name}) if Candidate.objects.filter(consumer=request.user, workscope=obj).exists(): return Response({'workscope':obj.pk, 'workscope_name':obj.name}) return Response({'error':'该类别不可选择,请咨询课程顾问'}) class BannerViewSet(ModelViewSet): """ 轮播图:增删改查 """ perms_map = ( {'get': 'banner_list'}, {'post': 'banner_create'}, {'put': 'banner_update'}, {'delete': 'banner_delete'}) pagination_class = None queryset = Banner.objects.filter(is_delete=0).all().order_by("sort") serializer_class = BannerSerializer ordering_fields = ('id', 'sort') ordering = ['sort'] def get_authenticators(self): if self.request.method == 'GET': self.authentication_classes = [] return [auth() for auth in self.authentication_classes] def get_permissions(self): if self.request.method == 'GET': self.permission_classes = [] return [permission() for permission in self.permission_classes] class TestRuleViewSet(ModelViewSet): """ 模考规则:增删改查 """ perms_map = ( {'get': 'testrule_list'}, {'post': 'testrule_create'}, {'put': 'testrule_update'}, {'delete': 'testrule_delete'}) pagination_class = None queryset = TestRule.objects.filter(is_delete=0).all().order_by("id") serializer_class = TestRuleSerializer ordering_fields = ('id',) ordering = ['id'] search_fields = ('^name',) def get_authenticators(self): if self.request.method == 'GET': self.authentication_classes = [] return [auth() for auth in self.authentication_classes] def get_permissions(self): if self.request.method == 'GET': self.permission_classes = [] return [permission() for permission in self.permission_classes] class ExamTestViewSet(PageOrNot, ModelViewSet): """ 考试记录列表和详情 """ perms_map = [{'get': 'examtest_view'},{'post': '*'}, {'delete':'examtest_delete'}] pagination_class = CommonPagination queryset = ExamTest.objects.filter(is_delete=0).all() serializer_class = ExamTestExamListSerializer ordering_fields = ('id','create_time','took','score') ordering = ['-create_time', 'is_pass', '-score'] search_fields = ('consumer__name', 'consumer__company__name') filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter] filterset_fields = ['type','is_pass', 'exam'] def get_serializer_class(self): if self.request: if self.request.query_params.get('exam', None) or self.request.query_params.get('type') == '正式考试': return ExamTestExamListSerializer if self.action == 'retrieve': return ExamTestDetailSerializer return ExamTestListSerializer def filter_queryset(self, queryset): for backend in list(self.filter_backends): queryset = backend().filter_queryset(self.request, queryset, self) if self.request.query_params.get('start'): queryset = queryset.filter(start_time__gte=self.request.query_params['start'] ) if self.request.query_params.get('end'): queryset = queryset.filter(start_time__lte=self.request.query_params['end']) return queryset def get_queryset(self): queryset = self.queryset if hasattr(self.get_serializer_class(), 'setup_eager_loading'): queryset = self.get_serializer_class().setup_eager_loading(queryset) if self.request.user.is_superuser: return queryset roles = self.request.user.roles.values_list('name', flat=True) if '普通管理' in roles: queryset = queryset.filter(Q(consumer__create_admin = self.request.user)|Q(exam__create_admin=self.request.user)) elif '省管理' in roles: if self.request.user.pname: queryset = queryset.filter(consumer__company__geo__pname = self.request.user.pname) else: return queryset.objects.none() elif '本单位管理' in roles: if self.request.user.bcompany: queryset = queryset.filter(consumer__company = self.request.user.bcompany) else: return queryset.objects.none() else: return queryset.none() return queryset @action(methods=['get'], detail=False,url_path='self', url_name='selftest', perms_map = [{'*':'my_examtest'}]) def selftest(self, request, pk=None): ''' 个人考试记录 ''' queryset = self.filter_queryset(ExamTest.objects.filter(consumer=request.user)).order_by('-create_time') pg = CommonPagination() p = pg.paginate_queryset(queryset=queryset,request=request,view=self) serializer = ExamTestListSerializer(instance=p,many=True) return pg.get_paginated_response(serializer.data) @action(methods=['get'], detail=False,url_path='fx', url_name='selffx', perms_map = [{'*':'my_examtest'}]) def selffx(self, request, pk=None): ''' 个人考试分析 ''' queryset = ExamTest.objects.filter(consumer=request.user) ret = {} ret['total'] = queryset.count() ret['avg_score'] = round(queryset.aggregate(avg=Avg('score'))['avg']) if ret['total'] else 0 ret['pass_rate'] = round(((queryset.filter(is_pass=True).count())/ret['total'])*100) if ret['total'] else 0 return Response(ret) def create(self, request, *args, **kwargs): serializer = MoniTestSerializer(data = request.data) if serializer.is_valid(): instance = serializer.save(consumer = request.user) if 'questions' in request.data: questions = [] for i in request.data['questions']: question = {} question['question'] = i['id'] question['examtest'] = instance.id question['score'] = i['score'] if 'user_answer' in i: question['user_answer'] = i['user_answer'] question['is_right'] = i['is_right'] questions.append(question) serializer_detail = AnswerDetailCreateSerializer(data=questions, many=True) if serializer_detail.is_valid(): serializer_detail.save() # 关联正式考试如有 if request.data.get('exam', None): instance.exam = Exam.objects.get(pk=request.data['exam']) instance.save() return Response(MoniTestSerializer(instance).data,status=status.HTTP_200_OK) else: return Response(serializer_detail.errors) else: return Response({'error':'答题记录不存在'}) else: return Response(serializer.errors) @action(methods=['get'], detail=False, url_path='export', url_name='export_test', perms_map=[{'*':'export_test'}]) def export(self, request): queryset = self.filter_queryset(self.get_queryset()) queryset = ExamTestListSerializer.setup_eager_loading(queryset) # 性能优化 if queryset.count()>1000: return Response({'error':'数据量超过1000,请筛选后导出!'}) serializer = ExamTestListSerializer(instance=queryset, many=True) path = export_test(serializer.data) return Response({'path': path}) @action(methods=['post'], detail = True ,perms_map=[{'post':'export_test'}]) def exportw(self, request, *args, **kwargs): obj = self.get_object() if 'anew' in request.data and request.data['anew']: # 是否需要重新生成 path = exportw_test(obj, True) else: path = exportw_test(obj, False) return Response({'path': path}) @action(methods=['post'], detail=True, perms_map = [{'*':'candidate_issue'}], serializer_class=serializers.Serializer) @transaction.atomic def issue(self, request, pk=None): ''' 颁发证书 ''' obj = self.get_object() workscope = obj.workscope candidate = obj.candidate if hasattr(obj, 'candidate') else None if not obj.is_pass: return Response({'error':'考试未通过'}) if candidate: return Response({'error':'证书已存在'}) candidates = Candidate.objects.filter(consumer=obj.consumer, workscope=workscope, number__isnull=True) if candidates.exists(): candidate = candidates[0] else: candidate = Candidate.objects.create(consumer=obj.consumer, workscope=workscope) candidate.examtest = obj now = timezone.now() candidate.issue_date = now candidate.start_date = now candidate.end_date = now + timedelta(days=workscope.issue_year*365) # 有效期 candidate.examtest_date = obj.start_time.date() candidate.workscope_name = workscope.name candidate.consumer_name = obj.consumer_detail['name'] candidate.ID_number = obj.consumer_detail['ID_number'] candidate.company_name = obj.consumer_detail['company_name'] candidate.deptname = obj.consumer_detail['deptname'] candidate.train_name = obj.exam.train_name candidate.train_start_date = obj.exam.train_start_date candidate.train_end_date = obj.exam.train_end_date candidate.save() if workscope.name in ['放射工作人员(上岗)', '放射工作人员(在岗)']: # 如果是放射工作人员处理方式 count = Candidate.objects.filter(create_admin=request.user, issue_date__year=now.year, issue_date__month=now.month).count() candidate.number='HNHK'+ str(now.year) + str(now.month).zfill(2) + str(count+1).zfill(4) else: candidate.number='SL'+ str(now.year)[-2:] + str(candidate.pk).zfill(6) candidate.create_admin = request.user candidate.save() path = None # 生成word证书 # path = None # if workscope.name in ['放射工作人员(上岗)', '放射工作人员(在岗)']: # 如果是放射工作人员处理方式 # path = gen_candidate(candidate) # candidate.path = path # candidate.save() return Response({"id":candidate.pk, "number":candidate.number, "path":path}) class PaperViewSet(PageOrNot, ModelViewSet): """ 押题卷增删改查 """ perms_map = [ {'get': 'paper_view'}, {'post': 'paper_create'}, {'put': 'paper_update'}, {'delete': 'paper_delete'}] queryset = Paper.objects.filter(is_delete=0).all().order_by("id") pagination_class = CommonPagination serializer_class = PaperSerializer ordering_fields = ('id',) ordering = ['id'] filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter] filterset_fields = ['workscope'] search_fields = ('name',) def get_serializer_class(self): if self.action=='list': return PaperSerializer else: return PaperDetailSerializer def create(self, request, *args, **kwargs): data = request.data serializer = self.get_serializer(data=data) serializer.is_valid(raise_exception=True) instance = serializer.save() if 'questions' in data: questions = [] for i in data['questions']: question = {} question['question'] = i['id'] question['paper'] = instance.id question['total_score'] = i['total_score'] questions.append(question) serializer_detail = PaperQuestionsCreateSerializer(data=questions, many=True) if serializer_detail.is_valid(): serializer_detail.save() return Response(status=status.HTTP_200_OK) else: return Response(serializer_detail.errors) else: return Response({'error':'不存在题库'}) def update(self, request, *args, **kwargs): data = request.data instance = self.get_object() serializer = self.get_serializer(instance, data=data) serializer.is_valid(raise_exception=True) self.perform_update(serializer) if 'questions' in data: questions = [] for i in data['questions']: question = {} question['question'] = i['id'] question['paper'] = instance.id question['total_score'] = i['total_score'] questions.append(question) serializer_detail = PaperQuestionsCreateSerializer(data=questions, many=True) if serializer_detail.is_valid(): PaperQuestions.objects.filter(paper=instance).delete() serializer_detail.save() return Response(status=status.HTTP_200_OK) else: return Response(serializer_detail.errors) else: return Response({'error':'不存在题库'}) @action(methods=['get'], detail=True, url_path='monitest', url_name='gen_monitest', authentication_classes=[],permission_classes=[], perms_map=[{'get':'gen_monitest'}]) def monitest(self, request, pk=None): ''' 生成押卷模拟考试 ''' ret = {} paper = self.get_object() serializer = PaperDetailSerializer(instance=paper) ret = serializer.data ret['name'] = '押卷模考' + datetime.now().strftime('%Y%m%d%H%M') ret['type'] = '押卷模考' ret['paper'] = paper.id return Response(ret) @action(methods=['put'], detail=True, url_path='clone', url_name='clone_paper', perms_map=[{'put':'clone_paper'}]) def clone(self, request, pk=None): ''' 克隆试卷 ''' paper = self.get_object() obj = Paper() obj.name = '克隆卷-'+paper.name obj.workscope = paper.workscope obj.limit = paper.limit obj.total_score = paper.total_score obj.pass_score = paper.pass_score obj.danxuan_count = paper.danxuan_count obj.danxuan_score = paper.danxuan_score obj.duoxuan_count = paper.duoxuan_count obj.duoxuan_score = paper.duoxuan_score obj.panduan_count = paper.panduan_count obj.panduan_score = paper.panduan_score obj.save() for i in PaperQuestions.objects.filter(paper=paper): o = PaperQuestions() o.paper = obj o.question = i.question o.total_score = i.total_score o.save() return Response(status=status.HTTP_200_OK)