diff --git a/apps/enm/services.py b/apps/enm/services.py index dc0ae42d..c89aa38c 100644 --- a/apps/enm/services.py +++ b/apps/enm/services.py @@ -262,6 +262,52 @@ def insert_mplogx_from_king_mqtt(data: dict, is_offset=True): # for future in futures: # print(future.result(), end=', ') +def insert_mplogx_item(code, val, timex, enp_mpoints_dict): + """ + 存入超表 + """ + cache_key = Mpoint.cache_key(code) + mpoint_data = get_mpoint_cache(code) + if mpoint_data is None or not mpoint_data['enabled']: + return + val_type = mpoint_data["val_type"] + mpoint_interval = mpoint_data["interval"] + mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None) + mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False) + mpoint_ep_base_val1 = mpoint_data.get("ep_base_val1", None) + # 控制采集间隔 + can_save = False + if mpoint_last_timex: + if (timex - mpoint_last_timex).total_seconds() > mpoint_interval: + can_save = True + else: + can_save = True + if can_save: + save_dict = { + "timex": timex, + "mpoint": Mpoint.objects.get(id=mpoint_data["id"]), + } + save_dict[f"val_{val_type}"] = val + val_mrs = Equipment.RUNING + if mpoint_is_rep_ep_running_state: + save_dict["val_mrs"] = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1) + try: + MpLogx.objects.create(**save_dict) + update_mpoint_cache(cache_key, mpoint_data, timex, val, val_mrs) + + # 此处代码用于更新envdata表里的数据 + enp_field = mpoint_data.get("enp_field", None) + ep_monitored = mpoint_data.get("ep_monitored", None) + if enp_field and ep_monitored: + if enp_mpoints_dict.get(ep_monitored, None) is None: + enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)} + if enp_field == "running_state": + enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs}) + else: + enp_mpoints_dict[ep_monitored].update({enp_field: val}) + enp_mpoints_dict[ep_monitored].update({enp_field: val}) + except Exception: + myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True) def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True): """ @@ -274,56 +320,7 @@ def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is val = obj.get("1", oval) # timex = obj.get("2", None) code = f"K_{n}" - cache_key = Mpoint.cache_key(code) - mpoint_data = get_mpoint_cache(code) - if mpoint_data is None or not mpoint_data['enabled']: - continue - val_type = mpoint_data["val_type"] - # if is_offset: - # if timex is None: - # timex = otime_obj - # else: - # timex = otime_obj + timedelta(mgilliseconds=timex) - # else: - # timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f')) - mpoint_interval = mpoint_data["interval"] - mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None) - mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False) - mpoint_ep_base_val1 = mpoint_data.get("ep_base_val1", None) - # 控制采集间隔 - can_save = False - if mpoint_last_timex: - if (otime_obj - mpoint_last_timex).total_seconds() > mpoint_interval: - can_save = True - else: - can_save = True - if can_save: - save_dict = { - "timex": otime_obj, - "mpoint": Mpoint.objects.get(id=mpoint_data["id"]), - } - save_dict[f"val_{val_type}"] = val - val_mrs = Equipment.RUNING - if mpoint_is_rep_ep_running_state: - save_dict["val_mrs"] = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1) - try: - MpLogx.objects.create(**save_dict) - update_mpoint_cache(cache_key, mpoint_data, otime_obj, val, val_mrs) - - # 此处代码用于更新envdata表里的数据 - enp_field = mpoint_data.get("enp_field", None) - ep_monitored = mpoint_data.get("ep_monitored", None) - if enp_field and ep_monitored: - if enp_mpoints_dict.get(ep_monitored, None) is None: - enp_mpoints_dict[ep_monitored] = {"timex": otime_obj, "equipment": Equipment.objects.get(id=ep_monitored)} - if enp_field == "running_state": - enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs}) - else: - enp_mpoints_dict[ep_monitored].update({enp_field: val}) - enp_mpoints_dict[ep_monitored].update({enp_field: val}) - except Exception: - myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True) - continue + insert_mplogx_item(code, val, otime_obj, enp_mpoints_dict) # # 先尝试批量存库/发生异常则单个存储 # is_bulk_insert = True @@ -343,3 +340,16 @@ def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is if enp_mpoints_dict: for _, item in enp_mpoints_dict: EnvData.objects(**item) + +def insert_mplogx_from_king_rest_chunk(objs: list): + if objs: + timex = timezone.make_aware(datetime.strptime(objs[0]["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0) + enp_mpoints_dict = {} + for obj in objs: + n = obj["N"] + code = f"K_{n}" + insert_mplogx_item(code, obj["V"], timex, enp_mpoints_dict) + + if enp_mpoints_dict: + for _, item in enp_mpoints_dict: + EnvData.objects(**item) diff --git a/apps/enm/tasks.py b/apps/enm/tasks.py index 873811e0..fcdae7c4 100644 --- a/apps/enm/tasks.py +++ b/apps/enm/tasks.py @@ -1,13 +1,9 @@ # Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask -from celery import shared_task, group, chain -from apps.utils.sql import DbConnection -from server.settings import get_sysconfig, update_sysconfig -import importlib -from django.core.cache import cache +from celery import shared_task from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2 -from apps.wpm.models import SfLog, StLog +from apps.wpm.models import SfLog import datetime from django.db.models import Sum, Avg from dateutil import tz @@ -16,14 +12,15 @@ from apps.wpm.services import get_sflog from apps.mtm.models import Mgroup, Material from apps.fim.services import get_cost_unit, get_price_unit from apps.fim.models import Fee -from django.core.cache import cache from apps.enm.services import translate_eval_formula import logging from django.db.models import F from apps.wpm.services import get_pcoal_heat -import traceback from django.utils import timezone from django.db.models import Max +from apps.third.king.k import kingClient +from apps.third.king.king_api import kapis +from apps.enm.services import insert_mplogx_from_king_rest_chunk myLogger = logging.getLogger("log") @@ -694,3 +691,13 @@ def enm_alarm(year_s: int, month_s: int, day_s: int): event.save() Eventdo.objects.get_or_create(cate=event_cate, event=event, defaults={"cate": event_cate, "event": event}) notify_event(event) + + +@shared_task(base=CustomTask) +def king_insert_mplogx(): + mpoint_codes = Mpoint.objects.filter(enabled=True, is_auto=True, code__startswith='K_').values_list('code', flat=True) + ml = [] + for m in mpoint_codes: + ml.append({"N": m.replace('K_', '')}) + _, res = kingClient.request(**kapis['read_batchtagrealvalue'], json={"objs": ml}) + insert_mplogx_from_king_rest_chunk(res) diff --git a/apps/third/king/king_api.py b/apps/third/king/king_api.py index 9867311d..293334d9 100644 --- a/apps/third/king/king_api.py +++ b/apps/third/king/king_api.py @@ -29,7 +29,7 @@ kapis = { }, "read_batchtagrealvalue": { "url": "/api/v1/batchrealvalue", - "method": "get" + "method": "post" }, "read_devicestatus": { "url": "/api/v1/devicestatus",