cma_search/server/apps/ability/views.py

138 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.shortcuts import render
from rest_framework.viewsets import ModelViewSet
from .models import CMA, CNAS
from .serializers import CMASerializer, CNASSerializer
from rest_framework.decorators import action
from django.conf import settings
from rest_framework import status
from rest_framework.response import Response
import zipfile
import rarfile
from openpyxl import Workbook, load_workbook
from django.db.models import Count
# Create your views here.
class CMAViewSet(ModelViewSet):
"""
CMA检测能力增删改查
"""
perms_map = {'get': '*', 'post': 'cma_create',
'put': 'cma_update', 'delete': 'cma_delete'}
queryset = CMA.objects.all()
serializer_class = CMASerializer
search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc']
filterset_fields = ['sszx']
ordering_fields = ['xmxh']
ordering = 'xmxh'
@action(methods=['get'], detail=False,url_name='cma_group_by', perms_map = {'*':'*'})
def group(self, request, pk=None):
"""
聚合查询列
"""
queryset = self.filter_queryset(self.get_queryset())
ret = []
if request.query_params.get('group_by', None):
group_by = request.query_params.get('group_by')
group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by))
for i in group_by_data:
ret.append({'text':i[group_by]+'('+str(i['count'])+')','value':i[group_by]})
return Response(ret)
@action(methods=['post'], detail=False, url_path='import', url_name='cma_import', perms_map = {'post':'cma_import'})
def cma_import(self, request, pk=None):
"""
导入能力
"""
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
import os
if fullpath.endswith('.rar'):
rar = rarfile.RarFile(fullpath)
fulldir = fullpath.replace('.rar','')
os.mkdir(fulldir)
os.chdir(fulldir)
rar.extractall()
rar.close()
CMA.objects.all().delete()
for root, dirs, files in os.walk(fulldir):
for f in files:
import_cma(f, os.path.join(root,f))
return Response(status = status.HTTP_200_OK)
class CNASViewSet(ModelViewSet):
"""
CNAS检测能力增删改查
"""
perms_map = {'get': '*', 'post': 'cnas_create',
'put': 'cnas_update', 'delete': 'cnas_delete'}
queryset = CNAS.objects.all()
serializer_class = CNASSerializer
search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc']
ordering_fields = ['bzmc']
ordering = 'bzmc'
@action(methods=['post'], detail=False, url_path='import', url_name='cnas_import', perms_map = {'post':'cnas_import'})
def cnas_import(self, request, pk=None):
"""
导入能力
"""
filepath = request.data['path']
fullpath = settings.BASE_DIR + filepath
import os
if fullpath.endswith('.rar'):
rar = rarfile.RarFile(fullpath)
fulldir = fullpath.replace('.rar','')
os.mkdir(fulldir)
os.chdir(fulldir)
rar.extractall()
rar.close()
CNAS.objects.all().delete()
for root, dirs, files in os.walk(fulldir):
for f in files:
if f.endswith('.xls'):
return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST)
import_cnas(f, os.path.join(root,f))
return Response(status = status.HTTP_200_OK)
def import_cma(filename, path):
wb = load_workbook(path)
sheet = wb.worksheets[0]
datalist = []
sszx = filename.replace('.xlsx','').replace('副本14检验检测能力申请表-','')
i = 4
while sheet['b'+str(i)].value:
data = {}
data['dlxh'] = sheet['a'+str(i)].value
data['dlmc'] = sheet['b'+str(i)].value
data['lbxh'] = sheet['c'+str(i)].value
data['lbmc'] = sheet['d'+str(i)].value
data['xmxh'] = sheet['e'+str(i)].value
data['xmmc'] = sheet['f'+str(i)].value
data['bzmc'] = sheet['g'+str(i)].value
data['bzbh'] = sheet['h'+str(i)].value
data['xzfw'] = sheet['i'+str(i)].value
data['bz'] = sheet['j'+str(i)].value
data['sszx'] = sszx
print(data)
datalist.append(CMA(**data))
i = i + 1
CMA.objects.bulk_create(datalist)
def import_cnas(filename, path):
wb = load_workbook(path)
sheet = wb.get_sheet_by_name('检测能力范围')
datalist = []
sszx = filename.replace('.xlsx','').replace('检测能力范围(含能源之星)-','')
i = 3
while sheet['l'+str(i)].value:
data = {}
data['lbmc'] = sheet['b'+str(i)].value
data['xmmc'] = sheet['g'+str(i)].value
data['bzmc'] = sheet['l'+str(i)].value
data['bzbh'] = sheet['n'+str(i)].value
data['sszx'] = sszx
print(data)
datalist.append(CNAS(**data))
i = i + 1
CNAS.objects.bulk_create(datalist)