cma_search/server/apps/exam/views.py

272 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.shortcuts import render
from rest_framework.viewsets import ModelViewSet
from apps.exam.exports import export_question
from apps.exam.models import Question, Questioncat, PaperQuestion
from apps.exam.serializers import (QuestionSerializer, QuestioncatSerializer, PaperSerializer,
ExamCreateUpdateSerializer, ExamListSerializer, ExamAttendSerializer, PaperDetailSerializer, PaperCreateUpdateSerializer)
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework.exceptions import ParseError
from openpyxl import Workbook, load_workbook
from django.conf import settings
from apps.exam.models import Paper, Exam, ExamRecord, AnswerDetail
from django.utils import timezone
from django.db import transaction
# Create your views here.
def enctry(s):
k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
encry_str = ""
for i, j in zip(s, k):
# i为字符j为秘钥字符
temp = str(ord(i)+ord(j))+'_' # 加密字符 = 字符的Unicode码 + 秘钥的Unicode码
encry_str = encry_str + temp
return encry_str
# 解密
def dectry(p):
k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
dec_str = ""
for i, j in zip(p.split("_")[:-1], k):
# i 为加密字符j为秘钥字符
# 解密字符 = (加密Unicode码字符 - 秘钥字符的Unicode码)的单字节字符
temp = chr(int(i) - ord(j))
dec_str = dec_str+temp
return dec_str
class QuestioncatViewSet(ModelViewSet):
perms_map = {'*': '*'}
queryset = Questioncat.objects.all()
serializer_class = QuestioncatSerializer
filterset_fields = ['parent']
search_fields = ['name']
class QuestionViewSet(ModelViewSet):
perms_map = {'*': '*'}
queryset = Question.objects.all()
serializer_class = QuestionSerializer
filterset_fields = ['level', 'type', 'year']
search_fields = ['name', 'options', 'resolution']
@action(methods=['get'], detail=False,
url_path='export', url_name='export_question', perms_map=[{'get': '*'}])
def export_question(self, request):
queryset = self.filter_queryset(self.get_queryset())
path = export_question(queryset)
return Response({'path': path})
@action(methods=['post'], detail=False, url_name='enable_question', permission_classes=[IsAuthenticated])
def enable(self, request):
ids = request.data.get('ids', None)
if ids:
Question.objects.filter(pk__in=ids).update(enabled=True)
return Response(status=200)
@action(methods=['post'], detail=False,
url_path='import', url_name='import_question', perms_map=[{'post': '*'}])
def import_question(self, request):
"""
导入题目
"""
xlsxpath = request.data['path']
fullpath = settings.BASE_DIR + xlsxpath
wb = load_workbook(fullpath)
sheet = wb.worksheets[0]
qlist = ['A', 'B', 'C', 'D', 'E', 'F']
leveldict = {'': '', '': '', '': ''}
notinlist = []
# 验证文件内容
if sheet['a2'].value != '题目类型':
return Response({"error": "类型列错误!"})
if sheet['b2'].value != '分类':
return Response({"error": "分类列错误!"})
if sheet['c2'].value != '题目':
return Response({"error": "题目列错误!"})
questioncatdict = {}
questioncats = Questioncat.objects.all()
for i in questioncats:
questioncatdict[i.name] = i.id
i = 3
while sheet['c'+str(i)].value:
type = sheet['a'+str(i)].value.replace(' ', '')
questioncat = sheet['b'+str(i)].value
if questioncat:
questioncat = questioncat.replace(' ', '')
else:
return Response(str(i)+'行没有分类', status=400)
name = sheet['c'+str(i)].value
answer = {}
if sheet['d'+str(i)].value:
answer['A'] = sheet['d'+str(i)].value
if sheet['e'+str(i)].value:
answer['B'] = sheet['e'+str(i)].value
if sheet['f'+str(i)].value:
answer['C'] = sheet['f'+str(i)].value
if sheet['g'+str(i)].value:
answer['D'] = sheet['g'+str(i)].value
if sheet['h'+str(i)].value:
answer['E'] = sheet['h'+str(i)].value
if sheet['i'+str(i)].value:
answer['F'] = sheet['i'+str(i)].value
right = sheet['j'+str(i)].value
if right:
right = right.replace(' ', '')
else:
return Response(str(i)+'行没有答案', status=400)
resolution = sheet['k'+str(i)].value
level = sheet['l'+str(i)].value
year = sheet['m' + str(i)].value
if level:
level = level.replace(' ', '')
cateobj = None
if questioncat not in questioncatdict:
return Response(str(i)+"行不存在分类("+questioncat+")!请先新建", status=400)
else:
cateobj = Questioncat.objects.get(
id=questioncatdict[questioncat])
if type == '单选':
if Question.objects.filter(type='单选', name=name, year=year, options=answer, questioncat=cateobj).exists():
notinlist.append(i)
else:
if right in ['A', 'B', 'C', 'D', 'E', 'F']:
obj = Question()
obj.type = '单选'
if cateobj:
obj.questioncat = cateobj
obj.name = name
obj.options = answer
obj.right = right
obj.resolution = resolution if resolution else ''
obj.year = year if year else None
if level in leveldict:
obj.level = leveldict[level]
else:
obj.level = ''
obj.save()
elif type == '多选':
right = list(right.strip())
if Question.objects.filter(type='多选', name=name, year=year, options=answer, questioncat=cateobj).exists():
notinlist.append(i)
else:
if [False for c in right if c not in qlist]:
pass
else:
obj = Question()
obj.type = '多选'
obj.questioncat = cateobj
obj.name = name
obj.options = answer
obj.right = right
obj.resolution = resolution if resolution else ''
obj.year = year if year else None
if level in leveldict:
obj.level = leveldict[level]
else:
obj.level = ''
obj.save()
elif type == '判断':
if right == 'A' or right == '' or right == '正确':
right = 'A'
else:
right = 'B'
if Question.objects.filter(type='判断', name=name, is_delete=0, options={'A': '', 'B': ''}, questioncat=cateobj).exists():
notinlist.append(i)
else:
obj = Question()
obj.type = '判断'
obj.questioncat = cateobj
obj.name = name
obj.options = {'A': '', 'B': ''}
obj.right = right
obj.resolution = resolution if resolution else ''
obj.year = year if year else None
if level in leveldict:
obj.level = leveldict[level]
else:
obj.level = ''
obj.save()
i = i + 1
return Response(notinlist, status=200)
class PaperViewSet(ModelViewSet):
"""
试卷增删改查
"""
perms_map = {'*': '*'}
queryset = Paper.objects.all()
serializer_class = PaperSerializer
ordering = ['id']
search_fields = ('name',)
def get_serializer_class(self):
if self.action in ['retrieve']:
return PaperDetailSerializer
elif self.action in ['create', 'update']:
return PaperCreateUpdateSerializer
return PaperSerializer
def create(self, request, *args, **kwargs):
sr = PaperCreateUpdateSerializer(data=request.data)
sr.is_valid(raise_exception=True)
vdata = sr.validated_data
questions_ = vdata.pop('questions_')
paper = Paper.objects.create(**vdata)
q_list = []
for i in questions_:
question = Question.objects.get(id=i['id'])
q_list.append(PaperQuestion(question=question, total_score=i['total_score'], paper=paper))
PaperQuestion.objects.bulk_create(q_list)
return Response(status=201)
def update(self, request, *args, **kwargs):
return super().update(request, *args, **kwargs)
class ExamViewSet(ModelViewSet):
perms_map = {'*': '*'}
queryset = Exam.objects.all()
ordering = ['-id']
search_fields = ('name',)
def get_serializer_class(self):
if self.action in ['create', 'update']:
return ExamCreateUpdateSerializer
return ExamListSerializer
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
if ExamRecord.objects.filter(exam=instance).exists():
raise ParseError('存在考试记录,禁止删除')
instance.delete()
return Response(status=204)
@action(methods=['post'], detail=False, perms_map=[{'post': '*'}], serializer_class=ExamAttendSerializer)
def attend(self, request, *args, **kwargs):
"""
参加考试
参加考试
"""
sr = ExamAttendSerializer(data=request.data)
sr.is_valid(raise_exception=True)
vdata = sr.validated_data
code = vdata['code']
now = timezone.now()
exam = Exam.objects.filter(
code=code, open_time__lt=now, close_time__gt=now).first()
if exam:
tests = ExamRecord.objects.filter(
exam=exam, create_by=request.user)
if tests.count() < exam.chance: # 还有考试机会就可以接着考
return Response(ExamListSerializer(instance=exam).data)
raise ParseError('考试机会已用完')
raise ParseError('有效考试不存在')