from django.shortcuts import render from rest_framework.viewsets import ModelViewSet, GenericViewSet from rest_framework.mixins import ListModelMixin, DestroyModelMixin, RetrieveModelMixin from apps.exam.exports import export_question, export_record from apps.exam.models import Question, Questioncat, PaperQuestion from apps.exam.serializers import (QuestionSerializer, QuestioncatSerializer, PaperSerializer, ExamDetailSerializer, ExamRecordDetailSerializer, ExamListSerializer, ExamCreateUpdateSerializer, ExamListSerializer, ExamRecordSubmitSerializer, PaperDetailSerializer, PaperCreateUpdateSerializer, AnswerDetailOutSerializer, ExamRecordListSerializer) from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated from rest_framework.exceptions import ParseError from openpyxl import Workbook, load_workbook from django.conf import settings from apps.exam.models import Paper, Exam, ExamRecord, AnswerDetail from django.utils import timezone from django.db import transaction from rest_framework.serializers import Serializer from datetime import datetime from apps.exam.filters import ExamRecordFilter, ExamFilter from datetime import timedelta from apps.system.mixins import CreateUpdateCustomMixin from apps.edu.serializers import CertificateSerializer from utils.queryset import get_child_queryset2 from apps.system.permission import has_permission from apps.exam.parse_word import interpret_text import os import shutil from django.db.models import Q # Create your views here. EXCEL_PATH = os.path.join(settings.BASE_DIR, "media/default/question.xlsx") def enctry(s): k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da' encry_str = "" for i, j in zip(s, k): # i为字符,j为秘钥字符 temp = str(ord(i)+ord(j))+'_' # 加密字符 = 字符的Unicode码 + 秘钥的Unicode码 encry_str = encry_str + temp return encry_str # 解密 def dectry(p): k = 'ez9z3a4m*$%srn9ve_t71yd!v+&xn9@0k(e(+l6#g1h=e5i4da' dec_str = "" for i, j in zip(p.split("_")[:-1], k): # i 为加密字符,j为秘钥字符 # 解密字符 = (加密Unicode码字符 - 秘钥字符的Unicode码)的单字节字符 temp = chr(int(i) - ord(j)) dec_str = dec_str+temp return dec_str class QuestioncatViewSet(CreateUpdateCustomMixin, ModelViewSet): perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'} queryset = Questioncat.objects.all() serializer_class = QuestioncatSerializer filterset_fields = ['parent'] search_fields = ['name'] class QuestionViewSet(CreateUpdateCustomMixin, ModelViewSet): perms_map = {'get': '*', 'post':'question', 'put':'question', 'delete':'question'} queryset = Question.objects.all() serializer_class = QuestionSerializer filterset_fields = ['level', 'type', 'year', 'questioncat'] search_fields = ['name', 'options', 'resolution'] @action(methods=['post'], detail=False, url_name='enable_question', perms_map={'post': 'question'}, serializer_class=Serializer) def enable(self, request): """ 启用题目 启用题目 """ ids = request.data.get('ids', None) if ids: Question.objects.filter(pk__in=ids).update(enabled=True) return Response(status=200) @action(methods=['post'], detail=False, perms_map={'*':'question_delete'}) def deletes(self, request): """ 批量删除 """ ids = request.data.get('ids', []) if request.user.is_superuser: Question.objects.filter(id__in=ids).update(is_deleted=True) return Response() return Response({'error':'权限不足'}) @action(methods=['get'], detail=False, perms_map={'get':'export_question'}) def export(self, request): """ 导出题目 """ queryset = self.filter_queryset(self.get_queryset()) path = export_question(queryset) return Response({'path': path}) @action(methods=['post'], detail=False, url_path='import', url_name='import_question', perms_map={'post': 'question'}, serializer_class=Serializer) def import_question(self, request): """ 导入题目 导入题目 """ xlsxpath = request.data['path'] fullpath = settings.BASE_DIR + xlsxpath wb = load_workbook(fullpath) sheet = wb.worksheets[0] qlist = ['A', 'B', 'C', 'D', 'E', 'F'] leveldict = {'低': '低', '中': '中', '高': '高'} notinlist = [] # 验证文件内容 if sheet['a2'].value != '题目类型': return Response({"error": "类型列错误!"}) if sheet['b2'].value != '分类': return Response({"error": "分类列错误!"}) if sheet['c2'].value != '题目': return Response({"error": "题目列错误!"}) questioncatdict = {} questioncats = Questioncat.objects.all() for i in questioncats: questioncatdict[i.name] = i.id i = 3 while sheet['c'+str(i)].value: type = sheet['a'+str(i)].value.replace(' ', '') questioncat = sheet['b'+str(i)].value if questioncat: questioncat = questioncat.replace(' ', '') else: return Response(str(i)+'行没有分类', status=400) name = sheet['c'+str(i)].value answer = {} if sheet['d'+str(i)].value: answer['A'] = sheet['d'+str(i)].value if sheet['e'+str(i)].value: answer['B'] = sheet['e'+str(i)].value if sheet['f'+str(i)].value: answer['C'] = sheet['f'+str(i)].value if sheet['g'+str(i)].value: answer['D'] = sheet['g'+str(i)].value if sheet['h'+str(i)].value: answer['E'] = sheet['h'+str(i)].value if sheet['i'+str(i)].value: answer['F'] = sheet['i'+str(i)].value right = sheet['j'+str(i)].value if right: right = right.replace(' ', '') else: return Response(str(i)+'行没有答案', status=400) resolution = sheet['k'+str(i)].value level = sheet['l'+str(i)].value year = sheet['m' + str(i)].value if level: level = level.replace(' ', '') cateobj = None if questioncat not in questioncatdict: return Response(str(i)+"行不存在分类("+questioncat+")!请先新建", status=400) else: cateobj = Questioncat.objects.get( id=questioncatdict[questioncat]) if type == '单选': if Question.objects.filter(type='单选', name=name, year=year, options=answer, questioncat=cateobj).exists(): notinlist.append(i) else: if right in ['A', 'B', 'C', 'D', 'E', 'F']: obj = Question() obj.type = '单选' if cateobj: obj.questioncat = cateobj obj.name = name obj.options = answer obj.right = right obj.resolution = resolution if resolution else '' obj.year = year if year else None if level in leveldict: obj.level = leveldict[level] else: obj.level = '低' obj.save() elif type == '多选': right = list(right.strip()) if Question.objects.filter(type='多选', name=name, year=year, options=answer, questioncat=cateobj).exists(): notinlist.append(i) else: if [False for c in right if c not in qlist]: pass else: obj = Question() obj.type = '多选' obj.questioncat = cateobj obj.name = name obj.options = answer obj.right = right obj.resolution = resolution if resolution else '' obj.year = year if year else None if level in leveldict: obj.level = leveldict[level] else: obj.level = '低' obj.save() elif type == '判断': if right == 'A' or right == '对' or right == '正确': right = 'A' else: right = 'B' if Question.objects.filter(type='判断', name=name, is_deleted=0, options={'A': '对', 'B': '错'}, questioncat=cateobj).exists(): notinlist.append(i) else: obj = Question() obj.type = '判断' obj.questioncat = cateobj obj.name = name obj.options = {'A': '对', 'B': '错'} obj.right = right obj.resolution = resolution if resolution else '' obj.year = year if year else None if level in leveldict: obj.level = leveldict[level] else: obj.level = '低' obj.save() i = i + 1 return Response(notinlist, status=200) class PaperViewSet(ModelViewSet): """ 试卷增删改查 """ perms_map = {'get': '*', 'post':'paper', 'put':'paper', 'delete':'paper'} queryset = Paper.objects.all() serializer_class = PaperSerializer ordering = ['id'] search_fields = ('name',) def get_serializer_class(self): if self.action in ['retrieve']: return PaperDetailSerializer elif self.action in ['create', 'update']: return PaperCreateUpdateSerializer return super().get_serializer_class() def create(self, request, *args, **kwargs): sr = PaperCreateUpdateSerializer(data=request.data) sr.is_valid(raise_exception=True) vdata = sr.validated_data vdata['create_by'] = request.user questions_ = vdata.pop('questions_') paper = Paper.objects.create(**vdata) q_list = [] for i in questions_: q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper)) PaperQuestion.objects.bulk_create(q_list) return Response(status=201) def update(self, request, *args, **kwargs): # 有考试在执行,不可更新 now = timezone.now() paper = self.get_object() if Exam.objects.filter(close_time__gte=now, paper=paper).exists(): raise ParseError('存在考试,不可编辑') sr = PaperCreateUpdateSerializer(instance=paper, data=request.data) sr.is_valid(raise_exception=True) vdata = sr.validated_data questions_ = vdata.pop('questions_') vdata['update_by'] = request.user Paper.objects.filter(id=paper.id).update(**vdata) q_list = [] for i in questions_: q_list.append(PaperQuestion(question=i['question'], total_score=i['total_score'], paper=paper)) PaperQuestion.objects.filter(paper=paper).delete() PaperQuestion.objects.bulk_create(q_list) return Response() @action(methods=['put'], detail=True, url_path='clone', url_name='clone_paper', perms_map={'put':'clone_paper'}) def clone(self, request, pk=None): ''' 克隆试卷 ''' paper = self.get_object() obj = Paper() obj.name = '克隆卷-'+paper.name obj.workscope = paper.workscope obj.limit = paper.limit obj.total_score = paper.total_score obj.pass_score = paper.pass_score obj.danxuan_count = paper.danxuan_count obj.danxuan_score = paper.danxuan_score obj.duoxuan_count = paper.duoxuan_count obj.duoxuan_score = paper.duoxuan_score obj.panduan_count = paper.panduan_count obj.panduan_score = paper.panduan_score obj.save() for i in PaperQuestion.objects.filter(paper=paper): o = PaperQuestion() o.paper = obj o.question = i.question o.total_score = i.total_score o.save() return Response(status=200) @action(methods=['post'], detail=False, perms_map={'post': 'question'}, serializer_class=Serializer) def upload_paper(self, request): doc_path = request.data.get('doc_path') question_type = request.data.get('question_type') excel_path = settings.BASE_DIR + "/media/default/question.xlsx" doc_path = settings.BASE_DIR + doc_path timenow = timezone.now().strftime('%Y%m%d%H%M%S') question_excel_name = "question_excel_"+timenow question_excel = os.path.join(os.path.dirname(excel_path), question_excel_name) if not os.path.exists(question_excel): os.makedirs(question_excel) shutil.copy(excel_path, question_excel) excel_save_path = os.path.join(question_excel, os.path.basename(EXCEL_PATH)) # 解析word文档到excel res = interpret_text(3, excel_save_path, doc_path, question_type) ids = [] if res: # 将excel 入库并返回 queston_id wb = load_workbook(excel_save_path) sheet = wb.worksheets[0] leveldict = {'低': '低', '中': '中', '高': '高'} # 验证文件内容 if sheet['a2'].value != '题目类型': return Response({"error": "类型列错误!"}) if sheet['b2'].value != '分类': return Response({"error": "分类列错误!"}) if sheet['c2'].value != '题目': return Response({"error": "题目列错误!"}) questioncatdict = {} questioncats = Questioncat.objects.all() for i in questioncats: questioncatdict[i.name] = i.id i = 3 while sheet['c'+str(i)].value: type = sheet['a'+str(i)].value.replace(' ', '') questioncat = sheet['b'+str(i)].value if questioncat: questioncat = questioncat.replace(' ', '') else: return Response(str(i)+'行没有分类', status=400) name = sheet['c'+str(i)].value answer = {} if sheet['d'+str(i)].value: answer['A'] = sheet['d'+str(i)].value if sheet['e'+str(i)].value: answer['B'] = sheet['e'+str(i)].value if sheet['f'+str(i)].value: answer['C'] = sheet['f'+str(i)].value if sheet['g'+str(i)].value: answer['D'] = sheet['g'+str(i)].value if sheet['h'+str(i)].value: answer['E'] = sheet['h'+str(i)].value if sheet['i'+str(i)].value: answer['F'] = sheet['i'+str(i)].value right = sheet['j'+str(i)].value if right: right = right.replace(' ', '') else: return Response(str(i)+'行没有答案', status=400) resolution = sheet['k'+str(i)].value level = sheet['l'+str(i)].value year = sheet['m' + str(i)].value if level: level = level.replace(' ', '') cateobj = None if questioncat not in questioncatdict: return Response(str(i)+"行不存在分类("+questioncat+")!请先新建", status=400) else: cateobj = Questioncat.objects.get( id=questioncatdict[questioncat]) if Question.objects.filter(name=name, is_deleted=0, questioncat=cateobj).exists(): ids.append(Question.objects.get(name=name, is_deleted=0, questioncat=cateobj).id) i = i + 1 continue if type == '单选': if right in ['A', 'B', 'C', 'D', 'E', 'F']: obj = Question() obj.type = '单选' if cateobj: obj.questioncat = cateobj obj.name = name obj.options = answer obj.right = right obj.create_by = request.user obj.resolution = resolution if resolution else '' obj.year = year if year else None if level in leveldict: obj.level = leveldict[level] else: obj.level = '低' obj.save() ids.append(obj.id) elif type == '多选': obj = Question() rights = list(right.strip()) right = [x for x in rights if x != ','] obj.type = '多选' obj.questioncat = cateobj obj.name = name obj.options = answer obj.right = right obj.create_by = request.user obj.resolution = resolution if resolution else '' obj.year = year if year else None if level in leveldict: obj.level = leveldict[level] else: obj.level = '低' obj.save() ids.append(obj.id) elif type == '判断': if right == 'A' or right == '对' or right == '正确': right = 'A' else: right = 'B' obj = Question() obj.type = '判断' obj.questioncat = cateobj obj.name = name obj.options = {'A': '对', 'B': '错'} obj.right = right obj.create_by = request.user obj.resolution = resolution if resolution else '' obj.year = year if year else None if level in leveldict: obj.level = leveldict[level] else: obj.level = '低' obj.save() ids.append(obj.id) i = i + 1 else: raise ParseError('excel解析失败') if ids: questions = Question.objects.filter(pk__in=ids) Serializer = QuestionSerializer(questions, many=True) Serializer.data return Response(Serializer.data, status=200) class ExamViewSet(CreateUpdateCustomMixin, ModelViewSet): perms_map = {'get': '*', 'post':'exam', 'put':'exam', 'delete':'exam'} queryset = Exam.objects.all().select_related('paper', 'create_by') ordering = ['-id'] search_fields = ('name',) serializer_class = ExamListSerializer filterset_class = ExamFilter def get_serializer_class(self): if self.action in ['create', 'update']: return ExamCreateUpdateSerializer elif self.action in ['retrieve']: return ExamDetailSerializer return super().get_serializer_class() def get_queryset(self): qs = super().get_queryset() user = self.request.user if has_permission("exam", user): return qs return qs.filter(Q(participant_user=user)|Q(participant_dep=user.dept)|Q(is_open=True)) def destroy(self, request, *args, **kwargs): instance = self.get_object() if ExamRecord.objects.filter(exam=instance).exists(): raise ParseError('存在考试记录,禁止删除') instance.delete(soft=False) return Response(status=204) @action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=Serializer, permission_classes = [IsAuthenticated]) @transaction.atomic def start(self, request, *args, **kwargs): """ 开始考试 开始考试具体题目信息 """ exam = self.get_object() # 查询本次考试对应哪些人 participants = exam.participant_user.all() participants_ids = [i.id for i in participants] dep = exam.participant_dep.all() dep_ids = [i.id for i in dep] if request.user.id in participants_ids or request.user.dept.id in dep_ids: pass else: raise ParseError('不在考试人员范围内') now = timezone.now() if now < exam.open_time or now > exam.close_time: raise ParseError('不在考试时间范围') tests = ExamRecord.objects.filter( exam=exam, create_by=request.user) chance_used = tests.count() if chance_used > exam.chance: raise ParseError('考试机会已用完') if exam.paper: er = ExamRecord() er.type = '正式考试' er.name = '正式考试' + datetime.now().strftime('%Y%m%d%H%M') er.limit = exam.paper.limit er.paper = exam.paper er.total_score = exam.paper.total_score er.start_time = now er.is_pass = False er.exam = exam er.create_by = request.user er.save() ret = {} ret['examrecord'] = er.id pqs = PaperQuestion.objects.filter(paper=exam.paper).order_by('id') details = [] for i in pqs: details.append(AnswerDetail(examrecord=er, question=i.question, total_score=i.total_score)) AnswerDetail.objects.bulk_create(details) ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id') ret['questions_'] = AnswerDetailOutSerializer(instance=ads, many=True).data return Response(ret) raise ParseError('暂不支持') class ExamRecordViewSet(ListModelMixin, DestroyModelMixin, RetrieveModelMixin, GenericViewSet): """ 考试记录列表和详情 """ perms_map = {'get': '*', 'post': '*', 'delete':'examrecord'} queryset = ExamRecord.objects.select_related('create_by', 'cert_er') serializer_class = ExamRecordListSerializer ordering_fields = ['create_time', 'score', 'took', 'update_time', 'belong_dept'] ordering = ['-update_time'] search_fields = ('create_by__name', 'create_by__username', 'exam__name', 'belong_dept__name') filterset_class = ExamRecordFilter def get_queryset(self): qs = super().get_queryset() # if self.request.method == 'GET': # return qs # else: # return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept)) if has_permission('ctc_manager', self.request.user): return qs # 如果是部门管理员,只能看到自己部门下的考试记录 elif has_permission('exam_manager', self.request.user): return qs.filter(belong_dept__in=get_child_queryset2(self.request.user.dept)) # 如果是普通员工,只能看到自己考试记录 else: return qs.filter(create_by=self.request.user) def get_serializer_class(self): if self.action == 'retrieve': now = timezone.now() if now > self.get_object().exam.close_time: return ExamRecordDetailSerializer return super().get_serializer_class() def perform_destroy(self, instance): # 考试记录物理删除 instance.delete(soft=False) @action(methods=['post'], detail=False, perms_map={'post': '*'}, serializer_class=Serializer, permission_classes = [IsAuthenticated]) def clear(self, request, pk=None): """ 清除七日前未提交的考试记录 """ now = timezone.now days7_ago = now - timedelta(days=7) ExamRecord.objects.filter(create_time__lte=days7_ago, is_submited=False).delete(soft=False) return Response(status=False) @action(methods=['get'], detail=False, url_path='export', url_name='export_record', perms_map={'*': 'export_ansrecord'}, serializer_class=Serializer) def export(self, request): """ 导出答题记录 """ queryset = self.filter_queryset(self.get_queryset()) path = export_record(queryset) return Response({'path': path}) @action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=ExamRecordSubmitSerializer, permission_classes = [IsAuthenticated]) @transaction.atomic def submit(self, request, pk=None): ''' 提交答卷 提交答卷 ''' try: er = ExamRecord.objects.get(id=pk) except Exception as e: raise ParseError(e) now = timezone.now() if er.create_by != request.user: raise ParseError('提交人有误') exam = er.exam paper = er.paper if not exam: raise ParseError('暂不支持') if now > exam.close_time + timedelta(minutes=30): raise ParseError('考试时间已过, 提交失败') serializer = ExamRecordSubmitSerializer(data = request.data) serializer.is_valid(raise_exception=True) vdata = serializer.validated_data questions_ = vdata['questions_'] # 后端判卷 ads = AnswerDetail.objects.select_related('question').filter(examrecord=er).order_by('id') total_score = 0 try: for index, ad in enumerate(ads): ad.user_answer = questions_[index]['user_answer'] if ad.question.type == '多选': if set(ad.question.right) == set(ad.user_answer): ad.is_right = True ad.score = ad.total_score else: if ad.question.right == ad.user_answer: ad.is_right = True ad.score = ad.total_score ad.save() total_score = total_score + ad.score except Exception as e: raise ParseError('判卷失败, 请检查试卷:' + str(e)) er.score = total_score # if er.score > 0.6*er.total_score: if er.score >= paper.pass_score: er.is_pass = True # 如果是自动发证 if exam.certificate: now_data = datetime.now() course = exam.course_name.all() courese_ids = [i.id for i in course] current_date = now_data.strftime('%Y-%m-%d') cer_number = now_data.strftime('%Y%m%d') data_dict = { '姓名': request.user.name, '用户ID': request.user.id, '证书编号': 'CTCZL'+ cer_number, '证书方案': '202312', '单位名称': request.user.dept.name, '所属单位': '国检测试控股集团'+request.user.dept.name, '发证日期': current_date, '培训日期':current_date, '培训结束日期': current_date, '课程列表': courese_ids, 'examrecord': er.id, } serializer = CertificateSerializer(data=data_dict) serializer.is_valid(raise_exception=True) serializer.save() er.took = (now - er.create_time).total_seconds() er.end_time = now er.belong_dept=request.user.dept er.is_submited = True er.save() return Response(ExamRecordListSerializer(instance=er).data)