cma_search/server/apps/ability/views.py

343 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.shortcuts import render
from rest_framework.viewsets import ModelViewSet
from .models import *
from .serializers import *
from rest_framework.decorators import action
from django.conf import settings
from rest_framework import status
from rest_framework.response import Response
import zipfile
import rarfile
from openpyxl import Workbook, load_workbook
from django.db.models import Count
# Create your views here.
class CMAViewSet(ModelViewSet):
"""
CMA检测能力增删改查
"""
perms_map = {'get': '*', 'post': 'cma_create',
'put': 'cma_update', 'delete': 'cma_delete'}
queryset = CMA.objects.all()
serializer_class = CMASerializer
search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'glzz']
filterset_fields = ['sszx', 'type', 'glzz']
ordering_fields = ['xmxh']
ordering = 'sszx'
@action(methods=['get'], detail=False,url_name='cma_group_by', perms_map = {'*':'*'})
def group(self, request, pk=None):
"""
聚合查询列
"""
queryset = self.filter_queryset(self.get_queryset())
ret = []
if request.query_params.get('group_by', None):
group_by = request.query_params.get('group_by')
group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by))
for i in group_by_data:
if i[group_by] and i['count']:
ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]})
return Response(ret)
@action(methods=['post'], detail=False, url_path='import', url_name='cma_import', perms_map = {'post':'cma_import'})
def cma_import(self, request, pk=None):
"""
导入能力
"""
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
import os
if fullpath.endswith('.rar'):
rar = rarfile.RarFile(fullpath)
fulldir = fullpath.replace('.rar','')
os.mkdir(fulldir)
os.chdir(fulldir)
rar.extractall()
rar.close()
CMA.objects.filter(type='center').delete()
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xls'):
return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST)
import_cma(f, os.path.join(root,f))
elif fullpath.endswith('.zip'):
fulldir = fullpath.replace('.zip','')
os.mkdir(fulldir)
os.chdir(fulldir)
CMA.objects.filter(type='center').delete()
with zipfile.ZipFile(fullpath,'r') as zzz:
zzz.extractall(fulldir)
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xls'):
return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST)
import_cma(f.encode('cp437').decode('gbk'), os.path.join(root,f))
return Response(status = status.HTTP_200_OK)
@action(methods=['post'], detail=False, url_path='import2', url_name='cma_import2', perms_map = {'post':'cma_import2'})
def cma_import2(self, request, pk=None):
"""
导入能力2
"""
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
import os
if fullpath.endswith('.rar'):
rar = rarfile.RarFile(fullpath)
fulldir = fullpath.replace('.rar','')
os.mkdir(fulldir)
os.chdir(fulldir)
rar.extractall()
rar.close()
# CMA.objects.filter(type='sub').delete()
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xls'):
return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST)
import_cma2(f, os.path.join(root,f))
elif fullpath.endswith('.zip'):
fulldir = fullpath.replace('.zip','')
os.mkdir(fulldir)
os.chdir(fulldir)
# CMA.objects.filter(type='sub').delete()
with zipfile.ZipFile(fullpath,'r') as zzz:
zzz.extractall(fulldir)
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xls'):
return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST)
import_cma2(f.encode('cp437').decode('gbk'), os.path.join(root,f))
return Response(status = status.HTTP_200_OK)
class QualificationViewSet(ModelViewSet):
"""
资质能力:增删改查
"""
perms_map = {'get': '*', 'post': 'qualificaiton_create',
'put': 'qualification_update', 'delete': 'qualification_delete'}
queryset = Qualification.objects.all()
serializer_class = QualificationSerializer
search_fields = ['cma', 'cnas', 'sszx', 'other', 'service']
# ordering_fields = ['sszx']
ordering = 'sszx'
filterset_fields = ['sszx']
@action(methods=['get'], detail=False,url_name='qualification_group_by', perms_map = {'*':'*'})
def group(self, request, pk=None):
"""
聚合查询列
"""
queryset = self.filter_queryset(self.get_queryset())
ret = []
if request.query_params.get('group_by', None):
group_by = request.query_params.get('group_by')
group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by))
for i in group_by_data:
if i[group_by] and i['count']:
ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]})
return Response(ret)
class CNASViewSet(ModelViewSet):
"""
CNAS检测能力增删改查
"""
perms_map = {'get': '*', 'post': 'cnas_create',
'put': 'cnas_update', 'delete': 'cnas_delete'}
queryset = CNAS.objects.all()
serializer_class = CNASSerializer
search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'bztk']
ordering_fields = ['bzmc']
filterset_fields = ['sszx']
ordering = 'bzmc'
@action(methods=['post'], detail=False, url_path='import', url_name='cnas_import', perms_map = {'post':'cnas_import'})
def cnas_import(self, request, pk=None):
"""
导入能力
"""
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
import os
if fullpath.endswith('.rar'):
rar = rarfile.RarFile(fullpath)
fulldir = fullpath.replace('.rar','')
os.mkdir(fulldir)
os.chdir(fulldir)
rar.extractall()
rar.close()
CNAS.objects.all().delete()
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xls'):
return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST)
import_cnas(f, os.path.join(root,f))
elif fullpath.endswith('.zip'):
fulldir = fullpath.replace('.zip','')
os.mkdir(fulldir)
os.chdir(fulldir)
CNAS.objects.all().delete()
with zipfile.ZipFile(fullpath,'r') as zzz:
zzz.extractall(fulldir)
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xls'):
return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST)
import_cnas(f.encode('cp437').decode('gbk'), os.path.join(root,f))
return Response(status = status.HTTP_200_OK)
def import_cma(filename, path):
wb = load_workbook(path)
sheet = wb.worksheets[0]
datalist = []
sszx = filename.replace('.xlsx','').replace('副本14检验检测能力申请表-','')
i = 4
while sheet['b'+str(i)].value:
data = {}
data['dlxh'] = sheet['a'+str(i)].value
data['dlmc'] = sheet['b'+str(i)].value
data['lbxh'] = sheet['c'+str(i)].value
data['lbmc'] = sheet['d'+str(i)].value
data['xmxh'] = sheet['e'+str(i)].value
data['xmmc'] = sheet['f'+str(i)].value
data['bzmc'] = sheet['g'+str(i)].value
data['bzbh'] = sheet['h'+str(i)].value
data['xzfw'] = sheet['i'+str(i)].value
data['bz'] = sheet['j'+str(i)].value
data['sszx'] = sszx
data['type'] = 'center'
# print(data)
datalist.append(CMA(**data))
i = i + 1
CMA.objects.bulk_create(datalist)
def import_cnas(filename, path):
wb = load_workbook(path)
sheet = wb.get_sheet_by_name('检测能力范围')
datalist = []
sszx = filename.replace('.xlsx','').replace('检测能力范围(含能源之星)-','')
i = 3
while sheet['l'+str(i)].value:
data = {}
if sheet['b'+str(i)].value:
data['lbmc'] = sheet['b'+str(i)].value
else:
m = i - 1
while True:
if sheet['b'+str(m)].value:
data['lbmc'] = sheet['b'+str(m)].value
break
m = m - 1
if sheet['g'+str(i)].value:
data['xmmc'] = sheet['g'+str(i)].value
else:
m = i - 1
while True:
if sheet['g'+str(m)].value:
data['xmmc'] = sheet['g'+str(m)].value
break
m = m - 1
data['bzmc'] = sheet['l'+str(i)].value
data['bzbh'] = sheet['n'+str(i)].value
data['bztk'] = sheet['q'+str(i)].value
data['sszx'] = sszx
# print(data)
datalist.append(CNAS(**data))
i = i + 1
CNAS.objects.bulk_create(datalist)
def import_cma2(filename, path):
wb = load_workbook(path,data_only=True)
sheet = wb.worksheets[0]
datalist = []
sszx = filename.split('-')[0]
if CMA.objects.filter(sszx=sszx, type='sub').exists():
CMA.objects.filter(sszx=sszx, type='sub').delete()
i = 3
max_row = sheet.max_row
print(max_row)
while i<max_row+1:
data = {}
if sheet['a'+str(i)].value:
data['dlxh'] = sheet['a'+str(i)].value
elif sheet['a3'].value:
m = i - 1
while True:
if sheet['a'+str(m)].value:
data['dlxh'] = sheet['a'+str(m)].value
break
m = m - 1
if sheet['b'+str(i)].value:
data['dlmc'] = sheet['b'+str(i)].value
elif sheet['b3'].value:
m = i - 1
while True:
if sheet['b'+str(m)].value:
data['dlmc'] = sheet['b'+str(m)].value
break
m = m - 1
if sheet['c'+str(i)].value:
data['lbxh'] = sheet['c'+str(i)].value
else:
m = i - 1
while True:
if sheet['c'+str(m)].value:
data['lbxh'] = sheet['c'+str(m)].value
break
m = m - 1
if sheet['d'+str(i)].value:
data['lbmc'] = sheet['d'+str(i)].value
else:
m = i - 1
while True:
if sheet['d'+str(m)].value:
data['lbmc'] = sheet['d'+str(m)].value
break
m = m - 1
if sheet['e'+str(i)].value:
data['xmxh'] = sheet['e'+str(i)].value
elif sheet['e3'].value: # 该表存在项目序号
m = i - 1
while True:
if sheet['e'+str(m)].value:
data['xmxh'] = sheet['e'+str(m)].value
break
m = m - 1
else: #该表没有项目序号,自己定
pass
if sheet['f'+str(i)].value:
data['xmmc'] = sheet['f'+str(i)].value
elif sszx=='枣庄公司':
pass
else:
m = i - 1
while True:
if sheet['f'+str(m)].value:
data['xmmc'] = sheet['f'+str(m)].value
break
m = m - 1
if sheet['g'+str(i)].value:
data['bzmc'] = sheet['g'+str(i)].value
else:
m = i - 1
while True:
if sheet['g'+str(m)].value:
data['bzmc'] = sheet['g'+str(m)].value
break
m = m - 1
if sheet['h'+str(i)].value:
data['bzbh'] = sheet['h'+str(i)].value
elif sheet['h3'].value:
m = i - 1
while True:
if sheet['h'+str(m)].value:
data['bzbh'] = sheet['h'+str(m)].value
break
m = m - 1
data['xzfw'] = sheet['i'+str(i)].value if (sheet['i'+str(i)].value and sheet['i'+str(i)].value !='') else None
data['bz'] = sheet['j'+str(i)].value if (sheet['j'+str(i)].value and sheet['j'+str(i)].value !='') else None
data['glzz'] = sheet['k'+str(i)].value if (sheet['k'+str(i)].value and sheet['k'+str(i)].value !='') else None
data['sszx'] = sszx
data['type'] = 'sub'
datalist.append(CMA(**data))
i = i + 1
CMA.objects.bulk_create(datalist)