from django.conf import settings from django.core.cache import cache from django.db.models import Count, Q from django.db.models.query import QuerySet from django.http import request from django.shortcuts import render from django.utils import timezone from rest_framework import status from rest_framework.decorators import action, permission_classes from rest_framework.exceptions import PermissionDenied, ParseError from rest_framework.mixins import CreateModelMixin, DestroyModelMixin, ListModelMixin, RetrieveModelMixin from rest_framework.permissions import IsAdminUser from rest_framework.response import Response from rest_framework.viewsets import GenericViewSet, ModelViewSet from utils.pagination import PageOrNot from apps.system.mixins import CreateUpdateCustomMixin, CreateUpdateModelAMixin, OptimizationMixin from apps.system.models import Organization from apps.system.permission import get_permission_list, has_permission from apps.system.permission_data import RbacFilterSet from rest_framework.serializers import Serializer from .models import * from .serializers import * from openpyxl import load_workbook # Create your views here. class RegulatoryViewSet(OptimizationMixin, PageOrNot, CreateUpdateModelAMixin, ModelViewSet): perms_map = {'get': '*', 'post': 'regulatory_create', 'put': 'regulatory_update', 'delete': 'regulatory_delete'} queryset = Regulatory.objects.all() serializer_class = RegulatorySerializer ordering = ['-id'] search_fields = ['provinces'] ordering_fields = ['provinces'] filterset_fields = ['provinces'] class ProfessionalViewSet(OptimizationMixin, PageOrNot, CreateUpdateModelAMixin, ModelViewSet): perms_map = {'get': '*', 'post': 'professional_create', 'put': 'professional_update', 'delete': 'professional_delete'} queryset = Professional.objects.all() serializer_class = ProfessionalSerializer ordering = ['-id'] class PolicyViewSet(OptimizationMixin, PageOrNot, CreateUpdateModelAMixin, ModelViewSet): perms_map = {'get': '*', 'post': 'policy_create', 'put': 'policy_update', 'delete': 'policy_delete'} queryset = Policy.objects.all() serializer_class = PolicySerializer filterset_fields = ['cate', 'name', 'year', 'month'] search_fields = ['cate', 'name', 'description'] ordering = ['-id'] def can_view_top(self, user): if user.dept.type and user.dept.type.name == '部门': return True elif user.dept.name in ['福建分公司', '河南分公司']: return True return False def get_queryset(self): cate = self.request.query_params.get('cate', '') if cate: if cate == '总部文件': user = self.request.user if self.can_view_top(user): return Policy.objects.filter(cate=cate) else: return Policy.objects.none() else: if self.request.method == 'GET': raise ParseError('请提供cate查询条件') return super().get_queryset() class ValidationViewSet(OptimizationMixin, PageOrNot, CreateUpdateModelAMixin, ModelViewSet): perms_map = {'get': '*', 'post': 'validation_create', 'put': 'policy_update', 'delete': 'validation_delete'} queryset = Validation.objects.all() serializer_class = ValidationSerializer ordering = ['-id'] search_fields = ['provinces'] ordering_fields = ['provinces'] filterset_fields = ['provinces'] class DeviceViewSet(ModelViewSet): perms_map = {'get': '*', 'post': 'device_import'} queryset = Device.objects.all() serializer_class = DeviceSerializer ordering = ['-id'] search_fields = ['company_name', 'device_name', 'spec', 'manufactor', 'dec_parameter', 'procurement_time'] @action(methods=["get"], detail=False, serializer_class=Serializer) def no_page(self, request, *args, **kwargs): data = Device.objects.all() serializer = DeviceSerializer(data, many=True) return Response(serializer.data) def make_data(self, data, sheet, i): data["company_number"] = sheet["a" + str(i)].value data["company_name"] = sheet["b" + str(i)].value data["device_name"] = sheet["c" + str(i)].value data["device_number"] = sheet["d" + str(i)].value data["spec"] = sheet["e" + str(i)].value data["manufactor"] = sheet["f" + str(i)].value data["dec_parameter"] = sheet["g" + str(i)].value data["range"] = sheet["h" + str(i)].value data["precision"] = sheet["i" + str(i)].value data["other_parameter"] = sheet["j" + str(i)].value data["department"] = sheet["k" + str(i)].value data["procurement_time"] = sheet["l" + str(i)].value data["original_price"] = sheet["m" + str(i)].value data["current_price"] = sheet["n" + str(i)].value data["is_infrastructure"] = True if sheet["o" + str(i)].value=="是" else False data["infras_percentage"] = sheet["p" + str(i)].value data["is_instructions"] = True if sheet["q" + str(i)].value=="是" else False data["custodian"] = sheet["r" + str(i)].value data["depositor"] = sheet["s" + str(i)].value data["contacts"] = sheet["t" + str(i)].value data["tel"] = sheet["u" + str(i)].value data["remark"] = sheet["v" + str(i)].value return data @action(methods=["post"], detail=False, perms_map={"post": "device_import"}, serializer_class=Serializer) def imp(self, request, *args, **kwargs): """导入表格 导入表格 """ path = request.data.get("excel_path", "") full_path = settings.BASE_DIR + path if not path.endswith(".xlsx"): raise ParseError("请提供xlsx格式文件") wb = load_workbook(full_path, data_only=True) sheet = wb.worksheets[0] data_list = [] i = 3 while sheet["b" + str(i)].value: data = {} data = self.make_data(data, sheet, i) data_list.append(data) i = i + 1 instances = [Device(**data) for data in data_list] Device.objects.all().delete() Device.objects.bulk_create(instances) return Response() def truncate_table(self, model): table_name = model._meta.db_table with connection.cursor() as cursor: cursor.execute("TRUNCATE TABLE %s;" % table_name)