fix : apps/em /enm 更改缓存存储
This commit is contained in:
parent
e2c44f06c4
commit
169a31da5a
|
@ -13,13 +13,13 @@ myLogger = logging.getLogger("log")
|
|||
def get_eq_rs(equipId: str):
|
||||
"""获取设备运行状态缓存
|
||||
"""
|
||||
return cache.get(f"equipment_{equipId}", {"running_state": Equipment.OFFLINE, "running_state_timex": datetime.datetime(year=1990, month=4, day=4, hour=0, minute=0, second=0, tzinfo=tz.gettz(settings.TIME_ZONE))})
|
||||
return cache.get(f"equipment_{equipId}", {"update_num":0, "running_state": Equipment.OFFLINE, "running_state_timex": datetime.datetime(year=1990, month=4, day=4, hour=0, minute=0, second=0, tzinfo=tz.gettz(settings.TIME_ZONE))})
|
||||
|
||||
def set_eq_rs(equipId, last_timex: datetime, last_mrs: int):
|
||||
"""更新设备运行状态(包括缓存和数据库)
|
||||
"""
|
||||
if last_mrs == Equipment.OFFLINE: # 如果是离线直接更新
|
||||
cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) # 更新缓存
|
||||
cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex, "update_num":0}, timeout=None) # 更新缓存
|
||||
Equipment.objects.filter(id=equipId).update(running_state=last_mrs) # 更新数据库
|
||||
else:
|
||||
eq_rs_cache = get_eq_rs(equipId)
|
||||
|
@ -27,8 +27,11 @@ def set_eq_rs(equipId, last_timex: datetime, last_mrs: int):
|
|||
eq_rs_change = False
|
||||
if eq_rs_cache["running_state"] != last_mrs: # 如果状态变动了要调用方法否则只需更新缓存
|
||||
eq_rs_change = True
|
||||
elif eq_rs_cache.get('update_num', 0) >= 20: # 如果状态没有变动但是更新次数大于20次也要调用方法
|
||||
eq_rs_change = True
|
||||
|
||||
cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) # 更新缓存
|
||||
update_num = eq_rs_cache.get('update_num', 0) + 1
|
||||
cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex, "update_num":update_num}, timeout=None) # 更新缓存
|
||||
if eq_rs_change:
|
||||
ctask_run.delay("apps.em.services.shutdown_or_startup", equipId, last_timex, last_mrs)
|
||||
|
||||
|
|
|
@ -192,7 +192,10 @@ class MpointCache:
|
|||
return
|
||||
save_dict = {"timex": last_timex, "mpoint": Mpoint.objects.get(id=current_cache_val["id"]), "val_mrs": last_mrs}
|
||||
save_dict[f"val_{current_cache_val['val_type']}"] = last_val
|
||||
try:
|
||||
MpLogx.objects.create(**save_dict)
|
||||
except IntegrityError:
|
||||
pass
|
||||
|
||||
# 下面开始更新设备信号
|
||||
ep_belong_id = current_cache_val.get("ep_belong")
|
||||
|
@ -330,7 +333,6 @@ def insert_mplogx_item(code: str, val, timex: datetime, enp_mpoints_dict):
|
|||
|
||||
mpoint_interval = mpoint_data["interval"]
|
||||
mpoint_last_timex = mpoint_data.get("last_data", {}).get("last_timex", None)
|
||||
|
||||
# 控制采集间隔
|
||||
can_save = False
|
||||
if mpoint_last_timex:
|
||||
|
|
|
@ -29,7 +29,6 @@ from apps.enm.services import insert_mplogx_item
|
|||
from django.utils.timezone import make_aware
|
||||
from apps.utils.thread import MyThread
|
||||
from django.core.cache import cache
|
||||
|
||||
myLogger = logging.getLogger("log")
|
||||
|
||||
|
||||
|
@ -42,6 +41,11 @@ def get_current_and_previous_time():
|
|||
def db_insert_mplogx_batch(rows):
|
||||
for row in rows:
|
||||
_, tag_val, tag_code, tag_update = row
|
||||
# if '散装' in tag_code or '袋装' in tag_code:
|
||||
# myLogger.info(f"db_ins_mplogx tag_val: {tag_val} tag_code: {tag_code}-------{str(type(tag_val))}")
|
||||
# mpoint = Mpoint.objects.get(code=tag_code)
|
||||
# tag_val = float(tag_val)
|
||||
# myLogger.info(f"db_ins_mpoint_id: {mpoint.id}-------db_ins_float_val: {tag_val}")
|
||||
insert_mplogx_item(tag_code, tag_val, make_aware(tag_update), {})
|
||||
|
||||
@shared_task(base=CustomTask)
|
||||
|
@ -77,16 +81,11 @@ def db_ins_mplogx():
|
|||
bill_date = config['enm1'].get('bill_date', None)
|
||||
if bill_date is None:
|
||||
raise Exception("bill_date is None")
|
||||
# cursor.execute("select count(id) from sa_weigh_view where bill_date > %s", (bill_date))
|
||||
# count = cursor.fetchone()[0]
|
||||
# if count > 400:
|
||||
# raise Exception("db inset count > 400")
|
||||
# materials_name = ['水泥+P.C42.5 袋装', '水泥+P.O42.5R 袋装', '水泥+P.O42.5 散装','水泥+P.O42.5 袋装', '水泥+P.O52.5 散装', '水泥+P.C42.5 散装', '水泥+P.O42.5R 散装']
|
||||
query = """
|
||||
SELECT id, de_real_quantity, CONCAT('x', inv_name) AS inv_name, bill_date
|
||||
FROM sa_weigh_view
|
||||
WHERE bill_date > %s
|
||||
ORDER BY id, bill_date
|
||||
ORDER BY bill_date
|
||||
"""
|
||||
cursor.execute(query, (bill_date,))
|
||||
rows = cursor.fetchall() # 获取数据后保存至本地
|
||||
|
@ -854,7 +853,7 @@ def king_insert_mplogx():
|
|||
def check_mpoint_offline(seconds=100):
|
||||
"""监测测点采集掉线"""
|
||||
now = localtime()
|
||||
for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO)| Mpoint.objects.filter(enabled=True, type=Mpoint.MT_COMPUTE, is_rep_ep_running_state=True):
|
||||
for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, is_unit=False)| Mpoint.objects.filter(enabled=True, type=Mpoint.MT_COMPUTE, is_rep_ep_running_state=True, is_unit=False):
|
||||
mc = MpointCache(mpoint.code)
|
||||
mpoint_data = mc.data
|
||||
last_data = mpoint_data.get('last_data', None)
|
||||
|
|
Loading…
Reference in New Issue