418 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			418 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
from django.shortcuts import render
 | 
						||
from rest_framework.viewsets import ModelViewSet, GenericViewSet
 | 
						||
from rest_framework.mixins import ListModelMixin, DestroyModelMixin
 | 
						||
from apps.exam.exports import export_question
 | 
						||
from apps.exam.models import Question, Questioncat, PaperQuestion
 | 
						||
from apps.exam.serializers import (QuestionSerializer, QuestioncatSerializer, PaperSerializer, ExamDetailSerializer, ExamRecordDetailSerializer, ExamListSerializer,
 | 
						||
                                   ExamCreateUpdateSerializer, ExamListSerializer, ExamRecordSubmitSerializer, PaperDetailSerializer, PaperCreateUpdateSerializer, AnswerDetailOutSerializer, ExamRecordListSerializer)
 | 
						||
from rest_framework.decorators import action
 | 
						||
from rest_framework.response import Response
 | 
						||
from rest_framework.permissions import IsAuthenticated
 | 
						||
from rest_framework.exceptions import ParseError
 | 
						||
from openpyxl import Workbook, load_workbook
 | 
						||
from django.conf import settings
 | 
						||
from apps.exam.models import Paper, Exam, ExamRecord, AnswerDetail
 | 
						||
from django.utils import timezone
 | 
						||
from django.db import transaction
 | 
						||
from rest_framework.serializers import Serializer
 | 
						||
from datetime import datetime
 | 
						||
from apps.exam.filters import ExamRecordFilter, ExamFilter
 | 
						||
from datetime import timedelta
 | 
						||
from apps.system.mixins import CreateUpdateCustomMixin
 | 
						||
 | 
						||
# Create your views here.
 | 
						||
 | 
						||
 | 
						||
def enctry(s):
 | 
						||
    k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
 | 
						||
    encry_str = ""
 | 
						||
    for i, j in zip(s, k):
 | 
						||
        # i为字符,j为秘钥字符
 | 
						||
        temp = str(ord(i)+ord(j))+'_'  # 加密字符 = 字符的Unicode码 + 秘钥的Unicode码
 | 
						||
        encry_str = encry_str + temp
 | 
						||
    return encry_str
 | 
						||
 | 
						||
 | 
						||
# 解密
 | 
						||
def dectry(p):
 | 
						||
    k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da'
 | 
						||
    dec_str = ""
 | 
						||
    for i, j in zip(p.split("_")[:-1], k):
 | 
						||
        # i 为加密字符,j为秘钥字符
 | 
						||
        # 解密字符 = (加密Unicode码字符 - 秘钥字符的Unicode码)的单字节字符
 | 
						||
        temp = chr(int(i) - ord(j))
 | 
						||
        dec_str = dec_str+temp
 | 
						||
    return dec_str
 | 
						||
 | 
						||
 | 
						||
class QuestioncatViewSet(CreateUpdateCustomMixin, ModelViewSet):
 | 
						||
    perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'}
 | 
						||
    queryset = Questioncat.objects.all()
 | 
						||
    serializer_class = QuestioncatSerializer
 | 
						||
    filterset_fields = ['parent']
 | 
						||
    search_fields = ['name']
 | 
						||
 | 
						||
 | 
						||
class QuestionViewSet(CreateUpdateCustomMixin, ModelViewSet):
 | 
						||
    perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'}
 | 
						||
    queryset = Question.objects.all()
 | 
						||
    serializer_class = QuestionSerializer
 | 
						||
    filterset_fields = ['level', 'type', 'year']
 | 
						||
    search_fields = ['name', 'options', 'resolution']
 | 
						||
 | 
						||
    @action(methods=['get'], detail=False,
 | 
						||
            url_path='export', url_name='export_question', perms_map=[{'get': '*'}], serializer_class=Serializer)
 | 
						||
    def export_question(self, request):
 | 
						||
        """
 | 
						||
        导出题目
 | 
						||
 | 
						||
        导出题目
 | 
						||
        """
 | 
						||
        queryset = self.filter_queryset(self.get_queryset())
 | 
						||
        path = export_question(queryset)
 | 
						||
        return Response({'path': path})
 | 
						||
 | 
						||
    @action(methods=['post'], detail=False, url_name='enable_question', perms_map={'post': 'question'}, serializer_class=Serializer)
 | 
						||
    def enable(self, request):
 | 
						||
        """
 | 
						||
        启用题目
 | 
						||
 | 
						||
        启用题目
 | 
						||
        """
 | 
						||
        ids = request.data.get('ids', None)
 | 
						||
        if ids:
 | 
						||
            Question.objects.filter(pk__in=ids).update(enabled=True)
 | 
						||
            return Response(status=200)
 | 
						||
 | 
						||
    @action(methods=['post'], detail=False,
 | 
						||
            url_path='import', url_name='import_question', perms_map={'post': 'question'}, serializer_class=Serializer)
 | 
						||
    def import_question(self, request):
 | 
						||
        """
 | 
						||
        导入题目
 | 
						||
 | 
						||
        导入题目
 | 
						||
        """
 | 
						||
        xlsxpath = request.data['path']
 | 
						||
        fullpath = settings.BASE_DIR + xlsxpath
 | 
						||
        wb = load_workbook(fullpath)
 | 
						||
        sheet = wb.worksheets[0]
 | 
						||
        qlist = ['A', 'B', 'C', 'D', 'E', 'F']
 | 
						||
        leveldict = {'低': '低', '中': '中', '高': '高'}
 | 
						||
        notinlist = []
 | 
						||
        # 验证文件内容
 | 
						||
        if sheet['a2'].value != '题目类型':
 | 
						||
            return Response({"error": "类型列错误!"})
 | 
						||
        if sheet['b2'].value != '分类':
 | 
						||
            return Response({"error": "分类列错误!"})
 | 
						||
        if sheet['c2'].value != '题目':
 | 
						||
            return Response({"error": "题目列错误!"})
 | 
						||
        questioncatdict = {}
 | 
						||
        questioncats = Questioncat.objects.all()
 | 
						||
        for i in questioncats:
 | 
						||
            questioncatdict[i.name] = i.id
 | 
						||
        i = 3
 | 
						||
        while sheet['c'+str(i)].value:
 | 
						||
            type = sheet['a'+str(i)].value.replace(' ', '')
 | 
						||
            questioncat = sheet['b'+str(i)].value
 | 
						||
            if questioncat:
 | 
						||
                questioncat = questioncat.replace(' ', '')
 | 
						||
            else:
 | 
						||
                return Response(str(i)+'行没有分类', status=400)
 | 
						||
            name = sheet['c'+str(i)].value
 | 
						||
 | 
						||
            answer = {}
 | 
						||
            if sheet['d'+str(i)].value:
 | 
						||
                answer['A'] = sheet['d'+str(i)].value
 | 
						||
            if sheet['e'+str(i)].value:
 | 
						||
                answer['B'] = sheet['e'+str(i)].value
 | 
						||
            if sheet['f'+str(i)].value:
 | 
						||
                answer['C'] = sheet['f'+str(i)].value
 | 
						||
            if sheet['g'+str(i)].value:
 | 
						||
                answer['D'] = sheet['g'+str(i)].value
 | 
						||
            if sheet['h'+str(i)].value:
 | 
						||
                answer['E'] = sheet['h'+str(i)].value
 | 
						||
            if sheet['i'+str(i)].value:
 | 
						||
                answer['F'] = sheet['i'+str(i)].value
 | 
						||
            right = sheet['j'+str(i)].value
 | 
						||
            if right:
 | 
						||
                right = right.replace(' ', '')
 | 
						||
            else:
 | 
						||
                return Response(str(i)+'行没有答案', status=400)
 | 
						||
            resolution = sheet['k'+str(i)].value
 | 
						||
            level = sheet['l'+str(i)].value
 | 
						||
            year = sheet['m' + str(i)].value
 | 
						||
            if level:
 | 
						||
                level = level.replace(' ', '')
 | 
						||
            cateobj = None
 | 
						||
            if questioncat not in questioncatdict:
 | 
						||
                return Response(str(i)+"行不存在分类("+questioncat+")!请先新建", status=400)
 | 
						||
            else:
 | 
						||
                cateobj = Questioncat.objects.get(
 | 
						||
                    id=questioncatdict[questioncat])
 | 
						||
            if type == '单选':
 | 
						||
                if Question.objects.filter(type='单选', name=name, year=year, options=answer, questioncat=cateobj).exists():
 | 
						||
                    notinlist.append(i)
 | 
						||
                else:
 | 
						||
                    if right in ['A', 'B', 'C', 'D', 'E', 'F']:
 | 
						||
                        obj = Question()
 | 
						||
                        obj.type = '单选'
 | 
						||
                        if cateobj:
 | 
						||
                            obj.questioncat = cateobj
 | 
						||
                        obj.name = name
 | 
						||
                        obj.options = answer
 | 
						||
                        obj.right = right
 | 
						||
                        obj.resolution = resolution if resolution else ''
 | 
						||
                        obj.year = year if year else None
 | 
						||
                        if level in leveldict:
 | 
						||
                            obj.level = leveldict[level]
 | 
						||
                        else:
 | 
						||
                            obj.level = '低'
 | 
						||
                        obj.save()
 | 
						||
            elif type == '多选':
 | 
						||
                right = list(right.strip())
 | 
						||
                if Question.objects.filter(type='多选', name=name, year=year, options=answer, questioncat=cateobj).exists():
 | 
						||
                    notinlist.append(i)
 | 
						||
                else:
 | 
						||
                    if [False for c in right if c not in qlist]:
 | 
						||
                        pass
 | 
						||
                    else:
 | 
						||
                        obj = Question()
 | 
						||
                        obj.type = '多选'
 | 
						||
                        obj.questioncat = cateobj
 | 
						||
                        obj.name = name
 | 
						||
                        obj.options = answer
 | 
						||
                        obj.right = right
 | 
						||
                        obj.resolution = resolution if resolution else ''
 | 
						||
                        obj.year = year if year else None
 | 
						||
                        if level in leveldict:
 | 
						||
                            obj.level = leveldict[level]
 | 
						||
                        else:
 | 
						||
                            obj.level = '低'
 | 
						||
                        obj.save()
 | 
						||
            elif type == '判断':
 | 
						||
                if right == 'A' or right == '对' or right == '正确':
 | 
						||
                    right = 'A'
 | 
						||
                else:
 | 
						||
                    right = 'B'
 | 
						||
                if Question.objects.filter(type='判断', name=name, is_delete=0,  options={'A': '对', 'B': '错'}, questioncat=cateobj).exists():
 | 
						||
                    notinlist.append(i)
 | 
						||
                else:
 | 
						||
                    obj = Question()
 | 
						||
                    obj.type = '判断'
 | 
						||
                    obj.questioncat = cateobj
 | 
						||
                    obj.name = name
 | 
						||
                    obj.options = {'A': '对', 'B': '错'}
 | 
						||
                    obj.right = right
 | 
						||
                    obj.resolution = resolution if resolution else ''
 | 
						||
                    obj.year = year if year else None
 | 
						||
                    if level in leveldict:
 | 
						||
                        obj.level = leveldict[level]
 | 
						||
                    else:
 | 
						||
                        obj.level = '低'
 | 
						||
                    obj.save()
 | 
						||
            i = i + 1
 | 
						||
        return Response(notinlist, status=200)
 | 
						||
 | 
						||
 | 
						||
class PaperViewSet(ModelViewSet):
 | 
						||
    """
 | 
						||
    试卷增删改查
 | 
						||
    """
 | 
						||
    perms_map = {'get': '*', 'post':'paper', 'put':'paper', 'delete':'paper'}
 | 
						||
    queryset = Paper.objects.all()
 | 
						||
    serializer_class = PaperSerializer
 | 
						||
    ordering = ['id']
 | 
						||
    search_fields = ('name',)
 | 
						||
 | 
						||
    def get_serializer_class(self):
 | 
						||
        if self.action in ['retrieve']:
 | 
						||
            return PaperDetailSerializer
 | 
						||
        elif self.action in ['create', 'update']:
 | 
						||
            return PaperCreateUpdateSerializer
 | 
						||
        return super().get_serializer_class()
 | 
						||
    
 | 
						||
    def create(self, request, *args, **kwargs):
 | 
						||
        sr = PaperCreateUpdateSerializer(data=request.data)
 | 
						||
        sr.is_valid(raise_exception=True)
 | 
						||
        vdata = sr.validated_data
 | 
						||
        vdata['create_by'] = request.user
 | 
						||
        questions_ = vdata.pop('questions_')
 | 
						||
        paper = Paper.objects.create(**vdata)
 | 
						||
        q_list = []
 | 
						||
        for i in questions_:
 | 
						||
            q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper))
 | 
						||
        PaperQuestion.objects.bulk_create(q_list)
 | 
						||
        return Response(status=201)
 | 
						||
 | 
						||
    def update(self, request, *args, **kwargs):
 | 
						||
        # 有考试在执行,不可更新
 | 
						||
        now = timezone.now()
 | 
						||
        paper = self.get_object()
 | 
						||
        if Exam.objects.filter(close_time__gte=now, paper=paper).exists():
 | 
						||
            raise ParseError('存在考试,不可编辑')
 | 
						||
        sr = PaperCreateUpdateSerializer(instance=paper, data=request.data)
 | 
						||
        sr.is_valid(raise_exception=True)
 | 
						||
        vdata = sr.validated_data
 | 
						||
        questions_ = vdata.pop('questions_')
 | 
						||
        vdata['update_by'] = request.user
 | 
						||
        Paper.objects.filter(id=paper.id).update(**vdata)
 | 
						||
        q_list = []
 | 
						||
        for i in questions_:
 | 
						||
            q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper))
 | 
						||
        PaperQuestion.objects.filter(paper=paper).delete()
 | 
						||
        PaperQuestion.objects.bulk_create(q_list)
 | 
						||
        return Response()
 | 
						||
 | 
						||
 | 
						||
class ExamViewSet(CreateUpdateCustomMixin, ModelViewSet):
 | 
						||
    perms_map = {'get': '*', 'post':'exam', 'put':'exam', 'delete':'exam'}
 | 
						||
    queryset = Exam.objects.all().select_related('paper', 'create_by')
 | 
						||
    ordering = ['-id']
 | 
						||
    search_fields = ('name',)
 | 
						||
    serializer_class = ExamListSerializer
 | 
						||
    filterset_class = ExamFilter
 | 
						||
 | 
						||
    def get_serializer_class(self):
 | 
						||
        if self.action in ['create', 'update']:
 | 
						||
            return ExamCreateUpdateSerializer
 | 
						||
        elif self.action in ['retrieve']:
 | 
						||
            return ExamDetailSerializer
 | 
						||
        return super().get_serializer_class()
 | 
						||
 | 
						||
    def destroy(self, request, *args, **kwargs):
 | 
						||
        instance = self.get_object()
 | 
						||
        if ExamRecord.objects.filter(exam=instance).exists():
 | 
						||
            raise ParseError('存在考试记录,禁止删除')
 | 
						||
        instance.delete(soft=False)
 | 
						||
        return Response(status=204)
 | 
						||
 | 
						||
    @action(methods=['post'], detail=True, perms_map=[{'post': '*'}], serializer_class=Serializer, permission_classes = [IsAuthenticated])
 | 
						||
    def start(self, request, *args, **kwargs):
 | 
						||
        """
 | 
						||
        开始考试
 | 
						||
 | 
						||
        开始考试具体题目信息
 | 
						||
        """
 | 
						||
        exam = self.get_object()
 | 
						||
        now = timezone.now()
 | 
						||
        if now < exam.open_time or now > exam.close_time:
 | 
						||
            raise ParseError('不在考试时间范围')
 | 
						||
        tests = ExamRecord.objects.filter(
 | 
						||
                exam=exam, create_by=request.user)
 | 
						||
        chance_used = tests.count()
 | 
						||
        if chance_used > exam.chance:
 | 
						||
            raise ParseError('考试机会已用完')
 | 
						||
        if exam.paper:
 | 
						||
            er = ExamRecord()
 | 
						||
            er.type = '正式考试'
 | 
						||
            er.name = '正式考试' + now.strftime('%Y%m%d%H%M')
 | 
						||
            er.limit = exam.paper.limit
 | 
						||
            er.paper = exam.paper
 | 
						||
            er.total_score = exam.paper.total_score
 | 
						||
            er.start_time = now
 | 
						||
            er.is_pass = False
 | 
						||
            er.exam = exam
 | 
						||
            er.save()
 | 
						||
            ret = {}
 | 
						||
            ret['examrecord'] = er.id
 | 
						||
            pqs = PaperQuestion.objects.filter(paper=exam.paper).order_by('id')
 | 
						||
            details = []
 | 
						||
            for i in pqs:
 | 
						||
                details.append(AnswerDetail(examrecord=er, question=i.question, total_score=i.total_score))
 | 
						||
            AnswerDetail.objects.bulk_create(details)
 | 
						||
            ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id')
 | 
						||
            ret['questions_'] = AnswerDetailOutSerializer(instance=ads, many=True).data
 | 
						||
            return Response(ret)
 | 
						||
        raise ParseError('暂不支持')
 | 
						||
 | 
						||
 | 
						||
class ExamRecordViewSet(ListModelMixin, DestroyModelMixin, GenericViewSet):
 | 
						||
    """
 | 
						||
    考试记录列表和详情
 | 
						||
    """
 | 
						||
    perms_map = {'get': 'examrecord', 'post': '*', 'delete':'examrecord'}
 | 
						||
    queryset = ExamRecord.objects.select_related('create_by')
 | 
						||
    serializer_class = ExamRecordListSerializer
 | 
						||
    ordering_fields = ['create_time', 'score', 'took', 'update_time']
 | 
						||
    ordering = ['-update_time']
 | 
						||
    search_fields = ('create_by__name', 'create_by__username')
 | 
						||
    filterset_class = ExamRecordFilter
 | 
						||
 | 
						||
    def get_serializer_class(self):
 | 
						||
        if self.action == 'retrieve':
 | 
						||
            return ExamRecordDetailSerializer
 | 
						||
        return super().get_serializer_class()
 | 
						||
 | 
						||
    def perform_destroy(self, instance):  # 考试记录物理删除
 | 
						||
        instance.delete(soft=False)
 | 
						||
 | 
						||
    @action(methods=['post'], detail=False, perms_map=[{'post': '*'}], serializer_class=Serializer, permission_classes = [IsAuthenticated])
 | 
						||
    def clear(self, request, pk=None):
 | 
						||
        """
 | 
						||
        清除七日前未提交的考试记录
 | 
						||
 | 
						||
        清除七日前未提交的考试记录
 | 
						||
        """
 | 
						||
        now = timezone.now
 | 
						||
        days7_ago = now - timedelta(days=7)
 | 
						||
        ExamRecord.objects.filter(create_time__lte=days7_ago, is_submited=False).delete(soft=False)
 | 
						||
        return Response(status=False)
 | 
						||
 | 
						||
    @action(methods=['get'], detail=False, permission_classes=[IsAuthenticated])
 | 
						||
    def self(self, request, pk=None):
 | 
						||
        '''
 | 
						||
        个人考试记录
 | 
						||
 | 
						||
        个人考试记录
 | 
						||
        '''
 | 
						||
        queryset = ExamRecord.objects.filter(create_by=request.user).order_by('-update_time')
 | 
						||
        page = self.paginate_queryset(queryset)
 | 
						||
        serializer = self.get_serializer(page, many=True)
 | 
						||
        return self.get_paginated_response(serializer.data)
 | 
						||
    
 | 
						||
    @action(methods=['post'], detail=True, perms_map=[{'post': '*'}], serializer_class=ExamRecordSubmitSerializer, permission_classes = [IsAuthenticated])
 | 
						||
    def submit(self, request, pk=None):
 | 
						||
        '''
 | 
						||
        提交答卷
 | 
						||
 | 
						||
        提交答卷
 | 
						||
        '''
 | 
						||
        er = self.get_object()
 | 
						||
        now = timezone.now()
 | 
						||
        if er.create_by != request.user:
 | 
						||
            raise ParseError('提交人有误')
 | 
						||
        exam = er.exam
 | 
						||
        if not exam:
 | 
						||
            raise ParseError('暂不支持')
 | 
						||
        if now > exam.close_time + timedelta(minutes=30):
 | 
						||
            raise ParseError('考试时间已过, 提交失败')
 | 
						||
        serializer = ExamRecordSubmitSerializer(data = request.data)
 | 
						||
        serializer.is_valid(raise_exception=True)
 | 
						||
        vdata = serializer.validated_data
 | 
						||
        questions_ = vdata['questions_']
 | 
						||
        # 后端判卷
 | 
						||
        ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id').values('id', 'question__right', 'total_score', 'question__type')
 | 
						||
        total_score = 0
 | 
						||
        try:
 | 
						||
            for index, ad in enumerate(ads):
 | 
						||
                ad.user_answer = questions_[index]['user_answer']
 | 
						||
                if ad.question__type == '多选':
 | 
						||
                    if set(ad.question__right) == set(ad.user_answer):
 | 
						||
                        ad.is_right = True
 | 
						||
                        ad.score = ad.total_score
 | 
						||
                else:
 | 
						||
                    if ad.question__right == ad.user_answer:
 | 
						||
                        ad.is_right = True
 | 
						||
                        ad.score = ad.total_score
 | 
						||
                ad.save()
 | 
						||
                total_score = total_score + ad.score
 | 
						||
        except Exception as e:
 | 
						||
            raise ParseError('判卷失败, 请检查试卷:' + str(e))
 | 
						||
        er.score = total_score
 | 
						||
        if er.score > 0.6*er.total_score:
 | 
						||
            er.is_pass = True
 | 
						||
        er.took = (now - er.create_time).total_seconds()
 | 
						||
        er.end_time = now
 | 
						||
        er.is_submited = True
 | 
						||
        er.save()
 | 
						||
        return Response() |