diff --git a/apps/enm/services.py b/apps/enm/services.py index 0175e4e3..6693e35c 100644 --- a/apps/enm/services.py +++ b/apps/enm/services.py @@ -1,13 +1,9 @@ -from apps.enm.models import Mpoint, MpointStat, EnStat, MpLogx +from apps.enm.models import Mpoint, MpointStat, MpLogx import re import logging -import traceback -from apps.mtm.services import get_mgroup_goals from django.db.models import Q from django.utils import timezone from django.core.cache import cache -import concurrent.futures -from django.db import connection, transaction from datetime import datetime, timedelta from apps.utils.decorators import auto_log from django.db import IntegrityError @@ -44,9 +40,9 @@ def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_s if match == 'self': expr_str = expr_str.replace(f"${{{match}}}", str(current_val)) else: - mpoint_data = get_mpoint_cache(match) + mpoint_data = MpointCache(match).data if mpoint_data: - expr_str = expr_str.replace(f"{{{match}}}", str(mpoint_data['last_data']['last_val'])) + expr_str = expr_str.replace(f"{{{match}}}", str(mpoint_data['last_data']['last_val'])) # 这个last_val可能不存在 rval = eval(expr_str) return rval if isinstance(current_val, bool): @@ -118,48 +114,81 @@ def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_s # print(traceback.format_exc()) # return goal_data, score - -def get_mpoint_cache(code: str, force_update=False, update_mplogx=True): +class MpointCache: + """测点缓存类 """ - 获取或更新mpoint的缓存数据 - 返回空代表无该测点 - """ - key = Mpoint.cache_key(code) - mpoint_data = cache.get(key, None) - if mpoint_data is None or force_update: - try: - mpoint = Mpoint.objects.get(code=code) - except Exception: - return None - mpoint_data = MpointSerializer(instance=mpoint).data - if update_mplogx: - now = timezone.now() - last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first() - if last_mplogx: # 核心数据 - mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex} - cache.set(key, mpoint_data, timeout=None) - return mpoint_data + def __init__(self, code: str): + self.code = code + self.cache_key = f'mpoint_{code}' + self.data = self.get() + + def get(self, force_update=False, update_mplogx=True): + key = self.cache_key + code = self.code + mpoint_data = cache.get(key, None) + if mpoint_data is None or force_update: + try: + mpoint = Mpoint.objects.get(code=code) + except Exception: + return None + mpoint_data = MpointSerializer(instance=mpoint).data + mpoint_data['last_data'] = {'last_val': None, 'last_timex': None} + if update_mplogx: + now = timezone.now() + last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first() + if last_mplogx: # 核心数据 + mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex} + cache.set(key, mpoint_data, timeout=None) + return mpoint_data + def set_fail(self, reason: int = -1): + """ + -1 存库失败 -2 掉线 + """ + self.data['gather_state'] = reason + cache.set(self.cache_key, self.data, timeout=None) -def update_mpoint_cache(cache_key: str, current_cache_val: dict, last_timex: datetime, last_val, last_mrs): - """ - 更新mpoint的缓存数据并执行某些操作 - last_mrs: 所监测的设备运行状态值可不传 - """ - ep_belong_id = current_cache_val.get("ep_belong", None) - ep_monitored_id = current_cache_val.get("ep_monitored", False) - last_data = current_cache_val["last_data"] - current_cache_val['gather_state'] = 0 - last_data["last_val"] = last_val - last_data["last_mrs"] = last_mrs - last_data["last_timex"] = last_timex - cache.set(cache_key, current_cache_val, timeout=None) - # 更新设备状态值 - if ep_belong_id: - set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING) - if ep_monitored_id: - set_eq_rs(ep_monitored_id, last_timex, last_mrs) + def set(self, last_timex: datetime, last_val): + current_cache_val = self.data + cache_key = self.cache_key + last_data = current_cache_val["last_data"] + last_data["last_val"] = last_val + last_data["last_timex"] = last_timex + last_mrs = None # 设备状态信号 + mpoint_is_rep_ep_running_state = current_cache_val['is_rep_ep_running_state'] + if mpoint_is_rep_ep_running_state: + mpoint_ep_rs_val = current_cache_val.get("ep_rs_val", None) + mpoint_ep_rs_expr = current_cache_val.get("ep_rs_expr", None) + last_mrs = transfer_mpoint_val_to_ep_running_state(last_val, mpoint_ep_rs_val, mpoint_ep_rs_expr) + last_data["last_mrs"] = last_mrs + current_cache_val['gather_state'] = 0 # 表明数据已更新 + cache.set(cache_key, current_cache_val, timeout=None) + # 存库 + save_dict = { + "timex": last_timex, + "mpoint": Mpoint.objects.get(id=current_cache_val["id"]), + "val_mrs": last_mrs + } + save_dict[f"val_{current_cache_val['val_type']}"] = last_val + MpLogx.objects.create(**save_dict) + + mf_code = current_cache_val.get('mpoint_affect') + if mf_code: # 如果该测点影响到另一个测点,要同步更新另一个测点 + mc = MpointCache(mf_code) + mf_data = mc.data + # 只有自采测点才可影响计算测点只针对开关信号 + if mf_data and current_cache_val['type'] == Mpoint.MT_AUTO and mf_data['type'] == Mpoint.MT_COMPUTE and mf_data['is_rep_ep_running_state'] and mf_data['ep_rs_expr']: + mc.set(last_timex, None) + + # 下面开始更新设备信号 + ep_belong_id = current_cache_val.get("ep_belong") + ep_monitored_id = current_cache_val.get("ep_monitored") + # 更新设备状态值 + if ep_belong_id: + set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING) + if ep_monitored_id and mpoint_is_rep_ep_running_state: + set_eq_rs(ep_monitored_id, last_timex, last_mrs) def king_sync(projectName: str, json_path: str = ""): """ @@ -271,16 +300,14 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict): """ 存入超表 """ - cache_key = Mpoint.cache_key(code) - mpoint_data = get_mpoint_cache(code) + mc = MpointCache(code) + mpoint_data = mc.data if mpoint_data is None or not mpoint_data['enabled']: return - val_type = mpoint_data["val_type"] + mpoint_interval = mpoint_data["interval"] mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None) - mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False) - mpoint_ep_rs_val = mpoint_data.get("ep_rs_val", None) - mpoint_ep_rs_expr = mpoint_data.get("ep_rs_expr", None) + # 控制采集间隔 can_save = False if mpoint_last_timex: @@ -289,18 +316,8 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict): else: can_save = True if can_save: - save_dict = { - "timex": timex, - "mpoint": Mpoint.objects.get(id=mpoint_data["id"]), - } - save_dict[f"val_{val_type}"] = val - val_mrs = None try: - if mpoint_is_rep_ep_running_state: - val_mrs = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_rs_val, mpoint_ep_rs_expr) - save_dict["val_mrs"] = val_mrs - MpLogx.objects.create(**save_dict) - update_mpoint_cache(cache_key, mpoint_data, timex, val, val_mrs) + MpointCache(code).set(timex, val) # 此处代码用于更新envdata表里的数据 enp_field = mpoint_data.get("enp_field", None) @@ -309,16 +326,15 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict): if enp_mpoints_dict.get(ep_monitored, None) is None: enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)} if enp_field == "running_state": - enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs}) + enp_mpoints_dict[ep_monitored].update({enp_field: mpoint_data['last_data']['last_mrs']}) else: enp_mpoints_dict[ep_monitored].update({enp_field: val}) enp_mpoints_dict[ep_monitored].update({enp_field: val}) except IntegrityError: # 忽略唯一性错误 pass except Exception: - myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True) - mpoint_data['gather_state'] = -1 - cache.set(cache_key, mpoint_data, timeout=None) + myLogger.error(f"mpoint_code: {code} 存库失败", exc_info=True) + mc.set_fail(-1) def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True): diff --git a/apps/enm/tasks.py b/apps/enm/tasks.py index ad51fba2..18c7fa5c 100644 --- a/apps/enm/tasks.py +++ b/apps/enm/tasks.py @@ -21,7 +21,7 @@ from django.utils import timezone from django.db.models import Max from apps.third.king.k import kingClient from apps.third.king.king_api import kapis -from apps.enm.services import insert_mplogx_from_king_rest_chunk, get_mpoint_cache +from apps.enm.services import insert_mplogx_from_king_rest_chunk, MpointCache from django.core.cache import cache from django.utils.timezone import localtime @@ -741,13 +741,12 @@ def check_mpoint_offline(seconds=300): """监测测点采集掉线""" now = localtime() for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO): - cache_key = Mpoint.cache_key(mpoint.code) - mpoint_data = get_mpoint_cache(mpoint.id) + mc = MpointCache(mpoint.code) + mpoint_data = mc.data last_data = mpoint_data.get('last_data', None) is_offline = True if last_data and last_data['last_timex'] and (now-last_data['last_timex']).total_seconds() < seconds: is_offline = False if is_offline: - mpoint_data['gather_state'] = -2 # 掉线了 - cache.set(cache_key, mpoint_data, timeout=None) + mc.set_fail(-2) \ No newline at end of file diff --git a/apps/enm/views.py b/apps/enm/views.py index 8c2766c7..20322fb2 100644 --- a/apps/enm/views.py +++ b/apps/enm/views.py @@ -11,7 +11,7 @@ from rest_framework.response import Response from rest_framework.serializers import Serializer from rest_framework.decorators import action from apps.enm.tasks import cal_mpointstats_duration -from apps.enm.services import king_sync, get_mpoint_cache +from apps.enm.services import king_sync, MpointCache from django.db import transaction @@ -33,13 +33,13 @@ class MpointViewSet(CustomModelViewSet): def perform_create(self, serializer): instance = serializer.save() if instance.code: - get_mpoint_cache(instance.code, True) + MpointCache(instance.code).get(True) @transaction.atomic def perform_update(self, serializer): instance = serializer.save() if instance.code: - get_mpoint_cache(instance.code, True) + MpointCache(instance.code).get(True) @action(methods=["post"], detail=False, perms_map={"post": "mpoint.create"}, serializer_class=Serializer) def king_sync(self, request, *args, **kwargs):