feat: ctask_run delay执行及相应优化

This commit is contained in:
caoqianming 2024-04-24 23:54:35 +08:00
parent c834f996f5
commit 53b1ceafa0
3 changed files with 25 additions and 16 deletions

View File

@ -2,7 +2,19 @@ from .models import Equipment
from apps.system.models import Dept from apps.system.models import Dept
from apps.mtm.models import Mgroup from apps.mtm.models import Mgroup
import datetime import datetime
from django.core.cache import cache
def set_equip_rs(equipId: str, last_timex: datetime, last_mrs):
"""更新设备运行状态缓存
"""
cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) # 更新缓存
def get_equip_rs(equipId):
"""返回设备运行状态缓存
{"running_state": None, "running_state_timex": None}
"""
return cache.get(f"equipment_{equipId}", {"running_state": None, "running_state_timex": None})
def shutdown_or_startup(equipId: str, last_timex: datetime, last_mrs): def shutdown_or_startup(equipId: str, last_timex: datetime, last_mrs):
""" """
@ -15,6 +27,7 @@ def shutdown_or_startup(equipId: str, last_timex: datetime, last_mrs):
equip = Equipment.objects.get(id=equipId) equip = Equipment.objects.get(id=equipId)
equip.running_state = last_mrs equip.running_state = last_mrs
equip.save(update_fields=["running_state"]) equip.save(update_fields=["running_state"])
update_equip_rs(equipId, last_timex, last_mrs)
mgroup: Mgroup = equip.mgroup mgroup: Mgroup = equip.mgroup
indicate = equip.indicate_mgroup_running indicate = equip.indicate_mgroup_running

View File

@ -8,7 +8,7 @@ from django.core.cache import cache
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
def check_equipment_offline(seconds=20): def check_equipment_offline(seconds=30):
"""监测设备是否掉线(根据测点采集) """监测设备是否掉线(根据测点采集)
监测设备是否掉线 监测设备是否掉线

View File

@ -15,6 +15,7 @@ from apps.utils.tasks import ctask_run
from .serializers import MpointSerializer from .serializers import MpointSerializer
from apps.enp.models import EnvData from apps.enp.models import EnvData
from apps.em.models import Equipment from apps.em.models import Equipment
from apps.em.services import set_equip_rs, get_equip_rs
myLogger = logging.getLogger("log") myLogger = logging.getLogger("log")
@ -120,26 +121,21 @@ def update_mpoint_cache(cache_key: str, current_cache_val: dict, last_timex: dat
ep_belong_id = current_cache_val.get("ep_belong", None) ep_belong_id = current_cache_val.get("ep_belong", None)
ep_monitored_id = current_cache_val.get("ep_monitored", False) ep_monitored_id = current_cache_val.get("ep_monitored", False)
last_data = current_cache_val["last_data"] last_data = current_cache_val["last_data"]
last_data["pre_val"] = last_data.get("last_val", None)
pre_timex = last_data.get("last_timex", None)
last_data["pre_timex"] = pre_timex
last_data["pre_mrs"] = last_data.get("last_mrs", None)
last_data["last_val"] = last_val last_data["last_val"] = last_val
if last_mrs: last_data["last_mrs"] = last_mrs
last_data["last_mrs"] = last_mrs
last_data["last_timex"] = last_timex last_data["last_timex"] = last_timex
cache.set(cache_key, current_cache_val, timeout=None) cache.set(cache_key, current_cache_val, timeout=None)
# 更新设备状态缓存值 # 更新设备状态缓存值
if ep_belong_id: if ep_belong_id:
cache.set(f"equipment_{ep_belong_id}", {"running_state": Equipment.RUNING, "running_state_timex": last_timex}, timeout=None) if get_equip_rs(ep_belong_id)["running_state"] != Equipment.RUNING: # 如果状态变动了要调用方法否则只需更新缓存
ctask_run.delay("apps.em.services.shutdown_or_startup", ep_belong_id, last_timex, Equipment.RUNING)
else:
set_equip_rs(ep_belong_id, last_timex, Equipment.RUNING)
if ep_monitored_id: if ep_monitored_id:
cache.set(f"equipment_{ep_monitored_id}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) if get_equip_rs(ep_monitored_id)["running_state"] != last_mrs:
# 如果state变动则触发函数 ctask_run.delay("apps.em.services.shutdown_or_startup", ep_monitored_id, last_timex, last_mrs)
if last_data["pre_mrs"] != last_mrs or pre_timex is None or (timezone.now()-pre_timex).total_seconds()>30: else:
if ep_belong_id: set_equip_rs(ep_monitored_id, last_timex, last_mrs)
ctask_run("apps.em.services.shutdown_or_startup", ep_belong_id, last_timex, Equipment.RUNING)
if ep_monitored_id:
ctask_run("apps.em.services.shutdown_or_startup", ep_monitored_id, last_timex, last_mrs)
def transfer_mpoint_val_to_ep_running_state(current_val, base_val1: float): def transfer_mpoint_val_to_ep_running_state(current_val, base_val1: float):
@ -289,7 +285,7 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict):
"mpoint": Mpoint.objects.get(id=mpoint_data["id"]), "mpoint": Mpoint.objects.get(id=mpoint_data["id"]),
} }
save_dict[f"val_{val_type}"] = val save_dict[f"val_{val_type}"] = val
val_mrs = Equipment.RUNING val_mrs = None
if mpoint_is_rep_ep_running_state: if mpoint_is_rep_ep_running_state:
val_mrs = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1) val_mrs = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1)
save_dict["val_mrs"] = val_mrs save_dict["val_mrs"] = val_mrs