644 lines
29 KiB
Python
644 lines
29 KiB
Python
from crm.models import Candidate
|
|
from datetime import datetime, timedelta
|
|
|
|
from django.db.models import Avg
|
|
from django.db.models.query import QuerySet
|
|
from django.utils.translation import get_language_from_request
|
|
from django_filters.rest_framework import DjangoFilterBackend
|
|
from openpyxl import Workbook, load_workbook
|
|
from rest_framework import status
|
|
from rest_framework.decorators import action
|
|
from rest_framework.filters import OrderingFilter, SearchFilter
|
|
from rest_framework.generics import GenericAPIView
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework.serializers import Serializer
|
|
from rest_framework.views import APIView
|
|
from rest_framework.viewsets import ModelViewSet
|
|
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
|
|
|
|
from question.models import Question
|
|
from question.serializers import QuestionSerializer
|
|
from server import settings
|
|
from utils.custom import CommonPagination
|
|
from utils.mixins import OptimizationMixin
|
|
|
|
from .exports import export_test, exportw_test
|
|
from .models import AnswerDetail, Banner, ExamTest, Exam
|
|
from .models_paper import Paper, PaperQuestions, TestRule, WorkScope
|
|
from .serializers import (
|
|
AnswerDetailCreateSerializer, AnswerDetailSerializer, BannerSerializer, ExamTestExamListSerializer,
|
|
ExamTestListSerializer, MoniTestSerializer, PaperDetailSerializer,
|
|
PaperQuestionsCreateSerializer, PaperSerializer, TestRuleSerializer,
|
|
WorkScopeSerializer, ExamCreateUpdateSerializer, ExamListSerializer, ExamTestDetailSerializer)
|
|
from django.utils import timezone
|
|
from django.db.models import Q
|
|
from utils.pagination import PageOrNot
|
|
# Create your views here.
|
|
|
|
class ExamViewSet(ModelViewSet):
|
|
"""
|
|
正式考试增删改查
|
|
"""
|
|
perms_map = [
|
|
{'get': 'exam_view'}, {'post': 'exam_create'},
|
|
{'put': 'exam_update'}, {'delete': 'exam_delete'}]
|
|
pagination_class = CommonPagination
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
ordering = ['-id']
|
|
search_fields = ['name']
|
|
|
|
def get_queryset(self):
|
|
queryset = Exam.objects.all()
|
|
if hasattr(self.get_serializer_class(), 'setup_eager_loading'):
|
|
queryset = self.get_serializer_class().setup_eager_loading(queryset) # 性能优化
|
|
if self.request.user.is_superuser:
|
|
return queryset
|
|
roles = self.request.user.roles.values_list('name', flat=True)
|
|
if '普通管理' in roles:
|
|
queryset = queryset.filter(create_admin = self.request.user)
|
|
elif '省管理' in roles:
|
|
if self.request.user.pname:
|
|
queryset = queryset.filter(examtest_exam__consumer__company__geo__pname = self.request.user.pname)
|
|
else:
|
|
return queryset.objects.none()
|
|
elif '本单位管理' in roles:
|
|
if self.request.user.bcompany:
|
|
queryset = queryset.filter(examtest_exam__consumer__company = self.request.user.bcompany)
|
|
else:
|
|
return queryset.objects.none()
|
|
else:
|
|
return queryset.none()
|
|
|
|
return queryset
|
|
|
|
def get_serializer_class(self):
|
|
if self.action in ['create', 'update']:
|
|
return ExamCreateUpdateSerializer
|
|
return ExamListSerializer
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
instance = serializer.save(create_admin=request.user)
|
|
instance.code = instance.pk + 10000
|
|
instance.save()
|
|
return Response({"code":instance.code, "id":instance.pk}, status=status.HTTP_201_CREATED)
|
|
|
|
def destroy(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
if ExamTest.objects.filter(exam=instance).exists():
|
|
return Response({'error':'存在考试记录,禁止删除'})
|
|
instance.delete()
|
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
|
|
|
|
|
@action(methods=['post'], detail = False,perms_map=[{'post':'exam_attend'}])
|
|
def attend(self, request, *args, **kwargs):
|
|
"""
|
|
参加考试
|
|
"""
|
|
if request.data.get('deptname', None):
|
|
request.user.deptname = request.data['deptname']
|
|
request.user.save()
|
|
if request.data.get('code', None):
|
|
code = request.data.get('code')
|
|
now = timezone.now()
|
|
try:
|
|
exam = Exam.objects.get(code=code, opentime__lt=now, closetime__gt=now)
|
|
tests = ExamTest.objects.filter(exam=exam, consumer=request.user)
|
|
if tests.count() < exam.chance: # 还有考试机会就可以接着考
|
|
return Response({'exam':exam.pk})
|
|
return Response({'error':'考试机会不足'})
|
|
except:
|
|
return Response({'error':'考试编号不存在'})
|
|
return Response({'error':'操作失败'})
|
|
|
|
@action(methods=['post'], detail = True,perms_map=[{'post':'exam_attend'}])
|
|
def init(self, request, *args, **kwargs):
|
|
"""
|
|
生成考试卷
|
|
"""
|
|
obj = Exam.objects.get(pk=kwargs['pk'])
|
|
workscope = obj.workscope
|
|
ret = {}
|
|
if workscope.name in ['医学Ⅲ类', '非医学Ⅲ类']:
|
|
ret['name'] = obj.name
|
|
ret['consumer_detail'] = request.data
|
|
ret['type'] = '正式考试' # 正式考试
|
|
ret['exam'] = kwargs['pk']
|
|
ret['exam_'] = ExamListSerializer(instance=obj).data
|
|
ret['workscope'] = workscope.id
|
|
ret['limit'] = 45
|
|
ret['total_score'] = 120
|
|
ret['pass_score'] = 90
|
|
ret['danxuan_count'] = 40
|
|
ret['danxuan_score'] = 2
|
|
ret['duoxuan_count'] = 10
|
|
ret['duoxuan_score'] = 4
|
|
question_queryset = Question.objects.none()
|
|
questioncats = workscope.questioncat.order_by('type', 'create_time')
|
|
if questioncats.count() == 3:
|
|
queryset = Question.objects.filter(is_delete=0, enabled=True)
|
|
a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:12]
|
|
a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:12]
|
|
a3_set = queryset.filter(questioncat=questioncats[2], type='单选').order_by('?')[:16]
|
|
b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:3]
|
|
b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:3]
|
|
b3_set = queryset.filter(questioncat=questioncats[2], type='多选').order_by('?')[:4]
|
|
question_queryset = question_queryset|a1_set|a2_set|a3_set|b1_set|b2_set|b3_set
|
|
questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data
|
|
for i in questions:
|
|
if i['type'] == '单选':
|
|
i['total_score'] = 2
|
|
elif i['type'] == '多选':
|
|
i['total_score'] = 4
|
|
ret['questions'] = questions
|
|
return Response(ret)
|
|
return Response({'error':'生成试卷失败'})
|
|
|
|
|
|
|
|
class AnswerDetailView(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
def get(self, request, *args, **kwargs):
|
|
queryset = AnswerDetail.objects.all()
|
|
if request.query_params.get('examtest', None):
|
|
queryset = queryset.filter(examtest=request.query_params.get('examtest')).order_by('question__type')
|
|
queryset = AnswerDetailSerializer.setup_eager_loading(queryset)
|
|
serializer = AnswerDetailSerializer(instance=queryset,many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class WorkScopeViewSet(ModelViewSet):
|
|
"""
|
|
工作类别:增删改查
|
|
"""
|
|
perms_map = [
|
|
{'get': '*'}, {'post': 'workscope_create'},
|
|
{'put': 'workscope_update'}, {'delete': 'workscope_delete'}]
|
|
pagination_class = None
|
|
queryset = WorkScope.objects.filter(is_delete=0).all().order_by("id")
|
|
serializer_class = WorkScopeSerializer
|
|
ordering_fields = ('id',)
|
|
ordering = ['-can_exam', 'sortnum']
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['subject', 'can_exam', 'is_public']
|
|
search_fields = ('name',)
|
|
|
|
def get_queryset(self):
|
|
queryset = self.queryset
|
|
queryset = self.get_serializer_class().setup_eager_loading(queryset)
|
|
return queryset
|
|
|
|
@action(methods=['get'], detail=True,url_path='monitest', url_name='gen_monitest', perms_map=[{'get':'gen_monitest'}])
|
|
def monitest(self, request, pk=None):
|
|
'''
|
|
生成自助模拟考试
|
|
'''
|
|
ret = {}
|
|
workscope = self.get_object()
|
|
ret['name'] = '自助模考' + datetime.now().strftime('%Y%m%d%H%M')
|
|
ret['type'] = '自助模考' # 自助模拟考试
|
|
ret['workscope'] = workscope.id
|
|
ret['limit'] = workscope.rule.limit
|
|
ret['total_score'] = workscope.rule.total_score
|
|
ret['pass_score'] = workscope.rule.pass_score
|
|
ret['danxuan_count'] = workscope.rule.danxuan_count
|
|
ret['danxuan_score'] = workscope.rule.danxuan_score
|
|
ret['duoxuan_count'] = workscope.rule.duoxuan_count
|
|
ret['duoxuan_score'] = workscope.rule.duoxuan_score
|
|
ret['panduan_count'] = workscope.rule.panduan_count
|
|
ret['panduan_score'] = workscope.rule.panduan_score
|
|
question_queryset = Question.objects.none()
|
|
questioncats = workscope.questioncat.exclude(name='辐射小知识更新中').order_by('type', 'create_time')
|
|
if questioncats.count() == 3:
|
|
import random
|
|
b2 = random.randint(1,8)
|
|
b3 = random.randint(1,9-b2)
|
|
b1 = 10 - b2 -b3
|
|
a1 = 2*b2 + 2*b3 - 2
|
|
a2 = 18 - 2*b2
|
|
a3 = 24 - 2*b3
|
|
queryset = Question.objects.filter(is_delete=0, enabled=True)
|
|
a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:a1]
|
|
a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:a2]
|
|
a3_set = queryset.filter(questioncat=questioncats[2], type='单选').order_by('?')[:a3]
|
|
b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:b1]
|
|
b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:b2]
|
|
b3_set = queryset.filter(questioncat=questioncats[2], type='多选').order_by('?')[:b3]
|
|
question_queryset = question_queryset|a1_set|a2_set|a3_set|b1_set|b2_set|b3_set
|
|
elif workscope.name == '辐射安全管理':
|
|
# 辐射安全管理出卷规则
|
|
queryset = Question.objects.filter(is_delete=0, enabled=True)
|
|
a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:16]
|
|
a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:24]
|
|
b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:4]
|
|
b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:6]
|
|
question_queryset = question_queryset|a1_set|a2_set|b1_set|b2_set
|
|
elif workscope.name == '科研、生产及其他':
|
|
# 科研、生产及其他出卷规则
|
|
queryset = Question.objects.filter(is_delete=0, enabled=True)
|
|
a1_set = queryset.filter(questioncat=questioncats[0], type='单选').order_by('?')[:24]
|
|
a2_set = queryset.filter(questioncat=questioncats[1], type='单选').order_by('?')[:16]
|
|
b1_set = queryset.filter(questioncat=questioncats[0], type='多选').order_by('?')[:6]
|
|
b2_set = queryset.filter(questioncat=questioncats[1], type='多选').order_by('?')[:4]
|
|
question_queryset = question_queryset|a1_set|a2_set|b1_set|b2_set
|
|
else:
|
|
queryset = Question.objects.filter(is_delete=0,questioncat__in = workscope.questioncat.all(), enabled=True)
|
|
if ret['danxuan_count']:
|
|
danxuan = queryset.filter(type='单选').order_by('?')[:ret['danxuan_count']]
|
|
question_queryset = question_queryset | danxuan
|
|
if ret['duoxuan_count']:
|
|
duoxuan = queryset.filter(type='多选').order_by('?')[:ret['duoxuan_count']]
|
|
question_queryset = question_queryset | duoxuan
|
|
if ret['panduan_count']:
|
|
panduan = queryset.filter(type='判断').order_by('?')[:ret['panduan_count']]
|
|
question_queryset = question_queryset | panduan
|
|
questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data
|
|
for i in questions:
|
|
if i['type'] == '单选':
|
|
i['total_score'] = ret['danxuan_score']
|
|
elif i['type'] == '多选':
|
|
i['total_score'] = ret['duoxuan_score']
|
|
else:
|
|
i['total_score'] = ret['panduan_score']
|
|
ret['questions'] = questions
|
|
return Response(ret)
|
|
|
|
@action(methods=['get'], detail=False,url_path='monitest2', url_name='gen_monitest2', perms_map=[{'get':'gen_monitest'}])
|
|
def monitest2(self, request, pk=None):
|
|
'''
|
|
生成体验版自助模拟考试
|
|
'''
|
|
ret = {}
|
|
rule = TestRule.objects.first()
|
|
ret['name'] = '自助模考' + datetime.now().strftime('%Y%m%d%H%M')
|
|
ret['type'] = '自助模考' # 自助模拟考试
|
|
ret['limit'] = rule.limit
|
|
ret['total_score'] = rule.total_score
|
|
ret['pass_score'] = rule.pass_score
|
|
ret['danxuan_count'] = rule.danxuan_count
|
|
ret['danxuan_score'] = rule.danxuan_score
|
|
ret['duoxuan_count'] = rule.duoxuan_count
|
|
ret['duoxuan_score'] = rule.duoxuan_score
|
|
ret['panduan_count'] = rule.panduan_count
|
|
ret['panduan_score'] = rule.panduan_score
|
|
question_queryset = Question.objects.none()
|
|
queryset = Question.objects.filter(is_delete=0, enabled=True)
|
|
if ret['danxuan_count']:
|
|
danxuan = queryset.filter(type='单选').order_by('?')[:ret['danxuan_count']]
|
|
question_queryset = question_queryset | danxuan
|
|
if ret['duoxuan_count']:
|
|
duoxuan = queryset.filter(type='多选').order_by('?')[:ret['duoxuan_count']]
|
|
question_queryset = question_queryset | duoxuan
|
|
if ret['panduan_count']:
|
|
panduan = queryset.filter(type='判断').order_by('?')[:ret['panduan_count']]
|
|
question_queryset = question_queryset | panduan
|
|
questions = QuestionSerializer(instance=question_queryset.order_by('type'),many=True).data
|
|
for i in questions:
|
|
if i['type'] == '单选':
|
|
i['total_score'] = ret['danxuan_score']
|
|
elif i['type'] == '多选':
|
|
i['total_score'] = ret['duoxuan_score']
|
|
else:
|
|
i['total_score'] = ret['panduan_score']
|
|
ret['questions'] = questions
|
|
user = request.user
|
|
user.remain_count = user.remain_count -1
|
|
request.user.save()
|
|
ret['remain_count'] = user.remain_count
|
|
return Response(ret)
|
|
|
|
@action(methods=['get'], detail=True, perms_map=[{'get':'*'}])
|
|
def chose(self, request, *args, **kwargs):
|
|
"""
|
|
用户自己选择工作类别
|
|
"""
|
|
obj = self.get_object()
|
|
if obj.can_exam:
|
|
try:
|
|
Candidate.objects.get_or_create(consumer=request.user, workscope=request.user.workscope, defaults={'consumer':request.user, 'workscope':request.user.workscope})
|
|
except:
|
|
pass
|
|
request.user.workscope = obj
|
|
request.user.save()
|
|
return Response({'workscope':obj.pk, 'workscope_name':obj.name})
|
|
if Candidate.objects.filter(consumer=request.user, workscope=obj).exists():
|
|
return Response({'workscope':obj.pk, 'workscope_name':obj.name})
|
|
return Response({'error':'该类别不可选择,请咨询课程顾问'})
|
|
|
|
class BannerViewSet(ModelViewSet):
|
|
"""
|
|
轮播图:增删改查
|
|
"""
|
|
perms_map = (
|
|
{'get': 'banner_list'}, {'post': 'banner_create'},
|
|
{'put': 'banner_update'}, {'delete': 'banner_delete'})
|
|
pagination_class = None
|
|
queryset = Banner.objects.filter(is_delete=0).all().order_by("sort")
|
|
serializer_class = BannerSerializer
|
|
ordering_fields = ('id', 'sort')
|
|
ordering = ['sort']
|
|
|
|
def get_authenticators(self):
|
|
if self.request.method == 'GET':
|
|
self.authentication_classes = []
|
|
return [auth() for auth in self.authentication_classes]
|
|
|
|
def get_permissions(self):
|
|
if self.request.method == 'GET':
|
|
self.permission_classes = []
|
|
return [permission() for permission in self.permission_classes]
|
|
|
|
class TestRuleViewSet(ModelViewSet):
|
|
"""
|
|
模考规则:增删改查
|
|
"""
|
|
perms_map = (
|
|
{'get': 'testrule_list'}, {'post': 'testrule_create'},
|
|
{'put': 'testrule_update'}, {'delete': 'testrule_delete'})
|
|
pagination_class = None
|
|
queryset = TestRule.objects.filter(is_delete=0).all().order_by("id")
|
|
serializer_class = TestRuleSerializer
|
|
ordering_fields = ('id',)
|
|
ordering = ['id']
|
|
search_fields = ('^name',)
|
|
|
|
def get_authenticators(self):
|
|
if self.request.method == 'GET':
|
|
self.authentication_classes = []
|
|
return [auth() for auth in self.authentication_classes]
|
|
|
|
def get_permissions(self):
|
|
if self.request.method == 'GET':
|
|
self.permission_classes = []
|
|
return [permission() for permission in self.permission_classes]
|
|
|
|
class ExamTestViewSet(PageOrNot, ModelViewSet):
|
|
"""
|
|
考试记录列表和详情
|
|
"""
|
|
perms_map = [{'get': 'examtest_view'},{'post': '*'}, {'delete':'examtest_delete'}]
|
|
pagination_class = CommonPagination
|
|
queryset = ExamTest.objects.filter(is_delete=0).all()
|
|
serializer_class = ExamTestExamListSerializer
|
|
ordering_fields = ('id','create_time','took','score')
|
|
ordering = ['-create_time']
|
|
search_fields = ('consumer__name', 'consumer__company__name')
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['type','is_pass', 'exam']
|
|
|
|
def filter_queryset(self, queryset):
|
|
for backend in list(self.filter_backends):
|
|
queryset = backend().filter_queryset(self.request, queryset, self)
|
|
if self.request.query_params.get('start'):
|
|
queryset = queryset.filter(start_time__gte=self.request.query_params['start'] )
|
|
if self.request.query_params.get('end'):
|
|
queryset = queryset.filter(start_time__lte=self.request.query_params['end'])
|
|
return queryset
|
|
|
|
def get_queryset(self):
|
|
queryset = self.queryset
|
|
if hasattr(self.get_serializer_class(), 'setup_eager_loading'):
|
|
queryset = self.get_serializer_class().setup_eager_loading(queryset)
|
|
if self.request.user.is_superuser:
|
|
return queryset
|
|
roles = self.request.user.roles.values_list('name', flat=True)
|
|
if '普通管理' in roles:
|
|
queryset = queryset.filter(Q(consumer__create_admin = self.request.user)|Q(exam__create_admin=self.request.user))
|
|
elif '省管理' in roles:
|
|
if self.request.user.pname:
|
|
queryset = queryset.filter(consumer__company__geo__pname = self.request.user.pname)
|
|
else:
|
|
return queryset.objects.none()
|
|
elif '本单位管理' in roles:
|
|
if self.request.user.bcompany:
|
|
queryset = queryset.filter(consumer__company = self.request.user.bcompany)
|
|
else:
|
|
return queryset.objects.none()
|
|
else:
|
|
return queryset.none()
|
|
return queryset
|
|
|
|
|
|
@action(methods=['get'], detail=False,url_path='self', url_name='selftest', perms_map = [{'*':'my_examtest'}])
|
|
def selftest(self, request, pk=None):
|
|
'''
|
|
个人考试记录
|
|
'''
|
|
queryset = ExamTest.objects.filter(consumer=request.user).order_by('-create_time')
|
|
pg = CommonPagination()
|
|
p = pg.paginate_queryset(queryset=queryset,request=request,view=self)
|
|
serializer = ExamTestListSerializer(instance=p,many=True)
|
|
return pg.get_paginated_response(serializer.data)
|
|
|
|
@action(methods=['get'], detail=False,url_path='fx', url_name='selffx', perms_map = [{'*':'my_examtest'}])
|
|
def selffx(self, request, pk=None):
|
|
'''
|
|
个人考试分析
|
|
'''
|
|
queryset = ExamTest.objects.filter(consumer=request.user)
|
|
ret = {}
|
|
ret['total'] = queryset.count()
|
|
ret['avg_score'] = round(queryset.aggregate(avg=Avg('score'))['avg']) if ret['total'] else 0
|
|
ret['pass_rate'] = round(((queryset.filter(is_pass=True).count())/ret['total'])*100) if ret['total'] else 0
|
|
return Response(ret)
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = MoniTestSerializer(data = request.data)
|
|
if serializer.is_valid():
|
|
instance = serializer.save(consumer = request.user)
|
|
if 'questions' in request.data:
|
|
questions = []
|
|
for i in request.data['questions']:
|
|
question = {}
|
|
question['question'] = i['id']
|
|
question['examtest'] = instance.id
|
|
question['score'] = i['score']
|
|
if 'user_answer' in i:
|
|
question['user_answer'] = i['user_answer']
|
|
question['is_right'] = i['is_right']
|
|
questions.append(question)
|
|
serializer_detail = AnswerDetailCreateSerializer(data=questions, many=True)
|
|
if serializer_detail.is_valid():
|
|
serializer_detail.save()
|
|
# 关联正式考试如有
|
|
if request.data.get('exam', None):
|
|
instance.exam = Exam.objects.get(pk=request.data['exam'])
|
|
instance.save()
|
|
return Response(MoniTestSerializer(instance).data,status=status.HTTP_200_OK)
|
|
else:
|
|
return Response(serializer_detail.errors)
|
|
|
|
else:
|
|
return Response({'error':'答题记录不存在'})
|
|
else:
|
|
return Response(serializer.errors)
|
|
|
|
@action(methods=['get'], detail=False,
|
|
url_path='export', url_name='export_test', perms_map=[{'*':'export_test'}])
|
|
def export(self, request):
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
queryset = ExamTestListSerializer.setup_eager_loading(queryset) # 性能优化
|
|
if queryset.count()>1000:
|
|
return Response({'error':'数据量超过1000,请筛选后导出!'})
|
|
serializer = ExamTestListSerializer(instance=queryset, many=True)
|
|
path = export_test(serializer.data)
|
|
return Response({'path': path})
|
|
|
|
@action(methods=['post'], detail = True ,perms_map=[{'post':'export_test'}])
|
|
def exportw(self, request, *args, **kwargs):
|
|
obj = self.get_object()
|
|
if 'anew' in request.data and request.data['anew']:
|
|
# 是否需要重新生成
|
|
path = exportw_test(obj, True)
|
|
else:
|
|
path = exportw_test(obj, False)
|
|
return Response({'path': path})
|
|
|
|
@action(methods=['post'], detail=True, perms_map = [{'*':'candidate_issue'}])
|
|
def issue(self, request, pk=None):
|
|
'''
|
|
颁发证书
|
|
'''
|
|
obj = self.get_object()
|
|
candidate = obj.candidate if hasattr(obj, 'candidate') else None
|
|
# if not obj.is_pass:
|
|
# return Response({'error':'考试未通过'})
|
|
if candidate:
|
|
return Response({'error':'证书已存在'})
|
|
candidates = Candidate.objects.filter(consumer=obj.consumer, workscope=obj.workscope, number__isnull=True)
|
|
if candidates.exists():
|
|
candidate = candidates[0]
|
|
else:
|
|
candidate = Candidate.objects.create(consumer=obj.consumer, workscope=obj.workscope)
|
|
count = Candidate.objects.exclude(number__isnull=True).count()
|
|
candidate.examtest = obj
|
|
now = timezone.now()
|
|
candidate.issue_date = now
|
|
candidate.start_date = now
|
|
candidate.end_date = now + timedelta(days=3*365)
|
|
candidate.workscope_name = obj.workscope.name
|
|
candidate.consumer_name = obj.consumer_detail['name']
|
|
candidate.ID_number = obj.consumer_detail['ID_number']
|
|
candidate.company_name = obj.consumer_detail['company_name']
|
|
candidate.deptname = obj.consumer_detail['deptname']
|
|
candidate.number='SL'+ str(now.year)[-2:] + str(count+1).zfill(6)
|
|
candidate.save()
|
|
return Response({"id":candidate.pk, "number":candidate.number})
|
|
|
|
class PaperViewSet(ModelViewSet):
|
|
"""
|
|
押题卷增删改查
|
|
"""
|
|
perms_map = [
|
|
{'get': 'paper_view'}, {'post': 'paper_create'},
|
|
{'put': 'paper_update'}, {'delete': 'paper_delete'}]
|
|
queryset = Paper.objects.filter(is_delete=0).all().order_by("id")
|
|
pagination_class = CommonPagination
|
|
serializer_class = PaperSerializer
|
|
ordering_fields = ('id',)
|
|
ordering = ['id']
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['workscope']
|
|
search_fields = ('name',)
|
|
|
|
def get_serializer_class(self):
|
|
if self.action=='list':
|
|
return PaperSerializer
|
|
else:
|
|
return PaperDetailSerializer
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
data = request.data
|
|
serializer = self.get_serializer(data=data)
|
|
serializer.is_valid(raise_exception=True)
|
|
instance = serializer.save()
|
|
if 'questions' in data:
|
|
questions = []
|
|
for i in data['questions']:
|
|
question = {}
|
|
question['question'] = i['id']
|
|
question['paper'] = instance.id
|
|
question['total_score'] = i['total_score']
|
|
questions.append(question)
|
|
serializer_detail = PaperQuestionsCreateSerializer(data=questions, many=True)
|
|
if serializer_detail.is_valid():
|
|
serializer_detail.save()
|
|
return Response(status=status.HTTP_200_OK)
|
|
else:
|
|
return Response(serializer_detail.errors)
|
|
|
|
else:
|
|
return Response({'error':'不存在题库'})
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
data = request.data
|
|
instance = self.get_object()
|
|
serializer = self.get_serializer(instance, data=data)
|
|
serializer.is_valid(raise_exception=True)
|
|
self.perform_update(serializer)
|
|
if 'questions' in data:
|
|
questions = []
|
|
for i in data['questions']:
|
|
question = {}
|
|
question['question'] = i['id']
|
|
question['paper'] = instance.id
|
|
question['total_score'] = i['total_score']
|
|
questions.append(question)
|
|
serializer_detail = PaperQuestionsCreateSerializer(data=questions, many=True)
|
|
if serializer_detail.is_valid():
|
|
PaperQuestions.objects.filter(paper=instance).delete()
|
|
serializer_detail.save()
|
|
return Response(status=status.HTTP_200_OK)
|
|
else:
|
|
return Response(serializer_detail.errors)
|
|
|
|
else:
|
|
return Response({'error':'不存在题库'})
|
|
|
|
@action(methods=['get'], detail=True, url_path='monitest', url_name='gen_monitest',
|
|
authentication_classes=[],permission_classes=[],
|
|
perms_map=[{'get':'gen_monitest'}])
|
|
def monitest(self, request, pk=None):
|
|
'''
|
|
生成押卷模拟考试
|
|
'''
|
|
ret = {}
|
|
paper = self.get_object()
|
|
serializer = PaperDetailSerializer(instance=paper)
|
|
ret = serializer.data
|
|
ret['name'] = '押卷模考' + datetime.now().strftime('%Y%m%d%H%M')
|
|
ret['type'] = '押卷模考'
|
|
ret['paper'] = paper.id
|
|
return Response(ret)
|
|
|
|
@action(methods=['put'], detail=True, url_path='clone', url_name='clone_paper',
|
|
perms_map=[{'put':'clone_paper'}])
|
|
def clone(self, request, pk=None):
|
|
'''
|
|
克隆试卷
|
|
'''
|
|
paper = self.get_object()
|
|
obj = Paper()
|
|
obj.name = '克隆卷-'+paper.name
|
|
obj.workscope = paper.workscope
|
|
obj.limit = paper.limit
|
|
obj.total_score = paper.total_score
|
|
obj.pass_score = paper.pass_score
|
|
obj.danxuan_count = paper.danxuan_count
|
|
obj.danxuan_score = paper.danxuan_score
|
|
obj.duoxuan_count = paper.duoxuan_count
|
|
obj.duoxuan_score = paper.duoxuan_score
|
|
obj.panduan_count = paper.panduan_count
|
|
obj.panduan_score = paper.panduan_score
|
|
obj.save()
|
|
for i in PaperQuestions.objects.filter(paper=paper):
|
|
o = PaperQuestions()
|
|
o.paper = obj
|
|
o.question = i.question
|
|
o.total_score = i.total_score
|
|
o.save()
|
|
return Response(status=status.HTTP_200_OK) |