examtest/test_server/examtest/views.py

480 lines
21 KiB
Python

from datetime import datetime
from django.db.models import Avg
from django.db.models.query import QuerySet
from django_filters.rest_framework import DjangoFilterBackend
from openpyxl import Workbook, load_workbook
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.generics import GenericAPIView
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
from question.models import Question
from question.serializers import QuestionSerializer
from server import settings
from utils.custom import CommonPagination
from utils.mixins import OptimizationMixin
from .exports import export_test
from .models import AnswerDetail, Banner, ExamTest, Exam
from .models_paper import Paper, PaperQuestions, TestRule, WorkScope
from .serializers import (
AnswerDetailCreateSerializer, AnswerDetailSerializer, BannerSerializer,
ExamTestListSerializer, MoniTestSerializer, PaperDetailSerializer,
PaperQuestionsCreateSerializer, PaperSerializer, TestRuleSerializer,
WorkScopeSerializer, ExamCreateUpdateSerializer, ExamListSerializer)
# Create your views here.
class ExamViewSet(ModelViewSet):
"""
正式考试增删改查
"""
perms_map = [
{'get': 'exam_view'}, {'post': 'exam_create'},
{'put': 'exam_update'}, {'delete': 'exam_delete'}]
pagination_class = CommonPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
ordering = ['-id']
search_fields = ['name']
def get_queryset(self):
queryset = Exam.objects.all()
if not self.request.user.is_superuser:
queryset = queryset.filter(create_admin = self.request.user)
if hasattr(self.get_serializer_class(), 'setup_eager_loading'):
queryset = self.get_serializer_class().setup_eager_loading(queryset) # 性能优化
return queryset
def get_serializer_class(self):
if self.action in ['create', 'update']:
return ExamCreateUpdateSerializer
return ExamListSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
instance = serializer.save(create_admin=request.user)
instance.code = instance.pk + 10000
instance.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
if ExamTest.objects.filter(exam=instance).exists():
return Response({'error':'存在考试记录,禁止删除'})
instance.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
class AnswerDetailView(APIView):
authentication_classes = []
permission_classes = []
def get(self, request, *args, **kwargs):
queryset = AnswerDetail.objects.all()
if request.query_params.get('examtest', None):
queryset = queryset.filter(examtest=request.query_params.get('examtest')).order_by('question__type')
queryset = AnswerDetailSerializer.setup_eager_loading(queryset)
serializer = AnswerDetailSerializer(instance=queryset,many=True)
return Response(serializer.data)
class WorkScopeViewSet(ModelViewSet):
"""
工作类别:增删改查
"""
perms_map = [
{'get': 'workscope_list'}, {'post': 'workscope_create'},
{'put': 'workscope_update'}, {'delete': 'workscope_delete'}]
pagination_class = None
queryset = WorkScope.objects.filter(is_delete=0).all().order_by("id")
serializer_class = WorkScopeSerializer
ordering_fields = ('id',)
ordering = ['id']
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['subject', 'can_exam']
search_fields = ('name',)
def get_queryset(self):
queryset = self.queryset
queryset = self.get_serializer_class().setup_eager_loading(queryset)
return queryset
@action(methods=['get'], detail=True,url_path='monitest', url_name='gen_monitest', perms_map=[{'get':'gen_monitest'}])
def monitest(self, request, pk=None):
'''
生成自助模拟考试
'''
ret = {}
workscope = self.get_object()
ret['name'] = '自助模考' + datetime.now().strftime('%Y%m%d%H%M')
ret['type'] = '自助模考' # 自助模拟考试
ret['workscope'] = workscope.id
ret['limit'] = workscope.rule.limit
ret['total_score'] = workscope.rule.total_score
ret['pass_score'] = workscope.rule.pass_score
ret['danxuan_count'] = workscope.rule.danxuan_count
ret['danxuan_score'] = workscope.rule.danxuan_score
ret['duoxuan_count'] = workscope.rule.duoxuan_count
ret['duoxuan_score'] = workscope.rule.duoxuan_score
ret['panduan_count'] = workscope.rule.panduan_count
ret['panduan_score'] = workscope.rule.panduan_score
question_queryset = Question.objects.none()
questioncats = workscope.questioncat.exclude(name='辐射小知识更新中').order_by('type', 'create_time')
if questioncats.count() == 3:
import random
b2 = random.randint(1,8)
b3 = random.randint(1,9-b2)
b1 = 10 - b2 -b3
a1 = 2*b2 + 2*b3 - 2
a2 = 18 - 2*b2
a3 = 24 - 2*b3
queryset = Question.objects.filter(is_delete=0)
a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:a1]
a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:a2]
a3_set = queryset.filter(questioncat=questioncats[2], type='单选').order_by('?')[:a3]
b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:b1]
b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:b2]
b3_set = queryset.filter(questioncat=questioncats[2], type='多选').order_by('?')[:b3]
question_queryset = question_queryset|a1_set|a2_set|a3_set|b1_set|b2_set|b3_set
elif workscope.name == '辐射安全管理':
# 辐射安全管理出卷规则
queryset = Question.objects.filter(is_delete=0)
a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:16]
a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:24]
b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:4]
b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:6]
question_queryset = question_queryset|a1_set|a2_set|b1_set|b2_set
elif workscope.name == '科研、生产及其他':
# 科研、生产及其他出卷规则
queryset = Question.objects.filter(is_delete=0)
a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:24]
a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:16]
b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:6]
b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:4]
question_queryset = question_queryset|a1_set|a2_set|b1_set|b2_set
else:
queryset = Question.objects.filter(is_delete=0,questioncat__in = workscope.questioncat.all())
if ret['danxuan_count']:
danxuan = queryset.filter(type='单选').order_by('?')[:ret['danxuan_count']]
question_queryset = question_queryset | danxuan
if ret['duoxuan_count']:
duoxuan = queryset.filter(type='多选').order_by('?')[:ret['duoxuan_count']]
question_queryset = question_queryset | duoxuan
if ret['panduan_count']:
panduan = queryset.filter(type='判断').order_by('?')[:ret['panduan_count']]
question_queryset = question_queryset | panduan
questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data
for i in questions:
if i['type'] == '单选':
i['total_score'] = ret['danxuan_score']
elif i['type'] == '多选':
i['total_score'] = ret['duoxuan_score']
else:
i['total_score'] = ret['panduan_score']
ret['questions'] = questions
return Response(ret)
@action(methods=['get'], detail=False,url_path='monitest2', url_name='gen_monitest2', perms_map=[{'get':'gen_monitest'}])
def monitest2(self, request, pk=None):
'''
生成体验版自助模拟考试
'''
ret = {}
rule = TestRule.objects.first()
ret['name'] = '自助模考' + datetime.now().strftime('%Y%m%d%H%M')
ret['type'] = '自助模考' # 自助模拟考试
ret['limit'] = rule.limit
ret['total_score'] = rule.total_score
ret['pass_score'] = rule.pass_score
ret['danxuan_count'] = rule.danxuan_count
ret['danxuan_score'] = rule.danxuan_score
ret['duoxuan_count'] = rule.duoxuan_count
ret['duoxuan_score'] = rule.duoxuan_score
ret['panduan_count'] = rule.panduan_count
ret['panduan_score'] = rule.panduan_score
question_queryset = Question.objects.none()
queryset = Question.objects.filter(is_delete=0)
if ret['danxuan_count']:
danxuan = queryset.filter(type='单选').order_by('?')[:ret['danxuan_count']]
question_queryset = question_queryset | danxuan
if ret['duoxuan_count']:
duoxuan = queryset.filter(type='多选').order_by('?')[:ret['duoxuan_count']]
question_queryset = question_queryset | duoxuan
if ret['panduan_count']:
panduan = queryset.filter(type='判断').order_by('?')[:ret['panduan_count']]
question_queryset = question_queryset | panduan
questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data
for i in questions:
if i['type'] == '单选':
i['total_score'] = ret['danxuan_score']
elif i['type'] == '多选':
i['total_score'] = ret['duoxuan_score']
else:
i['total_score'] = ret['panduan_score']
ret['questions'] = questions
user = request.user
user.remain_count = user.remain_count -1
request.user.save()
ret['remain_count'] = user.remain_count
return Response(ret)
class BannerViewSet(ModelViewSet):
"""
轮播图:增删改查
"""
perms_map = (
{'get': 'banner_list'}, {'post': 'banner_create'},
{'put': 'banner_update'}, {'delete': 'banner_delete'})
pagination_class = None
queryset = Banner.objects.filter(is_delete=0).all().order_by("sort")
serializer_class = BannerSerializer
ordering_fields = ('id', 'sort')
ordering = ['sort']
def get_authenticators(self):
if self.request.method == 'GET':
self.authentication_classes = []
return [auth() for auth in self.authentication_classes]
def get_permissions(self):
if self.request.method == 'GET':
self.permission_classes = []
return [permission() for permission in self.permission_classes]
class TestRuleViewSet(ModelViewSet):
"""
模考规则:增删改查
"""
perms_map = (
{'get': 'testrule_list'}, {'post': 'testrule_create'},
{'put': 'testrule_update'}, {'delete': 'testrule_delete'})
pagination_class = None
queryset = TestRule.objects.filter(is_delete=0).all().order_by("id")
serializer_class = TestRuleSerializer
ordering_fields = ('id',)
ordering = ['id']
search_fields = ('^name',)
def get_authenticators(self):
if self.request.method == 'GET':
self.authentication_classes = []
return [auth() for auth in self.authentication_classes]
def get_permissions(self):
if self.request.method == 'GET':
self.permission_classes = []
return [permission() for permission in self.permission_classes]
class ExamTestViewSet(ModelViewSet):
"""
考试记录列表和详情
"""
perms_map = [{'get': 'examtest_view'},{'post': '*'}]
pagination_class = CommonPagination
queryset = ExamTest.objects.filter(is_delete=0).all()
serializer_class = ExamTestListSerializer
ordering_fields = ('id','create_time','took','score')
ordering = ['-create_time']
search_fields = ('consumer__name', 'consumer__company__name')
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['type','is_pass']
def get_queryset(self):
assert self.queryset is not None, (
"'%s' should either include a `queryset` attribute, "
"or override the `get_queryset()` method."
% self.__class__.__name__
)
queryset = self.queryset
if isinstance(queryset, QuerySet):
# Ensure queryset is re-evaluated on each request.
queryset = queryset.all()
if self.request.query_params.get('start'):
queryset = queryset.filter(start_time__gte=self.request.query_params['start'] )
if self.request.query_params.get('end'):
queryset = queryset.filter(start_time__lte=self.request.query_params['end'])
if not self.request.user.is_superuser:
queryset = queryset.filter(consumer__create_admin = self.request.user)
return queryset
@action(methods=['get'], detail=False,url_path='self', url_name='selftest', perms_map = [{'*':'my_examtest'}])
def selftest(self, request, pk=None):
'''
个人考试记录
'''
queryset = ExamTest.objects.filter(consumer=request.user).order_by('-create_time')
pg = CommonPagination()
p = pg.paginate_queryset(queryset=queryset,request=request,view=self)
serializer = ExamTestListSerializer(instance=p,many=True)
return pg.get_paginated_response(serializer.data)
@action(methods=['get'], detail=False,url_path='fx', url_name='selffx', perms_map = [{'*':'my_examtest'}])
def selffx(self, request, pk=None):
'''
个人考试分析
'''
queryset = ExamTest.objects.filter(consumer=request.user)
ret = {}
ret['total'] = queryset.count()
ret['avg_score'] = round(queryset.aggregate(avg=Avg('score'))['avg']) if ret['total'] else 0
ret['pass_rate'] = round(((queryset.filter(is_pass=True).count())/ret['total'])*100) if ret['total'] else 0
return Response(ret)
def create(self, request, *args, **kwargs):
serializer = MoniTestSerializer(data = request.data)
if serializer.is_valid():
instance = serializer.save(consumer = request.user)
if 'questions' in request.data:
questions = []
for i in request.data['questions']:
question = {}
question['question'] = i['id']
question['examtest'] = instance.id
question['score'] = i['score']
if 'user_answer' in i:
question['user_answer'] = i['user_answer']
question['is_right'] = i['is_right']
questions.append(question)
serializer_detail = AnswerDetailCreateSerializer(data=questions, many=True)
if serializer_detail.is_valid():
serializer_detail.save()
return Response(MoniTestSerializer(instance).data,status=status.HTTP_200_OK)
else:
return Response(serializer_detail.errors)
else:
return Response({'error':'答题记录不存在'})
else:
return Response(serializer.errors)
@action(methods=['get'], detail=False,
url_path='export', url_name='export_test', perms_map=[{'*':'export_test'}])
def export(self, request):
queryset = self.filter_queryset(self.get_queryset())
queryset = ExamTestListSerializer.setup_eager_loading(queryset) # 性能优化
if queryset.count()>1000:
return Response({'error':'数据量超过1000,请筛选后导出!'})
serializer = ExamTestListSerializer(instance=queryset, many=True)
path = export_test(serializer.data)
return Response({'path': path})
class PaperViewSet(ModelViewSet):
"""
押题卷增删改查
"""
perms_map = [
{'get': 'paper_view'}, {'post': 'paper_create'},
{'put': 'paper_update'}, {'delete': 'paper_delete'}]
queryset = Paper.objects.filter(is_delete=0).all().order_by("id")
pagination_class = CommonPagination
serializer_class = PaperSerializer
ordering_fields = ('id',)
ordering = ['id']
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['workscope']
search_fields = ('name',)
def get_serializer_class(self):
if self.action=='list':
return PaperSerializer
else:
return PaperDetailSerializer
def create(self, request, *args, **kwargs):
data = request.data
serializer = self.get_serializer(data=data)
serializer.is_valid(raise_exception=True)
instance = serializer.save()
if 'questions' in data:
questions = []
for i in data['questions']:
question = {}
question['question'] = i['id']
question['paper'] = instance.id
question['total_score'] = i['total_score']
questions.append(question)
serializer_detail = PaperQuestionsCreateSerializer(data=questions, many=True)
if serializer_detail.is_valid():
serializer_detail.save()
return Response(status=status.HTTP_200_OK)
else:
return Response(serializer_detail.errors)
else:
return Response({'error':'不存在题库'})
def update(self, request, *args, **kwargs):
data = request.data
instance = self.get_object()
serializer = self.get_serializer(instance, data=data)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
if 'questions' in data:
questions = []
for i in data['questions']:
question = {}
question['question'] = i['id']
question['paper'] = instance.id
question['total_score'] = i['total_score']
questions.append(question)
serializer_detail = PaperQuestionsCreateSerializer(data=questions, many=True)
if serializer_detail.is_valid():
PaperQuestions.objects.filter(paper=instance).delete()
serializer_detail.save()
return Response(status=status.HTTP_200_OK)
else:
return Response(serializer_detail.errors)
else:
return Response({'error':'不存在题库'})
@action(methods=['get'], detail=True, url_path='monitest', url_name='gen_monitest',
authentication_classes=[],permission_classes=[],
perms_map=[{'get':'gen_monitest'}])
def monitest(self, request, pk=None):
'''
生成押卷模拟考试
'''
ret = {}
paper = self.get_object()
serializer = PaperDetailSerializer(instance=paper)
ret = serializer.data
ret['name'] = '押卷模考' + datetime.now().strftime('%Y%m%d%H%M')
ret['type'] = '押卷模考'
ret['paper'] = paper.id
return Response(ret)
@action(methods=['put'], detail=True, url_path='clone', url_name='clone_paper',
perms_map=[{'put':'clone_paper'}])
def clone(self, request, pk=None):
'''
克隆试卷
'''
paper = self.get_object()
obj = Paper()
obj.name = '克隆卷-'+paper.name
obj.workscope = paper.workscope
obj.limit = paper.limit
obj.total_score = paper.total_score
obj.pass_score = paper.pass_score
obj.danxuan_count = paper.danxuan_count
obj.danxuan_score = paper.danxuan_score
obj.duoxuan_count = paper.duoxuan_count
obj.duoxuan_score = paper.duoxuan_score
obj.panduan_count = paper.panduan_count
obj.panduan_score = paper.panduan_score
obj.save()
for i in PaperQuestions.objects.filter(paper=paper):
o = PaperQuestions()
o.paper = obj
o.question = i.question
o.total_score = i.total_score
o.save()
return Response(status=status.HTTP_200_OK)