refector: 改mpoint的处理逻辑添加mpoint_affect支持并重构部分代码

This commit is contained in:
caoqianming 2024-05-06 00:06:08 +08:00
parent a46cfe59ed
commit 6cd255173f
3 changed files with 89 additions and 74 deletions

View File

@ -1,13 +1,9 @@
from apps.enm.models import Mpoint, MpointStat, EnStat, MpLogx from apps.enm.models import Mpoint, MpointStat, MpLogx
import re import re
import logging import logging
import traceback
from apps.mtm.services import get_mgroup_goals
from django.db.models import Q from django.db.models import Q
from django.utils import timezone from django.utils import timezone
from django.core.cache import cache from django.core.cache import cache
import concurrent.futures
from django.db import connection, transaction
from datetime import datetime, timedelta from datetime import datetime, timedelta
from apps.utils.decorators import auto_log from apps.utils.decorators import auto_log
from django.db import IntegrityError from django.db import IntegrityError
@ -44,9 +40,9 @@ def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_s
if match == 'self': if match == 'self':
expr_str = expr_str.replace(f"${{{match}}}", str(current_val)) expr_str = expr_str.replace(f"${{{match}}}", str(current_val))
else: else:
mpoint_data = get_mpoint_cache(match) mpoint_data = MpointCache(match).data
if mpoint_data: if mpoint_data:
expr_str = expr_str.replace(f"{{{match}}}", str(mpoint_data['last_data']['last_val'])) expr_str = expr_str.replace(f"{{{match}}}", str(mpoint_data['last_data']['last_val'])) # 这个last_val可能不存在
rval = eval(expr_str) rval = eval(expr_str)
return rval return rval
if isinstance(current_val, bool): if isinstance(current_val, bool):
@ -118,48 +114,81 @@ def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_s
# print(traceback.format_exc()) # print(traceback.format_exc())
# return goal_data, score # return goal_data, score
class MpointCache:
def get_mpoint_cache(code: str, force_update=False, update_mplogx=True): """测点缓存类
""" """
获取或更新mpoint的缓存数据 def __init__(self, code: str):
返回空代表无该测点 self.code = code
""" self.cache_key = f'mpoint_{code}'
key = Mpoint.cache_key(code) self.data = self.get()
mpoint_data = cache.get(key, None)
if mpoint_data is None or force_update: def get(self, force_update=False, update_mplogx=True):
try: key = self.cache_key
mpoint = Mpoint.objects.get(code=code) code = self.code
except Exception: mpoint_data = cache.get(key, None)
return None if mpoint_data is None or force_update:
mpoint_data = MpointSerializer(instance=mpoint).data try:
if update_mplogx: mpoint = Mpoint.objects.get(code=code)
now = timezone.now() except Exception:
last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first() return None
if last_mplogx: # 核心数据 mpoint_data = MpointSerializer(instance=mpoint).data
mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex} mpoint_data['last_data'] = {'last_val': None, 'last_timex': None}
cache.set(key, mpoint_data, timeout=None) if update_mplogx:
return mpoint_data now = timezone.now()
last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first()
if last_mplogx: # 核心数据
mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex}
cache.set(key, mpoint_data, timeout=None)
return mpoint_data
def set_fail(self, reason: int = -1):
"""
-1 存库失败 -2 掉线
"""
self.data['gather_state'] = reason
cache.set(self.cache_key, self.data, timeout=None)
def update_mpoint_cache(cache_key: str, current_cache_val: dict, last_timex: datetime, last_val, last_mrs): def set(self, last_timex: datetime, last_val):
""" current_cache_val = self.data
更新mpoint的缓存数据并执行某些操作 cache_key = self.cache_key
last_mrs: 所监测的设备运行状态值可不传 last_data = current_cache_val["last_data"]
""" last_data["last_val"] = last_val
ep_belong_id = current_cache_val.get("ep_belong", None) last_data["last_timex"] = last_timex
ep_monitored_id = current_cache_val.get("ep_monitored", False) last_mrs = None # 设备状态信号
last_data = current_cache_val["last_data"] mpoint_is_rep_ep_running_state = current_cache_val['is_rep_ep_running_state']
current_cache_val['gather_state'] = 0 if mpoint_is_rep_ep_running_state:
last_data["last_val"] = last_val mpoint_ep_rs_val = current_cache_val.get("ep_rs_val", None)
last_data["last_mrs"] = last_mrs mpoint_ep_rs_expr = current_cache_val.get("ep_rs_expr", None)
last_data["last_timex"] = last_timex last_mrs = transfer_mpoint_val_to_ep_running_state(last_val, mpoint_ep_rs_val, mpoint_ep_rs_expr)
cache.set(cache_key, current_cache_val, timeout=None) last_data["last_mrs"] = last_mrs
# 更新设备状态值 current_cache_val['gather_state'] = 0 # 表明数据已更新
if ep_belong_id: cache.set(cache_key, current_cache_val, timeout=None)
set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING)
if ep_monitored_id:
set_eq_rs(ep_monitored_id, last_timex, last_mrs)
# 存库
save_dict = {
"timex": last_timex,
"mpoint": Mpoint.objects.get(id=current_cache_val["id"]),
"val_mrs": last_mrs
}
save_dict[f"val_{current_cache_val['val_type']}"] = last_val
MpLogx.objects.create(**save_dict)
mf_code = current_cache_val.get('mpoint_affect')
if mf_code: # 如果该测点影响到另一个测点,要同步更新另一个测点
mc = MpointCache(mf_code)
mf_data = mc.data
# 只有自采测点才可影响计算测点只针对开关信号
if mf_data and current_cache_val['type'] == Mpoint.MT_AUTO and mf_data['type'] == Mpoint.MT_COMPUTE and mf_data['is_rep_ep_running_state'] and mf_data['ep_rs_expr']:
mc.set(last_timex, None)
# 下面开始更新设备信号
ep_belong_id = current_cache_val.get("ep_belong")
ep_monitored_id = current_cache_val.get("ep_monitored")
# 更新设备状态值
if ep_belong_id:
set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING)
if ep_monitored_id and mpoint_is_rep_ep_running_state:
set_eq_rs(ep_monitored_id, last_timex, last_mrs)
def king_sync(projectName: str, json_path: str = ""): def king_sync(projectName: str, json_path: str = ""):
""" """
@ -271,16 +300,14 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict):
""" """
存入超表 存入超表
""" """
cache_key = Mpoint.cache_key(code) mc = MpointCache(code)
mpoint_data = get_mpoint_cache(code) mpoint_data = mc.data
if mpoint_data is None or not mpoint_data['enabled']: if mpoint_data is None or not mpoint_data['enabled']:
return return
val_type = mpoint_data["val_type"]
mpoint_interval = mpoint_data["interval"] mpoint_interval = mpoint_data["interval"]
mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None) mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None)
mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False)
mpoint_ep_rs_val = mpoint_data.get("ep_rs_val", None)
mpoint_ep_rs_expr = mpoint_data.get("ep_rs_expr", None)
# 控制采集间隔 # 控制采集间隔
can_save = False can_save = False
if mpoint_last_timex: if mpoint_last_timex:
@ -289,18 +316,8 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict):
else: else:
can_save = True can_save = True
if can_save: if can_save:
save_dict = {
"timex": timex,
"mpoint": Mpoint.objects.get(id=mpoint_data["id"]),
}
save_dict[f"val_{val_type}"] = val
val_mrs = None
try: try:
if mpoint_is_rep_ep_running_state: MpointCache(code).set(timex, val)
val_mrs = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_rs_val, mpoint_ep_rs_expr)
save_dict["val_mrs"] = val_mrs
MpLogx.objects.create(**save_dict)
update_mpoint_cache(cache_key, mpoint_data, timex, val, val_mrs)
# 此处代码用于更新envdata表里的数据 # 此处代码用于更新envdata表里的数据
enp_field = mpoint_data.get("enp_field", None) enp_field = mpoint_data.get("enp_field", None)
@ -309,16 +326,15 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict):
if enp_mpoints_dict.get(ep_monitored, None) is None: if enp_mpoints_dict.get(ep_monitored, None) is None:
enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)} enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)}
if enp_field == "running_state": if enp_field == "running_state":
enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs}) enp_mpoints_dict[ep_monitored].update({enp_field: mpoint_data['last_data']['last_mrs']})
else: else:
enp_mpoints_dict[ep_monitored].update({enp_field: val}) enp_mpoints_dict[ep_monitored].update({enp_field: val})
enp_mpoints_dict[ep_monitored].update({enp_field: val}) enp_mpoints_dict[ep_monitored].update({enp_field: val})
except IntegrityError: # 忽略唯一性错误 except IntegrityError: # 忽略唯一性错误
pass pass
except Exception: except Exception:
myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True) myLogger.error(f"mpoint_code: {code} 存库失败", exc_info=True)
mpoint_data['gather_state'] = -1 mc.set_fail(-1)
cache.set(cache_key, mpoint_data, timeout=None)
def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True): def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True):

View File

@ -21,7 +21,7 @@ from django.utils import timezone
from django.db.models import Max from django.db.models import Max
from apps.third.king.k import kingClient from apps.third.king.k import kingClient
from apps.third.king.king_api import kapis from apps.third.king.king_api import kapis
from apps.enm.services import insert_mplogx_from_king_rest_chunk, get_mpoint_cache from apps.enm.services import insert_mplogx_from_king_rest_chunk, MpointCache
from django.core.cache import cache from django.core.cache import cache
from django.utils.timezone import localtime from django.utils.timezone import localtime
@ -741,13 +741,12 @@ def check_mpoint_offline(seconds=300):
"""监测测点采集掉线""" """监测测点采集掉线"""
now = localtime() now = localtime()
for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO): for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO):
cache_key = Mpoint.cache_key(mpoint.code) mc = MpointCache(mpoint.code)
mpoint_data = get_mpoint_cache(mpoint.id) mpoint_data = mc.data
last_data = mpoint_data.get('last_data', None) last_data = mpoint_data.get('last_data', None)
is_offline = True is_offline = True
if last_data and last_data['last_timex'] and (now-last_data['last_timex']).total_seconds() < seconds: if last_data and last_data['last_timex'] and (now-last_data['last_timex']).total_seconds() < seconds:
is_offline = False is_offline = False
if is_offline: if is_offline:
mpoint_data['gather_state'] = -2 # 掉线了 mc.set_fail(-2)
cache.set(cache_key, mpoint_data, timeout=None)

View File

@ -11,7 +11,7 @@ from rest_framework.response import Response
from rest_framework.serializers import Serializer from rest_framework.serializers import Serializer
from rest_framework.decorators import action from rest_framework.decorators import action
from apps.enm.tasks import cal_mpointstats_duration from apps.enm.tasks import cal_mpointstats_duration
from apps.enm.services import king_sync, get_mpoint_cache from apps.enm.services import king_sync, MpointCache
from django.db import transaction from django.db import transaction
@ -33,13 +33,13 @@ class MpointViewSet(CustomModelViewSet):
def perform_create(self, serializer): def perform_create(self, serializer):
instance = serializer.save() instance = serializer.save()
if instance.code: if instance.code:
get_mpoint_cache(instance.code, True) MpointCache(instance.code).get(True)
@transaction.atomic @transaction.atomic
def perform_update(self, serializer): def perform_update(self, serializer):
instance = serializer.save() instance = serializer.save()
if instance.code: if instance.code:
get_mpoint_cache(instance.code, True) MpointCache(instance.code).get(True)
@action(methods=["post"], detail=False, perms_map={"post": "mpoint.create"}, serializer_class=Serializer) @action(methods=["post"], detail=False, perms_map={"post": "mpoint.create"}, serializer_class=Serializer)
def king_sync(self, request, *args, **kwargs): def king_sync(self, request, *args, **kwargs):