cma_search/server/apps/ability/views.py

588 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.shortcuts import render
from rest_framework.viewsets import ModelViewSet
from .models import *
from .serializers import *
from rest_framework.decorators import action
from django.conf import settings
from rest_framework import status
from rest_framework.response import Response
import zipfile
import rarfile
from apps.system.models import Organization
from openpyxl import Workbook, load_workbook
from django.db.models import Count
from utils.pagination import PageOrNot
# Create your views here.
import json
class RecordMixin():
def perform_authentication(self, request):
"""
Perform authentication on the incoming request.
Note that if you override this and simply 'pass', then authentication
will instead be performed lazily, the first time either
`request.user` or `request.auth` is accessed.
"""
user = request.user
if request.method == 'GET':
print(json.dumps(request.query_params), request.method, request.path)
class CMAViewSet(RecordMixin, ModelViewSet):
"""
CMA检测能力增删改查
"""
perms_map = {'get': '*', 'post': 'cma_create',
'put': 'cma_update', 'delete': 'cma_delete'}
queryset = CMA.objects.all()
serializer_class = CMASerializer
search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'glzz', 'dlmc']
filterset_fields = ['sszx', 'type', 'glzz']
ordering_fields = ['sszx', 'dlxh', 'lbxh', 'xmxh']
ordering = ['create_time']
@action(methods=['get'], detail=False,url_name='cma_group_by', perms_map = {'*':'*'})
def group(self, request, pk=None):
"""
聚合查询列
"""
queryset = self.filter_queryset(self.get_queryset())
ret = []
if request.query_params.get('group_by', None):
group_by = request.query_params.get('group_by')
group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by))
for i in group_by_data:
if i[group_by] and i['count']:
ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]})
return Response(ret)
@action(methods=['post'], detail=False, url_path='deletes', url_name='cma_deletes', perms_map = {'post':'cma_deletes'})
def deletes(self, request):
array = request.data['ids']
CMA.objects.filter(pk__in=array).delete()
return Response(status = status.HTTP_200_OK)
@action(methods=['post'], detail=False, url_path='import', url_name='cma_import', perms_map = {'post':'cma_import'})
def cma_import(self, request, pk=None):
"""
导入能力
"""
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
import os
if fullpath.endswith('.rar'):
rar = rarfile.RarFile(fullpath)
fulldir = fullpath.replace('.rar','')
os.mkdir(fulldir)
os.chdir(fulldir)
rar.extractall()
rar.close()
CMA.objects.filter(type='center').delete()
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xlsx'):
import_cma(f, os.path.join(root,f))
else:
return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST)
elif fullpath.endswith('.zip'):
fulldir = fullpath.replace('.zip','')
os.mkdir(fulldir)
os.chdir(fulldir)
CMA.objects.filter(type='center').delete()
with zipfile.ZipFile(fullpath,'r') as zzz:
zzz.extractall(fulldir)
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xlsx'):
import_cma(f.encode('cp437').decode('gbk'), os.path.join(root,f))
else:
return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST)
return Response(status = status.HTTP_200_OK)
@action(methods=['post'], detail=False, url_path='import2', url_name='cma_import2', perms_map = {'post':'cma_import2'})
def cma_import2(self, request, pk=None):
"""
导入能力2
"""
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
import os
if fullpath.endswith('.rar'):
rar = rarfile.RarFile(fullpath)
fulldir = fullpath.replace('.rar','')
os.mkdir(fulldir)
os.chdir(fulldir)
rar.extractall()
rar.close()
# CMA.objects.filter(type='sub').delete()
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xlsx'):
import_cma2(f, os.path.join(root,f))
else:
return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST)
elif fullpath.endswith('.zip'):
fulldir = fullpath.replace('.zip','')
os.mkdir(fulldir)
os.chdir(fulldir)
# CMA.objects.filter(type='sub').delete()
with zipfile.ZipFile(fullpath,'r') as zzz:
zzz.extractall(fulldir)
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xlsx'):
import_cma2(f.encode('cp437').decode('gbk'), os.path.join(root,f))
else:
return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST)
return Response(status = status.HTTP_200_OK)
from django.db.models.query import QuerySet
class QualificationViewSet(ModelViewSet):
"""
资质能力:增删改查
"""
perms_map = {'get': '*', 'post': 'qualificaiton_create',
'put': 'qualification_update', 'delete': 'qualification_delete'}
queryset = Qualification.objects.all()
serializer_class = QualificationSerializer
search_fields = ['cma', 'cnas', 'sszx', 'other', 'service']
ordering_fields = ['sszx']
ordering = ['sszx']
filterset_fields = ['sszx']
def get_queryset(self):
assert self.queryset is not None, (
"'%s' should either include a `queryset` attribute, "
"or override the `get_queryset()` method."
% self.__class__.__name__
)
queryset = self.queryset
if isinstance(queryset, QuerySet):
# Ensure queryset is re-evaluated on each request.
queryset = queryset.all()
if hasattr(self.get_serializer_class(), 'setup_eager_loading'):
queryset = self.get_serializer_class().setup_eager_loading(queryset)
return queryset
@action(methods=['get'], detail=False,url_name='qualification_group_by', perms_map = {'*':'*'})
def group(self, request, pk=None):
"""
聚合查询列
"""
queryset = self.filter_queryset(self.get_queryset())
ret = []
if request.query_params.get('group_by', None):
group_by = request.query_params.get('group_by')
group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by))
for i in group_by_data:
if i[group_by] and i['count']:
ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]})
return Response(ret)
@action(methods=['post'], detail=False, url_path='import', url_name='qualification_import', perms_map = {'post':'qualification_import'})
def qualification_import(self, request, pk=None):
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
if fullpath.endswith('.xlsx'):
import_qualification(fullpath)
return Response(status = status.HTTP_200_OK)
else:
return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST)
class QualificationotherViewSet(PageOrNot,ModelViewSet):
"""
资质能力:增删改查
"""
perms_map = {'get': '*', 'post': 'qualificationother_create',
'put': 'qualificationother_update', 'delete': 'qualificationother_delete'}
queryset = Qualificationother.objects.all()
serializer_class = QualificationotherSerializer
search_fields = ['qualification__cma', 'name','description','qualification__cnas', 'qualification__ssbm__id', 'qualification__service']
filterset_fields = ['qualification__ssbm__name']
ordering_fields = ['qualification__ssbm__name']
ordering = ['create_time']
@action(methods=['post'], detail=False, url_path='deletes', url_name='qualificationother_deletes', perms_map = {'post':'qualificationother_deletes'})
def deletes(self, request):
array = request.data['ids']
Qualificationother.objects.filter(pk__in=array).delete()
return Response(status = status.HTTP_200_OK)
@action(methods=['get'], detail=False,url_name='qualification_group_by', perms_map = {'*':'*'})
def group(self, request, pk=None):
"""
聚合查询列
"""
queryset = self.filter_queryset(self.get_queryset())
ret = []
if request.query_params.get('group_by', None):
group_by = request.query_params.get('group_by')
group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by('qualification__ssbm__sort', group_by))
for i in group_by_data:
if i[group_by] and i['count']:
ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]})
return Response(ret)
class InspectionViewSet(ModelViewSet):
"""
CNAS检测能力增删改查
"""
perms_map = {'get': '*', 'post': 'inspection_create',
'put': 'inspection_update', 'delete': 'inspection_delete'}
queryset = Inspection.objects.all()
serializer_class = InspectionSerializer
search_fields = ['dlmc', 'jydx', 'dxxh','jyxmmc','jybz','sszx','sm']
ordering_fields = ['dlmc']
ordering = ['create_time']
filterset_fields = ['sszx']
@action(methods=['post'], detail=False, url_path='deletes', url_name='inspection_deletes', perms_map = {'post':'inspection_deletes'})
def deletes(self, request):
array = request.data['ids']
Inspection.objects.filter(pk__in=array).delete()
return Response(status = status.HTTP_200_OK)
@action(methods=['get'], detail=False,url_name='inspection_group_by', perms_map = {'*':'*'})
def group(self, request, pk=None):
"""
聚合查询列
"""
queryset = self.filter_queryset(self.get_queryset())
ret = []
if request.query_params.get('group_by', None):
group_by = request.query_params.get('group_by')
group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by))
for i in group_by_data:
if i[group_by] and i['count']:
ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]})
return Response(ret)
@action(methods=['post'], detail=False, url_path='inspection2', url_name='inspection_inspection2', perms_map = {'post':'inspection_inspection2'})
def inspection_import2(self, request, pk=None):
"""
导入能力2
"""
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
import os
if fullpath.endswith('.rar'):
rar = rarfile.RarFile(fullpath)
fulldir = fullpath.replace('.rar','')
os.mkdir(fulldir)
os.chdir(fulldir)
rar.extractall()
rar.close()
# CMA.objects.filter(type='sub').delete()
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xlsx'):
import_inspection(f, os.path.join(root,f))
else:
return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST)
elif fullpath.endswith('.zip'):
fulldir = fullpath.replace('.zip','')
os.mkdir(fulldir)
os.chdir(fulldir)
# CMA.objects.filter(type='sub').delete()
with zipfile.ZipFile(fullpath,'r') as zzz:
zzz.extractall(fulldir)
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xlsx'):
import_inspection(f.encode('cp437').decode('gbk'), os.path.join(root,f))
else:
return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST)
return Response(status = status.HTTP_200_OK)
class CNASViewSet(ModelViewSet):
"""
CNAS检测能力增删改查
"""
perms_map = {'get': '*', 'post': 'cnas_create',
'put': 'cnas_update', 'delete': 'cnas_delete'}
queryset = CNAS.objects.all()
serializer_class = CNASSerializer
search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'bztk']
ordering_fields = ['bzmc']
filterset_fields = ['sszx']
ordering = ['create_time']
@action(methods=['get'], detail=False,url_name='cnas_group_by', perms_map = {'*':'*'})
def group(self, request, pk=None):
"""
聚合查询列
"""
queryset = self.filter_queryset(self.get_queryset())
ret = []
if request.query_params.get('group_by', None):
group_by = request.query_params.get('group_by')
group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by))
for i in group_by_data:
if i[group_by] and i['count']:
ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]})
return Response(ret)
@action(methods=['post'], detail=False, url_path='import', url_name='cnas_import', perms_map = {'post':'cnas_import'})
def cnas_import(self, request, pk=None):
"""
导入能力
"""
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
import os
if fullpath.endswith('.rar'):
rar = rarfile.RarFile(fullpath)
fulldir = fullpath.replace('.rar','')
os.mkdir(fulldir)
os.chdir(fulldir)
rar.extractall()
rar.close()
CNAS.objects.all().delete()
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xls'):
return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST)
import_cnas(f, os.path.join(root,f))
elif fullpath.endswith('.zip'):
fulldir = fullpath.replace('.zip','')
os.mkdir(fulldir)
os.chdir(fulldir)
CNAS.objects.all().delete()
with zipfile.ZipFile(fullpath,'r') as zzz:
print(zzz)
zzz.extractall(fulldir)
for root, dirs, files in os.walk(fulldir):
for f in files:
import_cnas(f.encode('cp437').decode('gbk'), os.path.join(root,f))
return Response(status = status.HTTP_200_OK)
def import_qualification(path):
wb = load_workbook(path)
sheet = wb.worksheets[0]
i = 3
max_row = sheet.max_row
obj = {}
Qualificationother.objects.all().delete()
Qualification.objects.all().delete()
while i<max_row+1:
sszx = sheet['b'+str(i)].value
cma = sheet['c'+str(i)].value
cnas = sheet['d'+str(i)].value
service = sheet['g'+str(i)].value
name = sheet['e'+str(i)].value
description = sheet['f'+str(i)].value
if sszx:
if Organization.objects.filter(name=sszx).exists():
ssbm=Organization.objects.get(name=sszx)
if Qualification.objects.filter(ssbm=ssbm).exists():
qualification = Qualification.objects.get(ssbm=ssbm)
Qualificationother.objects.create(qualification=qualification, name=name, description=description)
else:
qualification = Qualification.objects.create(ssbm=ssbm)
qualification.cma = cma
qualification.cnas = cnas
qualification.service = service
qualification.save()
obj = qualification
Qualificationother.objects.create(qualification=qualification, name=name, description=description)
else:
Qualificationother.objects.create(qualification=obj, name=name, description=description)
i = i + 1
def import_cma(filename, path):
wb = load_workbook(path)
sheet = wb.worksheets[0]
datalist = []
sszx = filename.replace('.xlsx','').replace('副本14检验检测能力申请表-','')
i = 4
while sheet['b'+str(i)].value:
data = {}
data['dlxh'] = sheet['a'+str(i)].value
data['dlmc'] = sheet['b'+str(i)].value
data['lbxh'] = sheet['c'+str(i)].value
data['lbmc'] = sheet['d'+str(i)].value
data['xmxh'] = sheet['e'+str(i)].value
data['xmmc'] = sheet['f'+str(i)].value
data['bzmc'] = sheet['g'+str(i)].value
data['bzbh'] = sheet['h'+str(i)].value
data['xzfw'] = sheet['i'+str(i)].value
data['bz'] = sheet['j'+str(i)].value
data['sszx'] = sszx
data['type'] = 'center'
datalist.append(CMA(**data))
i = i + 1
CMA.objects.bulk_create(datalist)
import xlrd
def import_cnas(filename, path):
sheet = xlrd.open_workbook(path).sheet_by_name('检测能力范围')
datalist = []
sszx = filename.replace('.xlsx','').replace('.xls', '').replace('检测能力范围(含能源之星)-','')
i = 2
lbmc = ''
xmmc = ''
while i < sheet.nrows:
data = {}
data_list = sheet.row_values(i)
if data_list[1]:
lbmc = data_list[1]
if data_list[6]:
xmmc = data_list[6]
data['lbmc'] = lbmc
data['xmmc'] = xmmc
data['bzmc'] = data_list[11]
data['bzbh'] = data_list[13]
data['bztk'] = data_list[15]
data['sszx'] = sszx
datalist.append(CNAS(**data))
i = i + 1
print(i,data)
CNAS.objects.bulk_create(datalist)
def import_cma2(filename, path):
wb = load_workbook(path,data_only=True)
sheet = wb.worksheets[0]
datalist = []
sszx = filename.split('-')[0]
if CMA.objects.filter(sszx=sszx, type='sub').exists():
CMA.objects.filter(sszx=sszx, type='sub').delete()
i = 3
max_row = sheet.max_row
defaultv = {}
while i<max_row+1:
data = {}
if sheet['a'+str(i)].value:
data['dlxh'] = sheet['a'+str(i)].value
defaultv['dlxh'] = data['dlxh']
else:
data['dlxh'] = defaultv['dlxh']
if sheet['b'+str(i)].value:
data['dlmc'] = sheet['b'+str(i)].value
defaultv['dlmc'] = data['dlmc']
else:
data['dlmc'] = defaultv['dlmc']
if sheet['c'+str(i)].value:
data['lbxh'] = sheet['c'+str(i)].value
defaultv['lbxh'] = data['lbxh']
else:
data['lbxh'] = defaultv['lbxh']
if sheet['d'+str(i)].value:
data['lbmc'] = sheet['d'+str(i)].value
defaultv['lbmc'] = data['lbmc']
else:
data['lbmc'] = defaultv['lbmc']
if sheet['e'+str(i)].value:
data['xmxh'] = sheet['e'+str(i)].value
defaultv['xmxh'] = data['xmxh']
else:
data['xmxh'] = defaultv['xmxh']
if sheet['f'+str(i)].value:
data['xmmc'] = sheet['f'+str(i)].value
defaultv['xmmc'] = data['xmmc']
else:
data['xmmc'] = defaultv['xmmc']
if sheet['g'+str(i)].value:
data['bzmc'] = sheet['g'+str(i)].value
defaultv['bzmc'] = data['bzmc']
else:
data['bzmc'] = defaultv['bzmc']
if sheet['h'+str(i)].value:
data['bzbh'] = sheet['h'+str(i)].value
defaultv['bzbh'] = data['bzbh']
else:
data['bzbh'] = defaultv['bzbh']
data['xzfw'] = sheet['i'+str(i)].value if (sheet['i'+str(i)].value and sheet['i'+str(i)].value !='') else None
data['bz'] = sheet['j'+str(i)].value if (sheet['j'+str(i)].value and sheet['j'+str(i)].value !='') else None
data['glzz'] = sheet['k'+str(i)].value if (sheet['k'+str(i)].value and sheet['k'+str(i)].value !='') else None
data['sszx'] = sszx
data['type'] = 'sub'
datalist.append(CMA(**data))
i = i + 1
CMA.objects.bulk_create(datalist)
def import_inspection(filename, path):
wb = load_workbook(path,data_only=True)
sheet = wb.worksheets[0]
datalist = []
sszx = filename.split('-')[0]
if Inspection.objects.filter(sszx=sszx).exists():
Inspection.objects.filter(sszx=sszx).delete()
i = 3
max_row = sheet.max_row
defaultv = {}
while i<max_row+1:
data = {}
if sheet['a'+str(i)].value:
data['dlxh'] = sheet['a'+str(i)].value
defaultv['dlxh'] = data['dlxh']
else:
data['dlxh'] = defaultv['dlxh']
if sheet['b'+str(i)].value:
data['dlmc'] = sheet['b'+str(i)].value
defaultv['dlmc'] = data['dlmc']
else:
data['dlmc'] = defaultv['dlmc']
if sheet['c'+str(i)].value:
data['dxxh'] = sheet['c'+str(i)].value
defaultv['dxxh'] = data['dxxh']
else:
data['dxxh'] = defaultv['dxxh']
if sheet['d'+str(i)].value:
data['jydx'] = sheet['d'+str(i)].value
defaultv['jydx'] = data['jydx']
else:
data['jydx'] = defaultv['jydx']
if sheet['e'+str(i)].value:
data['jyxmxh'] = sheet['e'+str(i)].value
elif sheet['e3'].value: # 该表存在项目序号
m = i - 1
while True:
if sheet['e'+str(m)].value:
data['jyxmxh'] = sheet['e'+str(m)].value
break
m = m - 1
else: #该表没有项目序号,自己定
pass
if sheet['f'+str(i)].value:
data['jyxmmc'] = sheet['f'+str(i)].value
else:
m = i - 1
while True:
if sheet['f'+str(m)].value:
data['jyxmmc'] = sheet['f'+str(m)].value
break
m = m - 1
if sheet['g'+str(i)].value:
data['jybz'] = sheet['g'+str(i)].value
elif sheet['g3'].value:
m = i - 1
while True:
if sheet['g'+str(m)].value:
data['jybz'] = sheet['g'+str(m)].value
break
m = m - 1
if sheet['h'+str(i)].value:
data['sm'] = sheet['h'+str(i)].value
elif sheet['h3'].value:
m = i - 1
while True:
if sheet['h'+str(m)].value:
data['sm'] = sheet['h'+str(m)].value
break
m = m - 1
if sheet['i'+str(i)].value:
data['sxrq'] = sheet['i'+str(i)].value
elif sheet['i3'].value:
m = i - 1
while True:
if sheet['i'+str(m)].value:
data['sxrq'] = sheet['i'+str(m)].value
break
m = m - 1
data['sszx'] = sszx
datalist.append(Inspection(**data))
i = i + 1
Inspection.objects.bulk_create(datalist)