from django.shortcuts import render from rest_framework.mixins import ListModelMixin from rest_framework.permissions import IsAuthenticated from rest_framework.viewsets import GenericViewSet, ModelViewSet from .models import * from .serializers import * from rest_framework.decorators import action from django.conf import settings from rest_framework import status from rest_framework.response import Response import zipfile import rarfile from apps.system.models import Organization from openpyxl import Workbook, load_workbook from django.db.models import Count from utils.pagination import PageOrNot from rest_framework.views import APIView from apps.supervision.models import Content, Record from apps.supervision.serializers import ContentSerializer, RecordCreateSerializer, RecordSerializer from apps.system.mixins import CreateUpdateCustomMixin from utils.queryset import get_child_queryset2 from apps.system.permission_data import RbacFilterSet from django.utils import timezone from apps.supervision.permission import RecordPermission from django.utils.decorators import method_decorator from django.views.decorators.cache import cache_page # Create your views here. import json class RecordMixin: def perform_authentication(self, request): """ Perform authentication on the incoming request. Note that if you override this and simply 'pass', then authentication will instead be performed lazily, the first time either `request.user` or `request.auth` is accessed. """ if request.user and request.method == "GET" and request.user.username != "admin": QueryRecord.objects.create( user=request.user, path=request.path, ip=request.META.get("HTTP_X_FORWARDED_FOR"), method=request.method, search=request.query_params.get("search", None), query=request.query_params ) class AbilityContentViewSet(CreateUpdateCustomMixin, ModelViewSet): """ 资质能力报送清单:增删改查 """ perms_map = {"get": "*", "post": "content", "put": "content", "delete": "content"} queryset = Content.objects.filter(cate=2) serializer_class = ContentSerializer pagination_class = None search_fields = ["name", "desc"] filterset_fields = ["type", "cate"] ordering = ["sortnum", "create_time"] def perform_create(self, serializer): if hasattr(self.queryset.model, "belong_dept"): serializer.save(create_by=self.request.user, belong_dept=self.request.user.dept, cate=2) else: serializer.save(create_by=self.request.user, cate=2) def perform_update(self, serializer): serializer.save(update_by=self.request.user) class AbilityRecordViewSet(RbacFilterSet, PageOrNot, CreateUpdateCustomMixin, ModelViewSet): perms_map = {"get": "*", "post": "*", "put": "*", "delete": "*"} queryset = Record.objects.filter(content__cate=2) serializer_class = RecordSerializer search_fields = ["content__name"] ordering_fields = ["content__sortnum", "belong_dept__sort"] ordering = ["-task", "content__sortnum", "-create_time"] filterset_fields = ["content", "content__cate", "belong_dept", "state"] def filter_queryset(self, queryset): if not self.request.query_params.get("pageoff", None): queryset = queryset.exclude(state="待发布") return super().filter_queryset(queryset) def get_serializer_class(self): if self.action == "create": return RecordCreateSerializer return super().get_serializer_class() @action(methods=["put"], detail=True, perms_map={"put": "record_up"}) def up(self, request, *args, **kwargs): """ 报送 """ obj = self.get_object() if obj.state in ["待整改", "待报送", "已报送"]: obj.files.clear() obj.files.add(*request.data["files"]) obj.noteb = request.data["noteb"] obj.state = "已报送" obj.up_user = request.user obj.up_date = timezone.now() obj.save() return Response(status=status.HTTP_200_OK) return Response("记录状态错误", status=status.HTTP_400_BAD_REQUEST) def create(self, request, *args, **kwargs): """ 主动报送 """ data = request.data if data.get("files", None): serializer = RecordCreateSerializer(data=data) serializer.is_valid(raise_exception=True) content = Content.objects.get(pk=data["content"]) if content.cate == 2: sdata = { "belong_dept": request.user.dept, "is_self": True, "content_name": content.name, "content_desc": content.desc, "state": "已报送", "up_user": request.user, "up_date": timezone.now(), } serializer.save(**sdata) return Response(status=status.HTTP_201_CREATED) return Response("该材料不是资质能力!", status=status.HTTP_400_BAD_REQUEST) return Response("未上传文件", status=status.HTTP_400_BAD_REQUEST) @action(methods=["put"], detail=True, perms_map={"put": "record_reject"}, permission_classes=[RecordPermission]) def reject(self, request, *args, **kwargs): """ 驳回 """ obj = self.get_object() if obj.state == "已报送": if request.data.get("opinion", None): obj.opinion = request.data["opinion"] obj.state = "待整改" obj.save() return Response(status=status.HTTP_200_OK) else: return Response("请填写修改意见", status=status.HTTP_400_BAD_REQUEST) return Response("记录状态错误", status=status.HTTP_400_BAD_REQUEST) @action(methods=["put"], detail=True, perms_map={"put": "record_confirm"}) def confirm(self, request, *args, **kwargs): """ 确认 """ obj = self.get_object() if obj.state in ["已报送", "待报送"]: obj.state = "已确认" obj.save() return Response(status=status.HTTP_200_OK) return Response("记录状态错误", status=status.HTTP_400_BAD_REQUEST) class QueryRecordListViewSet(ListModelMixin, GenericViewSet): perms_map = {"get": "queryrecord_view"} queryset = QueryRecord.objects.all() serializer_class = QueryRecordSerializer search_fields = ["user__name", "user__dept__name"] ordering = ["-create_time"] class CMAViewSet(RecordMixin, ModelViewSet): """ CMA检测能力:增删改查 """ perms_map = {"get": "cma_view", "post": "cma_create", "put": "cma_update", "delete": "cma_delete"} queryset = CMA.objects.all() serializer_class = CMASerializer search_fields = ["bzbh", "bzmc", "sszx", "xmmc", "glzz", "dlmc"] filterset_fields = ["sszx", "type", "glzz"] ordering_fields = ["sszx", "dlxh", "lbxh", "xmxh"] ordering = ["create_time", "xmxh"] @action(methods=["get"], detail=False, url_name="cma_group_by", perms_map={"*": "*"}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get("group_by", None): group_by = request.query_params.get("group_by") group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i["count"]: ret.append({"text": i[group_by] + "(" + str(i["count"]) + ")", "value": i[group_by]}) return Response(ret) @action(methods=["post"], detail=False, url_path="deletes", url_name="cma_deletes", perms_map={"post": "cma_deletes"}) def deletes(self, request): array = request.data.get("ids", []) sszx = request.data.get("sszx", None) if array: CMA.objects.filter(pk__in=array).delete() elif sszx: CMA.objects.filter(sszx=sszx).delete() return Response(status=status.HTTP_200_OK) @action(methods=["post"], detail=False, url_path="import", url_name="cma_import", perms_map={"post": "cma_import"}) def cma_import(self, request, pk=None): """ 导入能力 """ filepath = request.data["path"] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith(".rar"): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace(".rar", "") os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() CMA.objects.filter(type="center").delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith(".xlsx"): import_cma(f, os.path.join(root, f)) else: return Response("不支持非xlsx格式", status=status.HTTP_400_BAD_REQUEST) elif fullpath.endswith(".zip"): fulldir = fullpath.replace(".zip", "") if os.path.exists(fulldir): import shutil shutil.rmtree(fulldir) # 先删除该文件夹 os.mkdir(fulldir) os.chdir(fulldir) CMA.objects.filter(type="center").delete() with zipfile.ZipFile(fullpath, "r") as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith(".xlsx"): import_cma(f.encode("cp437").decode("gbk"), os.path.join(root, f)) else: return Response("不支持非xlsx格式", status=status.HTTP_400_BAD_REQUEST) return Response(status=status.HTTP_200_OK) @action(methods=["post"], detail=False, url_path="import2", url_name="cma_import2", perms_map={"post": "cma_import2"}) def cma_import2(self, request, pk=None): """ 导入能力2 """ filepath = request.data["path"] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith(".rar"): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace(".rar", "") os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() # CMA.objects.filter(type='sub').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith(".xlsx"): import_cma2(f, os.path.join(root, f)) else: return Response("不支持非xlsx格式", status=status.HTTP_400_BAD_REQUEST) elif fullpath.endswith(".zip"): fulldir = fullpath.replace(".zip", "") if os.path.exists(fulldir): import shutil shutil.rmtree(fulldir) # 先删除该文件夹 os.mkdir(fulldir) os.chdir(fulldir) # CMA.objects.filter(type='sub').delete() with zipfile.ZipFile(fullpath, "r") as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith(".xlsx"): import_cma2(f.encode("cp437").decode("gbk"), os.path.join(root, f)) else: return Response("不支持非xlsx格式", status=status.HTTP_400_BAD_REQUEST) return Response(status=status.HTTP_200_OK) from django.db.models.query import QuerySet class QualificationViewSet(RecordMixin, ModelViewSet): """ 资质能力:增删改查 """ perms_map = {"get": "qualification_view", "post": "qualificaiton_create", "put": "qualification_update", "delete": "qualification_delete"} queryset = Qualification.objects.all() serializer_class = QualificationSerializer search_fields = ["cma", "cnas", "sszx", "other", "service"] ordering_fields = ["sszx"] ordering = ["sszx"] filterset_fields = ["sszx"] def get_queryset(self): assert self.queryset is not None, "'%s' should either include a `queryset` attribute, " "or override the `get_queryset()` method." % self.__class__.__name__ queryset = self.queryset if isinstance(queryset, QuerySet): # Ensure queryset is re-evaluated on each request. queryset = queryset.all() if hasattr(self.get_serializer_class(), "setup_eager_loading"): queryset = self.get_serializer_class().setup_eager_loading(queryset) return queryset @action(methods=["get"], detail=False, url_name="qualification_group_by", perms_map={"*": "*"}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get("group_by", None): group_by = request.query_params.get("group_by") group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i["count"]: ret.append({"text": i[group_by] + "(" + str(i["count"]) + ")", "value": i[group_by]}) return Response(ret) @action(methods=["post"], detail=False, url_path="import", url_name="qualification_import", perms_map={"post": "qualification_import"}) def qualification_import(self, request, pk=None): filepath = request.data["path"] fullpath = settings.BASE_DIR + filepath if fullpath.endswith(".xlsx"): import_qualification(fullpath) return Response(status=status.HTTP_200_OK) else: return Response("不支持非xlsx格式", status=status.HTTP_400_BAD_REQUEST) class QualificationotherViewSet(RecordMixin, PageOrNot, ModelViewSet): """ 资质能力:增删改查 """ perms_map = {"get": "qualification_view", "post": "qualificationother_create", "put": "qualificationother_update", "delete": "qualificationother_delete"} queryset = Qualificationother.objects.all() serializer_class = QualificationotherSerializer search_fields = ["qualification__cma", "name", "description", "qualification__cnas", "qualification__ssbm__id", "qualification__service"] filterset_fields = ["qualification__ssbm__name"] ordering_fields = ["qualification__ssbm__name"] ordering = ["create_time"] @method_decorator(cache_page(60 * 60 * 2)) def list(self, request, *args, **kwargs): return super().list(request, *args, **kwargs) @action(methods=["post"], detail=False, url_path="deletes", url_name="qualificationother_deletes", perms_map={"post": "qualificationother_deletes"}) def deletes(self, request): array = request.data["ids"] Qualificationother.objects.filter(pk__in=array).delete() return Response(status=status.HTTP_200_OK) @action(methods=["get"], detail=False, url_name="qualification_group_by", perms_map={"*": "*"}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get("group_by", None): group_by = request.query_params.get("group_by") group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by("qualification__ssbm__sort", group_by)) for i in group_by_data: if i[group_by] and i["count"]: ret.append({"text": i[group_by] + "(" + str(i["count"]) + ")", "value": i[group_by]}) return Response(ret) class InspectionViewSet(RecordMixin, ModelViewSet): """ 检验能力:增删改查 """ perms_map = {"get": "inspection_view", "post": "inspection_create", "put": "inspection_update", "delete": "inspection_delete"} queryset = Inspection.objects.all() serializer_class = InspectionSerializer search_fields = ["dlmc", "jydx", "dxxh", "jyxmmc", "jybz", "sszx", "sm"] ordering_fields = ["dlmc"] ordering = ["create_time", "jyxmxh"] filterset_fields = ["sszx"] @action(methods=["post"], detail=False, url_path="deletes", url_name="inspection_deletes", perms_map={"post": "inspection_deletes"}) def deletes(self, request): array = request.data["ids"] Inspection.objects.filter(pk__in=array).delete() return Response(status=status.HTTP_200_OK) @action(methods=["get"], detail=False, url_name="inspection_group_by", perms_map={"*": "*"}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get("group_by", None): group_by = request.query_params.get("group_by") group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i["count"]: ret.append({"text": i[group_by] + "(" + str(i["count"]) + ")", "value": i[group_by]}) return Response(ret) @action(methods=["post"], detail=False, url_path="inspection2", url_name="inspection_inspection2", perms_map={"post": "inspection_inspection2"}) def inspection_import2(self, request, pk=None): """ 导入能力2 """ filepath = request.data["path"] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith(".rar"): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace(".rar", "") os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() # CMA.objects.filter(type='sub').delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith(".xlsx"): import_inspection(f, os.path.join(root, f)) else: return Response("不支持非xlsx格式", status=status.HTTP_400_BAD_REQUEST) elif fullpath.endswith(".zip"): fulldir = fullpath.replace(".zip", "") if os.path.exists(fulldir): import shutil shutil.rmtree(fulldir) # 先删除该文件夹 os.mkdir(fulldir) os.chdir(fulldir) # CMA.objects.filter(type='sub').delete() with zipfile.ZipFile(fullpath, "r") as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith(".xlsx"): import_inspection(f.encode("cp437").decode("gbk"), os.path.join(root, f)) else: return Response("不支持非xlsx格式", status=status.HTTP_400_BAD_REQUEST) return Response(status=status.HTTP_200_OK) class CNASViewSet(RecordMixin, ModelViewSet): """ CNAS检测能力:增删改查 """ perms_map = {"get": "cma_view", "post": "cnas_create", "put": "cnas_update", "delete": "cnas_delete"} queryset = CNAS.objects.all() serializer_class = CNASSerializer search_fields = ["bzbh", "bzmc", "sszx", "xmmc", "bztk"] ordering_fields = ["bzmc"] filterset_fields = ["sszx"] ordering = ["create_time"] @action(methods=["get"], detail=False, url_name="cnas_group_by", perms_map={"*": "*"}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get("group_by", None): group_by = request.query_params.get("group_by") group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i["count"]: ret.append({"text": i[group_by] + "(" + str(i["count"]) + ")", "value": i[group_by]}) return Response(ret) @action(methods=["post"], detail=False, url_path="import", url_name="cnas_import", perms_map={"post": "cnas_import"}) def cnas_import(self, request, pk=None): """ 导入能力 """ filepath = request.data["path"] fullpath = settings.BASE_DIR + filepath import os if fullpath.endswith(".rar"): rar = rarfile.RarFile(fullpath) fulldir = fullpath.replace(".rar", "") os.mkdir(fulldir) os.chdir(fulldir) rar.extractall() rar.close() CNAS.objects.all().delete() for root, dirs, files in os.walk(fulldir): for f in files: if f.endswith(".xls"): return Response("不支持旧xls格式", status=status.HTTP_400_BAD_REQUEST) import_cnas(f, os.path.join(root, f)) elif fullpath.endswith(".zip"): fulldir = fullpath.replace(".zip", "") if os.path.exists(fulldir): import shutil shutil.rmtree(fulldir) # 先删除该文件夹 os.mkdir(fulldir) os.chdir(fulldir) CNAS.objects.all().delete() with zipfile.ZipFile(fullpath, "r") as zzz: zzz.extractall(fulldir) for root, dirs, files in os.walk(fulldir): for f in files: import_cnas(f.encode("cp437").decode("gbk"), os.path.join(root, f)) return Response(status=status.HTTP_200_OK) class CorrectViewSet(RecordMixin, ModelViewSet): """ 校验能力:增删改查 """ perms_map = {"get": "correct_view", "post": "correct_create", "put": "correct_update", "delete": "correct_delete"} queryset = Correct.objects.all() serializer_class = CorrectSerializer search_fields = ["dlmc", "lbmc", "bclmc", "jzgc"] ordering_fields = ["dlmc"] ordering = ["create_time", "dlmc"] filterset_fields = ["ssbm", "ssgs"] @action(methods=["post"], detail=False, perms_map={"post": "correct_delete"}) def deletes(self, request): array = request.data["ids"] Correct.objects.filter(pk__in=array).delete() return Response(status=status.HTTP_200_OK) @action(methods=["get"], detail=False, perms_map={"*": "*"}) def group(self, request, pk=None): """ 聚合查询列 """ queryset = self.filter_queryset(self.get_queryset()) ret = [] if request.query_params.get("group_by", None): group_by = request.query_params.get("group_by") group_by_data = list(queryset.values(group_by).annotate(count=Count(group_by)).order_by(group_by)) for i in group_by_data: if i[group_by] and i["count"]: ret.append({"text": i[group_by] + "(" + str(i["count"]) + ")", "value": i[group_by]}) return Response(ret) @action(methods=["post"], detail=False, url_path="import") def correct_import(self, request, pk=None): """ 导入校验能力 """ filepath = request.data["path"] fullpath = settings.BASE_DIR + filepath if fullpath.endswith(".xlsx"): ret = import_correct(fullpath) if ret[0]: return Response() return Response(ret[1], status=status.HTTP_400_BAD_REQUEST) else: return Response("不支持非xlsx格式", status=status.HTTP_400_BAD_REQUEST) def import_qualification(path): wb = load_workbook(path) sheet = wb.worksheets[0] i = 3 max_row = sheet.max_row obj = {} Qualificationother.objects.all().delete() Qualification.objects.all().delete() while i < max_row + 1: sszx = sheet["b" + str(i)].value cma = sheet["c" + str(i)].value cnas = sheet["d" + str(i)].value service = sheet["g" + str(i)].value name = sheet["e" + str(i)].value description = sheet["f" + str(i)].value if sszx: if Organization.objects.filter(name=sszx).exists(): ssbm = Organization.objects.get(name=sszx) if Qualification.objects.filter(ssbm=ssbm).exists(): qualification = Qualification.objects.get(ssbm=ssbm) Qualificationother.objects.create(qualification=qualification, name=name, description=description) else: qualification = Qualification.objects.create(ssbm=ssbm) qualification.cma = cma qualification.cnas = cnas qualification.service = service qualification.save() obj = qualification Qualificationother.objects.create(qualification=qualification, name=name, description=description) else: Qualificationother.objects.create(qualification=obj, name=name, description=description) i = i + 1 def import_cma(filename, path): wb = load_workbook(path) sheet = wb.worksheets[0] datalist = [] sszx = filename.replace(".xlsx", "").replace("14检验检测能力申请表-", "") i = 4 while sheet["b" + str(i)].value: data = {} data["dlxh"] = sheet["a" + str(i)].value data["dlmc"] = sheet["b" + str(i)].value data["lbxh"] = sheet["c" + str(i)].value data["lbmc"] = sheet["d" + str(i)].value data["xmxh"] = sheet["e" + str(i)].value data["xmmc"] = sheet["f" + str(i)].value data["bzmc"] = sheet["g" + str(i)].value data["bzbh"] = sheet["h" + str(i)].value data["xzfw"] = sheet["i" + str(i)].value data["bz"] = sheet["j" + str(i)].value data["sszx"] = sszx data["type"] = "center" datalist.append(CMA(**data)) i = i + 1 CMA.objects.bulk_create(datalist) import xlrd def import_cnas(filename, path): sheet = xlrd.open_workbook(path).sheet_by_name("检测能力范围") datalist = [] sszx = filename.replace(".xlsx", "").replace(".xls", "").replace("检测能力范围(含能源之星)-", "") i = 2 lbmc = "" xmmc = "" while i < sheet.nrows: data = {} data_list = sheet.row_values(i) if data_list[1]: lbmc = data_list[1] if data_list[6]: xmmc = data_list[6] data["lbmc"] = lbmc data["xmmc"] = xmmc data["bzmc"] = data_list[11] data["bzbh"] = data_list[13] data["bztk"] = data_list[15] data["sszx"] = sszx data["sm"] = data_list[17] datalist.append(CNAS(**data)) i = i + 1 CNAS.objects.bulk_create(datalist) def import_cma2(filename, path): wb = load_workbook(path, data_only=True) sheet = wb.worksheets[0] datalist = [] sszx = filename.split("-")[0] # if CMA.objects.filter(sszx=sszx, type='sub').exists(): # CMA.objects.filter(sszx=sszx, type='sub').delete() i = 3 max_row = sheet.max_row defaultv = {} while i < max_row + 1: data = {} if sheet["a" + str(i)].value: data["dlxh"] = sheet["a" + str(i)].value defaultv["dlxh"] = data["dlxh"] else: data["dlxh"] = defaultv["dlxh"] if sheet["b" + str(i)].value: data["dlmc"] = sheet["b" + str(i)].value defaultv["dlmc"] = data["dlmc"] else: data["dlmc"] = defaultv["dlmc"] if sheet["c" + str(i)].value: data["lbxh"] = sheet["c" + str(i)].value defaultv["lbxh"] = data["lbxh"] else: data["lbxh"] = defaultv.get("lbxh", "") if sheet["d" + str(i)].value: data["lbmc"] = sheet["d" + str(i)].value defaultv["lbmc"] = data["lbmc"] else: data["lbmc"] = defaultv["lbmc"] if sheet["e" + str(i)].value: data["xmxh"] = sheet["e" + str(i)].value defaultv["xmxh"] = data["xmxh"] else: data["xmxh"] = defaultv.get("xmxh", "") if sheet["f" + str(i)].value: data["xmmc"] = sheet["f" + str(i)].value defaultv["xmmc"] = data["xmmc"] else: data["xmmc"] = defaultv["xmmc"] if sheet["g" + str(i)].value: data["bzmc"] = sheet["g" + str(i)].value defaultv["bzmc"] = data["bzmc"] else: data["bzmc"] = defaultv["bzmc"] if sheet["h" + str(i)].value: data["bzbh"] = sheet["h" + str(i)].value defaultv["bzbh"] = data["bzbh"] else: data["bzbh"] = defaultv["bzbh"] data["xzfw"] = sheet["i" + str(i)].value if (sheet["i" + str(i)].value and sheet["i" + str(i)].value != "无") else None data["bz"] = sheet["j" + str(i)].value if (sheet["j" + str(i)].value and sheet["j" + str(i)].value != "无") else None data["glzz"] = sheet["k" + str(i)].value if (sheet["k" + str(i)].value and sheet["k" + str(i)].value != "无") else None data["sszx"] = sszx data["type"] = "sub" datalist.append(CMA(**data)) i = i + 1 CMA.objects.bulk_create(datalist) def import_inspection(filename, path): wb = load_workbook(path, data_only=True) sheet = wb.worksheets[0] datalist = [] sszx = filename.split("-")[0] if Inspection.objects.filter(sszx=sszx).exists(): Inspection.objects.filter(sszx=sszx).delete() i = 3 max_row = sheet.max_row defaultv = {} while i < max_row + 1: data = {} if sheet["a" + str(i)].value: data["dlxh"] = sheet["a" + str(i)].value defaultv["dlxh"] = data["dlxh"] else: data["dlxh"] = defaultv["dlxh"] if sheet["b" + str(i)].value: data["dlmc"] = sheet["b" + str(i)].value defaultv["dlmc"] = data["dlmc"] else: data["dlmc"] = defaultv["dlmc"] if sheet["c" + str(i)].value: data["dxxh"] = sheet["c" + str(i)].value defaultv["dxxh"] = data["dxxh"] else: data["dxxh"] = defaultv["dxxh"] if sheet["d" + str(i)].value: data["jydx"] = sheet["d" + str(i)].value defaultv["jydx"] = data["jydx"] else: data["jydx"] = defaultv["jydx"] if sheet["e" + str(i)].value: data["jyxmxh"] = sheet["e" + str(i)].value elif sheet["e3"].value: # 该表存在项目序号 m = i - 1 while True: if sheet["e" + str(m)].value: data["jyxmxh"] = sheet["e" + str(m)].value break m = m - 1 else: # 该表没有项目序号,自己定 pass if sheet["f" + str(i)].value: data["jyxmmc"] = sheet["f" + str(i)].value else: m = i - 1 while True: if sheet["f" + str(m)].value: data["jyxmmc"] = sheet["f" + str(m)].value break m = m - 1 if sheet["g" + str(i)].value: data["jybz"] = sheet["g" + str(i)].value elif sheet["g3"].value: m = i - 1 while True: if sheet["g" + str(m)].value: data["jybz"] = sheet["g" + str(m)].value break m = m - 1 if sheet["h" + str(i)].value: data["sm"] = sheet["h" + str(i)].value elif sheet["h3"].value: m = i - 1 while True: if sheet["h" + str(m)].value: data["sm"] = sheet["h" + str(m)].value break m = m - 1 if sheet["i" + str(i)].value: data["sxrq"] = sheet["i" + str(i)].value elif sheet["i3"].value: m = i - 1 while True: if sheet["i" + str(m)].value: data["sxrq"] = sheet["i" + str(m)].value break m = m - 1 data["sszx"] = sszx datalist.append(Inspection(**data)) i = i + 1 Inspection.objects.bulk_create(datalist) def import_correct(path): wb = load_workbook(path, data_only=True) sheet = wb.worksheets[0] datalist = [] ssgs = sheet["k3"].value try: ssbm = Organization.objects.get(name=ssgs) except: return False, ssgs + "不存在" if Correct.objects.filter(ssbm=ssbm).exists(): Correct.objects.filter(ssbm=ssbm).delete() i = 3 max_row = sheet.max_row defaultv = {} while i < max_row + 1: data = {} if sheet["a" + str(i)].value: data["dlxh"] = sheet["a" + str(i)].value defaultv["dlxh"] = data["dlxh"] else: data["dlxh"] = defaultv["dlxh"] if sheet["b" + str(i)].value: data["dlmc"] = sheet["b" + str(i)].value defaultv["dlmc"] = data["dlmc"] else: data["dlmc"] = defaultv["dlmc"] if sheet["c" + str(i)].value: data["lbxh"] = sheet["c" + str(i)].value defaultv["lbxh"] = data["lbxh"] else: data["lbxh"] = defaultv["lbxh"] if sheet["d" + str(i)].value: data["lbmc"] = sheet["d" + str(i)].value defaultv["lbmc"] = data["lbmc"] else: data["lbmc"] = defaultv["lbmc"] if sheet["e" + str(i)].value: data["bclxh"] = sheet["e" + str(i)].value defaultv["bclxh"] = data["bclxh"] else: data["bclxh"] = defaultv["bclxh"] if "bclxh" in defaultv else None if sheet["f" + str(i)].value: data["bclmc"] = sheet["f" + str(i)].value defaultv["bclmc"] = data["bclmc"] else: data["bclmc"] = defaultv["bclmc"] if sheet["g" + str(i)].value: data["jzgc"] = sheet["g" + str(i)].value defaultv["jzgc"] = data["jzgc"] else: data["jzgc"] = defaultv["jzgc"] if sheet["h" + str(i)].value: data["clfw"] = sheet["h" + str(i)].value defaultv["clfw"] = data["clfw"] else: data["clfw"] = defaultv["clfw"] data["zqddj"] = sheet["i" + str(i)].value if (sheet["i" + str(i)].value and sheet["i" + str(i)].value != "无") else None data["note"] = sheet["j" + str(i)].value if (sheet["j" + str(i)].value and sheet["j" + str(i)].value != "无") else None data["ssgs"] = ssgs data["ssbm"] = ssbm datalist.append(Correct(**data)) i = i + 1 Correct.objects.bulk_create(datalist) return True, "" from django.db.models.functions import Cast def merge_cnas(request): for i in CNAS.objects.all(): objs = Ability.objects.filter(lbmc=i.lbmc, xmmc=i.xmmc, bzbh=i.bzbh, bztk=i.bztk, bzmc=i.bzmc) if objs.exists(): obj = objs[0] obj.cnas_ok = True if obj.cnas: obj.cnas = obj.cnas + "," + i.sszx obj.save() else: obj.cnas = i.sszx obj.save() print("已修改--" + obj.xmmc + "-" + obj.cnas) else: obj = Ability() obj.lbmc = i.lbmc obj.xmmc = i.xmmc obj.bzmc = i.bzmc obj.bzbh = i.bzbh obj.bztk = i.bztk obj.cnas = i.sszx obj.cnas_ok = True obj.save() print("已添加--" + obj.xmmc + obj.bzmc) return Response({}) def merge_cma(request): for i in CMA.objects.filter(type="center"): bztk = i.bzbh.split(" ")[-1] bzbh = i.bzbh.rstrip(bztk).strip() xmxh = i.xmxh.strip() if len(bzbh) < 8: bzbh = i.bzbh bztk = "" objs = Ability.objects.filter(lbmc=i.lbmc, xmmc=i.xmmc, bzbh=bzbh, bztk=bztk, xmxh=xmxh, bzmc=i.bzmc) if objs.exists(): obj = objs[0] obj.cma = obj.cma + "," + i.sszx obj.save() print("已修改--" + obj.xmmc + "-" + obj.cma) else: obj = Ability() obj.dlxh = i.dlxh obj.dlmc = i.dlmc obj.lbxh = i.lbxh obj.lbmc = i.lbmc obj.xmxh = i.xmxh obj.xmmc = i.xmmc obj.bzmc = i.bzmc obj.bzbh = bzbh obj.bztk = bztk obj.xzfw = i.xzfw obj.bz = i.bz obj.cma = i.sszx obj.cma_ok = True obj.save() print("已添加--" + obj.xmmc + obj.bzmc) return Response({}) def correct_ability(request): zxdict = {"测试中心": "B", "玻璃中心": "E", "腐蚀中心": "P", "光伏中心": "L", "耐火中心": "Q", "石材中心": "R", "水泥中心": "D", "质检中心": "C"} placedict = { "玻璃中心-管庄": "E", "玻璃中心-密云": "E", "玻璃中心-永顺": "E", "测试中心-管庄": "B", "测试中心-光华路": "B", "测试中心-国检三层": "B", "测试中心-密云": "B", "测试中心-顺义种植园": "B", "测试中心-宋庄": "B", "测试中心-通州种植园": "B", "测试中心-永顺": "B", "腐蚀中心": "P", "光伏中心-管庄": "L", "光伏中心-密云": "L", "耐火中心": "Q", "石材中心": "R", "水泥中心": "D", "质检中心-管庄": "C", "质检中心-光华路": "C", "质检中心-国检三层": "c", "质检中心-密云": "C", "质检中心-顺义种植园": "C", "质检中心-宋庄": "C", "质检中心-通州种植园": "C", "质检中心-永顺": "C", } for i in Ability.objects.all(): # i.cma_o = i.cma i.cma = i.cma_o i.cma_oplace = i.cma_o for key in zxdict: if key in i.cma_o: i.cma = i.cma.replace(key, zxdict[key]) for key in placedict: if key in i.cma_o: i.cma_oplace = i.cma_oplace.replace(key, placedict[key]) i.save() print("已修改" + i.cma) def correct_cma(request): for i in CMA.objects.all(): i.sszx = i.sszx.replace("14检验检测能力申请表-", "") i.save() return Response()