346 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			346 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
| from apps.enm.models import Mpoint, MpointStat, EnStat, MpLogx
 | |
| import re
 | |
| import logging
 | |
| import traceback
 | |
| from apps.mtm.services import get_mgroup_goals
 | |
| from django.db.models import Q
 | |
| from django.utils import timezone
 | |
| from django.core.cache import cache
 | |
| import concurrent.futures
 | |
| from django.db import connection, transaction
 | |
| from datetime import datetime, timedelta
 | |
| from apps.utils.decorators import auto_log
 | |
| from django.db import IntegrityError
 | |
| from apps.utils.tasks import ctask_run
 | |
| from .serializers import MpointSerializer
 | |
| from apps.enp.models import EnvData
 | |
| from apps.em.models import Equipment, RuningState
 | |
| 
 | |
| myLogger = logging.getLogger("log")
 | |
| 
 | |
| 
 | |
| def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int):
 | |
|     """
 | |
|     传入
 | |
|     """
 | |
|     pattern = r"\${(.*?)}"
 | |
|     matches = re.findall(pattern, exp_str)
 | |
|     for match in matches:
 | |
|         mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(mpoint__code=match), type="hour", year=year, month=month, day=day, hour=hour).first()
 | |
|         if mpst:
 | |
|             exp_str = exp_str.replace(f"${{{match}}}", str(mpst.val))
 | |
|     rval = eval(exp_str)
 | |
|     return rval
 | |
| 
 | |
| 
 | |
| # def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
 | |
| #     """
 | |
| #     根据给定的小时数, 计算出班天
 | |
| #     """
 | |
| #     if hour <= hour_split:
 | |
| #         return year, month, day
 | |
| #     else:
 | |
| #         now = datetime.datetime(year, month, day, hour)
 | |
| #         now2 = now + datetime.timedelta(days=1)
 | |
| #         return now2.year, now2.month, now2.day
 | |
| 
 | |
| 
 | |
| # cal_rule = {
 | |
| #     "电石渣": {
 | |
| #         "total_production": 0.4,
 | |
| #         "elec_consume_unit": 0.4,
 | |
| #         "production_cost_unit": 0.2
 | |
| #     },
 | |
| #     "原料磨":{
 | |
| #         "production_hour": 0.3,
 | |
| #         "elec_consume_unit": 0.3,
 | |
| #         "production_cost_unit": 0.1,
 | |
| #         "辅料_细度":0.05,
 | |
| #         "辅料_水分":0.04,
 | |
| #         "干混生料_CaO":0.04
 | |
| #     }
 | |
| # }
 | |
| # def cal_team_score(data):
 | |
| #     """
 | |
| #     计算月度绩效
 | |
| #     """
 | |
| #     qua_rate = {}
 | |
| #     month_s = data['month_s']
 | |
| #     for item in data['qua_data']:
 | |
| #         qua_rate[f'{item["material_name"]}_{item["testitem_name"]}'] = item["rate_pass"]
 | |
| 
 | |
| #     goal_dict = get_mgroup_goals(data['mgroup'], data['year_s'], False)
 | |
| #     goal_data = {}
 | |
| #     try:
 | |
| #         rule = cal_rule[data['mgroup_name']]
 | |
| #         score = 0
 | |
| #         for key in rule:
 | |
| #             new_key = f'{key}_{month_s}'
 | |
| #             goal_data[new_key] = goal_dict[new_key]
 | |
| #             if '-' in key:
 | |
| #                 score = score + qua_rate.get(key, 0)/goal_data[new_key]*rule[key]
 | |
| #             else:
 | |
| #                 score = score + data.get(key)/goal_data[new_key]*rule[key]
 | |
| #                 print(score)
 | |
| #         # enstat.goal_data = goal_data
 | |
| #         # enstat.score =score
 | |
| #         # enstat.save(update_fields=['goal_data', 'score'])
 | |
| #     except:
 | |
| #         print(traceback.format_exc())
 | |
| #     return goal_data, score
 | |
| 
 | |
| 
 | |
| def get_mpoint_cache(code: str, force_update=False, update_mplogx=True):
 | |
|     """
 | |
|     获取或更新mpoint的缓存数据
 | |
|     返回空代表无该测点
 | |
|     """
 | |
|     key = Mpoint.cache_key(code)
 | |
|     mpoint_data = cache.get(key, None)
 | |
|     if mpoint_data is None or force_update:
 | |
|         try:
 | |
|             mpoint = Mpoint.objects.get(code=code)
 | |
|         except Exception:
 | |
|             return None
 | |
|         mpoint_data = MpointSerializer(instance=mpoint).data
 | |
|         if update_mplogx:
 | |
|             now = timezone.now()
 | |
|             last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first()
 | |
|             if last_mplogx:  # 核心数据
 | |
|                 mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex}
 | |
|         cache.set(key, mpoint_data, timeout=None)
 | |
|     return mpoint_data
 | |
| 
 | |
| 
 | |
| def update_mpoint_cache(cache_key: str, current_cache_val: dict, last_timex: datetime, last_val, last_mrs):
 | |
|     """
 | |
|     更新mpoint的缓存数据并执行某些操作
 | |
|     last_mrs: 所监测的设备运行状态值可不传
 | |
|     """
 | |
|     ep_belong_id = current_cache_val.get("ep_belong", None)
 | |
|     ep_monitored_id = current_cache_val.get("ep_monitored", False)
 | |
|     last_data = current_cache_val["last_data"]
 | |
|     last_data["pre_val"] = last_data.get("last_val", None)
 | |
|     last_data["pre_timex"] = last_data.get("last_timex", None)
 | |
|     last_data["pre_mrs"] = last_data.get("last_mrs", None)
 | |
|     last_data["last_val"] = last_val
 | |
|     if last_mrs:
 | |
|         last_data["last_mrs"] = last_mrs
 | |
|     last_data["last_timex"] = last_timex
 | |
|     cache.set(cache_key, current_cache_val, timeout=None)
 | |
|     # 更新设备状态缓存值
 | |
|     if ep_belong_id:
 | |
|         cache.set(f"equipment_{ep_belong_id}", {"running_state": RuningState.RUNING, "running_state_timex": last_timex}, timeout=None)
 | |
|     if ep_monitored_id:
 | |
|         cache.set(f"equipment_{ep_monitored_id}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None)
 | |
|     # 如果state变动则触发函数
 | |
|     if last_data["pre_mrs"] != last_mrs:
 | |
|         if ep_belong_id:
 | |
|             ctask_run("apps.em.services.shutdown_or_startup", ep_belong_id, last_timex, RuningState.RUNING)
 | |
|         if ep_monitored_id:
 | |
|             ctask_run("apps.em.services.shutdown_or_startup", ep_belong_id, last_timex, last_mrs)
 | |
| 
 | |
| 
 | |
| def transfer_mpoint_val_to_ep_running_state(current_val, base_val1: float):
 | |
|     """
 | |
|     将测点值转换所监测设备的运行状态值
 | |
|     """
 | |
|     if isinstance(current_val, bool):
 | |
|         if current_val:
 | |
|             return RuningState.RUNING
 | |
|         return RuningState.STOP
 | |
|     rs = RuningState.RUNING
 | |
|     if base_val1:
 | |
|         if current_val < base_val1:
 | |
|             rs = RuningState.STOP
 | |
|     else:
 | |
|         if current_val == 0:
 | |
|             rs = RuningState.STOP
 | |
|     return rs
 | |
| 
 | |
| 
 | |
| def king_sync(projectName: str, json_path: str = ""):
 | |
|     """
 | |
|     同步亚控测点
 | |
|     """
 | |
|     if json_path:
 | |
|         import os
 | |
|         import json
 | |
|         from django.conf import settings
 | |
| 
 | |
|         with open(os.path.join(settings.BASE_DIR, json_path), "r", encoding="utf-8") as f:
 | |
|             res = json.loads(f.read())
 | |
|     else:
 | |
|         from apps.third.king.k import kingClient
 | |
|         from apps.third.king import king_api
 | |
| 
 | |
|         _, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName})
 | |
| 
 | |
|     t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"}
 | |
|     for index, item in enumerate(res["objectList"]):
 | |
|         if "t" in item and item["t"] and "SystemTag" not in item["d"]:
 | |
|             code = f'K_{item["n"]}'
 | |
|             item["from"] = "king"
 | |
|             group = item["g"]
 | |
|             name = item["d"]
 | |
|             if group:
 | |
|                 name = f"{group}.{name}"
 | |
|             Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "is_auto": True, "val_type": t_dict[item["t"]], "third_info": item})
 | |
| 
 | |
| 
 | |
| test_data = {
 | |
|     "PNs": {"1": "V", "2": "T", "3": "Q"},
 | |
|     "PVs": {"1": -725, "2": "2024-04-08 13:43:53.140", "3": 192},
 | |
|     "Objs": [
 | |
|         {"N": "IP2_MM_IW3"},
 | |
|         {"N": "IP2_MM_IW4", "1": -6386},
 | |
|         {"N": "IP2_MM_IW5", "1": -7835},
 | |
|         {"N": "IP2_MM_IW7", "1": -6864},
 | |
|         {"N": "IP2_MM_IW15", "1": 29},
 | |
|         {"N": "IP2_MM_SC_IW3", "1": 337},
 | |
|         {"N": "IP2_MM_SC_IW13", "1": -5511, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW14", "1": -4640, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW15", "1": -5586, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW16", "1": -6634, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW17", "1": -2768, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW18", "1": -2946, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW19", "1": -2550, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW20", "1": -1512, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW21", "1": -1775, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW22", "1": -194, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW23", "1": -366, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW24", "1": -9291, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW25", "1": -6888, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW26", "1": -2360, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW29", "1": 164, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW30", "1": 914, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW31", "1": 849, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW37", "1": -125, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW38", "1": -3009, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW40", "1": -1394, "2": 24},
 | |
|         {"N": "IP2_MM_SC_IW43", "1": 758, "2": 24},
 | |
|         {"N": "IP3_SC_IW1", "1": 11557, "2": 107},
 | |
|         {"N": "IP3_SC_IW2", "1": 7624, "2": 107},
 | |
|         {"N": "IP3_SC_IW6", "1": 11159, "2": 107},
 | |
|         {"N": "IP3_SC_IW7", "1": 8073, "2": 107},
 | |
|         {"N": "IP3_SC_IW9", "1": 4490, "2": 107},
 | |
|         {"N": "IP3_SC_IW10", "1": 5437, "2": 107},
 | |
|         {"N": "IP3_SC_IW11", "1": 9244, "2": 107},
 | |
|         {"N": "IP3_SC_IW12", "1": 7886, "2": 107},
 | |
|         {"N": "IP3_SC_IW13", "1": -2962, "2": 107},
 | |
|         {"N": "IP3_SC_IW14", "1": -159, "2": 107},
 | |
|         {"N": "IP3_SC_IW26", "1": 15, "2": 107},
 | |
|         {"N": "IP3_SC_IW27", "1": 15, "2": 107},
 | |
|     ],
 | |
| }
 | |
| 
 | |
| 
 | |
| @auto_log("亚控存库")
 | |
| def insert_mplogx_from_king_mqtt(data: dict, is_offset=True):
 | |
|     """
 | |
|     从king mqtt数据插入超表
 | |
|     注释的代码是分批存的, 但是实际上不需要, 暂时注释,有需要再加
 | |
|     """
 | |
|     objs = data["Objs"]
 | |
|     # len_objs = len(objs)
 | |
|     pvs = data["PVs"]
 | |
| 
 | |
|     # chunk_size = 200
 | |
|     # num_chunks = (len(objs) + chunk_size - 1) // chunk_size
 | |
| 
 | |
|     otime_obj = timezone.make_aware(datetime.strptime(pvs["2"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0)  # 只保留到秒级的精度
 | |
| 
 | |
|     insert_mplogx_from_king_mqtt_chunk(objs, otime_obj, is_offset)
 | |
|     # with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
 | |
|     #     futures = []
 | |
|     #     for i in range(num_chunks):
 | |
|     #         start = i * chunk_size
 | |
|     #         end = min(start + chunk_size, len_objs)
 | |
|     #         chunk = objs[start:end]
 | |
|     #         futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, otime_obj, is_offset))
 | |
|     #     concurrent.futures.wait(futures)
 | |
|     # for future in futures:
 | |
|     #     print(future.result(), end=', ')
 | |
| 
 | |
| 
 | |
| def insert_mplogx_from_king_mqtt_chunk(objs: list, otime_obj: datetime, is_offset=True):
 | |
|     """
 | |
|     分批存库,  亚控 38.00,00000.11011 版本偏移只是时间戳偏移。另外其实可以不在乎
 | |
|     """
 | |
|     # oval = pvs['1']
 | |
|     enp_mpoints_dict = {}  # 这个地方主要是需要更新envdata表里的数据
 | |
|     for obj in objs:
 | |
|         n = obj["N"]
 | |
|         val = obj["1"]
 | |
|         # timex = obj.get("2", None)
 | |
|         code = f"K_{n}"
 | |
|         cache_key = Mpoint.cache_key(code)
 | |
|         mpoint_data = get_mpoint_cache(code)
 | |
|         if mpoint_data is None or not mpoint_data['enabled']:
 | |
|             continue
 | |
|         val_type = mpoint_data["val_type"]
 | |
|         # if is_offset:
 | |
|         #     if timex is None:
 | |
|         #         timex = otime_obj
 | |
|         #     else:
 | |
|         #         timex = otime_obj + timedelta(mgilliseconds=timex)
 | |
|         # else:
 | |
|         #     timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f'))
 | |
|         mpoint_interval = mpoint_data["interval"]
 | |
|         mpoint_last_timex = mpoint_data.get("last_timex", None)
 | |
|         mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False)
 | |
|         mpoint_ep_base_val1 = mpoint_data.get("ep_base_val1", None)
 | |
|         # 控制采集间隔
 | |
|         can_save = False
 | |
|         if mpoint_last_timex:
 | |
|             if (otime_obj - mpoint_last_timex).total_seconds() > mpoint_interval:
 | |
|                 can_save = True
 | |
|         else:
 | |
|             can_save = True
 | |
|         if can_save:
 | |
|             save_dict = {
 | |
|                 "timex": otime_obj,
 | |
|                 "mpoint": Mpoint.objects.get(id=mpoint_data["id"]),
 | |
|             }
 | |
|             save_dict[f"val_{val_type}"] = val
 | |
|             val_mrs = RuningState.RUNING
 | |
|             if mpoint_is_rep_ep_running_state:
 | |
|                 save_dict["val_mrs"] = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1)
 | |
|             try:
 | |
|                 MpLogx.objects.create(**save_dict)
 | |
|                 update_mpoint_cache(cache_key, mpoint_data, otime_obj, val, val_mrs)
 | |
| 
 | |
|                 # 此处代码用于更新envdata表里的数据
 | |
|                 enp_field = mpoint_data.get("enp_field", None)
 | |
|                 ep_monitored = mpoint_data.get("ep_monitored", None)
 | |
|                 if enp_field and ep_monitored:
 | |
|                     if enp_mpoints_dict.get(ep_monitored, None) is None:
 | |
|                         enp_mpoints_dict[ep_monitored] = {"timex": otime_obj, "equipment": Equipment.objects.get(id=ep_monitored)}
 | |
|                     if enp_field == "running_state":
 | |
|                         enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs})
 | |
|                     else:
 | |
|                         enp_mpoints_dict[ep_monitored].update({enp_field: val})
 | |
|                         enp_mpoints_dict[ep_monitored].update({enp_field: val})
 | |
|             except Exception:
 | |
|                 myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True)
 | |
|                 continue
 | |
| 
 | |
|     # # 先尝试批量存库/发生异常则单个存储
 | |
|     # is_bulk_insert = True
 | |
|     # try:
 | |
|     #     MpLogx.objects.bulk_create(insert_db_data)
 | |
|     # except IntegrityError:
 | |
|     #     is_bulk_insert = False
 | |
| 
 | |
|     # if is_bulk_insert:
 | |
|     #     # 批量存库成功后更新缓存
 | |
|     #     for item in update_cache_data:
 | |
|     #         need_gather_filed_mpoints.append(update_mpoint_cache_and_do_func(item))
 | |
|     # else:
 | |
|     # {'eq_belong': {'dust_rtd': 1.0}}
 | |
| 
 | |
|     # 额外需要处理的数据(存储到envdata表里)
 | |
|     if enp_mpoints_dict:
 | |
|         for _, item in enp_mpoints_dict:
 | |
|             EnvData.objects(**item)
 |