from django.conf import settings from rest_framework.exceptions import ParseError from apps.enm.models import Mpoint, MpointStat, EnStat, EnStat2, MpLogx, Xscript from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet from apps.utils.mixins import BulkCreateModelMixin, BulkDestroyModelMixin, CustomListModelMixin, BulkUpdateModelMixin from apps.enm.serializers import (MpointSerializer, MpLogxSerializer, MpointStatSerializer, EnStatSerializer, EnStat2Serializer, ReCalSerializer, MpointStatCorrectSerializer, EnStatAnaSerializer, EnStatUpdateSerializer, XscriptSerializer, XscriptDetailSerializer) from apps.enm.filters import MpointStatFilter, EnStatFilter, EnStat2Filter from apps.enm.tasks import cal_mpointstat_manual from rest_framework.response import Response from rest_framework.serializers import Serializer from rest_framework.decorators import action from rest_framework.views import APIView from apps.enm.tasks import cal_mpointstats_duration from apps.enm.services import king_sync, MpointCache from django.db import transaction from datetime import datetime from django.utils.timezone import localtime from apps.enm.services import get_analyse_data_mgroups_duration from django.db.models import Sum import logging from django.core.cache import cache from apps.utils.sql import query_one_dict, query_all_dict from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from django.utils import timezone myLogger = logging.getLogger('log') class MpointViewSet(CustomModelViewSet): """ list:测点 测点 """ queryset = Mpoint.objects.all() serializer_class = MpointSerializer select_related_fields = ["create_by", "belong_dept", "ep_monitored", "ep_belong", "mgroup", "material"] filterset_fields = { "belong_dept": ["exact"], "ep_monitored": ["exact", "isnull"], "ep_monitored__power_kw": ["exact", "gte"], "ep_belong": ["exact"], "mgroup": ["exact"], "is_rep_mgroup": ["exact"], "type": ["exact"], "mgroup__name": ["exact"], "val_type": ["exact"], "enabled": ["exact"], "need_display": ["exact"], "formula": ["exact", "contains"], "material": ["exact", "isnull"], "material__code": ["exact", "in"], "code": ["exact", "contains", "in"], "need_change_cal_coefficient": ["exact"], } search_fields = ["name", "code", "nickname", "material__code", "material__name", "ep_rs_expr", "formula"] ordering = ["create_time", "name", "code"] @transaction.atomic def perform_create(self, serializer): instance = serializer.save() if instance.code: MpointCache(instance.code).get(True) @transaction.atomic def perform_update(self, serializer): old_code = serializer.instance.code instance: Mpoint = serializer.save() if instance.code: if old_code != instance.code: cache.delete(Mpoint.cache_key(old_code)) MpointCache(instance.code).get(True) if instance.enabled is False: mc = MpointCache(instance.code) now = localtime() mc.set_fail(-2, now) @action(methods=["post"], detail=False, perms_map={"post": "mpoint.create"}, serializer_class=Serializer) def king_sync(self, request, *args, **kwargs): """同步亚控采集点 同步亚控采集点 """ king_sync(getattr(settings, "KING_PROJECTNAME", "")) return Response() @action(methods=["post"], detail=False, perms_map={"post": "mpoint.create"}, serializer_class=Serializer) def show_picture(self, request, *args, **kwargs): import requests import os headers = { "Content-Type": "application/json;charset=utf-8", } url = "http://localhost:8093/boxplot" payload = { "startTime1": request.data.get("startTime1"), "endTime1": request.data.get("endTime1"), "startTime2": request.data.get("startTime2"), "endTime2": request.data.get("endTime2") } try: response = requests.request("POST", url, json=payload, headers=headers) except Exception as e: myLogger.error(e) pic_dir = os.path.join(settings.MEDIA_ROOT, "box_pic") os.makedirs(pic_dir, exist_ok=True) file_name= datetime.now().strftime('%Y%m%d_%H%M%S')+'.png' pic_path = os.path.join(pic_dir, file_name) with open(pic_path, 'wb') as f: f.write(response.content) rel_path = os.path.join('media/box_pic', file_name) rel_path = rel_path.replace('\\', '/') return Response({"rel_path": rel_path}) class XscriptViewSet(CustomModelViewSet): """ list:执行脚本 执行脚本 """ queryset = Xscript.objects.all() serializer_class = XscriptSerializer select_related_fields = ["myschedule", "periodictask"] retrieve_serializer_class = XscriptDetailSerializer search_fields = ['name'] @action(methods=['put'], detail=True, perms_map={'put': 'xscript.update'}) def toggle(self, request, pk=None): """修改启用禁用状态 修改启用禁用状态 """ obj = self.get_object() periodictask = obj.periodictask periodictask.enabled = False if periodictask.enabled else True periodictask.save() return Response() @action(methods=['put'], detail=True, perms_map={'put': 'xscript.update'}) def change_data(self, request, pk=None): """修改变动数据 修改变动数据 """ obj: Xscript = self.get_object() obj.change_data = request.data.get('change_data', {}) obj.save(update_fields=['change_data']) return Response() @transaction.atomic def perform_destroy(self, instance): periodictask = instance.periodictask instance.delete() periodictask.delete() # class MpLogViewSet(ListModelMixin, CustomGenericViewSet): # """ # list:测点原始记录 # 测点原始记录 # """ # perms_map = {'get': '*'} # queryset = MpLog.objects.all() # serializer_class = MpLogSerializer # select_related_fields = ['mpoint'] # filterset_fields = ['mpoint', 'mpoint__mgroup', 'mpoint__mgroup__belong_dept'] class MpLogxAPIView(APIView): """ list:测点采集数据 测点采集数据 """ perms_map = {"get": "*"} @swagger_auto_schema(manual_parameters=[ openapi.Parameter('mpoint', openapi.IN_QUERY, description='测点ID', type=openapi.TYPE_STRING), openapi.Parameter('timex__gte', openapi.IN_QUERY, description='开始时间', type=openapi.TYPE_STRING), openapi.Parameter('timex__lte', openapi.IN_QUERY, description='结束时间', type=openapi.TYPE_STRING), openapi.Parameter('page', openapi.IN_QUERY, description='页码', type=openapi.TYPE_INTEGER), openapi.Parameter('page_size', openapi.IN_QUERY, description='每页数量', type=openapi.TYPE_INTEGER), openapi.Parameter('ordering', openapi.IN_QUERY, description='排序字段,如 -timex', type=openapi.TYPE_STRING), openapi.Parameter('fields', openapi.IN_QUERY, description='返回字段,如 timex,val_float,val_int', type=openapi.TYPE_STRING), ]) def get(self, request, *args, **kwargs): mpoint = request.query_params.get("mpoint", None) timex__gte_str = request.query_params.get("timex__gte", None) timex__lte_str = request.query_params.get("timex__lte", None) page = int(request.query_params.get("page", 1)) page_size = int(request.query_params.get("page_size", 20)) fields = request.query_params.get("fields", None) if page < 0 and page_size < 0: raise ParseError("page, page_size must be positive") ordering = request.query_params.get("ordering", "-timex") # 默认倒序 if mpoint is None or timex__gte_str is None: raise ParseError("mpoint, timex__gte are required") # 处理时间 timex__gte = timezone.make_aware(datetime.strptime(timex__gte_str, "%Y-%m-%d %H:%M:%S")) timex__lte = timezone.make_aware(datetime.strptime(timex__lte_str, "%Y-%m-%d %H:%M:%S")) if timex__lte_str else timezone.now() print(timex__gte, timex__lte) # 统计总数 count_sql = """SELECT COUNT(*) AS total_count FROM enm_mplogx WHERE mpoint_id=%s AND timex >= %s AND timex <= %s""" count_data = query_one_dict(count_sql, [mpoint, timex__gte, timex__lte], with_time_format=True) # 排序白名单 allowed_fields = {"timex", "val_mrs", "val_int", "val_float"} # 根据表字段修改 order_fields = [] for field in ordering.split(","): field = field.strip() if not field: continue desc = field.startswith("-") field_name = field[1:] if desc else field if field_name in allowed_fields: order_fields.append(f"{field_name} {'DESC' if desc else 'ASC'}") # 如果没有合法字段,使用默认排序 if not order_fields: order_fields = ["timex DESC"] order_clause = "ORDER BY " + ", ".join(order_fields) # 构造 SQL if page == 0: if fields: # 过滤白名单,避免非法列 fields = [f for f in fields.split(",") if f in allowed_fields] if not fields: fields = ["timex", "val_float", "val_int"] # 默认列 select_clause = ", ".join(fields) else: select_clause = "timex, val_float, val_int" # 默认列 page_sql = f"""SELECT {select_clause} FROM enm_mplogx WHERE mpoint_id=%s AND timex >= %s AND timex <= %s {order_clause}""" page_params = [mpoint, timex__gte, timex__lte] else: page_sql = f"""SELECT * FROM enm_mplogx WHERE mpoint_id=%s AND timex >= %s AND timex <= %s {order_clause} LIMIT %s OFFSET %s""" page_params = [mpoint, timex__gte, timex__lte, page_size, (page-1)*page_size] page_data = query_all_dict(page_sql, page_params, with_time_format=True) if page == 0: return Response(page_data) return Response({ "count": count_data["total_count"], "page": page, "page_size": page_size, "results": page_data }) class MpLogxViewSet(CustomListModelMixin, CustomGenericViewSet): """ list: 测点采集数据 测点采集数据 """ perms_map = {"get": "*"} queryset = MpLogx.objects.all() serializer_class = MpLogxSerializer filterset_fields = { "timex": ["exact", "gte", "lte", "year", "month", "day"], "mpoint": ["exact", "in"], "mpoint__ep_monitored": ["exact"] } ordering_fields = ["timex"] ordering = ["-timex"] @action(methods=["get"], detail=False, perms_map={"get": "*"}) def to_wide(self, request, *args, **kwargs): """转换为宽表 转换为宽表 """ queryset = self.filter_queryset(self.get_queryset()) class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, CustomListModelMixin, CustomGenericViewSet): """ list:测点统计记录 测点统计记录 """ perms_map = {"get": "*", "post": "mpointstat.create", "delete": "mpointstat.delete"} queryset = MpointStat.objects.all() serializer_class = MpointStatSerializer select_related_fields = ["mpoint", "mpoint__ep_monitored", "mpoint__ep_belong", "mgroup", "mgroup__belong_dept"] filterset_class = MpointStatFilter ordering_fields = ['mpoint__report_sortstr', 'year', 'month', 'day', 'hour', 'year_s', 'month_s', 'day_s', 'mgroup__sort', 'create_time', "update_time"] ordering = ["mpoint__report_sortstr", "-year", "-month", "-day", "-year_s", "-month_s", "-day_s", "-create_time", "mgroup__sort"] def perform_create(self, serializer): ins = serializer.save() cal_mpointstat_manual.delay(ins.mpoint.id, ins.sflog.id, ins.mgroup.id, None, None, None, None, ins.year_s, ins.month_s, ins.day_s, next_cal=1) def perform_destroy(self, instance): mpoint, sflog, mgroup, year_s, month_s, day_s = instance.mpoint, instance.sflog, instance.mgroup, instance.year_s, instance.month_s, instance.day_s instance.delete() cal_mpointstat_manual.delay(mpoint.id, sflog.id, mgroup.id, None, None, None, None, year_s, month_s, day_s, next_cal=1) @action(methods=["post"], detail=True, perms_map={"post": "mpointstat.correct"}, serializer_class=MpointStatCorrectSerializer) def correct(self, request, *args, **kwargs): """修正测点统计记录及统计值 修正测点统计记录及统计值 """ instance_id = kwargs.get("pk") if not instance_id: return Response({"detail": "ID not provided in the URL"}, status=400) instance: MpointStat = self.get_object() sr = MpointStatCorrectSerializer(data=request.data) sr.is_valid(raise_exception=True) last_record = None # 校正班月和月的统计值 if instance.type in ['month_s', 'month']: last_record = MpointStat.objects.filter(mpoint=instance.mpoint, type=instance.type, mgroup=instance.mgroup, year_s=instance.year_s, year = instance.year ).order_by(instance.type).values("id").last() # 校正班天和天的统计值 elif instance.type in ['day_s', 'day']: last_record = MpointStat.objects.filter(mpoint=instance.mpoint, type=instance.type, year = instance.year, year_s=instance.year_s, month_s=instance.month_s, month = instance.month ).order_by(instance.type).values("id").last() last_id = last_record["id"] if last_record else None if str(last_id) == str(instance_id): raise ParseError("不能修正当日或当月数据") vdata = sr.validated_data val_correct = vdata["val_correct"] instance.val_correct = val_correct instance.val = val_correct instance.update_by = request.user instance.save() (mpoint, sflog, mgroup, year_s, month_s, day_s, year, month, day, hour) = ( instance.mpoint, instance.sflog, instance.mgroup, instance.year_s, instance.month_s, instance.day_s, instance.hour, instance.year, instance.month, instance.day) # sflog 可能为None if sflog is None: sflogId = None else: sflogId = sflog.id if mgroup is None: mgroupId = None else: mgroupId = mgroup.id myLogger.info(f'{mpoint.id}--{sflogId}--{mgroupId}--{year}--{month}--{day}--{hour}--{year_s}--{month_s}--{day_s}') cal_mpointstat_manual.delay(mpoint.id, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, next_cal=1) return Response() @action(methods=["post"], detail=False, perms_map={"post": "mpointstat.correct"}, serializer_class=ReCalSerializer) def recal(self, request, *args, **kwargs): """重新运行某段时间的enm计算 重新运行某段时间的enm计算 """ data = request.data sr = ReCalSerializer(data=data) sr.is_valid(raise_exception=True) task = cal_mpointstats_duration.delay(data["start_time"], data["end_time"]) return Response({"task_id": task.task_id}) @action(methods=["get"], detail=False, perms_map={"get": "*"}) def group_values(self, request, *args, **kwargs): """ 测点统计数据聚合查询 """ qs = self.filter_queryset(self.get_queryset()) group_by = request.query_params.get("group_by", "") group_by_list = group_by.split(",") if "" in group_by_list: group_by_list.remove("") if 'mpoint' not in group_by_list: group_by_list.append('mpoint') qs = qs.values() qs = qs.order_by() display_fields = ['mpoint__name', 'total_val', 'mpoint__nickname', 'mpoint__unit'] + group_by_list aggreagte_qs = qs.values(*group_by_list).annotate(total_val = Sum('val')).values(*display_fields) result = list(aggreagte_qs) return Response(result) class EnStatViewSet(CustomListModelMixin, BulkUpdateModelMixin, CustomGenericViewSet): """ list:能耗统计记录 能耗统计记录 """ perms_map = {"get": "*", "put": "enstat.update"} queryset = EnStat.objects.all() serializer_class = EnStatSerializer update_serializer_class = EnStatUpdateSerializer select_related_fields = ["mgroup", "team", "mgroup__belong_dept"] filterset_class = EnStatFilter ordering = ["mgroup__sort", "year_s", "month_s", "day_s", "team__create_time", "create_time"] ordering_fields = ["mgroup__sort", "year_s", "month_s", "day_s", "hour", "update_time", "create_time", "team__create_time"] @action(methods=["post"], detail=False, perms_map={"post": "*"}, serializer_class=EnStatAnaSerializer) def analyze(self, request, *args, **kwargs): """一段时间范围的工段分析数据 一段时间范围的工段分析数据(工段对比) """ data = request.data sr = EnStatAnaSerializer(data=data) sr.is_valid(raise_exception=True) vdata = sr.validated_data start_date: datetime = vdata["start_date"] end_date: datetime = vdata["end_date"] ret = get_analyse_data_mgroups_duration(start_date, end_date) return Response(ret) class EnStat2ViewSet(CustomListModelMixin, CustomGenericViewSet): """ list:全厂统计记录 全厂统计记录 """ perms_map = {"get": "*"} queryset = EnStat2.objects.all() serializer_class = EnStat2Serializer filterset_class = EnStat2Filter