from django.shortcuts import render from rest_framework.mixins import ListModelMixin from rest_framework.permissions import IsAuthenticated from rest_framework.viewsets import GenericViewSet, ModelViewSet from .models import * from .serializers import * from rest_framework.decorators import action from django.conf import settings from rest_framework import status from rest_framework.response import Response import zipfile import rarfile from apps.system.models import Organization from openpyxl import Workbook, load_workbook from django.db.models import Count from utils.pagination import PageOrNot from rest_framework.views import APIView from apps.supervision.models import Content, Record from apps.supervision.serializers import ContentSerializer, RecordCreateSerializer, RecordSerializer from apps.system.mixins import CreateUpdateCustomMixin from utils.queryset import get_child_queryset2 from apps.system.permission_data import RbacFilterSet from django.utils import timezone from apps.supervision.permission import RecordPermission from django.utils.decorators import method_decorator from django.views.decorators.cache import cache_page # Create your views here. import json class RecordMixin(): def perform_authentication(self, request): """ Perform authentication on the incoming request. Note that if you override this and simply 'pass', then authentication will instead be performed lazily, the first time either `request.user` or `request.auth` is accessed. """ if request.user and request.method == 'GET' and request.user.username != 'admin': QueryRecord.objects.create(user=request.user,path=request.path,ip=request.META.get('HTTP_X_FORWARDED_FOR'),method=\ request.method,search=request.query_params.get('search',None),query=request.query_params) class AbilityContentViewSet(CreateUpdateCustomMixin, ModelViewSet): """ 资质能力报送清单:增删改查 """ perms_map = {'get': '*', 'post': 'content', 'put': 'content', 'delete': 'content'} queryset = Content.objects.filter(cate=2) serializer_class = ContentSerializer pagination_class = None search_fields = ['name', 'desc'] filterset_fields = ['type','cate'] ordering = ['sortnum', 'create_time'] def perform_create(self, serializer): if hasattr(self.queryset.model, 'belong_dept'): serializer.save(create_by = self.request.user, belong_dept=self.request.user.dept, cate=2) else: serializer.save(create_by = self.request.user, cate=2) def perform_update(self, serializer): serializer.save(update_by = self.request.user) class AbilityRecordViewSet(RbacFilterSet, PageOrNot, CreateUpdateCustomMixin, ModelViewSet): perms_map = {'get': '*', 'post': '*', 'put': '*', 'delete': '*'} queryset = Record.objects.filter(content__cate=2) serializer_class = RecordSerializer search_fields = ['content__name'] ordering_fields = ['content__sortnum', 'belong_dept__sort'] ordering = ['-task', 'content__sortnum', '-create_time'] filterset_fields = ['content','content__cate', 'belong_dept', 'state'] def filter_queryset(self, queryset): if not self.request.query_params.get('pageoff', None): queryset = queryset.exclude(state='待发布') return super().filter_queryset(queryset) def get_serializer_class(self): if self.action == 'create': return RecordCreateSerializer return super().get_serializer_class() @action(methods=['put'], detail=True, perms_map = {'put':'record_up'}) def up(self, request, *args, **kwargs): """ 报送 """ obj = self.get_object() if obj.state in ['待整改','待报送', '已报送']: obj.files.clear() obj.files.add(*request.data['files']) obj.noteb = request.data['noteb'] obj.state = '已报送' obj.up_user = request.user obj.up_date = timezone.now() obj.save() return Response(status=status.HTTP_200_OK) return Response('记录状态错误', status=status.HTTP_400_BAD_REQUEST) def create(self, request, *args, **kwargs): """ 主动报送 """ data = request.data if data.get('files', None): serializer = RecordCreateSerializer(data=data) serializer.is_valid(raise_exception=True) content = Content.objects.get(pk=data['content']) if content.cate==2: sdata = {'belong_dept':request.user.dept, 'is_self':True, 'content_name':content.name, 'content_desc':content.desc, 'state':'已报送', 'up_user':request.user, 'up_date':timezone.now()} serializer.save(**sdata) return Response(status=status.HTTP_201_CREATED) return Response('该材料不是资质能力!', status=status.HTTP_400_BAD_REQUEST) return Response('未上传文件', status=status.HTTP_400_BAD_REQUEST) @action(methods=['put'], detail=True, perms_map = {'put':'record_reject'}, permission_classes=[RecordPermission]) def reject(self, request, *args, **kwargs): """ 驳回 """ obj = self.get_object() if obj.state == '已报送': if request.data.get('opinion', None): obj.opinion = request.data['opinion'] obj.state = '待整改' obj.save() return Response(status=status.HTTP_200_OK) else: return Response('请填写修改意见', status=status.HTTP_400_BAD_REQUEST) return Response('记录状态错误', status=status.HTTP_400_BAD_REQUEST) @action(methods=['put'], detail=True, perms_map = {'put':'record_confirm'}) def confirm(self, request, *args, **kwargs): """ 确认 """ obj = self.get_object() if obj.state in ['已报送', '待报送']: obj.state = '已确认' obj.save() return Response(status=status.HTTP_200_OK) return Response('记录状态错误', status=status.HTTP_400_BAD_REQUEST) class QueryRecordListViewSet(ListModelMixin, GenericViewSet): perms_map = {'get': 'queryrecord_view'} queryset = QueryRecord.objects.all() serializer_class = QueryRecordSerializer search_fields = ['user__name', 'user__dept__name'] ordering = ['-create_time'] class CMAViewSet(RecordMixin, ModelViewSet): """ CMA检测能力:增删改查 """ perms_map = {'get': 'cma_view', 'post': 'cma_create', 'put': 'cma_update', 'delete': 'cma_delete'} queryset = CMA.objects.all() serializer_class = CMASerializer search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'glzz', 'dlmc'] filterset_fields = ['sszx', 'type', 'glzz'] ordering_fields = ['sszx', 'dlxh', 'lbxh', 'xmxh'] ordering = ['create_time', 'xmxh'] @action(methods=['get'], detail=False,url_name='cma_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) @action(methods=['post'], detail=False, url_path='deletes', url_name='cma_deletes', perms_map = {'post':'cma_deletes'}) def deletes(self, request): array = request.data.get('ids', []) sszx = request.data.get('sszx', None) if array: CMA.objects.filter(pk__in=array).delete() elif sszx: CMA.objects.filter(sszx=sszx).delete() return Response(status = status.HTTP_200_OK) @action(methods=['post'], detail=False, url_path='import', url_name='cma_import', perms_map = {'post':'cma_import'}) def cma_import(self, request, pk=None): """ 导入能力 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar','') os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() CMA.objects.filter(type='center').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma(f, os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip','') if os.path.exists(fulldir): import shutil shutil.rmtree(fulldir) # 先删除该文件夹 os.mkdir(fulldir) os.chdir(fulldir) CMA.objects.filter(type='center').delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma(f.encode('cp437').decode('gbk'), os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) return Response(status = status.HTTP_200_OK) @action(methods=['post'], detail=False, url_path='import2', url_name='cma_import2', perms_map = {'post':'cma_import2'}) def cma_import2(self, request, pk=None): """ 导入能力2 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar','') os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() # CMA.objects.filter(type='sub').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma2(f, os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip','') if os.path.exists(fulldir): import shutil shutil.rmtree(fulldir) # 先删除该文件夹 os.mkdir(fulldir) os.chdir(fulldir) # CMA.objects.filter(type='sub').delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_cma2(f.encode('cp437').decode('gbk'), os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) return Response(status = status.HTTP_200_OK) from django.db.models.query import QuerySet class QualificationViewSet(RecordMixin, ModelViewSet): """ 资质能力:增删改查 """ perms_map = {'get': 'qualification_view', 'post': 'qualificaiton_create', 'put': 'qualification_update', 'delete': 'qualification_delete'} queryset = Qualification.objects.all() serializer_class = QualificationSerializer search_fields = ['cma', 'cnas', 'sszx', 'other', 'service'] ordering_fields = ['sszx'] ordering = ['sszx'] filterset_fields = ['sszx'] def get_queryset(self): assert self.queryset is not None, ( "'%s' should either include a `queryset` attribute, " "or override the `get_queryset()` method." % self.__class__.__name__ ) queryset = self.queryset if isinstance(queryset, QuerySet): # Ensure queryset is re-evaluated on each request. queryset = queryset.all() if hasattr(self.get_serializer_class(), 'setup_eager_loading'): queryset = self.get_serializer_class().setup_eager_loading(queryset) return queryset @action(methods=['get'], detail=False,url_name='qualification_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) @action(methods=['post'], detail=False, url_path='import', url_name='qualification_import', perms_map = {'post':'qualification_import'}) def qualification_import(self, request, pk=None): filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath if fullpath.endswith('.xlsx'): import_qualification(fullpath) return Response(status = status.HTTP_200_OK) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) class QualificationotherViewSet(RecordMixin, PageOrNot,ModelViewSet): """ 资质能力:增删改查 """ perms_map = {'get': 'qualification_view', 'post': 'qualificationother_create', 'put': 'qualificationother_update', 'delete': 'qualificationother_delete'} queryset = Qualificationother.objects.all() serializer_class = QualificationotherSerializer search_fields = ['qualification__cma', 'name','description','qualification__cnas', 'qualification__ssbm__id', 'qualification__service'] filterset_fields = ['qualification__ssbm__name'] ordering_fields = ['qualification__ssbm__name'] ordering = ['create_time'] @method_decorator(cache_page(60*60*2)) def list(self, request, *args, **kwargs): return super().list(request, *args, **kwargs) @action(methods=['post'], detail=False, url_path='deletes', url_name='qualificationother_deletes', perms_map = {'post':'qualificationother_deletes'}) def deletes(self, request): array = request.data['ids'] Qualificationother.objects.filter(pk__in=array).delete() return Response(status = status.HTTP_200_OK) @action(methods=['get'], detail=False,url_name='qualification_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by('qualification__ssbm__sort', group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) class InspectionViewSet(RecordMixin, ModelViewSet): """ 检验能力:增删改查 """ perms_map = {'get': 'inspection_view', 'post': 'inspection_create', 'put': 'inspection_update', 'delete': 'inspection_delete'} queryset = Inspection.objects.all() serializer_class = InspectionSerializer search_fields = ['dlmc', 'jydx', 'dxxh','jyxmmc','jybz','sszx','sm'] ordering_fields = ['dlmc'] ordering = ['create_time', 'jyxmxh'] filterset_fields = ['sszx'] @action(methods=['post'], detail=False, url_path='deletes', url_name='inspection_deletes', perms_map = {'post':'inspection_deletes'}) def deletes(self, request): array = request.data['ids'] Inspection.objects.filter(pk__in=array).delete() return Response(status = status.HTTP_200_OK) @action(methods=['get'], detail=False,url_name='inspection_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) @action(methods=['post'], detail=False, url_path='inspection2', url_name='inspection_inspection2', perms_map = {'post':'inspection_inspection2'}) def inspection_import2(self, request, pk=None): """ 导入能力2 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar','') os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() # CMA.objects.filter(type='sub').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_inspection(f, os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip','') if os.path.exists(fulldir): import shutil shutil.rmtree(fulldir) # 先删除该文件夹 os.mkdir(fulldir) os.chdir(fulldir) # CMA.objects.filter(type='sub').delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xlsx'): import_inspection(f.encode('cp437').decode('gbk'), os.path.join(root,f)) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) return Response(status = status.HTTP_200_OK) class CNASViewSet(RecordMixin, ModelViewSet): """ CNAS检测能力:增删改查 """ perms_map = {'get': 'cma_view', 'post': 'cnas_create', 'put': 'cnas_update', 'delete': 'cnas_delete'} queryset = CNAS.objects.all() serializer_class = CNASSerializer search_fields = ['bzbh', 'bzmc', 'sszx', 'xmmc', 'bztk'] ordering_fields = ['bzmc'] filterset_fields = ['sszx'] ordering = ['create_time'] @action(methods=['get'], detail=False,url_name='cnas_group_by', perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) @action(methods=['post'], detail=False, url_path='import', url_name='cnas_import', perms_map = {'post':'cnas_import'}) def cnas_import(self, request, pk=None): """ 导入能力 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith('.rar'): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace('.rar','') os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() CNAS.objects.all().delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith('.xls'): return Response('不支持旧xls格式', status = status.HTTP_400_BAD_REQUEST) import_cnas(f, os.path.join(root,f)) elif fullpath.endswith('.zip'): fulldir = fullpath.replace('.zip','') if os.path.exists(fulldir): import shutil shutil.rmtree(fulldir) # 先删除该文件夹 os.mkdir(fulldir) os.chdir(fulldir) CNAS.objects.all().delete() with zipfile.ZipFile(fullpath,'r') as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: import_cnas(f.encode('cp437').decode('gbk'), os.path.join(root,f)) return Response(status = status.HTTP_200_OK) class CorrectViewSet(RecordMixin, ModelViewSet): """ 校验能力:增删改查 """ perms_map = {'get': 'correct_view', 'post': 'correct_create', 'put': 'correct_update', 'delete': 'correct_delete'} queryset = Correct.objects.all() serializer_class = CorrectSerializer search_fields = ['dlmc', 'lbmc', 'bclmc','jzgc'] ordering_fields = ['dlmc'] ordering = ['create_time', 'dlmc'] filterset_fields = ['ssbm', 'ssgs'] @action(methods=['post'], detail=False, perms_map = {'post':'correct_delete'}) def deletes(self, request): array = request.data['ids'] Correct.objects.filter(pk__in=array).delete() return Response(status = status.HTTP_200_OK) @action(methods=['get'], detail=False, perms_map = {'*':'*'}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get('group_by', None): group_by = request.query_params.get('group_by') group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i['count']: ret.append({'text':i[group_by]+'('+ str(i['count']) +')','value':i[group_by]}) return Response(ret) @action(methods=['post'], detail=False, url_path="import") def correct_import(self, request, pk=None): """ 导入校验能力 """ filepath = request.data['path'] fullpath = settings.BASE_DIR + filepath if fullpath.endswith('.xlsx'): ret = import_correct(fullpath) if ret[0]: return Response() return Response(ret[1], status=status.HTTP_400_BAD_REQUEST) else: return Response('不支持非xlsx格式', status = status.HTTP_400_BAD_REQUEST) def import_qualification(path): wb = load_workbook(path) sheet = wb.worksheets[0] i = 3 max_row = sheet.max_row obj = {} Qualificationother.objects.all().delete() Qualification.objects.all().delete() while i