examtest/test_server/question/views.py

278 lines
11 KiB
Python

from rest_framework.filters import SearchFilter, OrderingFilter
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
from rest_framework.decorators import action
from rest_framework import status
from django_filters.rest_framework import DjangoFilterBackend
from openpyxl import Workbook, load_workbook
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
from utils.custom import CommonPagination
from rbac.permission import RbacPermission
from .models import Questioncat, Question
from .serializers import QuestioncatSerializer, QuestionSerializer, SubjectSerializer
from server import settings
from crm.authentication import ConsumerTokenAuthentication
from crm.models import PaySubject
class SubjectViewSet(ModelViewSet):
"""
学科分类:增删改查
"""
perms_map = (
{'*': 'admin'}, {'*': 'Subject_all'}, {'get': 'Subject_list'}, {'post': 'Subject_create'},
{'put': 'Subject_update'}, {'delete': 'Subject_delete'})
queryset = Questioncat.objects.filter(is_subject=True,is_delete=0).all().order_by("id")
serializer_class = SubjectSerializer
pagination_class = None
ordering_fields = ('id',)
ordering = ['id']
search_fields = ('^name',)
def destroy(self, request, *args, **kwargs): #逻辑删除
instance = self.get_object()
instance.is_delete = True
instance.save()
return Response(status=status.HTTP_204_NO_CONTENT)
def get_authenticators(self):
"""
GET请求不做登陆验证
"""
if self.request.method == 'GET':
self.authentication_classes = []
return [auth() for auth in self.authentication_classes]
def get_permissions(self):
"""
GET请求不做权限验证
"""
if self.request.method == 'GET':
self.permission_classes = []
return [permission() for permission in self.permission_classes]
class QuestioncatViewSet(ModelViewSet):
"""
题库分类:增删改查
"""
perms_map = (
{'*': 'admin'}, {'*': 'questioncat_all'}, {'get': 'questioncat_list'}, {'post': 'questioncat_create'},
{'put': 'questioncat_update'}, {'delete': 'questioncat_delete'})
queryset = Questioncat.objects.filter(is_delete=0,is_subject=False).all()
serializer_class = QuestioncatSerializer
pagination_class = CommonPagination
ordering_fields = ['id']
ordering = ['id']
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['pid']
search_fields = ['^name']
def destroy(self, request, *args, **kwargs): #逻辑删除
instance = self.get_object()
instance.is_delete = True
instance.save()
return Response(status=status.HTTP_204_NO_CONTENT)
@action(methods=['get'], detail=False, permission_classes=[IsAuthenticated],
url_path='all', url_name='all_questioncat')
def all(self, request):
"""
题库分类全,不分页
"""
queryset = Questioncat.objects.filter(is_delete=0).all()
if request.query_params.get('pid',None):
queryset = queryset.filter(pid = request.query_params.get('pid'))
serializer = QuestioncatSerializer(instance=queryset,many=True)
return Response(serializer.data)
@action(methods=['get'], detail=False, authentication_classes=[], permission_classes=[],
url_path='subject', url_name='questioncat_subject', pagination_class = None)
def subject(self, request):
"""
学科下的全部分类
"""
queryset = Questioncat.objects.filter(is_delete=0, pid=request.query_params.get('id')).all()
serializer = QuestioncatSerializer(instance=queryset,many=True)
return Response(serializer.data)
class QuestionViewSet(ModelViewSet):
"""
题目:增删改查
"""
perms_map = (
{'*': 'admin'}, {'*': 'question_all'}, {'get': 'question_list'}, {'post': 'question_create'},
{'put': 'question_update'}, {'delete': 'question_delete'})
queryset = Question.objects.filter(is_delete=0).all()
serializer_class = QuestionSerializer
pagination_class = CommonPagination
ordering_fields = ['id']
ordering = ['id']
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['questioncat','level', 'type']
search_fields = ['^name']
def destroy(self, request, *args, **kwargs): #逻辑删除
instance = self.get_object()
# self.perform_destroy(instance)
instance.is_delete = True
instance.save()
return Response(status=status.HTTP_204_NO_CONTENT)
@action(methods=['post'], detail=False, permission_classes=[],
url_path='count', url_name='question_count')
def count(self, request):
ret = {'danxuan':0,'duoxuan':0,'panduan':0}
queryset = self.queryset
if request.data.get('ids',None):
queryset = queryset.filter(questioncat__in = request.data.get('ids'))
ret['danxuan'] = queryset.filter(type='单选').count()
ret['duoxuan'] = queryset.filter(type='多选').count()
ret['panduan'] = queryset.filter(type='判断').count()
return Response(ret)
@action(methods=['post'], detail=False,
url_path='import', url_name='import_question')
def import_question(self, request):
"""
导入题目
"""
xlsxpath = request.data['path']
fullpath = settings.BASE_DIR + xlsxpath
wb = load_workbook(fullpath)
sheet = wb.worksheets[0]
qlist = ['A','B','C','D','E','F']
leveldict = {'':'','':'','':''}
notinlist = []
# 验证文件内容
if sheet['a2'].value != '题目类型':
return Response({"error":"类型列错误!"})
if sheet['b2'].value != '分类':
return Response({"error":"分类列错误!"})
if sheet['c2'].value != '题目':
return Response({"error":"题目列错误!"})
questioncatdict = {}
questioncats = Questioncat.objects.filter(is_delete=0)
for i in questioncats:
questioncatdict[i.name] = i.id
i = 3
while sheet['c'+str(i)].value:
type = sheet['a'+str(i)].value.replace(' ', '')
questioncat = sheet['b'+str(i)].value.replace(' ', '')
name = sheet['c'+str(i)].value
answer = {}
if sheet['d'+str(i)].value:
answer['A'] = sheet['d'+str(i)].value
if sheet['e'+str(i)].value:
answer['B'] = sheet['e'+str(i)].value
if sheet['f'+str(i)].value:
answer['C'] = sheet['f'+str(i)].value
if sheet['g'+str(i)].value:
answer['D'] = sheet['g'+str(i)].value
if sheet['h'+str(i)].value:
answer['E'] = sheet['h'+str(i)].value
if sheet['i'+str(i)].value:
answer['F'] = sheet['i'+str(i)].value
right = sheet['j'+str(i)].value.replace(' ', '')
resolution = sheet['k'+str(i)].value
level = sheet['l'+str(i)].value.replace(' ', '')
if questioncat not in questioncatdict:
return Response({"error":"不存在分类("+questioncat+")!请先新建"})
else:
cateobj = Questioncat.objects.get(id=questioncatdict[questioncat])
if type == '单选':
if Question.objects.filter(type='单选',name=name,right=right,is_delete=0).exists():
notinlist.append(i)
else:
if right in ['A','B','C','D','E','F']:
obj = Question()
obj.type = '单选'
obj.questioncat = cateobj
obj.name = name
obj.options=answer
obj.right = right
obj.resolution = resolution if resolution else ''
if level in leveldict:
obj.level = leveldict[level]
else:
obj.level = ''
obj.save()
elif type == '多选':
right = list(right)
if Question.objects.filter(type='多选',name=name,right=right,is_delete=0).exists():
notinlist.append(i)
else:
if [False for c in right if c not in qlist]:
pass
else:
obj = Question()
obj.type = '多选'
obj.questioncat = cateobj
obj.name = name
obj.options=answer
obj.right = right
obj.resolution = resolution if resolution else ''
if level in leveldict:
obj.level = leveldict[level]
else:
obj.level = ''
obj.save()
elif type == '判断':
if right == 'A' or right == '' or right == '正确':
right = 'A'
else:
right = 'B'
if Question.objects.filter(type='判断',name=name,right=right,is_delete=0).exists():
notinlist.append(i)
else:
obj = Question()
obj.type = '判断'
obj.questioncat = cateobj
obj.name = name
obj.options={'A':'','B':''}
obj.right = right
obj.resolution = resolution if resolution else ''
if level in leveldict:
obj.level = leveldict[level]
else:
obj.level = ''
obj.save()
i = i +1
if notinlist:
return {"code":206,"data":notinlist,"msg":"导入部分成功"}
else:
return Response(status=status.HTTP_200_OK)
class ExerciseView(APIView):
authentication_classes = [ConsumerTokenAuthentication]
permission_classes = []
def post(self, request):
questioncat = request.data['questioncat']
queryset = Question.objects.filter(is_delete=0,questioncat=questioncat)
if 'ydtms' in request.data and request.data['ydtms']:
queryset = queryset.exclude(id__in = request.data['ydtms'])
total = queryset.count()
queryset = queryset.order_by('id')[0:10]
# pg = CommonPagination()
# p = pg.paginate_queryset(queryset=queryset,request=request,view=self)
# serializer = QuestionSerializer(instance=p,many=True)
# return pg.get_paginated_response(serializer.data)
serializer = QuestionSerializer(instance=queryset,many=True)
collects = request.user.collects.all().values_list('id',flat=True) #当前用户收藏的题目
results = serializer.data
for i in results:
if i['id'] in collects:
i['is_collect'] = True
else:
i['is_collect'] = False
return Response({'total':total, 'results':results})