diff --git a/apps/em/services.py b/apps/em/services.py index 4e5b0cd7..3fb06801 100644 --- a/apps/em/services.py +++ b/apps/em/services.py @@ -13,13 +13,13 @@ myLogger = logging.getLogger("log") def get_eq_rs(equipId: str): """获取设备运行状态缓存 """ - return cache.get(f"equipment_{equipId}", {"running_state": Equipment.OFFLINE, "running_state_timex": datetime.datetime(year=1990, month=4, day=4, hour=0, minute=0, second=0, tzinfo=tz.gettz(settings.TIME_ZONE))}) + return cache.get(f"equipment_{equipId}", {"update_num":0, "running_state": Equipment.OFFLINE, "running_state_timex": datetime.datetime(year=1990, month=4, day=4, hour=0, minute=0, second=0, tzinfo=tz.gettz(settings.TIME_ZONE))}) def set_eq_rs(equipId, last_timex: datetime, last_mrs: int): """更新设备运行状态(包括缓存和数据库) """ if last_mrs == Equipment.OFFLINE: # 如果是离线直接更新 - cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) # 更新缓存 + cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex, "update_num":0}, timeout=None) # 更新缓存 Equipment.objects.filter(id=equipId).update(running_state=last_mrs) # 更新数据库 else: eq_rs_cache = get_eq_rs(equipId) @@ -27,8 +27,11 @@ def set_eq_rs(equipId, last_timex: datetime, last_mrs: int): eq_rs_change = False if eq_rs_cache["running_state"] != last_mrs: # 如果状态变动了要调用方法否则只需更新缓存 eq_rs_change = True + elif eq_rs_cache.get('update_num', 0) >= 20: # 如果状态没有变动但是更新次数大于20次也要调用方法 + eq_rs_change = True - cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) # 更新缓存 + update_num = eq_rs_cache.get('update_num', 0) + 1 + cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex, "update_num":update_num}, timeout=None) # 更新缓存 if eq_rs_change: ctask_run.delay("apps.em.services.shutdown_or_startup", equipId, last_timex, last_mrs) diff --git a/apps/enm/services.py b/apps/enm/services.py index fb07f1f1..494fd695 100644 --- a/apps/enm/services.py +++ b/apps/enm/services.py @@ -192,7 +192,10 @@ class MpointCache: return save_dict = {"timex": last_timex, "mpoint": Mpoint.objects.get(id=current_cache_val["id"]), "val_mrs": last_mrs} save_dict[f"val_{current_cache_val['val_type']}"] = last_val - MpLogx.objects.create(**save_dict) + try: + MpLogx.objects.create(**save_dict) + except IntegrityError: + pass # 下面开始更新设备信号 ep_belong_id = current_cache_val.get("ep_belong") @@ -330,7 +333,6 @@ def insert_mplogx_item(code: str, val, timex: datetime, enp_mpoints_dict): mpoint_interval = mpoint_data["interval"] mpoint_last_timex = mpoint_data.get("last_data", {}).get("last_timex", None) - # 控制采集间隔 can_save = False if mpoint_last_timex: diff --git a/apps/enm/tasks.py b/apps/enm/tasks.py index 45b15df2..ac6e204e 100644 --- a/apps/enm/tasks.py +++ b/apps/enm/tasks.py @@ -29,7 +29,6 @@ from apps.enm.services import insert_mplogx_item from django.utils.timezone import make_aware from apps.utils.thread import MyThread from django.core.cache import cache - myLogger = logging.getLogger("log") @@ -42,6 +41,11 @@ def get_current_and_previous_time(): def db_insert_mplogx_batch(rows): for row in rows: _, tag_val, tag_code, tag_update = row + # if '散装' in tag_code or '袋装' in tag_code: + # myLogger.info(f"db_ins_mplogx tag_val: {tag_val} tag_code: {tag_code}-------{str(type(tag_val))}") + # mpoint = Mpoint.objects.get(code=tag_code) + # tag_val = float(tag_val) + # myLogger.info(f"db_ins_mpoint_id: {mpoint.id}-------db_ins_float_val: {tag_val}") insert_mplogx_item(tag_code, tag_val, make_aware(tag_update), {}) @shared_task(base=CustomTask) @@ -77,16 +81,11 @@ def db_ins_mplogx(): bill_date = config['enm1'].get('bill_date', None) if bill_date is None: raise Exception("bill_date is None") - # cursor.execute("select count(id) from sa_weigh_view where bill_date > %s", (bill_date)) - # count = cursor.fetchone()[0] - # if count > 400: - # raise Exception("db inset count > 400") - # materials_name = ['水泥+P.C42.5 袋装', '水泥+P.O42.5R 袋装', '水泥+P.O42.5 散装','水泥+P.O42.5 袋装', '水泥+P.O52.5 散装', '水泥+P.C42.5 散装', '水泥+P.O42.5R 散装'] query = """ SELECT id, de_real_quantity, CONCAT('x', inv_name) AS inv_name, bill_date FROM sa_weigh_view WHERE bill_date > %s - ORDER BY id, bill_date + ORDER BY bill_date """ cursor.execute(query, (bill_date,)) rows = cursor.fetchall() # 获取数据后保存至本地 @@ -854,7 +853,7 @@ def king_insert_mplogx(): def check_mpoint_offline(seconds=100): """监测测点采集掉线""" now = localtime() - for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO)| Mpoint.objects.filter(enabled=True, type=Mpoint.MT_COMPUTE, is_rep_ep_running_state=True): + for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, is_unit=False)| Mpoint.objects.filter(enabled=True, type=Mpoint.MT_COMPUTE, is_rep_ep_running_state=True, is_unit=False): mc = MpointCache(mpoint.code) mpoint_data = mc.data last_data = mpoint_data.get('last_data', None)