215 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			215 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet
 | 
						|
from apps.utils.mixins import ListModelMixin, DestroyModelMixin
 | 
						|
from rest_framework.mixins import RetrieveModelMixin
 | 
						|
from rest_framework.exceptions import ParseError
 | 
						|
from rest_framework.decorators import action
 | 
						|
from rest_framework.serializers import Serializer
 | 
						|
from django.db import transaction
 | 
						|
from .models import Questioncat, Question, Paper, PaperQuestion, Exam, ExamRecord, AnswerDetail, Train
 | 
						|
from .serializers import (QuestioncatSerializer, QuestionSerializer, ExamSerializer, 
 | 
						|
                          ExamRecordInitSerizlier, ExamRecordSerializer, ExamRecordDetailSerializer, ExamRecordSubmitSerializer,
 | 
						|
                          PaperSerializer, PaperListSerializer, PaperPatchSerializer, ExamDetailSerializer, TrainingSerializer)
 | 
						|
from django.utils import timezone
 | 
						|
from rest_framework.response import Response
 | 
						|
from rest_framework.permissions import IsAuthenticated
 | 
						|
from drf_yasg.utils import swagger_auto_schema
 | 
						|
from apps.utils.permission import has_perm
 | 
						|
from .filters import ExamFilter, ExamRecordFilter
 | 
						|
from apps.system.models import User
 | 
						|
from datetime import datetime
 | 
						|
from apps.utils.snowflake import idWorker
 | 
						|
 | 
						|
# Create your views here.
 | 
						|
class QuestioncatViewSet(CustomModelViewSet):
 | 
						|
    queryset = Questioncat.objects.all()
 | 
						|
    serializer_class = QuestioncatSerializer
 | 
						|
 | 
						|
 | 
						|
class QuestionViewSet(CustomModelViewSet):
 | 
						|
    queryset = Question.objects.all()
 | 
						|
    serializer_class = QuestionSerializer
 | 
						|
    select_related_fields = ["questioncat"]
 | 
						|
    filterset_fields = ["questioncat", "type", "enabled"]
 | 
						|
 | 
						|
    def update(self, request, *args, **kwargs):
 | 
						|
        obj: Question = self.get_object()
 | 
						|
        if AnswerDetail.objects.filter(question=obj).exists():
 | 
						|
            raise ParseError("存在答题,该题目不可编辑")
 | 
						|
        return super().update(request, *args, **kwargs)
 | 
						|
            
 | 
						|
 | 
						|
class PaperViewSet(CustomModelViewSet):
 | 
						|
    queryset = Paper.objects.all()
 | 
						|
    serializer_class = PaperSerializer
 | 
						|
    list_serializer_class = PaperListSerializer
 | 
						|
    partial_update_serializer_class = PaperPatchSerializer
 | 
						|
 | 
						|
    def update(self, request, *args, **kwargs):
 | 
						|
        obj: Paper = self.get_object()
 | 
						|
        qs = Exam.objects.filter(paper=obj)
 | 
						|
        if qs.exists():
 | 
						|
            raise ParseError("存在考试,该试卷不可编辑")
 | 
						|
        return super().update(request, *args, **kwargs)
 | 
						|
 | 
						|
class ExamViewSet(CustomModelViewSet):
 | 
						|
    queryset = Exam.objects.all()
 | 
						|
    select_related_fields = ["paper"]
 | 
						|
    serializer_class = ExamSerializer
 | 
						|
    filterset_class = ExamFilter
 | 
						|
    retrieve_serializer_class = ExamDetailSerializer
 | 
						|
 | 
						|
    def add_info_for_list(self, data):
 | 
						|
        """
 | 
						|
        添加是否可参加字段
 | 
						|
        """
 | 
						|
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
 | 
						|
        for item in data:
 | 
						|
            item["can_attend"] = False
 | 
						|
            if item["open_time"] <= now <= item["close_time"]:
 | 
						|
                item["can_attend"] = True
 | 
						|
        return data
 | 
						|
 | 
						|
    def get_queryset(self):
 | 
						|
        qs = super().get_queryset()
 | 
						|
        if has_perm(self.request.user, ["exam.view"]):
 | 
						|
            return qs
 | 
						|
        user:User = self.request.user
 | 
						|
        dept = None
 | 
						|
        if user.is_authenticated:
 | 
						|
            dept = user.belong_dept
 | 
						|
        qs = qs.filter(is_public=True)
 | 
						|
        qs = qs|qs.filter(p_users=user)
 | 
						|
        if dept:
 | 
						|
            qs = qs|qs.filter(p_depts=dept)
 | 
						|
        return qs
 | 
						|
 | 
						|
    def destroy(self, request, *args, **kwargs):
 | 
						|
        instance = self.get_object()
 | 
						|
        if ExamRecord.objects.filter(exam=instance).exists():
 | 
						|
            raise ParseError('存在考试记录,禁止删除')
 | 
						|
        return super().destroy(request, *args, **kwargs)
 | 
						|
    
 | 
						|
    @swagger_auto_schema(request_body=Serializer, responses={200: ExamRecordInitSerizlier})
 | 
						|
    @action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=Serializer)
 | 
						|
    def attend(self, request, *args, **kwargs):
 | 
						|
        """
 | 
						|
        参加考试
 | 
						|
 | 
						|
        返回考试具体题目信息
 | 
						|
        """
 | 
						|
        exam: Exam = self.get_object()
 | 
						|
        now = timezone.now()
 | 
						|
        if now < exam.open_time or now > exam.close_time:
 | 
						|
            raise ParseError('不在考试时间范围')
 | 
						|
        tests = ExamRecord.objects.filter(
 | 
						|
                    exam=exam, create_by=request.user)
 | 
						|
        chance_used = tests.count()
 | 
						|
        if chance_used > exam.chance:
 | 
						|
            raise ParseError('考试机会已用完')
 | 
						|
        if exam.paper:
 | 
						|
            with transaction.atomic():
 | 
						|
                tests.update(is_last=False)
 | 
						|
                er = ExamRecord()
 | 
						|
                er.start_time = now
 | 
						|
                er.is_pass = False
 | 
						|
                er.is_submited = False
 | 
						|
                er.exam = exam
 | 
						|
                er.create_by = request.user
 | 
						|
                er.is_last = True
 | 
						|
                er.save()
 | 
						|
                pqs = PaperQuestion.objects.filter(paper=exam.paper).order_by('sort', 'id')
 | 
						|
                details = []
 | 
						|
                for i in pqs:
 | 
						|
                    details.append(AnswerDetail(id=idWorker.get_id(), examrecord=er, question=i.question, total_score=i.total_score))
 | 
						|
                AnswerDetail.objects.bulk_create(details)
 | 
						|
            sr = ExamRecordInitSerizlier(er)
 | 
						|
            res_data = sr.data
 | 
						|
            res_data.update({"chance_used": chance_used})
 | 
						|
            return Response(sr.data, status=201)
 | 
						|
        raise ParseError('暂不支持')
 | 
						|
 | 
						|
 | 
						|
class ExamRecordViewSet(ListModelMixin, DestroyModelMixin, RetrieveModelMixin, CustomGenericViewSet):
 | 
						|
    """
 | 
						|
    考试记录
 | 
						|
    """
 | 
						|
    perms_map = {"get": "*", "delete": "examrecord.delete"}
 | 
						|
    queryset = ExamRecord.objects.all()
 | 
						|
    list_serializer_class = ExamRecordSerializer
 | 
						|
    retrieve_serializer_class = ExamRecordDetailSerializer
 | 
						|
    search_fields = ('create_by__name', 'create_by__username', 'exam__name')
 | 
						|
    filterset_class = ExamRecordFilter
 | 
						|
 | 
						|
    def get_queryset(self):
 | 
						|
        qs = super().get_queryset()
 | 
						|
        if has_perm(self.request.user, ["examrecord.view"]):
 | 
						|
            return qs
 | 
						|
        return qs.filter(create_by=self.request.user)
 | 
						|
 | 
						|
    
 | 
						|
    @swagger_auto_schema(request_body=ExamRecordSubmitSerializer, responses={200: ExamRecordSerializer})
 | 
						|
    @action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=ExamRecordSubmitSerializer, permission_classes = [IsAuthenticated])
 | 
						|
    @transaction.atomic
 | 
						|
    def submit(self, request, pk=None):
 | 
						|
        '''
 | 
						|
        提交答卷
 | 
						|
 | 
						|
        提交答卷
 | 
						|
        '''
 | 
						|
        er: ExamRecord = self.get_object()
 | 
						|
        now = timezone.now()
 | 
						|
        if er.is_submited:
 | 
						|
            raise ParseError('该考试记录已提交')
 | 
						|
        exam:Exam = er.exam
 | 
						|
        if not exam:
 | 
						|
            raise ParseError('暂不支持')
 | 
						|
        took = (now - er.start_time).total_seconds()
 | 
						|
        if took > exam.paper.limit * 60:
 | 
						|
            raise ParseError('答题时间超时,提交失败')
 | 
						|
        serializer = ExamRecordSubmitSerializer(data = request.data)
 | 
						|
        serializer.is_valid(raise_exception=True)
 | 
						|
        vdata = serializer.validated_data
 | 
						|
        detail = vdata['detail']
 | 
						|
        # 后端判卷
 | 
						|
        ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id')
 | 
						|
        total_score = 0
 | 
						|
        try:
 | 
						|
            for index, ad in enumerate(ads):
 | 
						|
                ad.user_answer = detail[index]['user_answer']
 | 
						|
                if ad.question.type == '多选':
 | 
						|
                    if set(ad.question.right) == set(ad.user_answer):
 | 
						|
                        ad.is_right = True
 | 
						|
                        ad.score = ad.total_score
 | 
						|
                else:
 | 
						|
                    if ad.question.right == ad.user_answer:
 | 
						|
                        ad.is_right = True
 | 
						|
                        ad.score = ad.total_score
 | 
						|
                ad.save()
 | 
						|
                total_score = total_score + ad.score
 | 
						|
        except Exception as e:
 | 
						|
            raise ParseError('判卷失败, 请检查试卷:' + str(e))
 | 
						|
        er.score = total_score
 | 
						|
        er.took = took
 | 
						|
        er.end_time = now
 | 
						|
        er.is_submited = True
 | 
						|
        er.save()
 | 
						|
        return Response(ExamRecordSerializer(er).data)
 | 
						|
    
 | 
						|
 | 
						|
class TrainRecordViewSet(CustomModelViewSet):
 | 
						|
    '''
 | 
						|
    培训记录
 | 
						|
    '''
 | 
						|
    perms_map = {"get": "*", "delete": "train.delete","post": "train.create","put": "train.update"}
 | 
						|
    queryset = Train.objects.all()
 | 
						|
    serializer_class = TrainingSerializer
 | 
						|
    search_fields = ('create_by__name', 'create_by__username', 'name')
 | 
						|
 | 
						|
    def get_queryset(self):
 | 
						|
        qs = super().get_queryset()
 | 
						|
        if has_perm(self.request.user, ["train.view"]):
 | 
						|
            return qs
 | 
						|
        return qs.filter(create_by=self.request.user)
 | 
						|
 | 
						|
    
 |