from django.shortcuts import render from rest_framework.viewsets import ModelViewSet from .models import * from .serializers import * from rest_framework.decorators import action from django.conf import settings from rest_framework import status from rest_framework.response import Response import zipfile import rarfile from openpyxl import Workbook, load_workbook from django.db.models import Count import time # Create your views here. class CMAViewSet(ModelViewSet): """ CMA检测能力:增删改查 """ perms_map = {'get': '*', 'post': 'cma_create', 'put': 'cma_update', 'delete': 'cma_delete'} queryset = CMA.objects.all() serializer_class = CMASerializer search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'glzz', 'dlmc'] filterset_fields = ['sszx', 'type', 'glzz'] ordering_fields = ['xmxh'] ordering = 'sszx' @action(methods=['get'], detail=False,url_name='cma_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) @action(methods=['post'], detail=False, url_path='import', url_name='cma_import', perms_map = {'post':'cma_import'}) def cma_import(self, request, pk=None): """ 导入能力 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar',time.strftime('%Y%m%d%H%M%S')) os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() CMA.objects.filter(type='center').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xls'): import_cma(f, os.path.join(root,f)) return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip',time.strftime('%Y%m%d%H%M%S')) os.mkdir(fulldir) os.chdir(fulldir) CMA.objects.filter(type='center').delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma(f.encode('cp437').decode('gbk'), os.path.join(root,f)) return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) return Response(status = status.HTTP_200_OK) @action(methods=['post'], detail=False, url_path='import2', url_name='cma_import2', perms_map = {'post':'cma_import2'}) def cma_import2(self, request, pk=None): """ 导入能力2 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar',time.strftime('%Y%m%d%H%M%S')) os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() # CMA.objects.filter(type='sub').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma2(f, os.path.join(root,f)) return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip',time.strftime('%Y%m%d%H%M%S')) os.mkdir(fulldir) os.chdir(fulldir) # CMA.objects.filter(type='sub').delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma2(f.encode('cp437').decode('gbk'), os.path.join(root,f)) return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) return Response(status = status.HTTP_200_OK) class QualificationViewSet(ModelViewSet): """ 资质能力:增删改查 """ perms_map = {'get': '*', 'post': 'qualificaiton_create', 'put': 'qualification_update', 'delete': 'qualification_delete'} queryset = Qualification.objects.all() serializer_class = QualificationSerializer search_fields = ['cma', 'cnas', 'sszx', 'other', 'service'] # ordering_fields = ['sszx'] ordering = 'sszx' filterset_fields = ['sszx'] @action(methods=['get'], detail=False,url_name='qualification_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) class CNASViewSet(ModelViewSet): """ CNAS检测能力:增删改查 """ perms_map = {'get': '*', 'post': 'cnas_create', 'put': 'cnas_update', 'delete': 'cnas_delete'} queryset = CNAS.objects.all() serializer_class = CNASSerializer search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'bztk'] ordering_fields = ['bzmc'] filterset_fields = ['sszx'] ordering = 'bzmc' @action(methods=['post'], detail=False, url_path='import', url_name='cnas_import', perms_map = {'post':'cnas_import'}) def cnas_import(self, request, pk=None): """ 导入能力 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar','') os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() CNAS.objects.all().delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xls'): return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST) import_cnas(f, os.path.join(root,f)) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip','') os.mkdir(fulldir) os.chdir(fulldir) CNAS.objects.all().delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xls'): return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST) import_cnas(f.encode('cp437').decode('gbk'), os.path.join(root,f)) return Response(status = status.HTTP_200_OK) def import_cma(filename, path): wb = load_workbook(path) sheet = wb.worksheets[0] datalist = [] sszx = filename.replace('.xlsx','').replace('副本14检验检测能力申请表-','') i = 4 while sheet['b'+str(i)].value: data = {} data['dlxh'] = sheet['a'+str(i)].value data['dlmc'] = sheet['b'+str(i)].value data['lbxh'] = sheet['c'+str(i)].value data['lbmc'] = sheet['d'+str(i)].value data['xmxh'] = sheet['e'+str(i)].value data['xmmc'] = sheet['f'+str(i)].value data['bzmc'] = sheet['g'+str(i)].value data['bzbh'] = sheet['h'+str(i)].value data['xzfw'] = sheet['i'+str(i)].value data['bz'] = sheet['j'+str(i)].value data['sszx'] = sszx data['type'] = 'center' # print(data) datalist.append(CMA(**data)) i = i + 1 CMA.objects.bulk_create(datalist) def import_cnas(filename, path): wb = load_workbook(path) sheet = wb.get_sheet_by_name('检测能力范围') datalist = [] sszx = filename.replace('.xlsx','').replace('检测能力范围(含能源之星)-','') i = 3 while sheet['l'+str(i)].value: data = {} if sheet['b'+str(i)].value: data['lbmc'] = sheet['b'+str(i)].value else: m = i - 1 while True: if sheet['b'+str(m)].value: data['lbmc'] = sheet['b'+str(m)].value break m = m - 1 if sheet['g'+str(i)].value: data['xmmc'] = sheet['g'+str(i)].value else: m = i - 1 while True: if sheet['g'+str(m)].value: data['xmmc'] = sheet['g'+str(m)].value break m = m - 1 data['bzmc'] = sheet['l'+str(i)].value data['bzbh'] = sheet['n'+str(i)].value data['bztk'] = sheet['p'+str(i)].value data['sszx'] = sszx # print(data) datalist.append(CNAS(**data)) i = i + 1 CNAS.objects.bulk_create(datalist) def import_cma2(filename, path): wb = load_workbook(path,data_only=True) sheet = wb.worksheets[0] datalist = [] sszx = filename.split('-')[0] if CMA.objects.filter(sszx=sszx, type='sub').exists(): CMA.objects.filter(sszx=sszx, type='sub').delete() i = 3 max_row = sheet.max_row print(max_row) while i