216 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			216 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
| from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet
 | |
| from apps.utils.mixins import ListModelMixin, DestroyModelMixin
 | |
| from rest_framework.mixins import RetrieveModelMixin
 | |
| from rest_framework.exceptions import ParseError
 | |
| from rest_framework.decorators import action
 | |
| from rest_framework.serializers import Serializer
 | |
| from django.db import transaction
 | |
| from .models import Questioncat, Question, Paper, PaperQuestion, Exam, ExamRecord, AnswerDetail, Train
 | |
| from .serializers import (QuestioncatSerializer, QuestionSerializer, ExamSerializer, 
 | |
|                           ExamRecordInitSerizlier, ExamRecordSerializer, ExamRecordDetailSerializer, ExamRecordSubmitSerializer,
 | |
|                           PaperSerializer, PaperListSerializer, PaperPatchSerializer, ExamDetailSerializer, TrainingSerializer)
 | |
| from django.utils import timezone
 | |
| from rest_framework.response import Response
 | |
| from rest_framework.permissions import IsAuthenticated
 | |
| from drf_yasg.utils import swagger_auto_schema
 | |
| from apps.utils.permission import has_perm
 | |
| from .filters import ExamFilter, ExamRecordFilter
 | |
| from apps.system.models import User
 | |
| from datetime import datetime
 | |
| from apps.utils.snowflake import idWorker
 | |
| 
 | |
| # Create your views here.
 | |
| class QuestioncatViewSet(CustomModelViewSet):
 | |
|     queryset = Questioncat.objects.all()
 | |
|     serializer_class = QuestioncatSerializer
 | |
| 
 | |
| 
 | |
| class QuestionViewSet(CustomModelViewSet):
 | |
|     queryset = Question.objects.all()
 | |
|     serializer_class = QuestionSerializer
 | |
|     select_related_fields = ["questioncat"]
 | |
|     filterset_fields = ["questioncat", "type", "enabled"]
 | |
| 
 | |
|     def update(self, request, *args, **kwargs):
 | |
|         obj: Question = self.get_object()
 | |
|         if AnswerDetail.objects.filter(question=obj).exists():
 | |
|             raise ParseError("存在答题,该题目不可编辑")
 | |
|         return super().update(request, *args, **kwargs)
 | |
|             
 | |
| 
 | |
| class PaperViewSet(CustomModelViewSet):
 | |
|     queryset = Paper.objects.all()
 | |
|     serializer_class = PaperSerializer
 | |
|     list_serializer_class = PaperListSerializer
 | |
|     partial_update_serializer_class = PaperPatchSerializer
 | |
| 
 | |
|     def update(self, request, *args, **kwargs):
 | |
|         obj: Paper = self.get_object()
 | |
|         qs = Exam.objects.filter(paper=obj)
 | |
|         if qs.exists():
 | |
|             raise ParseError("存在考试,该试卷不可编辑")
 | |
|         return super().update(request, *args, **kwargs)
 | |
| 
 | |
| class ExamViewSet(CustomModelViewSet):
 | |
|     queryset = Exam.objects.all()
 | |
|     select_related_fields = ["paper"]
 | |
|     serializer_class = ExamSerializer
 | |
|     filterset_class = ExamFilter
 | |
|     retrieve_serializer_class = ExamDetailSerializer
 | |
| 
 | |
|     def add_info_for_list(self, data):
 | |
|         """
 | |
|         添加是否可参加字段
 | |
|         """
 | |
|         now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
 | |
|         for item in data:
 | |
|             item["can_attend"] = False
 | |
|             if item["open_time"] <= now <= item["close_time"]:
 | |
|                 item["can_attend"] = True
 | |
|         return data
 | |
| 
 | |
|     def get_queryset(self):
 | |
|         qs = super().get_queryset()
 | |
|         if has_perm(self.request.user, ["exam.view"]):
 | |
|             return qs
 | |
|         user:User = self.request.user
 | |
|         dept = None
 | |
|         if user.is_authenticated:
 | |
|             dept = user.belong_dept
 | |
|         qs = qs.filter(is_public=True)
 | |
|         qs = qs|qs.filter(p_users=user)
 | |
|         if dept:
 | |
|             qs = qs|qs.filter(p_depts=dept)
 | |
|         return qs
 | |
| 
 | |
|     def destroy(self, request, *args, **kwargs):
 | |
|         instance = self.get_object()
 | |
|         if ExamRecord.objects.filter(exam=instance).exists():
 | |
|             raise ParseError('存在考试记录,禁止删除')
 | |
|         return super().destroy(request, *args, **kwargs)
 | |
|     
 | |
|     @swagger_auto_schema(request_body=Serializer, responses={200: ExamRecordInitSerizlier})
 | |
|     @action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=Serializer)
 | |
|     def attend(self, request, *args, **kwargs):
 | |
|         """
 | |
|         参加考试
 | |
| 
 | |
|         返回考试具体题目信息
 | |
|         """
 | |
|         exam: Exam = self.get_object()
 | |
|         now = timezone.now()
 | |
|         if now < exam.open_time or now > exam.close_time:
 | |
|             raise ParseError('不在考试时间范围')
 | |
|         tests = ExamRecord.objects.filter(
 | |
|                     exam=exam, create_by=request.user)
 | |
|         chance_used = tests.count()
 | |
|         if chance_used > exam.chance:
 | |
|             raise ParseError('考试机会已用完')
 | |
|         if exam.paper:
 | |
|             with transaction.atomic():
 | |
|                 tests.update(is_last=False)
 | |
|                 er = ExamRecord()
 | |
|                 er.start_time = now
 | |
|                 er.is_pass = False
 | |
|                 er.is_submited = False
 | |
|                 er.exam = exam
 | |
|                 er.create_by = request.user
 | |
|                 er.is_last = True
 | |
|                 er.save()
 | |
|                 pqs = PaperQuestion.objects.filter(paper=exam.paper).order_by('sort', 'id')
 | |
|                 details = []
 | |
|                 for i in pqs:
 | |
|                     details.append(AnswerDetail(id=idWorker.get_id(), examrecord=er, question=i.question, total_score=i.total_score))
 | |
|                 AnswerDetail.objects.bulk_create(details)
 | |
|             sr = ExamRecordInitSerizlier(er)
 | |
|             res_data = sr.data
 | |
|             print(res_data)
 | |
|             res_data.update({"chance_used": chance_used})
 | |
|             return Response(sr.data, status=201)
 | |
|         raise ParseError('暂不支持')
 | |
| 
 | |
| 
 | |
| class ExamRecordViewSet(ListModelMixin, DestroyModelMixin, RetrieveModelMixin, CustomGenericViewSet):
 | |
|     """
 | |
|     考试记录
 | |
|     """
 | |
|     perms_map = {"get": "*", "delete": "examrecord.delete"}
 | |
|     queryset = ExamRecord.objects.all()
 | |
|     list_serializer_class = ExamRecordSerializer
 | |
|     retrieve_serializer_class = ExamRecordDetailSerializer
 | |
|     search_fields = ('create_by__name', 'create_by__username', 'exam__name')
 | |
|     filterset_class = ExamRecordFilter
 | |
| 
 | |
|     def get_queryset(self):
 | |
|         qs = super().get_queryset()
 | |
|         if has_perm(self.request.user, ["examrecord.view"]):
 | |
|             return qs
 | |
|         return qs.filter(create_by=self.request.user)
 | |
| 
 | |
|     
 | |
|     @swagger_auto_schema(request_body=ExamRecordSubmitSerializer, responses={200: ExamRecordSerializer})
 | |
|     @action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=ExamRecordSubmitSerializer, permission_classes = [IsAuthenticated])
 | |
|     @transaction.atomic
 | |
|     def submit(self, request, pk=None):
 | |
|         '''
 | |
|         提交答卷
 | |
| 
 | |
|         提交答卷
 | |
|         '''
 | |
|         er: ExamRecord = self.get_object()
 | |
|         now = timezone.now()
 | |
|         if er.is_submited:
 | |
|             raise ParseError('该考试记录已提交')
 | |
|         exam:Exam = er.exam
 | |
|         if not exam:
 | |
|             raise ParseError('暂不支持')
 | |
|         took = (now - er.start_time).total_seconds()
 | |
|         if took > exam.paper.limit * 60:
 | |
|             raise ParseError('答题时间超时,提交失败')
 | |
|         serializer = ExamRecordSubmitSerializer(data = request.data)
 | |
|         serializer.is_valid(raise_exception=True)
 | |
|         vdata = serializer.validated_data
 | |
|         detail = vdata['detail']
 | |
|         # 后端判卷
 | |
|         ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id')
 | |
|         total_score = 0
 | |
|         try:
 | |
|             for index, ad in enumerate(ads):
 | |
|                 ad.user_answer = detail[index]['user_answer']
 | |
|                 if ad.question.type == '多选':
 | |
|                     if set(ad.question.right) == set(ad.user_answer):
 | |
|                         ad.is_right = True
 | |
|                         ad.score = ad.total_score
 | |
|                 else:
 | |
|                     if ad.question.right == ad.user_answer:
 | |
|                         ad.is_right = True
 | |
|                         ad.score = ad.total_score
 | |
|                 ad.save()
 | |
|                 total_score = total_score + ad.score
 | |
|         except Exception as e:
 | |
|             raise ParseError('判卷失败, 请检查试卷:' + str(e))
 | |
|         er.score = total_score
 | |
|         er.took = took
 | |
|         er.end_time = now
 | |
|         er.is_submited = True
 | |
|         er.save()
 | |
|         return Response(ExamRecordSerializer(er).data)
 | |
|     
 | |
| 
 | |
| class TrainRecordViewSet(CustomModelViewSet):
 | |
|     '''
 | |
|     培训记录
 | |
|     '''
 | |
|     perms_map = {"get": "*", "delete": "train.delete","post": "train.create","put": "train.update"}
 | |
|     queryset = Train.objects.all()
 | |
|     serializer_class = TrainingSerializer
 | |
|     search_fields = ('create_by__name', 'create_by__username', 'name')
 | |
| 
 | |
|     def get_queryset(self):
 | |
|         qs = super().get_queryset()
 | |
|         if has_perm(self.request.user, ["train.view"]):
 | |
|             return qs
 | |
|         return qs.filter(create_by=self.request.user)
 | |
| 
 | |
|     
 |