factory/apps/enm/services.py

536 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from apps.enm.models import Mpoint, MpointStat, MpLogx
import re
import logging
from django.db.models import Q
from django.utils import timezone
from django.core.cache import cache
from datetime import datetime, timedelta
from apps.utils.decorators import auto_log
from django.db import IntegrityError
from .serializers import MpointSerializer
from apps.enp.models import EnvData
from apps.em.models import Equipment
from apps.em.services import set_eq_rs
from server.settings import get_sysconfig
from apps.enm.models import EnStat
from django.db.models import Sum
from typing import Dict, Any
myLogger = logging.getLogger("log")
def db_insert_mplogx_batch(rows):
for row in rows:
_, tag_val, tag_code, tag_update = row
if tag_update.tzinfo is None:
tag_update = timezone.make_aware(tag_update)
insert_mplogx_item(tag_code, tag_val, tag_update, {})
def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int):
"""
传入
"""
pattern = r"\{(.*?)}"
matches = re.findall(pattern, exp_str)
exp_str2 = exp_str
for match in matches:
mpst = MpointStat.objects.filter(mpoint__code=match, type="hour", year=year, month=month, day=day, hour=hour).first()
if mpst is None:
mpoint = Mpoint.objects.get(code=match)
mpst, _ = MpointStat.objects.get_or_create(mpoint=mpoint, type="hour", year=year, month=month, day=day, hour=hour, defaults={"val": 0})
myLogger.error(f"找不到该测点的时间线数据: {match}, {year}, {month}, {day}, {hour}, 赋予0值")
if mpst:
# exp_str2 = exp_str2.replace(f"{{{match}}}", str(mpst.val))
# 为了校正计算测点
exp_str2 = exp_str2.replace(f"{{{match}}}", str(mpst.val if mpst.val_correct is None else mpst.val_correct))
rval = 0
try:
rval = eval(exp_str2)
except Exception as e:
myLogger.error(f"表达式计算错误: {e}, {exp_str}, --{exp_str2}")
rval = 0
return rval
def get_can_save_from_save_expr(expr_str: str, self_val) -> bool:
"""判断是否可以保存
"""
pattern = r"\{(.*?)}"
matches = re.findall(pattern, expr_str)
for match in matches:
if match == "self":
expr_str = expr_str.replace("{self}", self_val)
else:
mpoint_data = MpointCache(match).data
current_val = mpoint_data.get("last_data", {}).get("last_val", None)
if current_val is None:
return True
else:
expr_str = expr_str.replace(f"{{{match}}}", str(current_val))
try:
rval = eval(expr_str)
except Exception as e:
myLogger.error(f"存储表达式计算错误: {e}, {expr_str}")
rval = False
return rval
def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_str: str):
"""
将测点值转换所监测设备的运行状态值
base_expr: 三元表达式
"""
if expr_str: # 优先使用表达式
pattern = r"\{(.*?)}"
matches = re.findall(pattern, expr_str)
for match in matches:
if match == "self":
expr_str = expr_str.replace("{self}", str(current_val))
else:
match_val = 50
mpoint_data = MpointCache(match).data
if mpoint_data and mpoint_data.get("gather_state", -2) == 0: # 测点正常采集
match_val = mpoint_data["last_data"]["last_mrs"]
expr_str = expr_str.replace(f"{{{match}}}", str(match_val))
rval = eval(expr_str)
return rval
if isinstance(current_val, bool):
if current_val:
return Equipment.RUNING
return Equipment.STOP
rs = Equipment.RUNING
if base_val:
if current_val < base_val:
rs = Equipment.STOP
else:
if current_val == 0:
rs = Equipment.STOP
return rs
# def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
# """
# 根据给定的小时数, 计算出班天
# """
# if hour <= hour_split:
# return year, month, day
# else:
# now = datetime.datetime(year, month, day, hour)
# now2 = now + datetime.timedelta(days=1)
# return now2.year, now2.month, now2.day
# cal_rule = {
# "电石渣": {
# "total_production": 0.4,
# "elec_consume_unit": 0.4,
# "production_cost_unit": 0.2
# },
# "原料磨":{
# "production_hour": 0.3,
# "elec_consume_unit": 0.3,
# "production_cost_unit": 0.1,
# "辅料_细度":0.05,
# "辅料_水分":0.04,
# "干混生料_CaO":0.04
# }
# }
# def cal_team_score(data):
# """
# 计算月度绩效
# """
# qua_rate = {}
# month_s = data['month_s']
# for item in data['qua_data']:
# qua_rate[f'{item["material_name"]}_{item["testitem_name"]}'] = item["rate_pass"]
# goal_dict = get_mgroup_goals(data['mgroup'], data['year_s'], False)
# goal_data = {}
# try:
# rule = cal_rule[data['mgroup_name']]
# score = 0
# for key in rule:
# new_key = f'{key}_{month_s}'
# goal_data[new_key] = goal_dict[new_key]
# if '-' in key:
# score = score + qua_rate.get(key, 0)/goal_data[new_key]*rule[key]
# else:
# score = score + data.get(key)/goal_data[new_key]*rule[key]
# print(score)
# # enstat.goal_data = goal_data
# # enstat.score =score
# # enstat.save(update_fields=['goal_data', 'score'])
# except:
# print(traceback.format_exc())
# return goal_data, score
class MpointCache:
"""测点缓存类"""
def __init__(self, code: str):
self.code = code
self.cache_key = Mpoint.cache_key(code)
self.data = self.get()
def get(self, force_update=False, update_mplogx=True):
key = self.cache_key
code = self.code
mpoint_data = cache.get(key, None)
if mpoint_data is None or force_update:
try:
mpoint = Mpoint.objects.get(code=code)
except Exception as e:
myLogger.error(f"测点缓存获取失败: {e}, {code}")
cache.set(key, {}, timeout=None)
return {}
mpoint_data = MpointSerializer(instance=mpoint).data
mpoint_data["last_data"] = {"last_val": None, "last_timex": None, "last_mrs": None} # 初始化
if update_mplogx:
now = timezone.now()
last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=10)).order_by("-timex").first()
if last_mplogx: # 核心数据
mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex}
cache.set(key, mpoint_data, timeout=None)
return mpoint_data
def set_fail(self, reason: int = -1, timex: datetime = datetime.now()):
"""
-1 存库失败 -2 掉线
"""
self.data["gather_state"] = reason
cache.set(self.cache_key, self.data, timeout=None)
if reason == -2:
is_rep_ep_running_state = self.data.get("is_rep_ep_running_state", False)
ep_monitored_id = self.data.get("ep_monitored")
ep_belong_id = self.data.get("ep_belong")
if ep_monitored_id and is_rep_ep_running_state and ep_belong_id != ep_monitored_id:
set_eq_rs(ep_monitored_id, timex, Equipment.OFFLINE)
if ep_belong_id:
set_eq_rs(ep_belong_id, timex, Equipment.OFFLINE)
def set(self, last_timex: datetime, last_val):
# last_timex保存到秒
last_timex = last_timex.replace(microsecond=0)
current_cache_val = self.data
cache_key = self.cache_key
last_data = current_cache_val["last_data"]
if isinstance(last_val, (int, float)):
last_val = last_val*current_cache_val.get('coefficient', 1.0)
if (last_val == 0 and
current_cache_val.get("type", 0) == 10 and
current_cache_val.get('is_rep_ep_running_state', None) is False and
get_sysconfig('enm.use_cache_when_w_el_0', False) is True and
current_cache_val.get("material_name", "") in ["动力电", "工业水"]): # 如果电表断电
last_val = last_data["last_val"] if last_data["last_val"] is not None else 0
else:
last_data["last_val"] = last_val
last_data["last_timex"] = last_timex
last_mrs = None # 设备状态信号
mpoint_is_rep_ep_running_state = current_cache_val["is_rep_ep_running_state"]
if mpoint_is_rep_ep_running_state:
mpoint_ep_rs_val = current_cache_val.get("ep_rs_val", None)
mpoint_ep_rs_expr = current_cache_val.get("ep_rs_expr", None)
last_mrs = transfer_mpoint_val_to_ep_running_state(last_val, mpoint_ep_rs_val, mpoint_ep_rs_expr)
last_data["last_mrs"] = last_mrs
current_cache_val["gather_state"] = 0 # 表明数据已更新
cache.set(cache_key, current_cache_val, timeout=None)
# 存库
mpoint = Mpoint.objects.filter(id=current_cache_val["id"]).first()
if mpoint is None:
cache.delete(cache_key)
return
save_dict = {"timex": last_timex, "mpoint": Mpoint.objects.get(id=current_cache_val["id"]), "val_mrs": last_mrs}
save_dict[f"val_{current_cache_val['val_type']}"] = last_val
try:
MpLogx.objects.create(**save_dict)
except IntegrityError:
pass
# 下面开始更新设备信号
ep_belong_id = current_cache_val.get("ep_belong")
ep_monitored_id = current_cache_val.get("ep_monitored")
mpoint_is_rep_ep0_running_state = current_cache_val.get("is_rep_ep0_running_state", False)
if ep_monitored_id and mpoint_is_rep_ep_running_state:
set_eq_rs(ep_monitored_id, last_timex, last_mrs)
if ep_belong_id and mpoint_is_rep_ep0_running_state and ep_belong_id != ep_monitored_id:
set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING)
mf_code = current_cache_val.get("mpoint_affect")
if mf_code: # 如果该测点影响到另一个测点,要同步更新另一个测点
mc = MpointCache(mf_code)
mf_data = mc.data
# 只有自采测点才可影响计算测点只针对开关信号
if mf_data and current_cache_val["type"] == Mpoint.MT_AUTO and mf_data["type"] == Mpoint.MT_COMPUTE and mf_data["is_rep_ep_running_state"] and mf_data["ep_rs_expr"]:
mc.set(last_timex, None)
def king_sync(projectName: str, json_path: str = ""):
"""
同步亚控测点
"""
if json_path:
import os
import json
from django.conf import settings
with open(os.path.join(settings.BASE_DIR, json_path), "r", encoding="utf-8") as f:
res = json.loads(f.read())
else:
from apps.third.king.k import kingClient
from apps.third.king.king_api import kapis
_, res = kingClient.request(**kapis["read_variables"], params={"projectInstanceName": projectName})
t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"}
for index, item in enumerate(res["objectList"]):
if "t" in item and item["t"] and "SystemTag" not in item["d"]:
code = f'K_{item["n"]}'
item["from"] = "king"
group = item["g"]
name = item["d"]
if group:
name = f"{group}.{name}"
ins, created = Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "type": Mpoint.MT_AUTO, "val_type": t_dict[item["t"]], "third_info": item})
if not created and ins.val_type != t_dict[item["t"]]: # 如果数据类型变了要同步更新
ins.val_type = t_dict[item["t"]]
ins.third_info = item
ins.save(update_fields=["val_type", "third_info"])
test_data = {
"PNs": {"1": "V", "2": "T", "3": "Q"},
"PVs": {"1": -725, "2": "2024-04-08 13:43:53.140", "3": 192},
"Objs": [
{"N": "IP2_MM_IW3"},
{"N": "IP2_MM_IW4", "1": -6386},
{"N": "IP2_MM_IW5", "1": -7835},
{"N": "IP2_MM_IW7", "1": -6864},
{"N": "IP2_MM_IW15", "1": 29},
{"N": "IP2_MM_SC_IW3", "1": 337},
{"N": "IP2_MM_SC_IW13", "1": -5511, "2": 24},
{"N": "IP2_MM_SC_IW14", "1": -4640, "2": 24},
{"N": "IP2_MM_SC_IW15", "1": -5586, "2": 24},
{"N": "IP2_MM_SC_IW16", "1": -6634, "2": 24},
{"N": "IP2_MM_SC_IW17", "1": -2768, "2": 24},
{"N": "IP2_MM_SC_IW18", "1": -2946, "2": 24},
{"N": "IP2_MM_SC_IW19", "1": -2550, "2": 24},
{"N": "IP2_MM_SC_IW20", "1": -1512, "2": 24},
{"N": "IP2_MM_SC_IW21", "1": -1775, "2": 24},
{"N": "IP2_MM_SC_IW22", "1": -194, "2": 24},
{"N": "IP2_MM_SC_IW23", "1": -366, "2": 24},
{"N": "IP2_MM_SC_IW24", "1": -9291, "2": 24},
{"N": "IP2_MM_SC_IW25", "1": -6888, "2": 24},
{"N": "IP2_MM_SC_IW26", "1": -2360, "2": 24},
{"N": "IP2_MM_SC_IW29", "1": 164, "2": 24},
{"N": "IP2_MM_SC_IW30", "1": 914, "2": 24},
{"N": "IP2_MM_SC_IW31", "1": 849, "2": 24},
{"N": "IP2_MM_SC_IW37", "1": -125, "2": 24},
{"N": "IP2_MM_SC_IW38", "1": -3009, "2": 24},
{"N": "IP2_MM_SC_IW40", "1": -1394, "2": 24},
{"N": "IP2_MM_SC_IW43", "1": 758, "2": 24},
{"N": "IP3_SC_IW1", "1": 11557, "2": 107},
{"N": "IP3_SC_IW2", "1": 7624, "2": 107},
{"N": "IP3_SC_IW6", "1": 11159, "2": 107},
{"N": "IP3_SC_IW7", "1": 8073, "2": 107},
{"N": "IP3_SC_IW9", "1": 4490, "2": 107},
{"N": "IP3_SC_IW10", "1": 5437, "2": 107},
{"N": "IP3_SC_IW11", "1": 9244, "2": 107},
{"N": "IP3_SC_IW12", "1": 7886, "2": 107},
{"N": "IP3_SC_IW13", "1": -2962, "2": 107},
{"N": "IP3_SC_IW14", "1": -159, "2": 107},
{"N": "IP3_SC_IW26", "1": 15, "2": 107},
{"N": "IP3_SC_IW27", "1": 15, "2": 107},
],
}
@auto_log("亚控存库")
def insert_mplogx_from_king_mqtt(data: dict, is_offset=True):
"""
从king mqtt数据插入超表
注释的代码是分批存的, 但是实际上不需要, 暂时注释,有需要再加
"""
objs = data["Objs"]
# len_objs = len(objs)
pvs = data["PVs"]
# chunk_size = 200
# num_chunks = (len(objs) + chunk_size - 1) // chunk_size
otime_obj = timezone.make_aware(datetime.strptime(pvs["2"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0) # 只保留到秒级的精度
oval = pvs["1"]
insert_mplogx_from_king_mqtt_chunk(objs, oval, otime_obj, is_offset)
# with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# futures = []
# for i in range(num_chunks):
# start = i * chunk_size
# end = min(start + chunk_size, len_objs)
# chunk = objs[start:end]
# futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, otime_obj, is_offset))
# concurrent.futures.wait(futures)
# for future in futures:
# print(future.result(), end=', ')
def insert_mplogx_item(code: str, val, timex: datetime, enp_mpoints_dict):
"""
存入超表
"""
mc = MpointCache(code)
mpoint_data = mc.data
if mpoint_data in (None, {}) or not mpoint_data["enabled"]:
return
mpoint_interval = mpoint_data["interval"]
mpoint_last_timex = mpoint_data.get("last_data", {}).get("last_timex", None)
# 控制采集间隔
can_save = False
if mpoint_last_timex:
if (timex - mpoint_last_timex).total_seconds() > mpoint_interval or timex < mpoint_last_timex:
can_save = True
else:
can_save = True
if mpoint_data.get("save_expr", None):
can_save = get_can_save_from_save_expr(mpoint_data["save_expr"], val)
if can_save:
try:
MpointCache(code).set(timex, val)
# 此处代码用于更新envdata表里的数据
enp_field = mpoint_data.get("enp_field", None)
ep_monitored = mpoint_data.get("ep_monitored", None)
if enp_field and ep_monitored:
if enp_mpoints_dict.get(ep_monitored, None) is None:
enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)}
if enp_field == "running_state":
enp_mpoints_dict[ep_monitored].update({enp_field: mpoint_data["last_data"]["last_mrs"]})
else:
enp_mpoints_dict[ep_monitored].update({enp_field: val})
enp_mpoints_dict[ep_monitored].update({enp_field: val})
except IntegrityError: # 忽略唯一性错误
pass
except Exception:
myLogger.error(f"mpoint_code: {code} 存库失败", exc_info=True)
mc.set_fail(-1, timex)
def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True):
"""
分批存库, 亚控 38.00,00000.11011 版本偏移只是时间戳偏移。另外其实可以不在乎
"""
# oval = pvs['1']
enp_mpoints_dict = {} # 这个地方主要是需要更新envdata表里的数据
for obj in objs:
n = obj["N"]
val = obj.get("1", oval)
# timex = obj.get("2", None)
code = f"K_{n}"
insert_mplogx_item(code, val, otime_obj, enp_mpoints_dict)
# # 先尝试批量存库/发生异常则单个存储
# is_bulk_insert = True
# try:
# MpLogx.objects.bulk_create(insert_db_data)
# except IntegrityError:
# is_bulk_insert = False
# if is_bulk_insert:
# # 批量存库成功后更新缓存
# for item in update_cache_data:
# need_gather_filed_mpoints.append(update_mpoint_cache_and_do_func(item))
# else:
# {'eq_belong': {'dust_rtd': 1.0}}
# 额外需要处理的数据(存储到envdata表里)
if enp_mpoints_dict:
for _, item in enp_mpoints_dict:
EnvData.objects(**item)
def insert_mplogx_from_king_rest_chunk(objs: list):
enp_mpoints_dict = {}
for obj in objs:
n = obj["N"]
code = f"K_{n}"
timex = timezone.make_aware(datetime.strptime(obj["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0)
insert_mplogx_item(code, obj["V"], timex, enp_mpoints_dict)
if enp_mpoints_dict:
for _, item in enp_mpoints_dict.items():
# try:
EnvData.objects.get_or_create(timex=item["timex"], equipment=item["equipment"], defaults=item)
# except IntegrityError as e: # 忽略唯一性错误
# myLogger.error(e, exc_info=True)
def get_analyse_data_mgroups_duration(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""
获取一段时间范围的工段分析数据
"""
start_year, start_month, start_day = start_date.year, start_date.month, start_date.day
end_year, end_month, end_day = end_date.year, end_date.month, end_date.day
# total_sec = (end_date - start_date).total_seconds() + 3600 * 24
qs = (
EnStat.objects.filter(mgroup__cate="section", type="day_s")
.filter(Q(year_s__gt=start_year) | Q(year_s=start_year, month_s__gt=start_month) | Q(year_s=start_year, month_s=start_month, day_s__gte=start_day))
.filter(Q(year_s__lt=end_year) | Q(year_s=end_year, month_s__lt=end_month) | Q(year_s=end_year, month_s=end_month, day_s__lte=end_day))
)
res = (
qs.values("mgroup", "mgroup__name")
.annotate(total_production=Sum("total_production"),
run_sec=Sum("run_sec"), elec_consume=Sum("elec_consume"),
pcoal_coal_consume=Sum("pcoal_coal_consume"),
total_sec_now=Sum("total_sec_now"))
.order_by("mgroup__sort")
)
res_dict = {}
for item in res:
res_dict[item["mgroup__name"]] = item
for key, value in item.items():
if value is None:
item[key] = 0
for item in res:
item["mgroup_name"] = item["mgroup__name"]
item["production_hour"] = round(item["total_production"] * 3600 / item["run_sec"], 2) if item["run_sec"] > 0 else 0
item["elec_consume_unit"] = round(item["elec_consume"] / item["total_production"], 2) if item["total_production"] > 0 else 0
item["run_hour"] = round(item["run_sec"] / 3600, 2) if item["run_sec"] > 0 else 0
item["run_rate"] = round(item["run_sec"] * 100 / item["total_sec_now"], 2) if item["total_sec_now"] > 0 else 0
item["coal_consume_unit"] = round(item["pcoal_coal_consume"] * 1000 / item["total_production"], 2) if item["total_production"] > 0 else 0
if item["mgroup_name"] == "回转窑":
total_production_ylm = res_dict.get("原料磨", {}).get("total_production", 0)
elec_consume_ylm = res_dict.get("原料磨", {}).get("elec_consume", 0)
elec_consume_mm = res_dict.get("煤磨", {}).get("elec_consume", 0)
if item["total_production"] == 0:
item["celec_consume_unit"] = 0
item["cen_consume_unit"] = 0
else:
item["celec_consume_unit"] = round((elec_consume_mm + item["elec_consume"]) / get_safe_value(item["total_production"]) + get_sysconfig("enm.enm_lhxs") * elec_consume_ylm / get_safe_value(total_production_ylm), 2)
item["cen_consume_unit"] = round(item["coal_consume_unit"] + 0.1229 * item["elec_consume_unit"], 2)
item["total_production"] = round(item["total_production"], 2)
return res
def get_safe_value(value, default=1):
return value if value != 0 else default
def get_elec_level(month, hour):
"""
peak:用电尖峰 11112月份 1900-2100 -- 7 月份 2100-2300 更新val_level 为 peak
high:用电高峰时段8小时800--1100 1900--2400 更新val_level 为 high
flat:用电平谷时段8小时 1100--1300 1700--1900 更新val_level 为 flat
low:用电低谷时段8小时 400--800, 13:00--1700 更新val_level 为 low
"""
if (month in [1, 11, 12] and hour in [19, 20, 21]) or (month == 7 and hour in [21, 22]):
return 'peak'
elif month in [5, 6, 7, 8] and hour in [14, 15]:
return 'deep'
elif hour in [8, 9, 10, 19, 20, 21, 22, 23]:
return 'high'
elif hour in [4, 5, 6, 7, 13, 14, 15, 16]:
return 'low'
elif hour in [11, 12, 17, 18, 0, 1, 2, 3]:
return 'flat'
return 'flat'