from django.shortcuts import render from rest_framework.mixins import ListModelMixin from rest_framework.permissions import IsAuthenticated from rest_framework.viewsets import GenericViewSet, ModelViewSet from .models import * from .serializers import * from rest_framework.decorators import action from django.conf import settings from rest_framework import status from rest_framework.response import Response import zipfile import rarfile from apps.system.models import Organization from openpyxl import Workbook, load_workbook from django.db.models import Count from utils.pagination import PageOrNot from rest_framework.views import APIView # Create your views here. import json class RecordMixin(): def perform_authentication(self, request): """ Perform authentication on the incoming request. Note that if you override this and simply 'pass', then authentication will instead be performed lazily, the first time either `request.user` or `request.auth` is accessed. """ user = request.user if request.method == 'GET' and user.username != 'admin': QueryRecord.objects.create(user=user,path=request.path,ip=request.META.get('HTTP_X_FORWARDED_FOR'),method=\ request.method,search=request.query_params.get('search',None),query=request.query_params) class QueryRecordListViewSet(ListModelMixin, GenericViewSet): perms_map = {'get': 'queryrecord_view'} queryset = QueryRecord.objects.all() serializer_class = QueryRecordSerializer search_fields = ['user__name', 'user__dept__name'] ordering = ['-create_time'] class CMAViewSet(RecordMixin, ModelViewSet): """ CMA检测能力:增删改查 """ perms_map = {'get': 'cma_view', 'post': 'cma_create', 'put': 'cma_update', 'delete': 'cma_delete'} queryset = CMA.objects.all() serializer_class = CMASerializer search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'glzz', 'dlmc'] filterset_fields = ['sszx', 'type', 'glzz'] ordering_fields = ['sszx', 'dlxh', 'lbxh', 'xmxh'] ordering = ['create_time', 'xmxh'] @action(methods=['get'], detail=False,url_name='cma_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] ret2 = {} if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret2[i[group_by]] = "A" ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) print(ret2) return Response(ret) @action(methods=['post'], detail=False, url_path='deletes', url_name='cma_deletes', perms_map = {'post':'cma_deletes'}) def deletes(self, request): array = request.data['ids'] CMA.objects.filter(pk__in=array).delete() return Response(status = status.HTTP_200_OK) @action(methods=['post'], detail=False, url_path='import', url_name='cma_import', perms_map = {'post':'cma_import'}) def cma_import(self, request, pk=None): """ 导入能力 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar','') os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() CMA.objects.filter(type='center').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma(f, os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip','') os.mkdir(fulldir) os.chdir(fulldir) CMA.objects.filter(type='center').delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma(f.encode('cp437').decode('gbk'), os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) return Response(status = status.HTTP_200_OK) @action(methods=['post'], detail=False, url_path='import2', url_name='cma_import2', perms_map = {'post':'cma_import2'}) def cma_import2(self, request, pk=None): """ 导入能力2 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar','') os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() # CMA.objects.filter(type='sub').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma2(f, os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip','') os.mkdir(fulldir) os.chdir(fulldir) # CMA.objects.filter(type='sub').delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma2(f.encode('cp437').decode('gbk'), os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) return Response(status = status.HTTP_200_OK) from django.db.models.query import QuerySet class QualificationViewSet(RecordMixin, ModelViewSet): """ 资质能力:增删改查 """ perms_map = {'get': 'qualification_view', 'post': 'qualificaiton_create', 'put': 'qualification_update', 'delete': 'qualification_delete'} queryset = Qualification.objects.all() serializer_class = QualificationSerializer search_fields = ['cma', 'cnas', 'sszx', 'other', 'service'] ordering_fields = ['sszx'] ordering = ['sszx'] filterset_fields = ['sszx'] def get_queryset(self): assert self.queryset is not None, ( "'%s' should either include a `queryset` attribute, " "or override the `get_queryset()` method." % self.__class__.__name__ ) queryset = self.queryset if isinstance(queryset, QuerySet): # Ensure queryset is re-evaluated on each request. queryset = queryset.all() if hasattr(self.get_serializer_class(), 'setup_eager_loading'): queryset = self.get_serializer_class().setup_eager_loading(queryset) return queryset @action(methods=['get'], detail=False,url_name='qualification_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) @action(methods=['post'], detail=False, url_path='import', url_name='qualification_import', perms_map = {'post':'qualification_import'}) def qualification_import(self, request, pk=None): filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath if fullpath.endswith('.xlsx'): import_qualification(fullpath) return Response(status = status.HTTP_200_OK) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) class QualificationotherViewSet(RecordMixin, PageOrNot,ModelViewSet): """ 资质能力:增删改查 """ perms_map = {'get': 'qualification_view', 'post': 'qualificationother_create', 'put': 'qualificationother_update', 'delete': 'qualificationother_delete'} queryset = Qualificationother.objects.all() serializer_class = QualificationotherSerializer search_fields = ['qualification__cma', 'name','description','qualification__cnas', 'qualification__ssbm__id', 'qualification__service'] filterset_fields = ['qualification__ssbm__name'] ordering_fields = ['qualification__ssbm__name'] ordering = ['create_time'] @action(methods=['post'], detail=False, url_path='deletes', url_name='qualificationother_deletes', perms_map = {'post':'qualificationother_deletes'}) def deletes(self, request): array = request.data['ids'] Qualificationother.objects.filter(pk__in=array).delete() return Response(status = status.HTTP_200_OK) @action(methods=['get'], detail=False,url_name='qualification_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by('qualification__ssbm__sort', group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) class InspectionViewSet(RecordMixin, ModelViewSet): """ 检验能力:增删改查 """ perms_map = {'get': 'inspection_view', 'post': 'inspection_create', 'put': 'inspection_update', 'delete': 'inspection_delete'} queryset = Inspection.objects.all() serializer_class = InspectionSerializer search_fields = ['dlmc', 'jydx', 'dxxh','jyxmmc','jybz','sszx','sm'] ordering_fields = ['dlmc'] ordering = ['create_time', 'jyxmxh'] filterset_fields = ['sszx'] @action(methods=['post'], detail=False, url_path='deletes', url_name='inspection_deletes', perms_map = {'post':'inspection_deletes'}) def deletes(self, request): array = request.data['ids'] Inspection.objects.filter(pk__in=array).delete() return Response(status = status.HTTP_200_OK) @action(methods=['get'], detail=False,url_name='inspection_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) @action(methods=['post'], detail=False, url_path='inspection2', url_name='inspection_inspection2', perms_map = {'post':'inspection_inspection2'}) def inspection_import2(self, request, pk=None): """ 导入能力2 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar','') os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() # CMA.objects.filter(type='sub').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_inspection(f, os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip','') os.mkdir(fulldir) os.chdir(fulldir) # CMA.objects.filter(type='sub').delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_inspection(f.encode('cp437').decode('gbk'), os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) return Response(status = status.HTTP_200_OK) class CNASViewSet(RecordMixin, ModelViewSet): """ CNAS检测能力:增删改查 """ perms_map = {'get': 'cma_view', 'post': 'cnas_create', 'put': 'cnas_update', 'delete': 'cnas_delete'} queryset = CNAS.objects.all() serializer_class = CNASSerializer search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'bztk'] ordering_fields = ['bzmc'] filterset_fields = ['sszx'] ordering = ['create_time'] @action(methods=['get'], detail=False,url_name='cnas_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) @action(methods=['post'], detail=False, url_path='import', url_name='cnas_import', perms_map = {'post':'cnas_import'}) def cnas_import(self, request, pk=None): """ 导入能力 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar','') os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() CNAS.objects.all().delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xls'): return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST) import_cnas(f, os.path.join(root,f)) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip','') os.mkdir(fulldir) os.chdir(fulldir) CNAS.objects.all().delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: import_cnas(f.encode('cp437').decode('gbk'), os.path.join(root,f)) return Response(status = status.HTTP_200_OK) def import_qualification(path): wb = load_workbook(path) sheet = wb.worksheets[0] i = 3 max_row = sheet.max_row obj = {} Qualificationother.objects.all().delete() Qualification.objects.all().delete() while i