cma_search/server/apps/exam/views.py

457 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.shortcuts import render
from rest_framework.viewsets import ModelViewSet, GenericViewSet
from rest_framework.mixins import ListModelMixin, DestroyModelMixin, RetrieveModelMixin
from apps.exam.exports import export_question
from apps.exam.models import Question, Questioncat, PaperQuestion
from apps.exam.serializers import (QuestionSerializer, QuestioncatSerializer, PaperSerializer, ExamDetailSerializer, ExamRecordDetailSerializer, ExamListSerializer,
ExamCreateUpdateSerializer, ExamListSerializer, ExamRecordSubmitSerializer, PaperDetailSerializer, PaperCreateUpdateSerializer, AnswerDetailOutSerializer, ExamRecordListSerializer)
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework.exceptions import ParseError
from openpyxl import Workbook, load_workbook
from django.conf import settings
from apps.exam.models import Paper, Exam, ExamRecord, AnswerDetail
from django.utils import timezone
from django.db import transaction
from rest_framework.serializers import Serializer
from datetime import datetime
from apps.exam.filters import ExamRecordFilter, ExamFilter
from datetime import timedelta
from apps.system.mixins import CreateUpdateCustomMixin
from apps.edu.serializers import CertificateSerializer
from datetime import datetime
# Create your views here.
def enctry(s):
k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
encry_str = ""
for i, j in zip(s, k):
# i为字符j为秘钥字符
temp = str(ord(i)+ord(j))+'_' # 加密字符 = 字符的Unicode码 + 秘钥的Unicode码
encry_str = encry_str + temp
return encry_str
# 解密
def dectry(p):
k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
dec_str = ""
for i, j in zip(p.split("_")[:-1], k):
# i 为加密字符j为秘钥字符
# 解密字符 = (加密Unicode码字符 - 秘钥字符的Unicode码)的单字节字符
temp = chr(int(i) - ord(j))
dec_str = dec_str+temp
return dec_str
class QuestioncatViewSet(CreateUpdateCustomMixin, ModelViewSet):
perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'}
queryset = Questioncat.objects.all()
serializer_class = QuestioncatSerializer
filterset_fields = ['parent']
search_fields = ['name']
class QuestionViewSet(CreateUpdateCustomMixin, ModelViewSet):
perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'}
queryset = Question.objects.all()
serializer_class = QuestionSerializer
filterset_fields = ['level', 'type', 'year']
search_fields = ['name', 'options', 'resolution']
@action(methods=['get'], detail=False,
url_path='export', url_name='export_question', perms_map=[{'get': '*'}], serializer_class=Serializer)
def export_question(self, request):
"""
导出题目
导出题目
"""
queryset = self.filter_queryset(self.get_queryset())
path = export_question(queryset)
return Response({'path': path})
@action(methods=['post'], detail=False, url_name='enable_question', perms_map={'post': 'question'}, serializer_class=Serializer)
def enable(self, request):
"""
启用题目
启用题目
"""
ids = request.data.get('ids', None)
if ids:
Question.objects.filter(pk__in=ids).update(enabled=True)
return Response(status=200)
@action(methods=['post'], detail=False,
url_path='import', url_name='import_question', perms_map={'post': 'question'}, serializer_class=Serializer)
def import_question(self, request):
"""
导入题目
导入题目
"""
xlsxpath = request.data['path']
fullpath = settings.BASE_DIR + xlsxpath
wb = load_workbook(fullpath)
sheet = wb.worksheets[0]
qlist = ['A', 'B', 'C', 'D', 'E', 'F']
leveldict = {'': '', '': '', '': ''}
notinlist = []
# 验证文件内容
if sheet['a2'].value != '题目类型':
return Response({"error": "类型列错误!"})
if sheet['b2'].value != '分类':
return Response({"error": "分类列错误!"})
if sheet['c2'].value != '题目':
return Response({"error": "题目列错误!"})
questioncatdict = {}
questioncats = Questioncat.objects.all()
for i in questioncats:
questioncatdict[i.name] = i.id
i = 3
while sheet['c'+str(i)].value:
type = sheet['a'+str(i)].value.replace(' ', '')
questioncat = sheet['b'+str(i)].value
if questioncat:
questioncat = questioncat.replace(' ', '')
else:
return Response(str(i)+'行没有分类', status=400)
name = sheet['c'+str(i)].value
answer = {}
if sheet['d'+str(i)].value:
answer['A'] = sheet['d'+str(i)].value
if sheet['e'+str(i)].value:
answer['B'] = sheet['e'+str(i)].value
if sheet['f'+str(i)].value:
answer['C'] = sheet['f'+str(i)].value
if sheet['g'+str(i)].value:
answer['D'] = sheet['g'+str(i)].value
if sheet['h'+str(i)].value:
answer['E'] = sheet['h'+str(i)].value
if sheet['i'+str(i)].value:
answer['F'] = sheet['i'+str(i)].value
right = sheet['j'+str(i)].value
if right:
right = right.replace(' ', '')
else:
return Response(str(i)+'行没有答案', status=400)
resolution = sheet['k'+str(i)].value
level = sheet['l'+str(i)].value
year = sheet['m' + str(i)].value
if level:
level = level.replace(' ', '')
cateobj = None
if questioncat not in questioncatdict:
return Response(str(i)+"行不存在分类("+questioncat+")!请先新建", status=400)
else:
cateobj = Questioncat.objects.get(
id=questioncatdict[questioncat])
if type == '单选':
if Question.objects.filter(type='单选', name=name, year=year, options=answer, questioncat=cateobj).exists():
notinlist.append(i)
else:
if right in ['A', 'B', 'C', 'D', 'E', 'F']:
obj = Question()
obj.type = '单选'
if cateobj:
obj.questioncat = cateobj
obj.name = name
obj.options = answer
obj.right = right
obj.resolution = resolution if resolution else ''
obj.year = year if year else None
if level in leveldict:
obj.level = leveldict[level]
else:
obj.level = ''
obj.save()
elif type == '多选':
right = list(right.strip())
if Question.objects.filter(type='多选', name=name, year=year, options=answer, questioncat=cateobj).exists():
notinlist.append(i)
else:
if [False for c in right if c not in qlist]:
pass
else:
obj = Question()
obj.type = '多选'
obj.questioncat = cateobj
obj.name = name
obj.options = answer
obj.right = right
obj.resolution = resolution if resolution else ''
obj.year = year if year else None
if level in leveldict:
obj.level = leveldict[level]
else:
obj.level = ''
obj.save()
elif type == '判断':
if right == 'A' or right == '' or right == '正确':
right = 'A'
else:
right = 'B'
if Question.objects.filter(type='判断', name=name, is_delete=0, options={'A': '', 'B': ''}, questioncat=cateobj).exists():
notinlist.append(i)
else:
obj = Question()
obj.type = '判断'
obj.questioncat = cateobj
obj.name = name
obj.options = {'A': '', 'B': ''}
obj.right = right
obj.resolution = resolution if resolution else ''
obj.year = year if year else None
if level in leveldict:
obj.level = leveldict[level]
else:
obj.level = ''
obj.save()
i = i + 1
return Response(notinlist, status=200)
class PaperViewSet(ModelViewSet):
"""
试卷增删改查
"""
perms_map = {'get': '*', 'post':'paper', 'put':'paper', 'delete':'paper'}
queryset = Paper.objects.all()
serializer_class = PaperSerializer
ordering = ['id']
search_fields = ('name',)
def get_serializer_class(self):
if self.action in ['retrieve']:
return PaperDetailSerializer
elif self.action in ['create', 'update']:
return PaperCreateUpdateSerializer
return super().get_serializer_class()
def create(self, request, *args, **kwargs):
sr = PaperCreateUpdateSerializer(data=request.data)
sr.is_valid(raise_exception=True)
vdata = sr.validated_data
vdata['create_by'] = request.user
questions_ = vdata.pop('questions_')
paper = Paper.objects.create(**vdata)
q_list = []
for i in questions_:
q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper))
PaperQuestion.objects.bulk_create(q_list)
return Response(status=201)
def update(self, request, *args, **kwargs):
# 有考试在执行,不可更新
now = timezone.now()
paper = self.get_object()
if Exam.objects.filter(close_time__gte=now, paper=paper).exists():
raise ParseError('存在考试,不可编辑')
sr = PaperCreateUpdateSerializer(instance=paper, data=request.data)
sr.is_valid(raise_exception=True)
vdata = sr.validated_data
questions_ = vdata.pop('questions_')
vdata['update_by'] = request.user
Paper.objects.filter(id=paper.id).update(**vdata)
q_list = []
for i in questions_:
q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper))
PaperQuestion.objects.filter(paper=paper).delete()
PaperQuestion.objects.bulk_create(q_list)
return Response()
class ExamViewSet(CreateUpdateCustomMixin, ModelViewSet):
perms_map = {'get': '*', 'post':'exam', 'put':'exam', 'delete':'exam'}
queryset = Exam.objects.all().select_related('paper', 'create_by')
ordering = ['-id']
search_fields = ('name',)
serializer_class = ExamListSerializer
filterset_class = ExamFilter
def get_serializer_class(self):
if self.action in ['create', 'update']:
return ExamCreateUpdateSerializer
elif self.action in ['retrieve']:
return ExamDetailSerializer
return super().get_serializer_class()
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
if ExamRecord.objects.filter(exam=instance).exists():
raise ParseError('存在考试记录,禁止删除')
instance.delete(soft=False)
return Response(status=204)
@action(methods=['post'], detail=True, perms_map=[{'post': '*'}], serializer_class=Serializer, permission_classes = [IsAuthenticated])
@transaction.atomic
def start(self, request, *args, **kwargs):
"""
开始考试
开始考试具体题目信息
"""
exam = self.get_object()
# 查询本次考试对应哪些人
participants = exam.participant_user.all()
participants_ids = [i.id for i in participants]
dep = exam.participant_dep.all()
dep_ids = [i.id for i in dep]
# print(participants_id, 'participants_id')
# print(request.user.id, request.user.dept.id, "request.user.dept")
if request.user.id in participants_ids or request.user.dept in dep_ids:
pass
else:
raise ParseError('不在考试人员范围内')
dep = exam.participant_dep.all()
print(participants, "participants")
now = timezone.now()
if now < exam.open_time or now > exam.close_time:
raise ParseError('不在考试时间范围')
tests = ExamRecord.objects.filter(
exam=exam, create_by=request.user)
chance_used = tests.count()
if chance_used > exam.chance:
raise ParseError('考试机会已用完')
if exam.paper:
er = ExamRecord()
er.type = '正式考试'
er.name = '正式考试' + datetime.now().strftime('%Y%m%d%H%M')
er.limit = exam.paper.limit
er.paper = exam.paper
er.total_score = exam.paper.total_score
er.start_time = now
er.is_pass = False
er.exam = exam
er.create_by = request.user
er.save()
ret = {}
ret['examrecord'] = er.id
pqs = PaperQuestion.objects.filter(paper=exam.paper).order_by('id')
details = []
for i in pqs:
details.append(AnswerDetail(examrecord=er, question=i.question, total_score=i.total_score))
AnswerDetail.objects.bulk_create(details)
ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id')
ret['questions_'] = AnswerDetailOutSerializer(instance=ads, many=True).data
return Response(ret)
raise ParseError('暂不支持')
class ExamRecordViewSet(ListModelMixin, DestroyModelMixin, RetrieveModelMixin, GenericViewSet):
"""
考试记录列表和详情
"""
perms_map = {'get': '*', 'post': '*', 'delete':'examrecord'}
queryset = ExamRecord.objects.select_related('create_by')
serializer_class = ExamRecordListSerializer
ordering_fields = ['create_time', 'score', 'took', 'update_time']
ordering = ['-update_time']
search_fields = ('create_by__name', 'create_by__username')
filterset_class = ExamRecordFilter
def get_serializer_class(self):
if self.action == 'retrieve':
return ExamRecordDetailSerializer
return super().get_serializer_class()
def perform_destroy(self, instance): # 考试记录物理删除
instance.delete(soft=False)
@action(methods=['post'], detail=False, perms_map=[{'post': '*'}], serializer_class=Serializer, permission_classes = [IsAuthenticated])
def clear(self, request, pk=None):
"""
清除七日前未提交的考试记录
清除七日前未提交的考试记录
"""
now = timezone.now
days7_ago = now - timedelta(days=7)
ExamRecord.objects.filter(create_time__lte=days7_ago, is_submited=False).delete(soft=False)
return Response(status=False)
@action(methods=['get'], detail=False, permission_classes=[IsAuthenticated])
def self(self, request, pk=None):
'''
个人考试记录
个人考试记录
'''
queryset = ExamRecord.objects.filter(create_by=request.user).order_by('-update_time')
page = self.paginate_queryset(queryset)
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
@action(methods=['post'], detail=True, perms_map=[{'post': '*'}], serializer_class=ExamRecordSubmitSerializer, permission_classes = [IsAuthenticated])
@transaction.atomic
def submit(self, request, pk=None):
'''
提交答卷
提交答卷
'''
er = self.get_object()
print('er----------------', er)
now = timezone.now()
if er.create_by != request.user:
raise ParseError('提交人有误')
exam = er.exam
if not exam:
raise ParseError('暂不支持')
if now > exam.close_time + timedelta(minutes=30):
raise ParseError('考试时间已过, 提交失败')
serializer = ExamRecordSubmitSerializer(data = request.data)
serializer.is_valid(raise_exception=True)
vdata = serializer.validated_data
questions_ = vdata['questions_']
# 后端判卷
ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id')
total_score = 0
try:
for index, ad in enumerate(ads):
ad.user_answer = questions_[index]['user_answer']
if ad.question.type == '多选':
if set(ad.question.right) == set(ad.user_answer):
ad.is_right = True
ad.score = ad.total_score
else:
if ad.question.right == ad.user_answer:
ad.is_right = True
ad.score = ad.total_score
ad.save()
total_score = total_score + ad.score
except Exception as e:
raise ParseError('判卷失败, 请检查试卷:' + str(e))
er.score = total_score
if er.score > 0.6*er.total_score:
er.is_pass = True
# 如果是自动发证
if exam.certificate:
now_data = datetime.now()
course = exam.course_name.all()
courese_ids = [i.id for i in course]
print(courese_ids, "----------course")
print("------------",request.user.dept.name)
current_date = now_data.strftime('%Y-%m-%d')
data_dict = {
'姓名': request.user.name,
'证书编号': 'CTCZL'+ current_date,
'单位名称': request.user.dept.name,
'所属单位': '国检测试控股集团'+request.user.dept.name,
'发证日期': current_date,
'课程列表': courese_ids,
}
print('-----------------data_dict', data_dict)
serializer = CertificateSerializer(data=data_dict)
serializer.is_valid(raise_exception=True)
serializer.save()
er.took = (now - er.create_time).total_seconds()
er.end_time = now
er.is_submited = True
er.save()
return Response(ExamRecordListSerializer(instance=er).data)