715 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			715 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
| from django.shortcuts import render
 | ||
| from rest_framework.viewsets import ModelViewSet, GenericViewSet
 | ||
| from rest_framework.mixins import ListModelMixin, DestroyModelMixin, RetrieveModelMixin
 | ||
| from apps.exam.exports import export_question, export_record
 | ||
| from apps.exam.models import Question, Questioncat, PaperQuestion
 | ||
| from apps.exam.serializers import (QuestionSerializer, QuestioncatSerializer, PaperSerializer, ExamDetailSerializer, ExamRecordDetailSerializer, ExamListSerializer,
 | ||
|                                    ExamCreateUpdateSerializer, ExamListSerializer, ExamRecordSubmitSerializer, PaperDetailSerializer, PaperCreateUpdateSerializer, AnswerDetailOutSerializer, ExamRecordListSerializer)
 | ||
| from rest_framework.decorators import action
 | ||
| from rest_framework.response import Response
 | ||
| from rest_framework.permissions import IsAuthenticated
 | ||
| from rest_framework.exceptions import ParseError
 | ||
| from openpyxl import Workbook, load_workbook
 | ||
| from django.conf import settings
 | ||
| from apps.exam.models import Paper, Exam, ExamRecord, AnswerDetail, PaperQuestion
 | ||
| from django.utils import timezone
 | ||
| from django.db import transaction
 | ||
| from rest_framework.serializers import Serializer
 | ||
| from datetime import datetime
 | ||
| from apps.exam.filters import ExamRecordFilter, ExamFilter
 | ||
| from datetime import timedelta
 | ||
| from apps.system.mixins import CreateUpdateCustomMixin
 | ||
| from apps.edu.serializers import CertificateSerializer
 | ||
| from apps.edu.models import Certificate
 | ||
| from utils.queryset import get_child_queryset2
 | ||
| from apps.system.permission import has_permission
 | ||
| from apps.exam.parse_word import interpret_text
 | ||
| import os
 | ||
| import shutil
 | ||
| from django.db.models import Q
 | ||
| # Create your views here.
 | ||
| 
 | ||
| EXCEL_PATH = os.path.join(settings.BASE_DIR,  "media/default/question.xlsx")
 | ||
| 
 | ||
| def enctry(s):
 | ||
|     k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
 | ||
|     encry_str = ""
 | ||
|     for i, j in zip(s, k):
 | ||
|         # i为字符,j为秘钥字符
 | ||
|         temp = str(ord(i)+ord(j))+'_'  # 加密字符 = 字符的Unicode码 + 秘钥的Unicode码
 | ||
|         encry_str = encry_str + temp
 | ||
|     return encry_str
 | ||
| 
 | ||
| 
 | ||
| # 解密
 | ||
| def dectry(p):
 | ||
|     k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
 | ||
|     dec_str = ""
 | ||
|     for i, j in zip(p.split("_")[:-1], k):
 | ||
|         # i 为加密字符,j为秘钥字符
 | ||
|         # 解密字符 = (加密Unicode码字符 - 秘钥字符的Unicode码)的单字节字符
 | ||
|         temp = chr(int(i) - ord(j))
 | ||
|         dec_str = dec_str+temp
 | ||
|     return dec_str
 | ||
| 
 | ||
| 
 | ||
| class QuestioncatViewSet(CreateUpdateCustomMixin, ModelViewSet):
 | ||
|     perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'}
 | ||
|     queryset = Questioncat.objects.all()
 | ||
|     serializer_class = QuestioncatSerializer
 | ||
|     filterset_fields = ['parent']
 | ||
|     search_fields = ['name']
 | ||
| 
 | ||
| 
 | ||
| class QuestionViewSet(CreateUpdateCustomMixin, ModelViewSet):
 | ||
|     perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'}
 | ||
|     queryset = Question.objects.all()
 | ||
|     serializer_class = QuestionSerializer
 | ||
|     filterset_fields = ['level', 'type', 'year', 'questioncat']
 | ||
|     search_fields = ['name', 'options', 'resolution']
 | ||
|    
 | ||
|     def destroy(self, request, *args, **kwargs):
 | ||
|         """
 | ||
|         删除题目
 | ||
|         """
 | ||
|         id = kwargs.get('pk', None)
 | ||
|         # 删除前进行校验,如果存在考试中不允许删除。
 | ||
|         paperquestion = PaperQuestion.objects.filter(question_id=id).exists()
 | ||
|         if paperquestion:
 | ||
|             raise ParseError('此试题存在考试中不允许删除')
 | ||
|         Question.objects.filter(id=id).delete()
 | ||
|         return Response("删除成功")
 | ||
|         
 | ||
|     @action(methods=['post'], detail=False, url_name='enable_question', perms_map={'post': 'question'}, serializer_class=Serializer)
 | ||
|     def enable(self, request):
 | ||
|         """
 | ||
|         启用题目
 | ||
| 
 | ||
|         启用题目
 | ||
|         """
 | ||
|         ids = request.data.get('ids', None)
 | ||
|         if ids:
 | ||
|             Question.objects.filter(pk__in=ids).update(enabled=True)
 | ||
|             return Response(status=200)
 | ||
|     
 | ||
|     @action(methods=['post'], detail=False, perms_map={'*':'question_delete'})
 | ||
|     def deletes(self, request):
 | ||
|         """
 | ||
|         批量删除
 | ||
|         """
 | ||
|         ids = request.data.get('ids', [])
 | ||
|         if request.user.is_superuser:
 | ||
|             question_list= []
 | ||
|             # 删除前进行校验,如果存在考试中不允许删除。
 | ||
|             for u in ids:
 | ||
|                 paperquestion = PaperQuestion.objects.filter(question_id=u)
 | ||
|                 if paperquestion:
 | ||
|                     qobj = Question.objects.get(id=u)
 | ||
|                     question_list.append(qobj.name)
 | ||
|                 else:
 | ||
|                     Question.objects.filter(id=u).delete()
 | ||
|             if question_list:
 | ||
|                 raise ParseError(f'{question_list}-------存在考试中不允许删除')
 | ||
|             return Response(status=200)
 | ||
|         return Response({'msg':'权限不足'},status=401)
 | ||
|     
 | ||
|     @action(methods=['get'], detail=False, perms_map={'get':'export_question'})
 | ||
|     def export(self, request):
 | ||
|         """
 | ||
|         导出题目
 | ||
|         """
 | ||
|         queryset = self.filter_queryset(self.get_queryset())
 | ||
|         path = export_question(queryset)
 | ||
|         return Response({'path': path})
 | ||
| 
 | ||
|     @action(methods=['post'], detail=False,
 | ||
|             url_path='import', url_name='import_question', perms_map={'post': 'question'}, serializer_class=Serializer)
 | ||
|     def import_question(self, request):
 | ||
|         """
 | ||
|         导入题目
 | ||
| 
 | ||
|         导入题目
 | ||
|         """
 | ||
|         xlsxpath = request.data['path']
 | ||
|         fullpath = settings.BASE_DIR + xlsxpath
 | ||
|         wb = load_workbook(fullpath)
 | ||
|         sheet = wb.worksheets[0]
 | ||
|         qlist = ['A', 'B', 'C', 'D', 'E', 'F']
 | ||
|         leveldict = {'低': '低', '中': '中', '高': '高'}
 | ||
|         notinlist = []
 | ||
|         # 验证文件内容
 | ||
|         if sheet['a2'].value != '题目类型':
 | ||
|             return Response({"error": "类型列错误!"})
 | ||
|         if sheet['b2'].value != '分类':
 | ||
|             return Response({"error": "分类列错误!"})
 | ||
|         if sheet['c2'].value != '题目':
 | ||
|             return Response({"error": "题目列错误!"})
 | ||
|         questioncatdict = {}
 | ||
|         questioncats = Questioncat.objects.all()
 | ||
|         for i in questioncats:
 | ||
|             questioncatdict[i.name] = i.id
 | ||
|         i = 3
 | ||
|         while sheet['c'+str(i)].value:
 | ||
|             type = sheet['a'+str(i)].value.replace(' ', '')
 | ||
|             questioncat = sheet['b'+str(i)].value
 | ||
|             if questioncat:
 | ||
|                 questioncat = questioncat.replace(' ', '')
 | ||
|             else:
 | ||
|                 return Response(str(i)+'行没有分类', status=400)
 | ||
|             name = sheet['c'+str(i)].value
 | ||
| 
 | ||
|             answer = {}
 | ||
|             if sheet['d'+str(i)].value:
 | ||
|                 answer['A'] = sheet['d'+str(i)].value
 | ||
|             if sheet['e'+str(i)].value:
 | ||
|                 answer['B'] = sheet['e'+str(i)].value
 | ||
|             if sheet['f'+str(i)].value:
 | ||
|                 answer['C'] = sheet['f'+str(i)].value
 | ||
|             if sheet['g'+str(i)].value:
 | ||
|                 answer['D'] = sheet['g'+str(i)].value
 | ||
|             if sheet['h'+str(i)].value:
 | ||
|                 answer['E'] = sheet['h'+str(i)].value
 | ||
|             if sheet['i'+str(i)].value:
 | ||
|                 answer['F'] = sheet['i'+str(i)].value
 | ||
|             right = sheet['j'+str(i)].value
 | ||
|             if right:
 | ||
|                 right = right.replace(' ', '')
 | ||
|             else:
 | ||
|                 return Response(str(i)+'行没有答案', status=400)
 | ||
|             resolution = sheet['k'+str(i)].value
 | ||
|             level = sheet['l'+str(i)].value
 | ||
|             year = sheet['m' + str(i)].value
 | ||
|             if level:
 | ||
|                 level = level.replace(' ', '')
 | ||
|             cateobj = None
 | ||
|             if questioncat not in questioncatdict:
 | ||
|                 return Response(str(i)+"行不存在分类("+questioncat+")!请先新建", status=400)
 | ||
|             else:
 | ||
|                 cateobj = Questioncat.objects.get(
 | ||
|                     id=questioncatdict[questioncat])
 | ||
|             if type == '单选':
 | ||
|                 if Question.objects.filter(type='单选', name=name, year=year, options=answer, questioncat=cateobj).exists():
 | ||
|                     notinlist.append(i)
 | ||
|                 else:
 | ||
|                     if right in ['A', 'B', 'C', 'D', 'E', 'F']:
 | ||
|                         obj = Question()
 | ||
|                         obj.type = '单选'
 | ||
|                         if cateobj:
 | ||
|                             obj.questioncat = cateobj
 | ||
|                         obj.name = name
 | ||
|                         obj.options = answer
 | ||
|                         obj.right = right
 | ||
|                         obj.resolution = resolution if resolution else ''
 | ||
|                         obj.year = year if year else None
 | ||
|                         if level in leveldict:
 | ||
|                             obj.level = leveldict[level]
 | ||
|                         else:
 | ||
|                             obj.level = '低'
 | ||
|                         obj.save()
 | ||
|             elif type == '多选':
 | ||
|                 right = list(right.strip())
 | ||
|                 if Question.objects.filter(type='多选', name=name, year=year, options=answer, questioncat=cateobj).exists():
 | ||
|                     notinlist.append(i)
 | ||
|                 else:
 | ||
|                     if [False for c in right if c not in qlist]:
 | ||
|                         pass
 | ||
|                     else:
 | ||
|                         obj = Question()
 | ||
|                         obj.type = '多选'
 | ||
|                         obj.questioncat = cateobj
 | ||
|                         obj.name = name
 | ||
|                         obj.options = answer
 | ||
|                         obj.right = right
 | ||
|                         obj.resolution = resolution if resolution else ''
 | ||
|                         obj.year = year if year else None
 | ||
|                         if level in leveldict:
 | ||
|                             obj.level = leveldict[level]
 | ||
|                         else:
 | ||
|                             obj.level = '低'
 | ||
|                         obj.save()
 | ||
|             elif type == '判断':
 | ||
|                 if right == 'A' or right == '对' or right == '正确':
 | ||
|                     right = 'A'
 | ||
|                 else:
 | ||
|                     right = 'B'
 | ||
|                 if Question.objects.filter(type='判断', name=name, is_deleted=0,  options={'A': '对', 'B': '错'}, questioncat=cateobj).exists():
 | ||
|                     notinlist.append(i)
 | ||
|                 else:
 | ||
|                     obj = Question()
 | ||
|                     obj.type = '判断'
 | ||
|                     obj.questioncat = cateobj
 | ||
|                     obj.name = name
 | ||
|                     obj.options = {'A': '对', 'B': '错'}
 | ||
|                     obj.right = right
 | ||
|                     obj.resolution = resolution if resolution else ''
 | ||
|                     obj.year = year if year else None
 | ||
|                     if level in leveldict:
 | ||
|                         obj.level = leveldict[level]
 | ||
|                     else:
 | ||
|                         obj.level = '低'
 | ||
|                     obj.save()
 | ||
|             i = i + 1
 | ||
|         return Response(notinlist, status=200)
 | ||
| 
 | ||
| 
 | ||
| class PaperViewSet(ModelViewSet):
 | ||
|     """
 | ||
|     试卷增删改查
 | ||
|     """
 | ||
|     perms_map = {'get': '*', 'post':'paper', 'put':'paper', 'delete':'paper'}
 | ||
|     queryset = Paper.objects.all()
 | ||
|     serializer_class = PaperSerializer
 | ||
|     ordering = ['id']
 | ||
|     search_fields = ('name',)
 | ||
| 
 | ||
|     def get_serializer_class(self):
 | ||
|         if self.action in ['retrieve']:
 | ||
|             return PaperDetailSerializer
 | ||
|         elif self.action in ['create', 'update']:
 | ||
|             return PaperCreateUpdateSerializer
 | ||
|         return super().get_serializer_class()
 | ||
|     
 | ||
|     def create(self, request, *args, **kwargs):
 | ||
|         sr = PaperCreateUpdateSerializer(data=request.data)
 | ||
|         sr.is_valid(raise_exception=True)
 | ||
|         vdata = sr.validated_data
 | ||
|         vdata['create_by'] = request.user
 | ||
|         questions_ = vdata.pop('questions_')
 | ||
|         category = vdata.pop('category')
 | ||
|         paper = Paper.objects.create(**vdata)
 | ||
|         for c in category:
 | ||
|             paper.category.add(c.id)
 | ||
|         q_list = []
 | ||
|         for i in questions_:
 | ||
|             q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper))
 | ||
|         PaperQuestion.objects.bulk_create(q_list)
 | ||
|         return Response(status=201)
 | ||
| 
 | ||
|     def update(self, request, *args, **kwargs):
 | ||
|         # 有考试在执行,不可更新
 | ||
|         now = timezone.now()
 | ||
|         paper = self.get_object()
 | ||
|         if Exam.objects.filter(close_time__gte=now, paper=paper).exists():
 | ||
|             raise ParseError('存在考试,不可编辑')
 | ||
|         sr = PaperCreateUpdateSerializer(instance=paper, data=request.data)
 | ||
|         sr.is_valid(raise_exception=True)
 | ||
|         vdata = sr.validated_data
 | ||
|         questions_ = vdata.pop('questions_')
 | ||
|         vdata['update_by'] = request.user
 | ||
|         Paper.objects.filter(id=paper.id).update(**vdata)
 | ||
|         q_list = []
 | ||
|         for i in questions_:
 | ||
|             q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper))
 | ||
|         PaperQuestion.objects.filter(paper=paper).delete()
 | ||
|         PaperQuestion.objects.bulk_create(q_list)
 | ||
|         return Response()
 | ||
|     
 | ||
| 
 | ||
|     @action(methods=['put'], detail=True, url_path='clone', url_name='clone_paper', perms_map={'put':'clone_paper'})
 | ||
|     def clone(self, request, pk=None):
 | ||
|         '''
 | ||
|         克隆试卷
 | ||
|         '''
 | ||
|         paper = self.get_object()
 | ||
|         obj = Paper()
 | ||
|         obj.name = '克隆卷-'+paper.name
 | ||
|         obj.workscope = paper.workscope
 | ||
|         obj.limit = paper.limit
 | ||
|         obj.total_score = paper.total_score
 | ||
|         obj.pass_score = paper.pass_score
 | ||
|         obj.danxuan_count = paper.danxuan_count
 | ||
|         obj.danxuan_score = paper.danxuan_score
 | ||
|         obj.duoxuan_count = paper.duoxuan_count
 | ||
|         obj.duoxuan_score = paper.duoxuan_score
 | ||
|         obj.panduan_count = paper.panduan_count
 | ||
|         obj.panduan_score = paper.panduan_score
 | ||
|         obj.save()
 | ||
|         for i in PaperQuestion.objects.filter(paper=paper):
 | ||
|             o = PaperQuestion()
 | ||
|             o.paper = obj
 | ||
|             o.question = i.question
 | ||
|             o.total_score = i.total_score
 | ||
|             o.save()
 | ||
|         return Response(status=200)
 | ||
| 
 | ||
|     @action(methods=['post'], detail=False, perms_map={'post': 'question'}, serializer_class=Serializer)
 | ||
|     def upload_paper(self, request):
 | ||
|         doc_path = request.data.get('doc_path')
 | ||
|         question_type = request.data.get('question_type')
 | ||
|         excel_path = settings.BASE_DIR + "/media/default/question.xlsx"
 | ||
|         doc_path = settings.BASE_DIR + doc_path
 | ||
|         timenow = timezone.now().strftime('%Y%m%d%H%M%S')
 | ||
|         question_excel_name = "question_excel_"+timenow
 | ||
|         question_excel = os.path.join(os.path.dirname(excel_path), question_excel_name)
 | ||
|         if not os.path.exists(question_excel):
 | ||
|             os.makedirs(question_excel)
 | ||
|         shutil.copy(excel_path, question_excel)
 | ||
|         excel_save_path = os.path.join(question_excel, os.path.basename(EXCEL_PATH))
 | ||
|         # 解析word文档到excel
 | ||
|         res = interpret_text(3, excel_save_path, doc_path, question_type)
 | ||
|         ids = []
 | ||
|         if res:
 | ||
|             # 将excel 入库并返回 queston_id
 | ||
|             wb = load_workbook(excel_save_path)
 | ||
|             sheet = wb.worksheets[0]
 | ||
|             leveldict = {'低': '低', '中': '中', '高': '高'}       
 | ||
|             # 验证文件内容
 | ||
|             if sheet['a2'].value != '题目类型':
 | ||
|                 return Response({"error": "类型列错误!"})
 | ||
|             if sheet['b2'].value != '分类':
 | ||
|                 return Response({"error": "分类列错误!"})
 | ||
|             if sheet['c2'].value != '题目':
 | ||
|                 return Response({"error": "题目列错误!"})
 | ||
|             questioncatdict = {}
 | ||
|             questioncats = Questioncat.objects.all()
 | ||
|             for i in questioncats:
 | ||
|                 questioncatdict[i.name] = i.id
 | ||
|             i = 3
 | ||
|             while sheet['c'+str(i)].value:
 | ||
|                 type = sheet['a'+str(i)].value.replace(' ', '')
 | ||
|                 questioncat = sheet['b'+str(i)].value
 | ||
|                 if questioncat:
 | ||
|                     questioncat = questioncat.replace(' ', '')
 | ||
|                 else:
 | ||
|                     return Response(str(i)+'行没有分类', status=400)
 | ||
|                 name = sheet['c'+str(i)].value
 | ||
|                 answer = {}
 | ||
|                 if sheet['d'+str(i)].value:
 | ||
|                     answer['A'] = sheet['d'+str(i)].value
 | ||
|                 if sheet['e'+str(i)].value:
 | ||
|                     answer['B'] = sheet['e'+str(i)].value
 | ||
|                 if sheet['f'+str(i)].value:
 | ||
|                     answer['C'] = sheet['f'+str(i)].value
 | ||
|                 if sheet['g'+str(i)].value:
 | ||
|                     answer['D'] = sheet['g'+str(i)].value
 | ||
|                 if sheet['h'+str(i)].value:
 | ||
|                     answer['E'] = sheet['h'+str(i)].value
 | ||
|                 if sheet['i'+str(i)].value:
 | ||
|                     answer['F'] = sheet['i'+str(i)].value
 | ||
|                 right = sheet['j'+str(i)].value
 | ||
|                 if right:
 | ||
|                     right = right.replace(' ', '')
 | ||
|                 else:
 | ||
|                     return Response(str(i)+'行没有答案', status=400)
 | ||
|                 resolution = sheet['k'+str(i)].value
 | ||
|                 level = sheet['l'+str(i)].value
 | ||
|                 year = sheet['m' + str(i)].value
 | ||
|                 if level:
 | ||
|                     level = level.replace(' ', '')
 | ||
|                 cateobj = None
 | ||
|                 if questioncat not in questioncatdict:
 | ||
|                     return Response(str(i)+"行不存在分类("+questioncat+")!请先新建", status=400)
 | ||
|                 else:
 | ||
|                     cateobj = Questioncat.objects.get(
 | ||
|                         id=questioncatdict[questioncat])
 | ||
|                 if Question.objects.filter(name=name, is_deleted=0, questioncat=cateobj).exists():
 | ||
|                     ids.append(Question.objects.get(name=name, is_deleted=0, questioncat=cateobj).id)
 | ||
|                     i = i + 1
 | ||
|                     continue
 | ||
|                 if type == '单选':
 | ||
|                     if right in ['A', 'B', 'C', 'D', 'E', 'F']:
 | ||
|                         obj = Question()
 | ||
|                         obj.type = '单选'
 | ||
|                         if cateobj:
 | ||
|                             obj.questioncat = cateobj
 | ||
|                         obj.name = name
 | ||
|                         obj.options = answer
 | ||
|                         obj.right = right
 | ||
|                         obj.create_by = request.user
 | ||
|                         obj.resolution = resolution if resolution else ''
 | ||
|                         obj.year = year if year else None
 | ||
|                         if level in leveldict:
 | ||
|                             obj.level = leveldict[level]
 | ||
|                         else:
 | ||
|                             obj.level = '低'
 | ||
|                         obj.save()
 | ||
|                         ids.append(obj.id)
 | ||
|                 elif type == '多选':
 | ||
|                     obj = Question()
 | ||
|                     rights = list(right.strip())
 | ||
|                     right = [x for x in rights if x != ',']
 | ||
|                     obj.type = '多选'
 | ||
|                     obj.questioncat = cateobj
 | ||
|                     obj.name = name
 | ||
|                     obj.options = answer
 | ||
|                     obj.right = right
 | ||
|                     obj.create_by = request.user
 | ||
|                     obj.resolution = resolution if resolution else ''
 | ||
|                     obj.year = year if year else None
 | ||
|                     if level in leveldict:
 | ||
|                         obj.level = leveldict[level]
 | ||
|                     else:
 | ||
|                         obj.level = '低'
 | ||
|                     obj.save()
 | ||
|                     ids.append(obj.id)
 | ||
|                 elif type == '判断':
 | ||
|                     if right == 'A' or right == '对' or right == '正确':
 | ||
|                         right = 'A'
 | ||
|                     else:
 | ||
|                         right = 'B'
 | ||
|                     obj = Question()
 | ||
|                     obj.type = '判断'
 | ||
|                     obj.questioncat = cateobj
 | ||
|                     obj.name = name
 | ||
|                     obj.options = {'A': '对', 'B': '错'}
 | ||
|                     obj.right = right
 | ||
|                     obj.create_by = request.user
 | ||
|                     obj.resolution = resolution if resolution else ''
 | ||
|                     obj.year = year if year else None
 | ||
|                     if level in leveldict:
 | ||
|                         obj.level = leveldict[level]
 | ||
|                     else:
 | ||
|                         obj.level = '低'
 | ||
|                     obj.save()
 | ||
|                     ids.append(obj.id)
 | ||
|                 i = i + 1
 | ||
|         else:
 | ||
|             raise ParseError('excel解析失败')
 | ||
|         if ids:
 | ||
|             questions = Question.objects.filter(pk__in=ids)
 | ||
|             Serializer = QuestionSerializer(questions, many=True)
 | ||
|             Serializer.data
 | ||
|             return Response(Serializer.data, status=200)
 | ||
| 
 | ||
| 
 | ||
| class ExamViewSet(CreateUpdateCustomMixin, ModelViewSet):
 | ||
|     perms_map = {'get': '*', 'post':'exam', 'put':'exam', 'delete':'exam'}
 | ||
|     queryset = Exam.objects.all().select_related('paper', 'create_by')
 | ||
|     ordering = ['-id']
 | ||
|     search_fields = ('name',)
 | ||
|     serializer_class = ExamListSerializer
 | ||
|     filterset_class = ExamFilter
 | ||
| 
 | ||
|     def get_serializer_class(self):
 | ||
|         if self.action in ['create', 'update']:
 | ||
|             return ExamCreateUpdateSerializer
 | ||
|         elif self.action in ['retrieve']:
 | ||
|             return ExamDetailSerializer
 | ||
|         return super().get_serializer_class()
 | ||
|     
 | ||
|     def get_queryset(self):
 | ||
|         qs = super().get_queryset()
 | ||
|         user = self.request.user
 | ||
|         if has_permission("exam", user):
 | ||
|             return qs
 | ||
|         return qs.filter(participant_dep=user.dept)
 | ||
| 
 | ||
| 
 | ||
|     def destroy(self, request, *args, **kwargs):
 | ||
|         instance = self.get_object()
 | ||
|         if ExamRecord.objects.filter(exam=instance).exists():
 | ||
|             raise ParseError('存在考试记录,禁止删除')
 | ||
|         instance.delete(soft=False)
 | ||
|         return Response(status=204)
 | ||
| 
 | ||
|     @action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=Serializer, permission_classes = [IsAuthenticated])
 | ||
|     @transaction.atomic
 | ||
|     def start(self, request, *args, **kwargs):
 | ||
|         """
 | ||
|         开始考试
 | ||
| 
 | ||
|         开始考试具体题目信息
 | ||
|         """
 | ||
|         # exam = self.get_object()
 | ||
|         exam = Exam.objects.get(id=kwargs['pk'])
 | ||
|         # 查询本次考试对应哪些人
 | ||
|         participants = exam.participant_user.all()
 | ||
|         participants_ids = [i.id for i in participants]
 | ||
|         dep = exam.participant_dep.all()
 | ||
|         dep_ids = [i.id for i in dep]
 | ||
|         if request.user.id in participants_ids or request.user.dept.id in dep_ids:
 | ||
|             pass
 | ||
|         else:
 | ||
|             raise ParseError('不在考试人员范围内')
 | ||
|         now = timezone.now()
 | ||
|         if now < exam.open_time or now > exam.close_time:
 | ||
|             raise ParseError('不在考试时间范围')
 | ||
|         tests = ExamRecord.objects.filter(
 | ||
|                 exam=exam, create_by=request.user)
 | ||
|         chance_used = tests.count()
 | ||
|         if chance_used >= exam.chance:
 | ||
|             raise ParseError('考试机会已用完')
 | ||
|         if exam.paper:
 | ||
|             details = []
 | ||
|             er = ExamRecord()
 | ||
|             er.type = '正式考试'
 | ||
|             er.name = '正式考试' + datetime.now().strftime('%Y%m%d%H%M')
 | ||
|             er.limit = exam.paper.limit
 | ||
|             er.paper = exam.paper
 | ||
|             er.total_score = exam.paper.total_score
 | ||
|             er.start_time = now
 | ||
|             er.is_pass = False
 | ||
|             er.exam = exam
 | ||
|             er.create_by = request.user
 | ||
|             er.save()
 | ||
|             ret = {}
 | ||
|             ret['examrecord'] = er.id
 | ||
|             if exam.paper.paper_types == '押题':
 | ||
|                 pqs = PaperQuestion.objects.filter(paper=exam.paper).order_by('id')
 | ||
|                 for i in pqs:
 | ||
|                     details.append(AnswerDetail(examrecord=er, question=i.question, total_score=i.total_score))
 | ||
|             else:
 | ||
|                 # 查询此试卷有哪些分类
 | ||
|                 paper_obj = exam.paper.category.all()
 | ||
|                 cqs = [i.id for i in paper_obj]
 | ||
|                 # 查询随机组卷的单选、多选、判断各多少题和不同分类
 | ||
|                 single_qs = Question.objects.filter(Q(type="单选") & Q(questioncat__in=cqs)).order_by('?')[:exam.paper.danxuan_count]
 | ||
|                 multiple_qs = Question.objects.filter(Q(type="多选") & Q(questioncat__in=cqs)).order_by('?')[:exam.paper.duoxuan_count]
 | ||
|                 judge_qs = Question.objects.filter(Q(type="判断") & Q(questioncat__in=cqs)).order_by('?')[:exam.paper.panduan_count]
 | ||
|                 pqs = single_qs | multiple_qs | judge_qs
 | ||
|                 for i in pqs:
 | ||
|                     if i.type == '单选':
 | ||
|                         total_score = exam.paper.danxuan_score
 | ||
|                     elif i.type == '多选':
 | ||
|                         total_score = exam.paper.duoxuan_score
 | ||
|                     else:
 | ||
|                         total_score = exam.paper.panduan_score
 | ||
|                     details.append(AnswerDetail(examrecord=er, question=i, total_score=total_score))
 | ||
|             AnswerDetail.objects.bulk_create(details)
 | ||
|             ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id')
 | ||
|             ret['questions_'] = AnswerDetailOutSerializer(instance=ads, many=True).data
 | ||
|             return Response(ret)
 | ||
|         raise ParseError('暂不支持')
 | ||
| 
 | ||
| 
 | ||
| class ExamRecordViewSet(ListModelMixin, DestroyModelMixin, RetrieveModelMixin, GenericViewSet):
 | ||
|     """
 | ||
|     考试记录列表和详情
 | ||
|     """
 | ||
|     perms_map = {'get': '*', 'post': '*', 'delete':'examrecord'}
 | ||
|     queryset = ExamRecord.objects.select_related('create_by', 'cert_er')
 | ||
|     serializer_class = ExamRecordListSerializer
 | ||
|     ordering_fields = ['create_time', 'score', 'took', 'update_time', 'belong_dept']
 | ||
|     ordering = ['-update_time']
 | ||
|     search_fields = ('create_by__name', 'create_by__username', 'exam__name', 'belong_dept__name')
 | ||
|     filterset_class = ExamRecordFilter
 | ||
| 
 | ||
|     def get_queryset(self):
 | ||
|         qs = super().get_queryset()
 | ||
|         # if self.request.method == 'GET':
 | ||
|         #     return qs
 | ||
|         # else:
 | ||
|         #     return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept))
 | ||
|         if has_permission('ctc_manager', self.request.user):
 | ||
|             return qs
 | ||
|         # 如果是部门管理员,只能看到自己部门下的考试记录
 | ||
|         elif has_permission('exam_manager', self.request.user):
 | ||
|             return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept))
 | ||
|         # 如果是普通员工,只能看到自己考试记录
 | ||
|         else:
 | ||
|             return qs.filter(create_by=self.request.user)
 | ||
| 
 | ||
|     def get_serializer_class(self):
 | ||
|         if self.action == 'retrieve':
 | ||
|             now = timezone.now()
 | ||
|             if now > self.get_object().exam.close_time:
 | ||
|                 return ExamRecordDetailSerializer
 | ||
|         return super().get_serializer_class()
 | ||
| 
 | ||
|     def perform_destroy(self, instance):  # 考试记录物理删除
 | ||
|         instance.delete()
 | ||
| 
 | ||
|     @action(methods=['post'], detail=False, perms_map={'post': '*'}, serializer_class=Serializer, permission_classes = [IsAuthenticated])
 | ||
|     def clear(self, request, pk=None):
 | ||
|         """
 | ||
|         清除七日前未提交的考试记录
 | ||
|         """
 | ||
|         now = timezone.now
 | ||
|         days7_ago = now - timedelta(days=7)
 | ||
|         ExamRecord.objects.filter(create_time__lte=days7_ago, is_submited=False).delete(soft=False)
 | ||
|         return Response(status=False)
 | ||
| 
 | ||
| 
 | ||
|     @action(methods=['get'], detail=False,
 | ||
|             url_path='export', url_name='export_record', perms_map={'*': 'export_ansrecord'}, serializer_class=Serializer)
 | ||
|     def export(self, request):
 | ||
|         """
 | ||
|         导出答题记录
 | ||
|         """
 | ||
|         queryset = self.filter_queryset(self.get_queryset())
 | ||
|         path = export_record(queryset)
 | ||
|         return Response({'path': path})
 | ||
|     
 | ||
| 
 | ||
|     @action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=ExamRecordSubmitSerializer, permission_classes = [IsAuthenticated])
 | ||
|     @transaction.atomic
 | ||
|     def submit(self, request, pk=None):
 | ||
|         '''
 | ||
|         提交答卷
 | ||
| 
 | ||
|         提交答卷
 | ||
|         '''
 | ||
|         try:
 | ||
|             er = ExamRecord.objects.get(id=pk)
 | ||
|         except Exception as e:
 | ||
|             raise ParseError(e)
 | ||
|         now = timezone.now()
 | ||
|         if er.create_by != request.user:
 | ||
|             raise ParseError('提交人有误')
 | ||
|         exam = er.exam
 | ||
|         paper = er.paper
 | ||
|         if not exam:
 | ||
|             raise ParseError('暂不支持')
 | ||
|         if now > exam.close_time + timedelta(minutes=30):
 | ||
|             raise ParseError('考试时间已过, 提交失败')
 | ||
|         serializer = ExamRecordSubmitSerializer(data = request.data)
 | ||
|         serializer.is_valid(raise_exception=True)
 | ||
|         vdata = serializer.validated_data
 | ||
|         questions_ = vdata['questions_']
 | ||
|         # 后端判卷
 | ||
|         ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id')
 | ||
|         total_score = 0
 | ||
|         try:
 | ||
|             for index, ad in enumerate(ads):
 | ||
|                 ad.user_answer = questions_[index]['user_answer']
 | ||
|                 if ad.question.type == '多选':
 | ||
|                     if set(ad.question.right) == set(ad.user_answer):
 | ||
|                         ad.is_right = True
 | ||
|                         ad.score = ad.total_score
 | ||
|                 else:
 | ||
|                     if ad.question.right == ad.user_answer:
 | ||
|                         ad.is_right = True
 | ||
|                         ad.score = ad.total_score
 | ||
|                 ad.save()
 | ||
|                 total_score = total_score + ad.score
 | ||
|         except Exception as e:
 | ||
|             raise ParseError('判卷失败, 请检查试卷:' + str(e))
 | ||
|         er.score = total_score
 | ||
|         # if er.score > 0.6*er.total_score:
 | ||
|         if er.score >= paper.pass_score:
 | ||
|             er.is_pass = True
 | ||
|             # 如果是自动发证
 | ||
|             if exam.certificate:
 | ||
|                 now_data = datetime.now()
 | ||
|                 course = exam.course_name.all()
 | ||
|                 courese_ids = [i.id for i in course]
 | ||
|                 current_date =  now_data.strftime('%Y-%m-%d')
 | ||
|                 cer_number = now_data.strftime('%Y%m%d')
 | ||
|                 # 查询证明编号创建时间为最后一个
 | ||
|                 cer = Certificate.objects.latest('证书编号')
 | ||
|                 if cer:
 | ||
|                     cer_number = int(cer.证书编号[5:]) + 1
 | ||
|                     cer_number = 'CTCZL' + str(cer_number)
 | ||
|                 data_dict = {
 | ||
|                     '姓名': request.user.name,
 | ||
|                     '用户ID': request.user.id,
 | ||
|                     '证书编号': cer_number,
 | ||
|                     '证书方案': '202312',
 | ||
|                     '单位名称': request.user.dept.name,
 | ||
|                     '所属单位': '国检测试控股集团'+request.user.dept.name,
 | ||
|                     '发证日期': current_date,
 | ||
|                     '培训日期':current_date,
 | ||
|                     '培训结束日期': current_date,
 | ||
|                     '课程列表': courese_ids,
 | ||
|                     'examrecord': er.id,
 | ||
|                 }
 | ||
|                 serializer = CertificateSerializer(data=data_dict)
 | ||
|                 serializer.is_valid(raise_exception=True)
 | ||
|                 serializer.save()
 | ||
|                
 | ||
|         er.took = (now - er.create_time).total_seconds()
 | ||
|         er.end_time = now
 | ||
|         er.belong_dept=request.user.dept
 | ||
|         er.is_submited = True
 | ||
|         er.save()
 | ||
|         return Response(ExamRecordListSerializer(instance=er).data) |