741 lines
32 KiB
Python
741 lines
32 KiB
Python
from django.shortcuts import render
|
||
from rest_framework.viewsets import ModelViewSet, GenericViewSet
|
||
from rest_framework.mixins import ListModelMixin, DestroyModelMixin, RetrieveModelMixin
|
||
from apps.exam.exports import export_question, export_record
|
||
from apps.exam.models import Question, Questioncat, PaperQuestion
|
||
from apps.exam.serializers import (QuestionSerializer, QuestioncatSerializer, PaperSerializer, ExamDetailSerializer, ExamRecordDetailSerializer, ExamListSerializer,
|
||
ExamCreateUpdateSerializer, ExamListSerializer, ExamRecordSubmitSerializer, PaperDetailSerializer, PaperCreateUpdateSerializer, AnswerDetailOutSerializer, ExamRecordListSerializer)
|
||
from rest_framework.decorators import action
|
||
from rest_framework.response import Response
|
||
from rest_framework.permissions import IsAuthenticated
|
||
from rest_framework.exceptions import ParseError
|
||
from openpyxl import Workbook, load_workbook
|
||
from django.conf import settings
|
||
from apps.exam.models import Paper, Exam, ExamRecord, AnswerDetail, PaperQuestion
|
||
from django.utils import timezone
|
||
from django.db import transaction
|
||
from rest_framework.serializers import Serializer
|
||
from datetime import datetime
|
||
from apps.exam.filters import ExamRecordFilter, ExamFilter
|
||
from datetime import timedelta
|
||
from apps.system.mixins import CreateUpdateCustomMixin
|
||
from apps.edu.serializers import CertificateSerializer
|
||
from apps.edu.models import Certificate
|
||
from utils.queryset import get_child_queryset2
|
||
from apps.system.permission import has_permission
|
||
from apps.exam.parse_word import interpret_text
|
||
import os
|
||
import shutil
|
||
from django.db.models import Q
|
||
from django.core.cache import cache
|
||
import uuid
|
||
# Create your views here.
|
||
|
||
EXCEL_PATH = os.path.join(settings.BASE_DIR, "media/default/question.xlsx")
|
||
|
||
def enctry(s):
|
||
k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
|
||
encry_str = ""
|
||
for i, j in zip(s, k):
|
||
# i为字符,j为秘钥字符
|
||
temp = str(ord(i)+ord(j))+'_' # 加密字符 = 字符的Unicode码 + 秘钥的Unicode码
|
||
encry_str = encry_str + temp
|
||
return encry_str
|
||
|
||
|
||
# 解密
|
||
def dectry(p):
|
||
k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
|
||
dec_str = ""
|
||
for i, j in zip(p.split("_")[:-1], k):
|
||
# i 为加密字符,j为秘钥字符
|
||
# 解密字符 = (加密Unicode码字符 - 秘钥字符的Unicode码)的单字节字符
|
||
temp = chr(int(i) - ord(j))
|
||
dec_str = dec_str+temp
|
||
return dec_str
|
||
|
||
|
||
class QuestioncatViewSet(CreateUpdateCustomMixin, ModelViewSet):
|
||
perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'}
|
||
queryset = Questioncat.objects.all()
|
||
serializer_class = QuestioncatSerializer
|
||
filterset_fields = ['parent']
|
||
search_fields = ['name']
|
||
|
||
|
||
class QuestionViewSet(CreateUpdateCustomMixin, ModelViewSet):
|
||
perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'}
|
||
queryset = Question.objects.all()
|
||
serializer_class = QuestionSerializer
|
||
filterset_fields = ['level', 'type', 'year', 'questioncat']
|
||
search_fields = ['name', 'options', 'resolution']
|
||
|
||
def destroy(self, request, *args, **kwargs):
|
||
"""
|
||
删除题目
|
||
"""
|
||
id = kwargs.get('pk', None)
|
||
# 删除前进行校验,如果存在考试中不允许删除。
|
||
paperquestion = PaperQuestion.objects.filter(question_id=id).exists()
|
||
if paperquestion:
|
||
raise ParseError('此试题存在考试中不允许删除')
|
||
Question.objects.filter(id=id).delete()
|
||
return Response("删除成功")
|
||
|
||
@action(methods=['post'], detail=False, url_name='enable_question', perms_map={'post': 'question'}, serializer_class=Serializer)
|
||
def enable(self, request):
|
||
"""
|
||
启用题目
|
||
|
||
启用题目
|
||
"""
|
||
ids = request.data.get('ids', None)
|
||
if ids:
|
||
Question.objects.filter(pk__in=ids).update(enabled=True)
|
||
return Response(status=200)
|
||
|
||
@action(methods=['post'], detail=False, perms_map={'*':'question_delete'})
|
||
def deletes(self, request):
|
||
"""
|
||
批量删除
|
||
"""
|
||
ids = request.data.get('ids', [])
|
||
if request.user.is_superuser:
|
||
question_list= []
|
||
# 删除前进行校验,如果存在考试中不允许删除。
|
||
for u in ids:
|
||
paperquestion = PaperQuestion.objects.filter(question_id=u)
|
||
if paperquestion:
|
||
qobj = Question.objects.get(id=u)
|
||
question_list.append(qobj.name)
|
||
else:
|
||
Question.objects.filter(id=u).delete()
|
||
if question_list:
|
||
raise ParseError(f'{question_list}-------存在考试中不允许删除')
|
||
return Response(status=200)
|
||
return Response({'msg':'权限不足'},status=401)
|
||
|
||
@action(methods=['get'], detail=False, perms_map={'get':'export_question'})
|
||
def export(self, request):
|
||
"""
|
||
导出题目
|
||
"""
|
||
queryset = self.filter_queryset(self.get_queryset())
|
||
path = export_question(queryset)
|
||
return Response({'path': path})
|
||
|
||
@action(methods=['post'], detail=False,
|
||
url_path='import', url_name='import_question', perms_map={'post': 'question'}, serializer_class=Serializer)
|
||
def import_question(self, request):
|
||
"""
|
||
导入题目
|
||
|
||
导入题目
|
||
"""
|
||
xlsxpath = request.data['path']
|
||
fullpath = settings.BASE_DIR + xlsxpath
|
||
wb = load_workbook(fullpath)
|
||
sheet = wb.worksheets[0]
|
||
qlist = ['A', 'B', 'C', 'D', 'E', 'F']
|
||
leveldict = {'低': '低', '中': '中', '高': '高'}
|
||
notinlist = []
|
||
# 验证文件内容
|
||
if sheet['a2'].value != '题目类型':
|
||
return Response({"error": "类型列错误!"})
|
||
if sheet['b2'].value != '分类':
|
||
return Response({"error": "分类列错误!"})
|
||
if sheet['c2'].value != '题目':
|
||
return Response({"error": "题目列错误!"})
|
||
questioncatdict = {}
|
||
questioncats = Questioncat.objects.all()
|
||
for i in questioncats:
|
||
questioncatdict[i.name] = i.id
|
||
i = 3
|
||
while sheet['c'+str(i)].value:
|
||
type = sheet['a'+str(i)].value.replace(' ', '')
|
||
questioncat = sheet['b'+str(i)].value
|
||
if questioncat:
|
||
questioncat = questioncat.replace(' ', '')
|
||
else:
|
||
return Response(str(i)+'行没有分类', status=400)
|
||
name = sheet['c'+str(i)].value
|
||
|
||
answer = {}
|
||
if sheet['d'+str(i)].value:
|
||
answer['A'] = sheet['d'+str(i)].value
|
||
if sheet['e'+str(i)].value:
|
||
answer['B'] = sheet['e'+str(i)].value
|
||
if sheet['f'+str(i)].value:
|
||
answer['C'] = sheet['f'+str(i)].value
|
||
if sheet['g'+str(i)].value:
|
||
answer['D'] = sheet['g'+str(i)].value
|
||
if sheet['h'+str(i)].value:
|
||
answer['E'] = sheet['h'+str(i)].value
|
||
if sheet['i'+str(i)].value:
|
||
answer['F'] = sheet['i'+str(i)].value
|
||
right = sheet['j'+str(i)].value
|
||
if right:
|
||
right = right.replace(' ', '')
|
||
else:
|
||
return Response(str(i)+'行没有答案', status=400)
|
||
resolution = sheet['k'+str(i)].value
|
||
level = sheet['l'+str(i)].value
|
||
year = sheet['m' + str(i)].value
|
||
if level:
|
||
level = level.replace(' ', '')
|
||
cateobj = None
|
||
if questioncat not in questioncatdict:
|
||
return Response(str(i)+"行不存在分类("+questioncat+")!请先新建", status=400)
|
||
else:
|
||
cateobj = Questioncat.objects.get(
|
||
id=questioncatdict[questioncat])
|
||
if type == '单选':
|
||
if Question.objects.filter(type='单选', name=name, year=year, options=answer, questioncat=cateobj).exists():
|
||
notinlist.append(i)
|
||
else:
|
||
if right in ['A', 'B', 'C', 'D', 'E', 'F']:
|
||
obj = Question()
|
||
obj.type = '单选'
|
||
if cateobj:
|
||
obj.questioncat = cateobj
|
||
obj.name = name
|
||
obj.options = answer
|
||
obj.right = right
|
||
obj.resolution = resolution if resolution else ''
|
||
obj.year = year if year else None
|
||
if level in leveldict:
|
||
obj.level = leveldict[level]
|
||
else:
|
||
obj.level = '低'
|
||
obj.save()
|
||
elif type == '多选':
|
||
right = list(right.strip())
|
||
if Question.objects.filter(type='多选', name=name, year=year, options=answer, questioncat=cateobj).exists():
|
||
notinlist.append(i)
|
||
else:
|
||
if [False for c in right if c not in qlist]:
|
||
pass
|
||
else:
|
||
obj = Question()
|
||
obj.type = '多选'
|
||
obj.questioncat = cateobj
|
||
obj.name = name
|
||
obj.options = answer
|
||
obj.right = right
|
||
obj.resolution = resolution if resolution else ''
|
||
obj.year = year if year else None
|
||
if level in leveldict:
|
||
obj.level = leveldict[level]
|
||
else:
|
||
obj.level = '低'
|
||
obj.save()
|
||
elif type == '判断':
|
||
if right == 'A' or right == '对' or right == '正确':
|
||
right = 'A'
|
||
else:
|
||
right = 'B'
|
||
if Question.objects.filter(type='判断', name=name, is_deleted=0, options={'A': '对', 'B': '错'}, questioncat=cateobj).exists():
|
||
notinlist.append(i)
|
||
else:
|
||
obj = Question()
|
||
obj.type = '判断'
|
||
obj.questioncat = cateobj
|
||
obj.name = name
|
||
obj.options = {'A': '对', 'B': '错'}
|
||
obj.right = right
|
||
obj.resolution = resolution if resolution else ''
|
||
obj.year = year if year else None
|
||
if level in leveldict:
|
||
obj.level = leveldict[level]
|
||
else:
|
||
obj.level = '低'
|
||
obj.save()
|
||
i = i + 1
|
||
return Response(notinlist, status=200)
|
||
|
||
|
||
class PaperViewSet(ModelViewSet):
|
||
"""
|
||
试卷增删改查
|
||
"""
|
||
perms_map = {'get': '*', 'post':'paper', 'put':'paper', 'delete':'paper'}
|
||
queryset = Paper.objects.all()
|
||
serializer_class = PaperSerializer
|
||
ordering = ['id']
|
||
search_fields = ('name',)
|
||
|
||
def get_serializer_class(self):
|
||
if self.action in ['retrieve']:
|
||
return PaperDetailSerializer
|
||
elif self.action in ['create', 'update']:
|
||
return PaperCreateUpdateSerializer
|
||
return super().get_serializer_class()
|
||
|
||
def create(self, request, *args, **kwargs):
|
||
sr = PaperCreateUpdateSerializer(data=request.data)
|
||
sr.is_valid(raise_exception=True)
|
||
vdata = sr.validated_data
|
||
vdata['create_by'] = request.user
|
||
questions_ = vdata.pop('questions_')
|
||
category = vdata.pop('category')
|
||
paper = Paper.objects.create(**vdata)
|
||
for c in category:
|
||
paper.category.add(c.id)
|
||
q_list = []
|
||
for i in questions_:
|
||
q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper))
|
||
PaperQuestion.objects.bulk_create(q_list)
|
||
return Response(status=201)
|
||
|
||
def update(self, request, *args, **kwargs):
|
||
# 有考试在执行,不可更新
|
||
now = timezone.now()
|
||
paper = self.get_object()
|
||
if Exam.objects.filter(close_time__gte=now, paper=paper).exists():
|
||
raise ParseError('存在考试,不可编辑')
|
||
sr = PaperCreateUpdateSerializer(instance=paper, data=request.data)
|
||
sr.is_valid(raise_exception=True)
|
||
vdata = sr.validated_data
|
||
questions_ = vdata.pop('questions_')
|
||
vdata['update_by'] = request.user
|
||
Paper.objects.filter(id=paper.id).update(**vdata)
|
||
q_list = []
|
||
for i in questions_:
|
||
q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper))
|
||
PaperQuestion.objects.filter(paper=paper).delete()
|
||
PaperQuestion.objects.bulk_create(q_list)
|
||
return Response()
|
||
|
||
|
||
@action(methods=['put'], detail=True, url_path='clone', url_name='clone_paper', perms_map={'put':'clone_paper'})
|
||
def clone(self, request, pk=None):
|
||
'''
|
||
克隆试卷
|
||
'''
|
||
paper = self.get_object()
|
||
obj = Paper()
|
||
obj.name = '克隆卷-'+paper.name
|
||
obj.workscope = paper.workscope
|
||
obj.limit = paper.limit
|
||
obj.total_score = paper.total_score
|
||
obj.pass_score = paper.pass_score
|
||
obj.danxuan_count = paper.danxuan_count
|
||
obj.danxuan_score = paper.danxuan_score
|
||
obj.duoxuan_count = paper.duoxuan_count
|
||
obj.duoxuan_score = paper.duoxuan_score
|
||
obj.panduan_count = paper.panduan_count
|
||
obj.panduan_score = paper.panduan_score
|
||
obj.save()
|
||
for i in PaperQuestion.objects.filter(paper=paper):
|
||
o = PaperQuestion()
|
||
o.paper = obj
|
||
o.question = i.question
|
||
o.total_score = i.total_score
|
||
o.save()
|
||
return Response(status=200)
|
||
|
||
@action(methods=['post'], detail=False, perms_map={'post': 'question'}, serializer_class=Serializer)
|
||
def upload_paper(self, request):
|
||
doc_path = request.data.get('doc_path')
|
||
question_type = request.data.get('question_type')
|
||
excel_path = settings.BASE_DIR + "/media/default/question.xlsx"
|
||
doc_path = settings.BASE_DIR + doc_path
|
||
timenow = timezone.now().strftime('%Y%m%d%H%M%S')
|
||
question_excel_name = "question_excel_"+timenow
|
||
question_excel = os.path.join(os.path.dirname(excel_path), question_excel_name)
|
||
if not os.path.exists(question_excel):
|
||
os.makedirs(question_excel)
|
||
shutil.copy(excel_path, question_excel)
|
||
excel_save_path = os.path.join(question_excel, os.path.basename(EXCEL_PATH))
|
||
# 解析word文档到excel
|
||
res = interpret_text(3, excel_save_path, doc_path, question_type)
|
||
ids = []
|
||
if res:
|
||
# 将excel 入库并返回 queston_id
|
||
wb = load_workbook(excel_save_path)
|
||
sheet = wb.worksheets[0]
|
||
leveldict = {'低': '低', '中': '中', '高': '高'}
|
||
# 验证文件内容
|
||
if sheet['a2'].value != '题目类型':
|
||
return Response({"error": "类型列错误!"})
|
||
if sheet['b2'].value != '分类':
|
||
return Response({"error": "分类列错误!"})
|
||
if sheet['c2'].value != '题目':
|
||
return Response({"error": "题目列错误!"})
|
||
questioncatdict = {}
|
||
questioncats = Questioncat.objects.all()
|
||
for i in questioncats:
|
||
questioncatdict[i.name] = i.id
|
||
i = 3
|
||
while sheet['c'+str(i)].value:
|
||
type = sheet['a'+str(i)].value.replace(' ', '')
|
||
questioncat = sheet['b'+str(i)].value
|
||
if questioncat:
|
||
questioncat = questioncat.replace(' ', '')
|
||
else:
|
||
return Response(str(i)+'行没有分类', status=400)
|
||
name = sheet['c'+str(i)].value
|
||
answer = {}
|
||
if sheet['d'+str(i)].value:
|
||
answer['A'] = sheet['d'+str(i)].value
|
||
if sheet['e'+str(i)].value:
|
||
answer['B'] = sheet['e'+str(i)].value
|
||
if sheet['f'+str(i)].value:
|
||
answer['C'] = sheet['f'+str(i)].value
|
||
if sheet['g'+str(i)].value:
|
||
answer['D'] = sheet['g'+str(i)].value
|
||
if sheet['h'+str(i)].value:
|
||
answer['E'] = sheet['h'+str(i)].value
|
||
if sheet['i'+str(i)].value:
|
||
answer['F'] = sheet['i'+str(i)].value
|
||
right = sheet['j'+str(i)].value
|
||
if right:
|
||
right = right.replace(' ', '')
|
||
else:
|
||
return Response(str(i)+'行没有答案', status=400)
|
||
resolution = sheet['k'+str(i)].value
|
||
level = sheet['l'+str(i)].value
|
||
year = sheet['m' + str(i)].value
|
||
if level:
|
||
level = level.replace(' ', '')
|
||
cateobj = None
|
||
if questioncat not in questioncatdict:
|
||
return Response(str(i)+"行不存在分类("+questioncat+")!请先新建", status=400)
|
||
else:
|
||
cateobj = Questioncat.objects.get(
|
||
id=questioncatdict[questioncat])
|
||
if Question.objects.filter(name=name, is_deleted=0, questioncat=cateobj).exists():
|
||
ids.append(Question.objects.get(name=name, is_deleted=0, questioncat=cateobj).id)
|
||
i = i + 1
|
||
continue
|
||
if type == '单选':
|
||
if right in ['A', 'B', 'C', 'D', 'E', 'F']:
|
||
obj = Question()
|
||
obj.type = '单选'
|
||
if cateobj:
|
||
obj.questioncat = cateobj
|
||
obj.name = name
|
||
obj.options = answer
|
||
obj.right = right
|
||
obj.create_by = request.user
|
||
obj.resolution = resolution if resolution else ''
|
||
obj.year = year if year else None
|
||
if level in leveldict:
|
||
obj.level = leveldict[level]
|
||
else:
|
||
obj.level = '低'
|
||
obj.save()
|
||
ids.append(obj.id)
|
||
elif type == '多选':
|
||
obj = Question()
|
||
rights = list(right.strip())
|
||
right = [x for x in rights if x != ',']
|
||
obj.type = '多选'
|
||
obj.questioncat = cateobj
|
||
obj.name = name
|
||
obj.options = answer
|
||
obj.right = right
|
||
obj.create_by = request.user
|
||
obj.resolution = resolution if resolution else ''
|
||
obj.year = year if year else None
|
||
if level in leveldict:
|
||
obj.level = leveldict[level]
|
||
else:
|
||
obj.level = '低'
|
||
obj.save()
|
||
ids.append(obj.id)
|
||
elif type == '判断':
|
||
if right == 'A' or right == '对' or right == '正确':
|
||
right = 'A'
|
||
else:
|
||
right = 'B'
|
||
obj = Question()
|
||
obj.type = '判断'
|
||
obj.questioncat = cateobj
|
||
obj.name = name
|
||
obj.options = {'A': '对', 'B': '错'}
|
||
obj.right = right
|
||
obj.create_by = request.user
|
||
obj.resolution = resolution if resolution else ''
|
||
obj.year = year if year else None
|
||
if level in leveldict:
|
||
obj.level = leveldict[level]
|
||
else:
|
||
obj.level = '低'
|
||
obj.save()
|
||
ids.append(obj.id)
|
||
i = i + 1
|
||
else:
|
||
raise ParseError('excel解析失败')
|
||
if ids:
|
||
questions = Question.objects.filter(pk__in=ids)
|
||
Serializer = QuestionSerializer(questions, many=True)
|
||
Serializer.data
|
||
return Response(Serializer.data, status=200)
|
||
|
||
|
||
class ExamViewSet(CreateUpdateCustomMixin, ModelViewSet):
|
||
perms_map = {'get': '*', 'post':'exam', 'put':'exam', 'delete':'exam'}
|
||
queryset = Exam.objects.all().select_related('paper', 'create_by')
|
||
ordering = ['-id']
|
||
search_fields = ('name',)
|
||
serializer_class = ExamListSerializer
|
||
filterset_class = ExamFilter
|
||
|
||
def get_serializer_class(self):
|
||
if self.action in ['create', 'update']:
|
||
return ExamCreateUpdateSerializer
|
||
elif self.action in ['retrieve']:
|
||
return ExamDetailSerializer
|
||
return super().get_serializer_class()
|
||
|
||
def get_queryset(self):
|
||
qs = super().get_queryset()
|
||
user = self.request.user
|
||
if has_permission("exam", user):
|
||
return qs
|
||
return qs.filter(participant_dep=user.dept)
|
||
|
||
|
||
def destroy(self, request, *args, **kwargs):
|
||
instance = self.get_object()
|
||
if ExamRecord.objects.filter(exam=instance).exists():
|
||
raise ParseError('存在考试记录,禁止删除')
|
||
instance.delete(soft=False)
|
||
return Response(status=204)
|
||
|
||
@action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=Serializer, permission_classes = [IsAuthenticated])
|
||
@transaction.atomic
|
||
def start(self, request, *args, **kwargs):
|
||
"""
|
||
开始考试
|
||
|
||
开始考试具体题目信息
|
||
"""
|
||
# exam = self.get_object()
|
||
exam = Exam.objects.get(id=kwargs['pk'])
|
||
# 查询本次考试对应哪些人
|
||
participants = exam.participant_user.all()
|
||
participants_ids = [i.id for i in participants]
|
||
dep = exam.participant_dep.all()
|
||
dep_ids = [i.id for i in dep]
|
||
if request.user.id in participants_ids or request.user.dept.id in dep_ids:
|
||
pass
|
||
else:
|
||
raise ParseError('不在考试人员范围内')
|
||
now = timezone.now()
|
||
if now < exam.open_time or now > exam.close_time:
|
||
raise ParseError('不在考试时间范围')
|
||
ExamRecord.objects.filter(exam=exam, create_by=request.user, is_submited=False).delete()
|
||
tests = ExamRecord.objects.filter(
|
||
exam=exam, create_by=request.user)
|
||
chance_used = tests.count()
|
||
if chance_used >= exam.chance:
|
||
raise ParseError('考试机会已用完')
|
||
if exam.paper:
|
||
details = []
|
||
er = ExamRecord()
|
||
er.type = '正式考试'
|
||
er.name = '正式考试' + datetime.now().strftime('%Y%m%d%H%M')
|
||
er.limit = exam.paper.limit
|
||
er.paper = exam.paper
|
||
er.total_score = exam.paper.total_score
|
||
er.start_time = now
|
||
er.is_pass = False
|
||
er.exam = exam
|
||
er.create_by = request.user
|
||
er.save()
|
||
ret = {}
|
||
ret['examrecord'] = er.id
|
||
if exam.paper.paper_types == '押题':
|
||
pqs = PaperQuestion.objects.filter(paper=exam.paper).order_by('id')
|
||
for i in pqs:
|
||
details.append(AnswerDetail(examrecord=er, question=i.question, total_score=i.total_score))
|
||
else:
|
||
# 查询此试卷有哪些分类
|
||
paper_obj = exam.paper.category.all()
|
||
cqs = [i.id for i in paper_obj]
|
||
# 查询随机组卷的单选、多选、判断各多少题和不同分类
|
||
single_qs = Question.objects.filter(Q(type="单选") & Q(questioncat__in=cqs)).order_by('?')[:exam.paper.danxuan_count]
|
||
multiple_qs = Question.objects.filter(Q(type="多选") & Q(questioncat__in=cqs)).order_by('?')[:exam.paper.duoxuan_count]
|
||
judge_qs = Question.objects.filter(Q(type="判断") & Q(questioncat__in=cqs)).order_by('?')[:exam.paper.panduan_count]
|
||
pqs = single_qs | multiple_qs | judge_qs
|
||
for i in pqs:
|
||
if i.type == '单选':
|
||
total_score = exam.paper.danxuan_score
|
||
elif i.type == '多选':
|
||
total_score = exam.paper.duoxuan_score
|
||
else:
|
||
total_score = exam.paper.panduan_score
|
||
details.append(AnswerDetail(examrecord=er, question=i, total_score=total_score))
|
||
AnswerDetail.objects.bulk_create(details)
|
||
ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id')
|
||
ret['questions_'] = AnswerDetailOutSerializer(instance=ads, many=True).data
|
||
return Response(ret)
|
||
raise ParseError('暂不支持')
|
||
|
||
|
||
class ExamRecordViewSet(ListModelMixin, DestroyModelMixin, RetrieveModelMixin, GenericViewSet):
|
||
"""
|
||
考试记录列表和详情
|
||
"""
|
||
perms_map = {'get': '*', 'post': '*', 'delete':'examrecord'}
|
||
queryset = ExamRecord.objects.select_related('create_by', 'cert_er')
|
||
serializer_class = ExamRecordListSerializer
|
||
ordering_fields = ['create_time', 'score', 'took', 'update_time', 'belong_dept']
|
||
ordering = ['-update_time']
|
||
search_fields = ('create_by__name', 'create_by__username', 'exam__name', 'belong_dept__name')
|
||
filterset_class = ExamRecordFilter
|
||
|
||
def get_queryset(self):
|
||
qs = super().get_queryset()
|
||
# if self.request.method == 'GET':
|
||
# return qs
|
||
# else:
|
||
# return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept))
|
||
if has_permission('ctc_manager', self.request.user):
|
||
return qs
|
||
# 如果是部门管理员,只能看到自己部门下的考试记录
|
||
elif has_permission('exam_manager', self.request.user):
|
||
return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept))
|
||
# 如果是普通员工,只能看到自己考试记录
|
||
else:
|
||
return qs.filter(create_by=self.request.user)
|
||
|
||
def get_serializer_class(self):
|
||
if self.action == 'retrieve':
|
||
now = timezone.now()
|
||
if now > self.get_object().exam.close_time:
|
||
return ExamRecordDetailSerializer
|
||
return super().get_serializer_class()
|
||
|
||
def perform_destroy(self, instance): # 考试记录物理删除
|
||
instance.delete()
|
||
|
||
@action(methods=['post'], detail=False, perms_map={'post': '*'}, serializer_class=Serializer, permission_classes = [IsAuthenticated])
|
||
def clear(self, request, pk=None):
|
||
"""
|
||
清除七日前未提交的考试记录
|
||
"""
|
||
now = timezone.now
|
||
days7_ago = now - timedelta(days=7)
|
||
ExamRecord.objects.filter(create_time__lte=days7_ago, is_submited=False).delete(soft=False)
|
||
return Response(status=False)
|
||
|
||
|
||
@action(methods=['get'], detail=False,
|
||
url_path='export', url_name='export_record', perms_map={'*': 'export_ansrecord'}, serializer_class=Serializer)
|
||
def export(self, request):
|
||
"""
|
||
导出答题记录
|
||
"""
|
||
queryset = self.filter_queryset(self.get_queryset())
|
||
path = export_record(queryset)
|
||
return Response({'path': path})
|
||
|
||
|
||
@action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=ExamRecordSubmitSerializer, permission_classes = [IsAuthenticated])
|
||
@transaction.atomic
|
||
def submit(self, request, pk=None):
|
||
'''
|
||
提交答卷
|
||
|
||
提交答卷
|
||
'''
|
||
try:
|
||
er = ExamRecord.objects.get(id=pk)
|
||
except Exception as e:
|
||
raise ParseError(e)
|
||
now = timezone.now()
|
||
if er.create_by != request.user:
|
||
raise ParseError('提交人有误')
|
||
exam = er.exam
|
||
paper = er.paper
|
||
if not exam:
|
||
raise ParseError('暂不支持')
|
||
if now > exam.close_time + timedelta(minutes=30):
|
||
raise ParseError('考试时间已过, 提交失败')
|
||
serializer = ExamRecordSubmitSerializer(data = request.data)
|
||
serializer.is_valid(raise_exception=True)
|
||
vdata = serializer.validated_data
|
||
questions_ = vdata['questions_']
|
||
# 后端判卷
|
||
ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id')
|
||
total_score = 0
|
||
try:
|
||
for index, ad in enumerate(ads):
|
||
ad.user_answer = questions_[index]['user_answer']
|
||
if ad.question.type == '多选':
|
||
if set(ad.question.right) == set(ad.user_answer):
|
||
ad.is_right = True
|
||
ad.score = ad.total_score
|
||
else:
|
||
if ad.question.right == ad.user_answer:
|
||
ad.is_right = True
|
||
ad.score = ad.total_score
|
||
ad.save()
|
||
total_score = total_score + ad.score
|
||
except Exception as e:
|
||
raise ParseError('判卷失败, 请检查试卷:' + str(e))
|
||
er.score = total_score
|
||
# if er.score > 0.6*er.total_score:
|
||
if er.score >= paper.pass_score:
|
||
er.is_pass = True
|
||
# 如果是自动发证
|
||
if exam.certificate:
|
||
now_data = datetime.now()
|
||
course = exam.course_name.all()
|
||
courese_ids = [i.id for i in course]
|
||
current_date = now_data.strftime('%Y-%m-%d')
|
||
cer_number = now_data.strftime('%Y%m%d')
|
||
# 获取锁
|
||
iden = acquire_lock('certificate')
|
||
if iden is False:
|
||
raise ParseError("系统忙, 请稍后再试")
|
||
try:
|
||
# 查询证明编号创建时间为最后一个
|
||
cer = Certificate.objects.latest('证书编号')
|
||
if cer:
|
||
cer_number = int(cer.证书编号[5:]) + 1
|
||
cer_number = 'CTCZL' + str(cer_number)
|
||
data_dict = {
|
||
'姓名': request.user.name,
|
||
'用户ID': request.user.id,
|
||
'证书编号': cer_number,
|
||
'证书方案': '202312',
|
||
'单位名称': request.user.dept.full_name,
|
||
'所属单位': '国检测试控股集团'+request.user.dept.name,
|
||
'发证日期': current_date,
|
||
'培训日期':current_date,
|
||
'培训结束日期': current_date,
|
||
'课程列表': courese_ids,
|
||
'examrecord': er.id,
|
||
}
|
||
serializer = CertificateSerializer(data=data_dict)
|
||
serializer.is_valid(raise_exception=True)
|
||
serializer.save()
|
||
except Exception as e:
|
||
import traceback
|
||
raise ParseError(traceback.print_exc())
|
||
finally:
|
||
release_lock('certificate', iden)
|
||
|
||
er.took = (now - er.create_time).total_seconds()
|
||
er.end_time = now
|
||
er.belong_dept=request.user.dept
|
||
er.is_submited = True
|
||
er.save()
|
||
return Response(ExamRecordListSerializer(instance=er).data)
|
||
|
||
|
||
def acquire_lock(lock_name, timeout=60):
|
||
identifier = str(uuid.uuid4())
|
||
lock = cache.set(lock_name, identifier, timeout=timeout)
|
||
return identifier if lock else None
|
||
|
||
def release_lock(lock_name, identifier):
|
||
if cache.get(lock_name) == identifier:
|
||
cache.delete(lock_name)
|
||
return True
|
||
else:
|
||
raise ParseError('Lock identifier does not match') |