368 lines
14 KiB
Python
368 lines
14 KiB
Python
from apps.enm.models import Mpoint, MpointStat, EnStat, MpLogx
|
|
import re
|
|
import logging
|
|
import traceback
|
|
from apps.mtm.services import get_mgroup_goals
|
|
from django.db.models import Q
|
|
from django.utils import timezone
|
|
from django.core.cache import cache
|
|
import concurrent.futures
|
|
from django.db import connection, transaction
|
|
from datetime import datetime, timedelta
|
|
from apps.utils.decorators import auto_log
|
|
from django.db import IntegrityError
|
|
from .serializers import MpointSerializer
|
|
from apps.enp.models import EnvData
|
|
from apps.em.models import Equipment
|
|
from apps.em.services import set_eq_rs
|
|
|
|
myLogger = logging.getLogger("log")
|
|
|
|
|
|
def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int):
|
|
"""
|
|
传入
|
|
"""
|
|
pattern = r"\${(.*?)}"
|
|
matches = re.findall(pattern, exp_str)
|
|
for match in matches:
|
|
mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(mpoint__code=match), type="hour", year=year, month=month, day=day, hour=hour).first()
|
|
val = 0
|
|
if mpst:
|
|
val = mpst.val
|
|
exp_str = exp_str.replace(f"${{{match}}}", str(val))
|
|
rval = eval(exp_str)
|
|
return rval
|
|
|
|
def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_str: str):
|
|
"""
|
|
将测点值转换所监测设备的运行状态值
|
|
base_expr: 三元表达式
|
|
"""
|
|
if expr_str: # 优先使用表达式
|
|
pattern = r"\${(.*?)}"
|
|
matches = re.findall(pattern, expr_str)
|
|
for match in matches:
|
|
if match == 'self':
|
|
expr_str = expr_str.replace(f"${{{match}}}", str(current_val))
|
|
else:
|
|
mpoint_data = get_mpoint_cache(match)
|
|
if mpoint_data:
|
|
expr_str = expr_str.replace(f"${{{match}}}", str(mpoint_data['last_data']['last_val']))
|
|
rval = eval(expr_str)
|
|
return rval
|
|
if isinstance(current_val, bool):
|
|
if current_val:
|
|
return Equipment.RUNING
|
|
return Equipment.STOP
|
|
rs = Equipment.RUNING
|
|
if base_val:
|
|
if current_val < base_val:
|
|
rs = Equipment.STOP
|
|
else:
|
|
if current_val == 0:
|
|
rs = Equipment.STOP
|
|
return rs
|
|
|
|
# def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
|
|
# """
|
|
# 根据给定的小时数, 计算出班天
|
|
# """
|
|
# if hour <= hour_split:
|
|
# return year, month, day
|
|
# else:
|
|
# now = datetime.datetime(year, month, day, hour)
|
|
# now2 = now + datetime.timedelta(days=1)
|
|
# return now2.year, now2.month, now2.day
|
|
|
|
|
|
# cal_rule = {
|
|
# "电石渣": {
|
|
# "total_production": 0.4,
|
|
# "elec_consume_unit": 0.4,
|
|
# "production_cost_unit": 0.2
|
|
# },
|
|
# "原料磨":{
|
|
# "production_hour": 0.3,
|
|
# "elec_consume_unit": 0.3,
|
|
# "production_cost_unit": 0.1,
|
|
# "辅料_细度":0.05,
|
|
# "辅料_水分":0.04,
|
|
# "干混生料_CaO":0.04
|
|
# }
|
|
# }
|
|
# def cal_team_score(data):
|
|
# """
|
|
# 计算月度绩效
|
|
# """
|
|
# qua_rate = {}
|
|
# month_s = data['month_s']
|
|
# for item in data['qua_data']:
|
|
# qua_rate[f'{item["material_name"]}_{item["testitem_name"]}'] = item["rate_pass"]
|
|
|
|
# goal_dict = get_mgroup_goals(data['mgroup'], data['year_s'], False)
|
|
# goal_data = {}
|
|
# try:
|
|
# rule = cal_rule[data['mgroup_name']]
|
|
# score = 0
|
|
# for key in rule:
|
|
# new_key = f'{key}_{month_s}'
|
|
# goal_data[new_key] = goal_dict[new_key]
|
|
# if '-' in key:
|
|
# score = score + qua_rate.get(key, 0)/goal_data[new_key]*rule[key]
|
|
# else:
|
|
# score = score + data.get(key)/goal_data[new_key]*rule[key]
|
|
# print(score)
|
|
# # enstat.goal_data = goal_data
|
|
# # enstat.score =score
|
|
# # enstat.save(update_fields=['goal_data', 'score'])
|
|
# except:
|
|
# print(traceback.format_exc())
|
|
# return goal_data, score
|
|
|
|
|
|
def get_mpoint_cache(code: str, force_update=False, update_mplogx=True):
|
|
"""
|
|
获取或更新mpoint的缓存数据
|
|
返回空代表无该测点
|
|
"""
|
|
key = Mpoint.cache_key(code)
|
|
mpoint_data = cache.get(key, None)
|
|
if mpoint_data is None or force_update:
|
|
try:
|
|
mpoint = Mpoint.objects.get(code=code)
|
|
except Exception:
|
|
return None
|
|
mpoint_data = MpointSerializer(instance=mpoint).data
|
|
if update_mplogx:
|
|
now = timezone.now()
|
|
last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first()
|
|
if last_mplogx: # 核心数据
|
|
mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex}
|
|
cache.set(key, mpoint_data, timeout=None)
|
|
return mpoint_data
|
|
|
|
|
|
def update_mpoint_cache(cache_key: str, current_cache_val: dict, last_timex: datetime, last_val, last_mrs):
|
|
"""
|
|
更新mpoint的缓存数据并执行某些操作
|
|
last_mrs: 所监测的设备运行状态值可不传
|
|
"""
|
|
ep_belong_id = current_cache_val.get("ep_belong", None)
|
|
ep_monitored_id = current_cache_val.get("ep_monitored", False)
|
|
last_data = current_cache_val["last_data"]
|
|
current_cache_val['gather_state'] = 0
|
|
last_data["last_val"] = last_val
|
|
last_data["last_mrs"] = last_mrs
|
|
last_data["last_timex"] = last_timex
|
|
cache.set(cache_key, current_cache_val, timeout=None)
|
|
# 更新设备状态值
|
|
if ep_belong_id:
|
|
set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING)
|
|
if ep_monitored_id:
|
|
set_eq_rs(ep_monitored_id, last_timex, last_mrs)
|
|
|
|
|
|
def king_sync(projectName: str, json_path: str = ""):
|
|
"""
|
|
同步亚控测点
|
|
"""
|
|
if json_path:
|
|
import os
|
|
import json
|
|
from django.conf import settings
|
|
|
|
with open(os.path.join(settings.BASE_DIR, json_path), "r", encoding="utf-8") as f:
|
|
res = json.loads(f.read())
|
|
else:
|
|
from apps.third.king.k import kingClient
|
|
from apps.third.king.king_api import kapis
|
|
|
|
_, res = kingClient.request(**kapis["read_variables"], params={"projectInstanceName": projectName})
|
|
|
|
t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"}
|
|
for index, item in enumerate(res["objectList"]):
|
|
if "t" in item and item["t"] and "SystemTag" not in item["d"]:
|
|
code = f'K_{item["n"]}'
|
|
item["from"] = "king"
|
|
group = item["g"]
|
|
name = item["d"]
|
|
if group:
|
|
name = f"{group}.{name}"
|
|
ins, created = Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "type": Mpoint.MT_AUTO, "val_type": t_dict[item["t"]], "third_info": item})
|
|
if not created and ins.val_type != t_dict[item["t"]]: # 如果数据类型变了要同步更新
|
|
ins.val_type = t_dict[item["t"]]
|
|
ins.third_info = item
|
|
ins.save(update_fields=["val_type", "third_info"])
|
|
|
|
test_data = {
|
|
"PNs": {"1": "V", "2": "T", "3": "Q"},
|
|
"PVs": {"1": -725, "2": "2024-04-08 13:43:53.140", "3": 192},
|
|
"Objs": [
|
|
{"N": "IP2_MM_IW3"},
|
|
{"N": "IP2_MM_IW4", "1": -6386},
|
|
{"N": "IP2_MM_IW5", "1": -7835},
|
|
{"N": "IP2_MM_IW7", "1": -6864},
|
|
{"N": "IP2_MM_IW15", "1": 29},
|
|
{"N": "IP2_MM_SC_IW3", "1": 337},
|
|
{"N": "IP2_MM_SC_IW13", "1": -5511, "2": 24},
|
|
{"N": "IP2_MM_SC_IW14", "1": -4640, "2": 24},
|
|
{"N": "IP2_MM_SC_IW15", "1": -5586, "2": 24},
|
|
{"N": "IP2_MM_SC_IW16", "1": -6634, "2": 24},
|
|
{"N": "IP2_MM_SC_IW17", "1": -2768, "2": 24},
|
|
{"N": "IP2_MM_SC_IW18", "1": -2946, "2": 24},
|
|
{"N": "IP2_MM_SC_IW19", "1": -2550, "2": 24},
|
|
{"N": "IP2_MM_SC_IW20", "1": -1512, "2": 24},
|
|
{"N": "IP2_MM_SC_IW21", "1": -1775, "2": 24},
|
|
{"N": "IP2_MM_SC_IW22", "1": -194, "2": 24},
|
|
{"N": "IP2_MM_SC_IW23", "1": -366, "2": 24},
|
|
{"N": "IP2_MM_SC_IW24", "1": -9291, "2": 24},
|
|
{"N": "IP2_MM_SC_IW25", "1": -6888, "2": 24},
|
|
{"N": "IP2_MM_SC_IW26", "1": -2360, "2": 24},
|
|
{"N": "IP2_MM_SC_IW29", "1": 164, "2": 24},
|
|
{"N": "IP2_MM_SC_IW30", "1": 914, "2": 24},
|
|
{"N": "IP2_MM_SC_IW31", "1": 849, "2": 24},
|
|
{"N": "IP2_MM_SC_IW37", "1": -125, "2": 24},
|
|
{"N": "IP2_MM_SC_IW38", "1": -3009, "2": 24},
|
|
{"N": "IP2_MM_SC_IW40", "1": -1394, "2": 24},
|
|
{"N": "IP2_MM_SC_IW43", "1": 758, "2": 24},
|
|
{"N": "IP3_SC_IW1", "1": 11557, "2": 107},
|
|
{"N": "IP3_SC_IW2", "1": 7624, "2": 107},
|
|
{"N": "IP3_SC_IW6", "1": 11159, "2": 107},
|
|
{"N": "IP3_SC_IW7", "1": 8073, "2": 107},
|
|
{"N": "IP3_SC_IW9", "1": 4490, "2": 107},
|
|
{"N": "IP3_SC_IW10", "1": 5437, "2": 107},
|
|
{"N": "IP3_SC_IW11", "1": 9244, "2": 107},
|
|
{"N": "IP3_SC_IW12", "1": 7886, "2": 107},
|
|
{"N": "IP3_SC_IW13", "1": -2962, "2": 107},
|
|
{"N": "IP3_SC_IW14", "1": -159, "2": 107},
|
|
{"N": "IP3_SC_IW26", "1": 15, "2": 107},
|
|
{"N": "IP3_SC_IW27", "1": 15, "2": 107},
|
|
],
|
|
}
|
|
|
|
|
|
@auto_log("亚控存库")
|
|
def insert_mplogx_from_king_mqtt(data: dict, is_offset=True):
|
|
"""
|
|
从king mqtt数据插入超表
|
|
注释的代码是分批存的, 但是实际上不需要, 暂时注释,有需要再加
|
|
"""
|
|
objs = data["Objs"]
|
|
# len_objs = len(objs)
|
|
pvs = data["PVs"]
|
|
|
|
# chunk_size = 200
|
|
# num_chunks = (len(objs) + chunk_size - 1) // chunk_size
|
|
|
|
otime_obj = timezone.make_aware(datetime.strptime(pvs["2"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0) # 只保留到秒级的精度
|
|
oval = pvs["1"]
|
|
insert_mplogx_from_king_mqtt_chunk(objs, oval, otime_obj, is_offset)
|
|
# with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
|
|
# futures = []
|
|
# for i in range(num_chunks):
|
|
# start = i * chunk_size
|
|
# end = min(start + chunk_size, len_objs)
|
|
# chunk = objs[start:end]
|
|
# futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, otime_obj, is_offset))
|
|
# concurrent.futures.wait(futures)
|
|
# for future in futures:
|
|
# print(future.result(), end=', ')
|
|
|
|
def insert_mplogx_item(code, val, timex, enp_mpoints_dict):
|
|
"""
|
|
存入超表
|
|
"""
|
|
cache_key = Mpoint.cache_key(code)
|
|
mpoint_data = get_mpoint_cache(code)
|
|
if mpoint_data is None or not mpoint_data['enabled']:
|
|
return
|
|
val_type = mpoint_data["val_type"]
|
|
mpoint_interval = mpoint_data["interval"]
|
|
mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None)
|
|
mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False)
|
|
mpoint_ep_rs_val = mpoint_data.get("ep_rs_val", None)
|
|
mpoint_ep_rs_expr = mpoint_data.get("ep_rs_expr", None)
|
|
# 控制采集间隔
|
|
can_save = False
|
|
if mpoint_last_timex:
|
|
if (timex - mpoint_last_timex).total_seconds() > mpoint_interval:
|
|
can_save = True
|
|
else:
|
|
can_save = True
|
|
if can_save:
|
|
save_dict = {
|
|
"timex": timex,
|
|
"mpoint": Mpoint.objects.get(id=mpoint_data["id"]),
|
|
}
|
|
save_dict[f"val_{val_type}"] = val
|
|
val_mrs = None
|
|
try:
|
|
if mpoint_is_rep_ep_running_state:
|
|
val_mrs = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_rs_val, mpoint_ep_rs_expr)
|
|
save_dict["val_mrs"] = val_mrs
|
|
MpLogx.objects.create(**save_dict)
|
|
update_mpoint_cache(cache_key, mpoint_data, timex, val, val_mrs)
|
|
|
|
# 此处代码用于更新envdata表里的数据
|
|
enp_field = mpoint_data.get("enp_field", None)
|
|
ep_monitored = mpoint_data.get("ep_monitored", None)
|
|
if enp_field and ep_monitored:
|
|
if enp_mpoints_dict.get(ep_monitored, None) is None:
|
|
enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)}
|
|
if enp_field == "running_state":
|
|
enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs})
|
|
else:
|
|
enp_mpoints_dict[ep_monitored].update({enp_field: val})
|
|
enp_mpoints_dict[ep_monitored].update({enp_field: val})
|
|
except IntegrityError: # 忽略唯一性错误
|
|
pass
|
|
except Exception:
|
|
myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True)
|
|
mpoint_data['gather_state'] = -1
|
|
cache.set(cache_key, mpoint_data, timeout=None)
|
|
|
|
|
|
def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True):
|
|
"""
|
|
分批存库, 亚控 38.00,00000.11011 版本偏移只是时间戳偏移。另外其实可以不在乎
|
|
"""
|
|
# oval = pvs['1']
|
|
enp_mpoints_dict = {} # 这个地方主要是需要更新envdata表里的数据
|
|
for obj in objs:
|
|
n = obj["N"]
|
|
val = obj.get("1", oval)
|
|
# timex = obj.get("2", None)
|
|
code = f"K_{n}"
|
|
insert_mplogx_item(code, val, otime_obj, enp_mpoints_dict)
|
|
|
|
# # 先尝试批量存库/发生异常则单个存储
|
|
# is_bulk_insert = True
|
|
# try:
|
|
# MpLogx.objects.bulk_create(insert_db_data)
|
|
# except IntegrityError:
|
|
# is_bulk_insert = False
|
|
|
|
# if is_bulk_insert:
|
|
# # 批量存库成功后更新缓存
|
|
# for item in update_cache_data:
|
|
# need_gather_filed_mpoints.append(update_mpoint_cache_and_do_func(item))
|
|
# else:
|
|
# {'eq_belong': {'dust_rtd': 1.0}}
|
|
|
|
# 额外需要处理的数据(存储到envdata表里)
|
|
if enp_mpoints_dict:
|
|
for _, item in enp_mpoints_dict:
|
|
EnvData.objects(**item)
|
|
|
|
def insert_mplogx_from_king_rest_chunk(objs: list):
|
|
enp_mpoints_dict = {}
|
|
for obj in objs:
|
|
n = obj["N"]
|
|
code = f"K_{n}"
|
|
timex = timezone.make_aware(datetime.strptime(obj["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0)
|
|
insert_mplogx_item(code, obj["V"], timex, enp_mpoints_dict)
|
|
|
|
if enp_mpoints_dict:
|
|
for _, item in enp_mpoints_dict.items():
|
|
EnvData.objects(**item) |