from apps.enm.models import Mpoint, MpointStat, MpLogx import re import logging from django.db.models import Q from django.utils import timezone from django.core.cache import cache from datetime import datetime, timedelta from apps.utils.decorators import auto_log from django.db import IntegrityError from .serializers import MpointSerializer from apps.enp.models import EnvData from apps.em.models import Equipment from apps.em.services import set_eq_rs from server.settings import get_sysconfig from apps.enm.models import EnStat from django.db.models import Sum from typing import Dict, Any myLogger = logging.getLogger("log") def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int): """ 传入 """ pattern = r"\{(.*?)}" matches = re.findall(pattern, exp_str) for match in matches: if match in ["enm_lhxs"]: exp_str = exp_str.replace(f"{{{match}}}", str(get_sysconfig(f"enm.{match}"))) else: mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(mpoint__code=match), type="hour", year=year, month=month, day=day, hour=hour).first() if mpst: exp_str = exp_str.replace(f"{{{match}}}", str(mpst.val)) rval = eval(exp_str) return rval def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_str: str): """ 将测点值转换所监测设备的运行状态值 base_expr: 三元表达式 """ if expr_str: # 优先使用表达式 pattern = r"\{(.*?)}" matches = re.findall(pattern, expr_str) for match in matches: if match == "self": expr_str = expr_str.replace("{self}", str(current_val)) else: match_val = 50 mpoint_data = MpointCache(match).data if mpoint_data and mpoint_data.get("gather_state", -2) == 0: # 测点正常采集 match_val = mpoint_data["last_data"]["last_mrs"] expr_str = expr_str.replace(f"{{{match}}}", str(match_val)) rval = eval(expr_str) return rval if isinstance(current_val, bool): if current_val: return Equipment.RUNING return Equipment.STOP rs = Equipment.RUNING if base_val: if current_val < base_val: rs = Equipment.STOP else: if current_val == 0: rs = Equipment.STOP return rs # def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21): # """ # 根据给定的小时数, 计算出班天 # """ # if hour <= hour_split: # return year, month, day # else: # now = datetime.datetime(year, month, day, hour) # now2 = now + datetime.timedelta(days=1) # return now2.year, now2.month, now2.day # cal_rule = { # "电石渣": { # "total_production": 0.4, # "elec_consume_unit": 0.4, # "production_cost_unit": 0.2 # }, # "原料磨":{ # "production_hour": 0.3, # "elec_consume_unit": 0.3, # "production_cost_unit": 0.1, # "辅料_细度":0.05, # "辅料_水分":0.04, # "干混生料_CaO":0.04 # } # } # def cal_team_score(data): # """ # 计算月度绩效 # """ # qua_rate = {} # month_s = data['month_s'] # for item in data['qua_data']: # qua_rate[f'{item["material_name"]}_{item["testitem_name"]}'] = item["rate_pass"] # goal_dict = get_mgroup_goals(data['mgroup'], data['year_s'], False) # goal_data = {} # try: # rule = cal_rule[data['mgroup_name']] # score = 0 # for key in rule: # new_key = f'{key}_{month_s}' # goal_data[new_key] = goal_dict[new_key] # if '-' in key: # score = score + qua_rate.get(key, 0)/goal_data[new_key]*rule[key] # else: # score = score + data.get(key)/goal_data[new_key]*rule[key] # print(score) # # enstat.goal_data = goal_data # # enstat.score =score # # enstat.save(update_fields=['goal_data', 'score']) # except: # print(traceback.format_exc()) # return goal_data, score class MpointCache: """测点缓存类""" def __init__(self, code: str): self.code = code self.cache_key = Mpoint.cache_key(code) self.data = self.get() def get(self, force_update=False, update_mplogx=True): key = self.cache_key code = self.code mpoint_data = cache.get(key, None) if mpoint_data is None or force_update: try: mpoint = Mpoint.objects.get(code=code) except Exception: return None mpoint_data = MpointSerializer(instance=mpoint).data mpoint_data["last_data"] = {"last_val": None, "last_timex": None, "last_mrs": None} # 初始化 if update_mplogx: now = timezone.now() last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first() if last_mplogx: # 核心数据 mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex} cache.set(key, mpoint_data, timeout=None) return mpoint_data def set_fail(self, reason: int = -1, timex: datetime = datetime.now()): """ -1 存库失败 -2 掉线 """ self.data["gather_state"] = reason cache.set(self.cache_key, self.data, timeout=None) if reason == -2: is_rep_ep_running_state = self.data.get("is_rep_ep_running_state", False) ep_monitored_id = self.data.get("ep_monitored") ep_belong_id = self.data.get("ep_belong") if ep_monitored_id and is_rep_ep_running_state and ep_belong_id != ep_monitored_id: set_eq_rs(ep_monitored_id, timex, Equipment.OFFLINE) if ep_belong_id: set_eq_rs(ep_belong_id, timex, Equipment.OFFLINE) def set(self, last_timex: datetime, last_val): current_cache_val = self.data cache_key = self.cache_key last_data = current_cache_val["last_data"] last_data["last_val"] = last_val last_data["last_timex"] = last_timex last_mrs = None # 设备状态信号 mpoint_is_rep_ep_running_state = current_cache_val["is_rep_ep_running_state"] if mpoint_is_rep_ep_running_state: mpoint_ep_rs_val = current_cache_val.get("ep_rs_val", None) mpoint_ep_rs_expr = current_cache_val.get("ep_rs_expr", None) last_mrs = transfer_mpoint_val_to_ep_running_state(last_val, mpoint_ep_rs_val, mpoint_ep_rs_expr) last_data["last_mrs"] = last_mrs current_cache_val["gather_state"] = 0 # 表明数据已更新 cache.set(cache_key, current_cache_val, timeout=None) # 存库 save_dict = {"timex": last_timex, "mpoint": Mpoint.objects.get(id=current_cache_val["id"]), "val_mrs": last_mrs} save_dict[f"val_{current_cache_val['val_type']}"] = last_val MpLogx.objects.create(**save_dict) # 下面开始更新设备信号 ep_belong_id = current_cache_val.get("ep_belong") ep_monitored_id = current_cache_val.get("ep_monitored") if ep_monitored_id and mpoint_is_rep_ep_running_state and ep_belong_id != ep_monitored_id: set_eq_rs(ep_monitored_id, last_timex, last_mrs) if ep_belong_id: set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING) mf_code = current_cache_val.get("mpoint_affect") if mf_code: # 如果该测点影响到另一个测点,要同步更新另一个测点 mc = MpointCache(mf_code) mf_data = mc.data # 只有自采测点才可影响计算测点只针对开关信号 if mf_data and current_cache_val["type"] == Mpoint.MT_AUTO and mf_data["type"] == Mpoint.MT_COMPUTE and mf_data["is_rep_ep_running_state"] and mf_data["ep_rs_expr"]: mc.set(last_timex, None) def king_sync(projectName: str, json_path: str = ""): """ 同步亚控测点 """ if json_path: import os import json from django.conf import settings with open(os.path.join(settings.BASE_DIR, json_path), "r", encoding="utf-8") as f: res = json.loads(f.read()) else: from apps.third.king.k import kingClient from apps.third.king.king_api import kapis _, res = kingClient.request(**kapis["read_variables"], params={"projectInstanceName": projectName}) t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"} for index, item in enumerate(res["objectList"]): if "t" in item and item["t"] and "SystemTag" not in item["d"]: code = f'K_{item["n"]}' item["from"] = "king" group = item["g"] name = item["d"] if group: name = f"{group}.{name}" ins, created = Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "type": Mpoint.MT_AUTO, "val_type": t_dict[item["t"]], "third_info": item}) if not created and ins.val_type != t_dict[item["t"]]: # 如果数据类型变了要同步更新 ins.val_type = t_dict[item["t"]] ins.third_info = item ins.save(update_fields=["val_type", "third_info"]) test_data = { "PNs": {"1": "V", "2": "T", "3": "Q"}, "PVs": {"1": -725, "2": "2024-04-08 13:43:53.140", "3": 192}, "Objs": [ {"N": "IP2_MM_IW3"}, {"N": "IP2_MM_IW4", "1": -6386}, {"N": "IP2_MM_IW5", "1": -7835}, {"N": "IP2_MM_IW7", "1": -6864}, {"N": "IP2_MM_IW15", "1": 29}, {"N": "IP2_MM_SC_IW3", "1": 337}, {"N": "IP2_MM_SC_IW13", "1": -5511, "2": 24}, {"N": "IP2_MM_SC_IW14", "1": -4640, "2": 24}, {"N": "IP2_MM_SC_IW15", "1": -5586, "2": 24}, {"N": "IP2_MM_SC_IW16", "1": -6634, "2": 24}, {"N": "IP2_MM_SC_IW17", "1": -2768, "2": 24}, {"N": "IP2_MM_SC_IW18", "1": -2946, "2": 24}, {"N": "IP2_MM_SC_IW19", "1": -2550, "2": 24}, {"N": "IP2_MM_SC_IW20", "1": -1512, "2": 24}, {"N": "IP2_MM_SC_IW21", "1": -1775, "2": 24}, {"N": "IP2_MM_SC_IW22", "1": -194, "2": 24}, {"N": "IP2_MM_SC_IW23", "1": -366, "2": 24}, {"N": "IP2_MM_SC_IW24", "1": -9291, "2": 24}, {"N": "IP2_MM_SC_IW25", "1": -6888, "2": 24}, {"N": "IP2_MM_SC_IW26", "1": -2360, "2": 24}, {"N": "IP2_MM_SC_IW29", "1": 164, "2": 24}, {"N": "IP2_MM_SC_IW30", "1": 914, "2": 24}, {"N": "IP2_MM_SC_IW31", "1": 849, "2": 24}, {"N": "IP2_MM_SC_IW37", "1": -125, "2": 24}, {"N": "IP2_MM_SC_IW38", "1": -3009, "2": 24}, {"N": "IP2_MM_SC_IW40", "1": -1394, "2": 24}, {"N": "IP2_MM_SC_IW43", "1": 758, "2": 24}, {"N": "IP3_SC_IW1", "1": 11557, "2": 107}, {"N": "IP3_SC_IW2", "1": 7624, "2": 107}, {"N": "IP3_SC_IW6", "1": 11159, "2": 107}, {"N": "IP3_SC_IW7", "1": 8073, "2": 107}, {"N": "IP3_SC_IW9", "1": 4490, "2": 107}, {"N": "IP3_SC_IW10", "1": 5437, "2": 107}, {"N": "IP3_SC_IW11", "1": 9244, "2": 107}, {"N": "IP3_SC_IW12", "1": 7886, "2": 107}, {"N": "IP3_SC_IW13", "1": -2962, "2": 107}, {"N": "IP3_SC_IW14", "1": -159, "2": 107}, {"N": "IP3_SC_IW26", "1": 15, "2": 107}, {"N": "IP3_SC_IW27", "1": 15, "2": 107}, ], } @auto_log("亚控存库") def insert_mplogx_from_king_mqtt(data: dict, is_offset=True): """ 从king mqtt数据插入超表 注释的代码是分批存的, 但是实际上不需要, 暂时注释,有需要再加 """ objs = data["Objs"] # len_objs = len(objs) pvs = data["PVs"] # chunk_size = 200 # num_chunks = (len(objs) + chunk_size - 1) // chunk_size otime_obj = timezone.make_aware(datetime.strptime(pvs["2"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0) # 只保留到秒级的精度 oval = pvs["1"] insert_mplogx_from_king_mqtt_chunk(objs, oval, otime_obj, is_offset) # with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: # futures = [] # for i in range(num_chunks): # start = i * chunk_size # end = min(start + chunk_size, len_objs) # chunk = objs[start:end] # futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, otime_obj, is_offset)) # concurrent.futures.wait(futures) # for future in futures: # print(future.result(), end=', ') def insert_mplogx_item(code, val, timex, enp_mpoints_dict): """ 存入超表 """ mc = MpointCache(code) mpoint_data = mc.data if mpoint_data is None or not mpoint_data["enabled"]: return mpoint_interval = mpoint_data["interval"] mpoint_last_timex = mpoint_data.get("last_data", {}).get("last_timex", None) # 控制采集间隔 can_save = False if mpoint_last_timex: if (timex - mpoint_last_timex).total_seconds() > mpoint_interval: can_save = True else: can_save = True if can_save: try: MpointCache(code).set(timex, val) # 此处代码用于更新envdata表里的数据 enp_field = mpoint_data.get("enp_field", None) ep_monitored = mpoint_data.get("ep_monitored", None) if enp_field and ep_monitored: if enp_mpoints_dict.get(ep_monitored, None) is None: enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)} if enp_field == "running_state": enp_mpoints_dict[ep_monitored].update({enp_field: mpoint_data["last_data"]["last_mrs"]}) else: enp_mpoints_dict[ep_monitored].update({enp_field: val}) enp_mpoints_dict[ep_monitored].update({enp_field: val}) except IntegrityError: # 忽略唯一性错误 pass except Exception: myLogger.error(f"mpoint_code: {code} 存库失败", exc_info=True) mc.set_fail(-1, timex) def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True): """ 分批存库, 亚控 38.00,00000.11011 版本偏移只是时间戳偏移。另外其实可以不在乎 """ # oval = pvs['1'] enp_mpoints_dict = {} # 这个地方主要是需要更新envdata表里的数据 for obj in objs: n = obj["N"] val = obj.get("1", oval) # timex = obj.get("2", None) code = f"K_{n}" insert_mplogx_item(code, val, otime_obj, enp_mpoints_dict) # # 先尝试批量存库/发生异常则单个存储 # is_bulk_insert = True # try: # MpLogx.objects.bulk_create(insert_db_data) # except IntegrityError: # is_bulk_insert = False # if is_bulk_insert: # # 批量存库成功后更新缓存 # for item in update_cache_data: # need_gather_filed_mpoints.append(update_mpoint_cache_and_do_func(item)) # else: # {'eq_belong': {'dust_rtd': 1.0}} # 额外需要处理的数据(存储到envdata表里) if enp_mpoints_dict: for _, item in enp_mpoints_dict: EnvData.objects(**item) def insert_mplogx_from_king_rest_chunk(objs: list): enp_mpoints_dict = {} for obj in objs: n = obj["N"] code = f"K_{n}" timex = timezone.make_aware(datetime.strptime(obj["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0) insert_mplogx_item(code, obj["V"], timex, enp_mpoints_dict) if enp_mpoints_dict: for _, item in enp_mpoints_dict.items(): try: EnvData.objects.create(**item) except IntegrityError as e: # 忽略唯一性错误 myLogger.error(e, exc_info=True) def get_analyse_data_mgroups_duration(start_date: datetime, end_date: datetime) -> Dict[str, Any]: """ 获取一段时间范围的工段分析数据 """ start_year, start_month, start_day = start_date.year, start_date.month, start_date.day end_year, end_month, end_day = end_date.year, end_date.month, end_date.day total_sec = (end_date - start_date).total_seconds() + 3600 * 24 qs = ( EnStat.objects.filter(mgroup__cate="section") .filter(Q(year_s__gt=start_year) | Q(year_s=start_year, month_s__gt=start_month) | Q(year_s=start_year, month_s=start_month, day_s__gte=start_day)) .filter(Q(year_s__lt=end_year) | Q(year_s=end_year, month_s__lt=end_month) | Q(year_s=end_year, month_s=end_month, day_s__lte=end_day)) ) res = ( qs.values("mgroup", "mgroup__name") .annotate(total_production=Sum("total_production"), run_sec=Sum("run_sec"), elec_consume=Sum("elec_consume"), pcoal_coal_consume=Sum("pcoal_coal_consume")) .order_by("mgroup__sort") ) res_dict = {} for item in res: res_dict[item["mgroup__name"]] = item for key, value in item.items(): if value is None: item[key] = 0 for item in res: item["mgroup_name"] = item["mgroup__name"] item["production_hour"] = round(item["total_production"] * 3600 / item["run_sec"], 2) if item["run_sec"] > 0 else 0 item["elec_consume_unit"] = round(item["elec_consume"] / item["total_production"], 2) if item["total_production"] > 0 else 0 item["run_hour"] = round(item["run_sec"] / 3600, 2) if item["run_sec"] > 0 else 0 item["run_rate"] = round(item["run_sec"] * 100 / total_sec, 4) if total_sec > 0 else 0 item["coal_consume_unit"] = round(item["pcoal_coal_consume"] * 1000 / item["total_production"], 2) if item["total_production"] > 0 else 0 if item["mgroup_name"] == "回转窑": total_production_ylm = res_dict.get("原料磨", {}).get("total_production", 0) elec_consume_ylm = res_dict.get("原料磨", {}).get("elec_consume", 0) elec_consume_mm = res_dict.get("煤磨", {}).get("elec_consume", 0) if item["total_production"] == 0: item["celec_consume_unit"] = 0 item["en_consume_unit"] = 0 else: item["celec_consume_unit"] = ((elec_consume_mm + item["elec_consume"]) / item["total_production"] + get_sysconfig("enm.enm_lhxs") * elec_consume_ylm / total_production_ylm, 2) item["en_consume_unit"] = item["coal_consume_unit"] + 0.1229 * item["elec_consume_unit"] return res