examtest/test_server/analyse/views.py

196 lines
9.7 KiB
Python

from django.shortcuts import render
from requests.api import request
from rest_framework.views import APIView
from rest_framework.response import Response
from crm.models import Consumer, Company
from examtest.models import ExamTest
from question.models import Question
import pickle
import requests
from lxml import etree
from django.db.models import Count
from rbac.models import UserProfile
from django.db.models.functions import ExtractMonth, ExtractWeek, ExtractYear
# Create your views here.
class BasicCount(APIView):
'''
获取基本统计数据
'''
def get(self, request, format=None):
ret={}
q1 = Consumer.objects.filter(is_delete=False)
q1 = q1.filter(create_time__gte = request.query_params.get('datestart')) if request.query_params.get('datestart', None) else q1
ret['consumer1_count'] = q1.count()
q2 = Consumer.objects.filter(is_delete=False).exclude(create_admin=None)
q2 = q2.filter(create_time__gte = request.query_params.get('datestart')) if request.query_params.get('datestart', None) else q2
ret['consumer2_count'] = q2.count()
q3 = ExamTest.objects.filter(is_delete=False)
q3 = q3.filter(create_time__gte = request.query_params.get('datestart')) if request.query_params.get('datestart', None) else q3
ret['test_count'] = q3.count()
q4 = ExamTest.objects.filter(is_delete=False)
q4 = q4.filter(create_time__gte = request.query_params.get('datestart')) if request.query_params.get('datestart', None) else q4
ret['question_count'] = q4.count()
return Response(ret)
class Admindata1(APIView):
"""
各管理员录入学员数-柱状图
"""
def get(self, request, format=None):
queryset = Consumer.objects.exclude(create_admin__isnull=True)
queryset = queryset.filter(create_time__gte = request.query_params.get('datestart')) if request.query_params.get('datestart', None) else queryset
queryset = queryset.filter(create_time__lte = request.query_params.get('dateend')) if request.query_params.get('dateend', None) else queryset
ret = {'x':{'name':'管理员', 'data':[]}, 's':[{'name':'学员数', 'data':[]}], 't':'各管理员录入学员数'}
tmp = queryset.values('create_admin__username').annotate(total=Count('create_admin__username')).order_by('-total')
for i in tmp:
ret['x']['data'].append(i['create_admin__username'])
ret['s'][0]['data'].append(i['total'])
return Response(ret)
from examtest.models import ExamTest
class Monitest(APIView):
"""
模拟考试数据-时间分布图
"""
def get(self, request):
ret = {'x':{'name':'月份', 'data':[]}, 's':[{'name':'模考数', 'data':[]}], 't':'趋势图', 'l':['模考数']}
# ret = {'x':{'name':'月份', 'data':[]}, 's':[{'name':'自助模考', 'data':[]}, {'name':'押卷模考', 'data':[]}], 't':'模考次数趋势图', 'l':['自助模考', '押卷模考']}
queryset = ExamTest.objects.exclude(consumer__create_admin__isnull=True)
queryset = queryset.filter(create_time__gte = request.query_params.get('datestart')) if request.query_params.get('datestart', None) else queryset
queryset = queryset.filter(create_time__lte = request.query_params.get('dateend')) if request.query_params.get('dateend', None) else queryset
tmp = queryset.annotate(create_month=ExtractMonth('create_time'), create_year=ExtractYear('create_time'))\
.values('create_month', 'create_year').order_by('create_year', 'create_month').annotate(total=Count('create_month'))
for i in tmp:
# if str(i['create_year'])+'年'+str(i['create_month'])+'月' not in ret['x']['data']:
ret['x']['data'].append(str(i['create_year'])+''+str(i['create_month'])+'')
# if i['type'] == '自助模考':
ret['s'][0]['data'].append(i['total'])
return Response(ret)
nameMap = {
"南海诸岛" : "南海诸岛",
'北京市' :'北京',
'天津市' :'天津',
'上海市' :'上海',
'重庆市' :'重庆',
'河北省' :'河北',
'河南省' :'河南',
'云南省' :'云南',
'辽宁省' :'辽宁',
'黑龙江省' : '黑龙江',
'湖南省' :'湖南',
'安徽省' :'安徽',
'山东省' :'山东',
'新疆维吾尔自治区' :'新疆',
'江苏省' :'江苏',
'浙江省' :'浙江',
'江西省' :'江西',
'湖北省' :'湖北',
'广西壮族自治区' : '广西',
'甘肃省' :'甘肃',
'山西省' :'山西',
'内蒙古自治区' : "内蒙古",
'陕西省' :'陕西',
'吉林省' :'吉林',
'福建省' :'福建',
'贵州省' :'贵州',
'广东省' :'广东',
'青海省' :'青海',
'西藏自治区' :'西藏',
'四川省' :'四川',
'宁夏回族自治区' :'宁夏',
'海南省' :'海南',
'台湾' :'台湾',
'香港' :'香港',
'澳门' :'澳门'
}
class Companydis(APIView):
"""
单位分布
"""
def get(self, request, format=None):
ret = {'s':[], 't':'单位分布', 'max':800, 'min':0}
queryset = Company.objects.exclude(geo__pname__isnull=True)
queryset = queryset.filter(create_time__gte = request.query_params.get('datestart')) if request.query_params.get('datestart', None) else queryset
queryset = queryset.filter(create_time__lte = request.query_params.get('dateend')) if request.query_params.get('dateend', None) else queryset
tmp = queryset.values('geo__pname').annotate(total=Count('geo__pname'))
for i in tmp:
if i['geo__pname'] in nameMap:
ret['s'].append({'name':nameMap[i['geo__pname']], 'value':i['total']})
return Response(ret)
class Consumerdis(APIView):
"""
学员分布
"""
def get(self, request, format=None):
ret = {'s':[], 't':'学员分布', 'max':3000, 'min':0}
queryset = Consumer.objects.exclude(company__geo__pname__isnull=True)
queryset = queryset.filter(create_time__gte = request.query_params.get('datestart')) if request.query_params.get('datestart', None) else queryset
queryset = queryset.filter(create_time__lte = request.query_params.get('dateend')) if request.query_params.get('dateend', None) else queryset
tmp = queryset.values('company__geo__pname').annotate(total=Count('company__geo__pname'))
for i in tmp:
if i['company__geo__pname'] in nameMap:
ret['s'].append({'name':nameMap[i['company__geo__pname']], 'value':i['total']})
return Response(ret)
class Quota(APIView):
'''
获取考试名额
'''
def get(self, request, format=None):
with open('quota.dat','rb') as f:
data = pickle.load(f)
return Response(data)
class SearchCandidates(APIView):
perms_map = [
{'post': 'candidate_search'}]
def post(self, request, *args, **kwargs):
names = request.data['names']
numbers = request.data['numbers']
names_ = names.split('\n')
numbers_ = numbers.split('\n')
names_l = [i.replace(' ', '') for i in names_ if i!='']
numbers_l = [i.replace(' ', '') for i in numbers_ if i!='']
candidates = []
if len(names_l)>60:
return Response({'error':'单次批量查询不允许超过60'})
if len(names_l) == len(numbers_l):
for i in range(len(names_l)):
payload = {'IndexModel[name]': names_l[i], 'IndexModel[identityNumber]': numbers_l[i], 'IndexModel[certNumber]':'', 'IndexModel[candidateNumber]':''}
try:
r = requests.post('https://fushe.chinansc.cn/open/candidate-list',data=payload, timeout=5)
r.raise_for_status()
r.close()
html = etree.HTML(r.text)
results = html.xpath("//table[@class='cert-table']")
try:
for i in results:
url = i.xpath('tr/td/a/@href')
name = i.xpath('tr[1]/td[1]/text()')
ID_number = i.xpath('tr[1]/td[2]/text()')
report_number = i.xpath('tr[2]/td[1]/text()')
issue_date = i.xpath('tr[2]/td[2]/text()')
jsondata = {
'name':name[0],
'ID_number':ID_number[0],
'report_number':report_number[0],
'issue_date':issue_date[0],
'url':'https://fushe.chinansc.cn'+ url[0]
}
candidates.append(jsondata)
except:
pass
except:
pass
return Response(candidates)
else:
return Response({'error':'姓名列和身份证列不匹配!'})