feat: 增加工段数据按任意时间范围分析

This commit is contained in:
caoqianming 2024-05-22 17:04:51 +08:00
parent bf5df19a59
commit 8627d5f9dc
3 changed files with 103 additions and 35 deletions

View File

@ -167,4 +167,8 @@ class ReCalSerializer(serializers.Serializer):
class MpointStatCorrectSerializer(CustomModelSerializer): class MpointStatCorrectSerializer(CustomModelSerializer):
class Meta: class Meta:
model = MpointStat model = MpointStat
fields = ['val_correct', 'id'] fields = ['val_correct', 'id']
class EnStatAnaSerializer(serializers.Serializer):
start_date = serializers.DateField(label="开始日期")
end_date = serializers.DateField(label="结束日期")

View File

@ -12,6 +12,9 @@ from apps.enp.models import EnvData
from apps.em.models import Equipment from apps.em.models import Equipment
from apps.em.services import set_eq_rs from apps.em.services import set_eq_rs
from server.settings import get_sysconfig from server.settings import get_sysconfig
from apps.enm.models import EnStat
from django.db.models import Sum
from typing import Dict, Any
myLogger = logging.getLogger("log") myLogger = logging.getLogger("log")
@ -23,8 +26,8 @@ def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour:
pattern = r"\{(.*?)}" pattern = r"\{(.*?)}"
matches = re.findall(pattern, exp_str) matches = re.findall(pattern, exp_str)
for match in matches: for match in matches:
if match in ['enm_lhxs']: if match in ["enm_lhxs"]:
exp_str = exp_str.replace(f"{{{match}}}", str(get_sysconfig(f'enm.{match}'))) exp_str = exp_str.replace(f"{{{match}}}", str(get_sysconfig(f"enm.{match}")))
else: else:
mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(mpoint__code=match), type="hour", year=year, month=month, day=day, hour=hour).first() mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(mpoint__code=match), type="hour", year=year, month=month, day=day, hour=hour).first()
if mpst: if mpst:
@ -32,6 +35,7 @@ def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour:
rval = eval(exp_str) rval = eval(exp_str)
return rval return rval
def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_str: str): def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_str: str):
""" """
将测点值转换所监测设备的运行状态值 将测点值转换所监测设备的运行状态值
@ -41,14 +45,14 @@ def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_s
pattern = r"\{(.*?)}" pattern = r"\{(.*?)}"
matches = re.findall(pattern, expr_str) matches = re.findall(pattern, expr_str)
for match in matches: for match in matches:
if match == 'self': if match == "self":
expr_str = expr_str.replace(f"${{{match}}}", str(current_val)) expr_str = expr_str.replace(f"${{{match}}}", str(current_val))
else: else:
mpoint_data = MpointCache(match).data mpoint_data = MpointCache(match).data
if mpoint_data: if mpoint_data:
match_val = 50 match_val = 50
if mpoint_data.get('gather_state', -2) == 0: # 测点正常采集 if mpoint_data.get("gather_state", -2) == 0: # 测点正常采集
match_val = mpoint_data['last_data']['last_mrs'] match_val = mpoint_data["last_data"]["last_mrs"]
expr_str = expr_str.replace(f"{{{match}}}", str(match_val)) expr_str = expr_str.replace(f"{{{match}}}", str(match_val))
rval = eval(expr_str) rval = eval(expr_str)
return rval return rval
@ -65,6 +69,7 @@ def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_s
rs = Equipment.STOP rs = Equipment.STOP
return rs return rs
# def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21): # def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
# """ # """
# 根据给定的小时数, 计算出班天 # 根据给定的小时数, 计算出班天
@ -121,14 +126,15 @@ def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_s
# print(traceback.format_exc()) # print(traceback.format_exc())
# return goal_data, score # return goal_data, score
class MpointCache: class MpointCache:
"""测点缓存类 """测点缓存类"""
"""
def __init__(self, code: str): def __init__(self, code: str):
self.code = code self.code = code
self.cache_key = Mpoint.cache_key(code) self.cache_key = Mpoint.cache_key(code)
self.data = self.get() self.data = self.get()
def get(self, force_update=False, update_mplogx=True): def get(self, force_update=False, update_mplogx=True):
key = self.cache_key key = self.cache_key
code = self.code code = self.code
@ -139,7 +145,7 @@ class MpointCache:
except Exception: except Exception:
return None return None
mpoint_data = MpointSerializer(instance=mpoint).data mpoint_data = MpointSerializer(instance=mpoint).data
mpoint_data['last_data'] = {'last_val': None, 'last_timex': None, 'last_mrs': None} # 初始化 mpoint_data["last_data"] = {"last_val": None, "last_timex": None, "last_mrs": None} # 初始化
if update_mplogx: if update_mplogx:
now = timezone.now() now = timezone.now()
last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first() last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first()
@ -148,11 +154,11 @@ class MpointCache:
cache.set(key, mpoint_data, timeout=None) cache.set(key, mpoint_data, timeout=None)
return mpoint_data return mpoint_data
def set_fail(self, reason: int = -1, timex: datetime=datetime.now()): def set_fail(self, reason: int = -1, timex: datetime = datetime.now()):
""" """
-1 存库失败 -2 掉线 -1 存库失败 -2 掉线
""" """
self.data['gather_state'] = reason self.data["gather_state"] = reason
cache.set(self.cache_key, self.data, timeout=None) cache.set(self.cache_key, self.data, timeout=None)
if reason == -2: if reason == -2:
is_rep_ep_running_state = self.data.get("is_rep_ep_running_state", False) is_rep_ep_running_state = self.data.get("is_rep_ep_running_state", False)
@ -162,7 +168,7 @@ class MpointCache:
set_eq_rs(ep_monitored_id, timex, Equipment.OFFLINE) set_eq_rs(ep_monitored_id, timex, Equipment.OFFLINE)
if ep_belong_id: if ep_belong_id:
set_eq_rs(ep_belong_id, timex, Equipment.OFFLINE) set_eq_rs(ep_belong_id, timex, Equipment.OFFLINE)
def set(self, last_timex: datetime, last_val): def set(self, last_timex: datetime, last_val):
current_cache_val = self.data current_cache_val = self.data
cache_key = self.cache_key cache_key = self.cache_key
@ -170,21 +176,17 @@ class MpointCache:
last_data["last_val"] = last_val last_data["last_val"] = last_val
last_data["last_timex"] = last_timex last_data["last_timex"] = last_timex
last_mrs = None # 设备状态信号 last_mrs = None # 设备状态信号
mpoint_is_rep_ep_running_state = current_cache_val['is_rep_ep_running_state'] mpoint_is_rep_ep_running_state = current_cache_val["is_rep_ep_running_state"]
if mpoint_is_rep_ep_running_state: if mpoint_is_rep_ep_running_state:
mpoint_ep_rs_val = current_cache_val.get("ep_rs_val", None) mpoint_ep_rs_val = current_cache_val.get("ep_rs_val", None)
mpoint_ep_rs_expr = current_cache_val.get("ep_rs_expr", None) mpoint_ep_rs_expr = current_cache_val.get("ep_rs_expr", None)
last_mrs = transfer_mpoint_val_to_ep_running_state(last_val, mpoint_ep_rs_val, mpoint_ep_rs_expr) last_mrs = transfer_mpoint_val_to_ep_running_state(last_val, mpoint_ep_rs_val, mpoint_ep_rs_expr)
last_data["last_mrs"] = last_mrs last_data["last_mrs"] = last_mrs
current_cache_val['gather_state'] = 0 # 表明数据已更新 current_cache_val["gather_state"] = 0 # 表明数据已更新
cache.set(cache_key, current_cache_val, timeout=None) cache.set(cache_key, current_cache_val, timeout=None)
# 存库 # 存库
save_dict = { save_dict = {"timex": last_timex, "mpoint": Mpoint.objects.get(id=current_cache_val["id"]), "val_mrs": last_mrs}
"timex": last_timex,
"mpoint": Mpoint.objects.get(id=current_cache_val["id"]),
"val_mrs": last_mrs
}
save_dict[f"val_{current_cache_val['val_type']}"] = last_val save_dict[f"val_{current_cache_val['val_type']}"] = last_val
MpLogx.objects.create(**save_dict) MpLogx.objects.create(**save_dict)
@ -196,13 +198,12 @@ class MpointCache:
if ep_belong_id: if ep_belong_id:
set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING) set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING)
mf_code = current_cache_val.get("mpoint_affect")
mf_code = current_cache_val.get('mpoint_affect')
if mf_code: # 如果该测点影响到另一个测点,要同步更新另一个测点 if mf_code: # 如果该测点影响到另一个测点,要同步更新另一个测点
mc = MpointCache(mf_code) mc = MpointCache(mf_code)
mf_data = mc.data mf_data = mc.data
# 只有自采测点才可影响计算测点只针对开关信号 # 只有自采测点才可影响计算测点只针对开关信号
if mf_data and current_cache_val['type'] == Mpoint.MT_AUTO and mf_data['type'] == Mpoint.MT_COMPUTE and mf_data['is_rep_ep_running_state'] and mf_data['ep_rs_expr']: if mf_data and current_cache_val["type"] == Mpoint.MT_AUTO and mf_data["type"] == Mpoint.MT_COMPUTE and mf_data["is_rep_ep_running_state"] and mf_data["ep_rs_expr"]:
mc.set(last_timex, None) mc.set(last_timex, None)
@ -233,11 +234,12 @@ def king_sync(projectName: str, json_path: str = ""):
if group: if group:
name = f"{group}.{name}" name = f"{group}.{name}"
ins, created = Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "type": Mpoint.MT_AUTO, "val_type": t_dict[item["t"]], "third_info": item}) ins, created = Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "type": Mpoint.MT_AUTO, "val_type": t_dict[item["t"]], "third_info": item})
if not created and ins.val_type != t_dict[item["t"]]: # 如果数据类型变了要同步更新 if not created and ins.val_type != t_dict[item["t"]]: # 如果数据类型变了要同步更新
ins.val_type = t_dict[item["t"]] ins.val_type = t_dict[item["t"]]
ins.third_info = item ins.third_info = item
ins.save(update_fields=["val_type", "third_info"]) ins.save(update_fields=["val_type", "third_info"])
test_data = { test_data = {
"PNs": {"1": "V", "2": "T", "3": "Q"}, "PNs": {"1": "V", "2": "T", "3": "Q"},
"PVs": {"1": -725, "2": "2024-04-08 13:43:53.140", "3": 192}, "PVs": {"1": -725, "2": "2024-04-08 13:43:53.140", "3": 192},
@ -311,18 +313,19 @@ def insert_mplogx_from_king_mqtt(data: dict, is_offset=True):
# concurrent.futures.wait(futures) # concurrent.futures.wait(futures)
# for future in futures: # for future in futures:
# print(future.result(), end=', ') # print(future.result(), end=', ')
def insert_mplogx_item(code, val, timex, enp_mpoints_dict): def insert_mplogx_item(code, val, timex, enp_mpoints_dict):
""" """
存入超表 存入超表
""" """
mc = MpointCache(code) mc = MpointCache(code)
mpoint_data = mc.data mpoint_data = mc.data
if mpoint_data is None or not mpoint_data['enabled']: if mpoint_data is None or not mpoint_data["enabled"]:
return return
mpoint_interval = mpoint_data["interval"] mpoint_interval = mpoint_data["interval"]
mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None) mpoint_last_timex = mpoint_data.get("last_data", {}).get("last_timex", None)
# 控制采集间隔 # 控制采集间隔
can_save = False can_save = False
@ -342,7 +345,7 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict):
if enp_mpoints_dict.get(ep_monitored, None) is None: if enp_mpoints_dict.get(ep_monitored, None) is None:
enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)} enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)}
if enp_field == "running_state": if enp_field == "running_state":
enp_mpoints_dict[ep_monitored].update({enp_field: mpoint_data['last_data']['last_mrs']}) enp_mpoints_dict[ep_monitored].update({enp_field: mpoint_data["last_data"]["last_mrs"]})
else: else:
enp_mpoints_dict[ep_monitored].update({enp_field: val}) enp_mpoints_dict[ep_monitored].update({enp_field: val})
enp_mpoints_dict[ep_monitored].update({enp_field: val}) enp_mpoints_dict[ep_monitored].update({enp_field: val})
@ -385,17 +388,62 @@ def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is
for _, item in enp_mpoints_dict: for _, item in enp_mpoints_dict:
EnvData.objects(**item) EnvData.objects(**item)
def insert_mplogx_from_king_rest_chunk(objs: list): def insert_mplogx_from_king_rest_chunk(objs: list):
enp_mpoints_dict = {} enp_mpoints_dict = {}
for obj in objs: for obj in objs:
n = obj["N"] n = obj["N"]
code = f"K_{n}" code = f"K_{n}"
timex = timezone.make_aware(datetime.strptime(obj["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0) timex = timezone.make_aware(datetime.strptime(obj["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0)
insert_mplogx_item(code, obj["V"], timex, enp_mpoints_dict) insert_mplogx_item(code, obj["V"], timex, enp_mpoints_dict)
if enp_mpoints_dict: if enp_mpoints_dict:
for _, item in enp_mpoints_dict.items(): for _, item in enp_mpoints_dict.items():
try: try:
EnvData.objects.create(**item) EnvData.objects.create(**item)
except IntegrityError as e: # 忽略唯一性错误 except IntegrityError as e: # 忽略唯一性错误
myLogger.error(e, exc_info=True) myLogger.error(e, exc_info=True)
def get_analyse_data_mgroups_duration(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""
获取一段时间范围的工段分析数据
"""
start_year, start_month, start_day = start_date.year, start_date.month, start_date.day
end_year, end_month, end_day = end_date.year, end_date.month, end_date.day
total_sec = (end_date - start_date).total_seconds() + 3600 * 24
qs = (
EnStat.objects.filter(mgroup__cate="section")
.filter(Q(year_s__gt=start_year) | Q(year_s=start_year, month_s__gt=start_month) | Q(year_s=start_year, month_s=start_month, day_s__gte=start_day))
.filter(Q(year_s__lt=end_year) | Q(year_s=end_year, month_s__lt=end_month) | Q(year_s=end_year, month_s=end_month, day_s__lte=end_day))
)
res = (
qs.values("mgroup", "mgroup__name")
.annotate(total_production=Sum("total_production"), run_sec=Sum("run_sec"), elec_consume=Sum("elec_consume"), pcoal_coal_consume=Sum("pcoal_coal_consume"))
.order_by("mgroup__sort")
)
res_dict = {}
for item in res:
res_dict[item["mgroup__name"]] = item
for key, value in item.items():
if value is None:
item[key] = 0
for item in res:
item["mgroup_name"] = item["mgroup__name"]
item["production_hour"] = round(item["total_production"] * 3600 / item["run_sec"], 2) if item["run_sec"] > 0 else 0
item["elec_consume_unit"] = round(item["elec_consume"] / item["total_production"], 2) if item["total_production"] > 0 else 0
item["run_rate"] = round(item["run_sec"] * 100 / total_sec, 4) if total_sec > 0 else 0
item["coal_consume_unit"] = round(item["pcoal_coal_consume"] * 1000 / item["total_production"], 2) if item["total_production"] > 0 else 0
if item["mgroup_name"] == "回转窑":
total_production_ylm = res_dict.get("原料磨", {}).get("total_production", 0)
elec_consume_ylm = res_dict.get("原料磨", {}).get("elec_consume", 0)
elec_consume_mm = res_dict.get("煤磨", {}).get("elec_consume", 0)
if item["total_production"] == 0:
item["celec_consume_unit"] = 0
item["en_consume_unit"] = 0
else:
item["celec_consume_unit"] = ((elec_consume_mm + item["elec_consume"]) / item["total_production"] + get_sysconfig("enm.enm_lhxs") * elec_consume_ylm / total_production_ylm, 2)
item["en_consume_unit"] = item["coal_consume_unit"] + 0.1229 * item["elec_consume_unit"]
return res

View File

@ -1,9 +1,8 @@
from django.shortcuts import render
from django.conf import settings from django.conf import settings
from apps.enm.models import Mpoint, MpointStat, EnStat, EnStat2, MpLogx from apps.enm.models import Mpoint, MpointStat, EnStat, EnStat2, MpLogx
from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet
from apps.utils.mixins import BulkCreateModelMixin, BulkDestroyModelMixin, CustomListModelMixin from apps.utils.mixins import BulkCreateModelMixin, BulkDestroyModelMixin, CustomListModelMixin
from apps.enm.serializers import MpointSerializer, MpLogxSerializer, MpointStatSerializer, EnStatSerializer, EnStat2Serializer, ReCalSerializer, MpointStatCorrectSerializer from apps.enm.serializers import (MpointSerializer, MpLogxSerializer, MpointStatSerializer, EnStatSerializer, EnStat2Serializer, ReCalSerializer, MpointStatCorrectSerializer, EnStatAnaSerializer)
from apps.enm.filters import MpointStatFilter, EnStatFilter, EnStat2Filter from apps.enm.filters import MpointStatFilter, EnStatFilter, EnStat2Filter
from apps.enm.tasks import cal_mpointstat_manual from apps.enm.tasks import cal_mpointstat_manual
from rest_framework.response import Response from rest_framework.response import Response
@ -12,6 +11,8 @@ from rest_framework.decorators import action
from apps.enm.tasks import cal_mpointstats_duration from apps.enm.tasks import cal_mpointstats_duration
from apps.enm.services import king_sync, MpointCache from apps.enm.services import king_sync, MpointCache
from django.db import transaction from django.db import transaction
from datetime import datetime
from apps.enm.services import get_analyse_data_mgroups_duration
class MpointViewSet(CustomModelViewSet): class MpointViewSet(CustomModelViewSet):
@ -169,6 +170,21 @@ class EnStatViewSet(CustomListModelMixin, CustomGenericViewSet):
filterset_class = EnStatFilter filterset_class = EnStatFilter
ordering = ["mgroup__sort", "year_s", "month_s", "day_s", "hour"] ordering = ["mgroup__sort", "year_s", "month_s", "day_s", "hour"]
@action(methods=["post"], detail=False, perms_map={"post": "*"}, serializer_class=EnStatAnaSerializer)
def analyze(self, request, *args, **kwargs):
"""一段时间范围的工段分析数据
一段时间范围的工段分析数据
"""
data = request.data
sr = EnStatAnaSerializer(data=data)
sr.is_valid(raise_exception=True)
vdata = sr.validated_data
start_date: datetime = vdata["start_date"]
end_date: datetime = vdata["end_date"]
ret = get_analyse_data_mgroups_duration(start_date, end_date)
return Response(ret)
class EnStat2ViewSet(CustomListModelMixin, CustomGenericViewSet): class EnStat2ViewSet(CustomListModelMixin, CustomGenericViewSet):
""" """