factory/apps/enm/views.py

450 lines
18 KiB
Python

from django.conf import settings
from rest_framework.exceptions import ParseError
from apps.enm.models import Mpoint, MpointStat, EnStat, EnStat2, MpLogx, Xscript
from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet
from apps.utils.mixins import BulkCreateModelMixin, BulkDestroyModelMixin, CustomListModelMixin, BulkUpdateModelMixin
from apps.enm.serializers import (MpointSerializer, MpLogxSerializer, MpointStatSerializer,
EnStatSerializer, EnStat2Serializer, ReCalSerializer,
MpointStatCorrectSerializer,
EnStatAnaSerializer, EnStatUpdateSerializer, XscriptSerializer, XscriptDetailSerializer)
from apps.enm.filters import MpointStatFilter, EnStatFilter, EnStat2Filter
from apps.enm.tasks import cal_mpointstat_manual
from rest_framework.response import Response
from rest_framework.serializers import Serializer
from rest_framework.decorators import action
from rest_framework.views import APIView
from apps.enm.tasks import cal_mpointstats_duration
from apps.enm.services import king_sync, MpointCache
from django.db import transaction
from datetime import datetime
from django.utils.timezone import localtime
from apps.enm.services import get_analyse_data_mgroups_duration
from django.db.models import Sum
import logging
from django.core.cache import cache
from apps.utils.sql import query_one_dict, query_all_dict
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
from django.utils import timezone
myLogger = logging.getLogger('log')
class MpointViewSet(CustomModelViewSet):
"""
list:测点
测点
"""
queryset = Mpoint.objects.all()
serializer_class = MpointSerializer
select_related_fields = ["create_by", "belong_dept", "ep_monitored", "ep_belong", "mgroup", "material"]
filterset_fields = {
"belong_dept": ["exact"],
"ep_monitored": ["exact", "isnull"],
"ep_monitored__power_kw": ["exact", "gte"],
"ep_belong": ["exact"],
"mgroup": ["exact"],
"is_rep_mgroup": ["exact"],
"type": ["exact"],
"mgroup__name": ["exact"],
"val_type": ["exact"],
"enabled": ["exact"],
"need_display": ["exact"],
"formula": ["exact", "contains"],
"material": ["exact", "isnull"],
"material__code": ["exact", "in"],
"code": ["exact", "contains", "in"],
"need_change_cal_coefficient": ["exact"],
}
search_fields = ["name", "code", "nickname", "material__code", "material__name", "ep_rs_expr", "formula"]
ordering = ["create_time", "name", "code"]
@transaction.atomic
def perform_create(self, serializer):
instance = serializer.save()
if instance.code:
MpointCache(instance.code).get(True)
@transaction.atomic
def perform_update(self, serializer):
old_code = serializer.instance.code
instance: Mpoint = serializer.save()
if instance.code:
if old_code != instance.code:
cache.delete(Mpoint.cache_key(old_code))
MpointCache(instance.code).get(True)
if instance.enabled is False:
mc = MpointCache(instance.code)
now = localtime()
mc.set_fail(-2, now)
@action(methods=["post"], detail=False, perms_map={"post": "mpoint.create"}, serializer_class=Serializer)
def king_sync(self, request, *args, **kwargs):
"""同步亚控采集点
同步亚控采集点
"""
king_sync(getattr(settings, "KING_PROJECTNAME", ""))
return Response()
@action(methods=["post"], detail=False, perms_map={"post": "mpoint.create"}, serializer_class=Serializer)
def show_picture(self, request, *args, **kwargs):
import requests
import os
headers = {
"Content-Type": "application/json;charset=utf-8",
}
url = "http://localhost:8093/boxplot"
payload = {
"startTime1": request.data.get("startTime1"),
"endTime1": request.data.get("endTime1"),
"startTime2": request.data.get("startTime2"),
"endTime2": request.data.get("endTime2")
}
try:
response = requests.request("POST", url, json=payload, headers=headers)
except Exception as e:
myLogger.error(e)
pic_dir = os.path.join(settings.MEDIA_ROOT, "box_pic")
os.makedirs(pic_dir, exist_ok=True)
file_name= datetime.now().strftime('%Y%m%d_%H%M%S')+'.png'
pic_path = os.path.join(pic_dir, file_name)
with open(pic_path, 'wb') as f:
f.write(response.content)
rel_path = os.path.join('media/box_pic', file_name)
rel_path = rel_path.replace('\\', '/')
return Response({"rel_path": rel_path})
class XscriptViewSet(CustomModelViewSet):
"""
list:执行脚本
执行脚本
"""
queryset = Xscript.objects.all()
serializer_class = XscriptSerializer
select_related_fields = ["myschedule", "periodictask"]
retrieve_serializer_class = XscriptDetailSerializer
search_fields = ['name']
@action(methods=['put'], detail=True, perms_map={'put': 'xscript.update'})
def toggle(self, request, pk=None):
"""修改启用禁用状态
修改启用禁用状态
"""
obj = self.get_object()
periodictask = obj.periodictask
periodictask.enabled = False if periodictask.enabled else True
periodictask.save()
return Response()
@action(methods=['put'], detail=True, perms_map={'put': 'xscript.update'})
def change_data(self, request, pk=None):
"""修改变动数据
修改变动数据
"""
obj: Xscript = self.get_object()
obj.change_data = request.data.get('change_data', {})
obj.save(update_fields=['change_data'])
return Response()
@transaction.atomic
def perform_destroy(self, instance):
periodictask = instance.periodictask
instance.delete()
periodictask.delete()
# class MpLogViewSet(ListModelMixin, CustomGenericViewSet):
# """
# list:测点原始记录
# 测点原始记录
# """
# perms_map = {'get': '*'}
# queryset = MpLog.objects.all()
# serializer_class = MpLogSerializer
# select_related_fields = ['mpoint']
# filterset_fields = ['mpoint', 'mpoint__mgroup', 'mpoint__mgroup__belong_dept']
class MpLogxAPIView(APIView):
"""
list:测点采集数据
测点采集数据
"""
perms_map = {"get": "*"}
@swagger_auto_schema(manual_parameters=[
openapi.Parameter('mpoint', openapi.IN_QUERY, description='测点ID', type=openapi.TYPE_STRING),
openapi.Parameter('timex__gte', openapi.IN_QUERY, description='开始时间', type=openapi.TYPE_STRING),
openapi.Parameter('timex__lte', openapi.IN_QUERY, description='结束时间', type=openapi.TYPE_STRING),
openapi.Parameter('page', openapi.IN_QUERY, description='页码', type=openapi.TYPE_INTEGER),
openapi.Parameter('page_size', openapi.IN_QUERY, description='每页数量', type=openapi.TYPE_INTEGER),
openapi.Parameter('ordering', openapi.IN_QUERY, description='排序字段,如 -timex', type=openapi.TYPE_STRING),
openapi.Parameter('fields', openapi.IN_QUERY, description='返回字段,如 timex,val_float,val_int', type=openapi.TYPE_STRING),
])
def get(self, request, *args, **kwargs):
mpoint = request.query_params.get("mpoint", None)
timex__gte_str = request.query_params.get("timex__gte", None)
timex__lte_str = request.query_params.get("timex__gte", None)
page = int(request.query_params.get("page", 1))
page_size = int(request.query_params.get("page_size", 20))
fields = request.query_params.get("fields", None)
if page < 0 and page_size < 0:
raise ParseError("page, page_size must be positive")
ordering = request.query_params.get("ordering", "-timex") # 默认倒序
if mpoint is None or timex__gte_str is None:
raise ParseError("mpoint, timex__gte are required")
# 处理时间
timex__gte = timezone.make_aware(datetime.strptime(timex__gte_str, "%Y-%m-%d %H:%M:%S"))
timex__lte = timezone.make_aware(datetime.strptime(timex__lte_str, "%Y-%m-%d %H:%M:%S")) if timex__lte_str else timezone.now()
# 统计总数
count_sql = """SELECT COUNT(*) AS total_count FROM enm_mplogx
WHERE mpoint_id=%s AND timex >= %s AND timex <= %s"""
count_data = query_one_dict(count_sql, [mpoint, timex__gte, timex__lte], with_time_format=True)
# 排序白名单
allowed_fields = {"timex", "val_mrs", "val_int", "val_float"} # 根据表字段修改
order_fields = []
for field in ordering.split(","):
field = field.strip()
if not field:
continue
desc = field.startswith("-")
field_name = field[1:] if desc else field
if field_name in allowed_fields:
order_fields.append(f"{field_name} {'DESC' if desc else 'ASC'}")
# 如果没有合法字段,使用默认排序
if not order_fields:
order_fields = ["timex DESC"]
order_clause = "ORDER BY " + ", ".join(order_fields)
# 构造 SQL
if page == 0:
if fields:
# 过滤白名单,避免非法列
fields = [f for f in fields.split(",") if f in allowed_fields]
if not fields:
fields = ["timex", "val_float", "val_int"] # 默认列
select_clause = ", ".join(fields)
else:
select_clause = "timex, val_float, val_int" # 默认列
page_sql = f"""SELECT {select_clause} FROM enm_mplogx
WHERE mpoint_id=%s AND timex >= %s AND timex <= %s
{order_clause}"""
page_params = [mpoint, timex__gte, timex__lte]
else:
page_sql = f"""SELECT * FROM enm_mplogx
WHERE mpoint_id=%s AND timex >= %s AND timex <= %s
{order_clause} LIMIT %s OFFSET %s"""
page_params = [mpoint, timex__gte, timex__lte, page_size, (page-1)*page_size]
page_data = query_all_dict(page_sql, page_params, with_time_format=True)
if page == 0:
return Response(page_data)
return Response({
"count": count_data["total_count"],
"page": page,
"page_size": page_size,
"results": page_data
})
class MpLogxViewSet(CustomListModelMixin, CustomGenericViewSet):
"""
list: 测点采集数据
测点采集数据
"""
perms_map = {"get": "*"}
queryset = MpLogx.objects.all()
serializer_class = MpLogxSerializer
filterset_fields = {
"timex": ["exact", "gte", "lte", "year", "month", "day"],
"mpoint": ["exact", "in"],
"mpoint__ep_monitored": ["exact"]
}
ordering_fields = ["timex"]
ordering = ["-timex"]
@action(methods=["get"], detail=False, perms_map={"get": "*"})
def to_wide(self, request, *args, **kwargs):
"""转换为宽表
转换为宽表
"""
queryset = self.filter_queryset(self.get_queryset())
class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, CustomListModelMixin, CustomGenericViewSet):
"""
list:测点统计记录
测点统计记录
"""
perms_map = {"get": "*", "post": "mpointstat.create", "delete": "mpointstat.delete"}
queryset = MpointStat.objects.all()
serializer_class = MpointStatSerializer
select_related_fields = ["mpoint", "mpoint__ep_monitored", "mpoint__ep_belong", "mgroup", "mgroup__belong_dept"]
filterset_class = MpointStatFilter
ordering_fields = ['mpoint__report_sortstr', 'year', 'month', 'day', 'hour', 'year_s', 'month_s', 'day_s', 'mgroup__sort', 'create_time', "update_time"]
ordering = ["mpoint__report_sortstr", "-year", "-month", "-day", "-year_s", "-month_s", "-day_s", "-create_time", "mgroup__sort"]
def perform_create(self, serializer):
ins = serializer.save()
cal_mpointstat_manual.delay(ins.mpoint.id, ins.sflog.id, ins.mgroup.id, None, None, None, None, ins.year_s, ins.month_s, ins.day_s, next_cal=1)
def perform_destroy(self, instance):
mpoint, sflog, mgroup, year_s, month_s, day_s = instance.mpoint, instance.sflog, instance.mgroup, instance.year_s, instance.month_s, instance.day_s
instance.delete()
cal_mpointstat_manual.delay(mpoint.id, sflog.id, mgroup.id, None, None, None, None, year_s, month_s, day_s, next_cal=1)
@action(methods=["post"], detail=True, perms_map={"post": "mpointstat.correct"}, serializer_class=MpointStatCorrectSerializer)
def correct(self, request, *args, **kwargs):
"""修正测点统计记录及统计值
修正测点统计记录及统计值
"""
instance_id = kwargs.get("pk")
if not instance_id:
return Response({"detail": "ID not provided in the URL"}, status=400)
instance: MpointStat = self.get_object()
sr = MpointStatCorrectSerializer(data=request.data)
sr.is_valid(raise_exception=True)
last_record = None
# 校正班月和月的统计值
if instance.type in ['month_s', 'month']:
last_record = MpointStat.objects.filter(mpoint=instance.mpoint,
type=instance.type,
mgroup=instance.mgroup,
year_s=instance.year_s,
year = instance.year
).order_by(instance.type).values("id").last()
# 校正班天和天的统计值
elif instance.type in ['day_s', 'day']:
last_record = MpointStat.objects.filter(mpoint=instance.mpoint,
type=instance.type,
year = instance.year,
year_s=instance.year_s,
month_s=instance.month_s,
month = instance.month
).order_by(instance.type).values("id").last()
last_id = last_record["id"] if last_record else None
if str(last_id) == str(instance_id):
raise ParseError("不能修正当日或当月数据")
vdata = sr.validated_data
val_correct = vdata["val_correct"]
instance.val_correct = val_correct
instance.val = val_correct
instance.update_by = request.user
instance.save()
(mpoint, sflog, mgroup, year_s, month_s, day_s, year, month, day, hour) = (
instance.mpoint, instance.sflog, instance.mgroup,
instance.year_s, instance.month_s, instance.day_s, instance.hour,
instance.year, instance.month, instance.day)
# sflog 可能为None
if sflog is None:
sflogId = None
else:
sflogId = sflog.id
if mgroup is None:
mgroupId = None
else:
mgroupId = mgroup.id
myLogger.info(f'{mpoint.id}--{sflogId}--{mgroupId}--{year}--{month}--{day}--{hour}--{year_s}--{month_s}--{day_s}')
cal_mpointstat_manual.delay(mpoint.id, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, next_cal=1)
return Response()
@action(methods=["post"], detail=False, perms_map={"post": "mpointstat.correct"}, serializer_class=ReCalSerializer)
def recal(self, request, *args, **kwargs):
"""重新运行某段时间的enm计算
重新运行某段时间的enm计算
"""
data = request.data
sr = ReCalSerializer(data=data)
sr.is_valid(raise_exception=True)
task = cal_mpointstats_duration.delay(data["start_time"], data["end_time"])
return Response({"task_id": task.task_id})
@action(methods=["get"], detail=False, perms_map={"get": "*"})
def group_values(self, request, *args, **kwargs):
"""
测点统计数据聚合查询
"""
qs = self.filter_queryset(self.get_queryset())
group_by = request.query_params.get("group_by", "")
group_by_list = group_by.split(",")
if "" in group_by_list:
group_by_list.remove("")
if 'mpoint' not in group_by_list:
group_by_list.append('mpoint')
qs = qs.values()
qs = qs.order_by()
display_fields = ['mpoint__name', 'total_val', 'mpoint__nickname', 'mpoint__unit'] + group_by_list
aggreagte_qs = qs.values(*group_by_list).annotate(total_val = Sum('val')).values(*display_fields)
result = list(aggreagte_qs)
return Response(result)
class EnStatViewSet(CustomListModelMixin, BulkUpdateModelMixin, CustomGenericViewSet):
"""
list:能耗统计记录
能耗统计记录
"""
perms_map = {"get": "*", "put": "enstat.update"}
queryset = EnStat.objects.all()
serializer_class = EnStatSerializer
update_serializer_class = EnStatUpdateSerializer
select_related_fields = ["mgroup", "team", "mgroup__belong_dept"]
filterset_class = EnStatFilter
ordering = ["mgroup__sort", "year_s", "month_s", "day_s", "team__create_time", "create_time"]
ordering_fields = ["mgroup__sort", "year_s", "month_s", "day_s", "hour", "update_time", "create_time", "team__create_time"]
@action(methods=["post"], detail=False, perms_map={"post": "*"}, serializer_class=EnStatAnaSerializer)
def analyze(self, request, *args, **kwargs):
"""一段时间范围的工段分析数据
一段时间范围的工段分析数据(工段对比)
"""
data = request.data
sr = EnStatAnaSerializer(data=data)
sr.is_valid(raise_exception=True)
vdata = sr.validated_data
start_date: datetime = vdata["start_date"]
end_date: datetime = vdata["end_date"]
ret = get_analyse_data_mgroups_duration(start_date, end_date)
return Response(ret)
class EnStat2ViewSet(CustomListModelMixin, CustomGenericViewSet):
"""
list:全厂统计记录
全厂统计记录
"""
perms_map = {"get": "*"}
queryset = EnStat2.objects.all()
serializer_class = EnStat2Serializer
filterset_class = EnStat2Filter