feat: 通过restful获取测点值

This commit is contained in:
caoqianming 2024-04-24 17:57:31 +08:00
parent 53ce0ea0f7
commit 14bcf0833f
3 changed files with 76 additions and 59 deletions

View File

@ -262,6 +262,52 @@ def insert_mplogx_from_king_mqtt(data: dict, is_offset=True):
# for future in futures: # for future in futures:
# print(future.result(), end=', ') # print(future.result(), end=', ')
def insert_mplogx_item(code, val, timex, enp_mpoints_dict):
"""
存入超表
"""
cache_key = Mpoint.cache_key(code)
mpoint_data = get_mpoint_cache(code)
if mpoint_data is None or not mpoint_data['enabled']:
return
val_type = mpoint_data["val_type"]
mpoint_interval = mpoint_data["interval"]
mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None)
mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False)
mpoint_ep_base_val1 = mpoint_data.get("ep_base_val1", None)
# 控制采集间隔
can_save = False
if mpoint_last_timex:
if (timex - mpoint_last_timex).total_seconds() > mpoint_interval:
can_save = True
else:
can_save = True
if can_save:
save_dict = {
"timex": timex,
"mpoint": Mpoint.objects.get(id=mpoint_data["id"]),
}
save_dict[f"val_{val_type}"] = val
val_mrs = Equipment.RUNING
if mpoint_is_rep_ep_running_state:
save_dict["val_mrs"] = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1)
try:
MpLogx.objects.create(**save_dict)
update_mpoint_cache(cache_key, mpoint_data, timex, val, val_mrs)
# 此处代码用于更新envdata表里的数据
enp_field = mpoint_data.get("enp_field", None)
ep_monitored = mpoint_data.get("ep_monitored", None)
if enp_field and ep_monitored:
if enp_mpoints_dict.get(ep_monitored, None) is None:
enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)}
if enp_field == "running_state":
enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs})
else:
enp_mpoints_dict[ep_monitored].update({enp_field: val})
enp_mpoints_dict[ep_monitored].update({enp_field: val})
except Exception:
myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True)
def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True): def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True):
""" """
@ -274,56 +320,7 @@ def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is
val = obj.get("1", oval) val = obj.get("1", oval)
# timex = obj.get("2", None) # timex = obj.get("2", None)
code = f"K_{n}" code = f"K_{n}"
cache_key = Mpoint.cache_key(code) insert_mplogx_item(code, val, otime_obj, enp_mpoints_dict)
mpoint_data = get_mpoint_cache(code)
if mpoint_data is None or not mpoint_data['enabled']:
continue
val_type = mpoint_data["val_type"]
# if is_offset:
# if timex is None:
# timex = otime_obj
# else:
# timex = otime_obj + timedelta(mgilliseconds=timex)
# else:
# timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f'))
mpoint_interval = mpoint_data["interval"]
mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None)
mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False)
mpoint_ep_base_val1 = mpoint_data.get("ep_base_val1", None)
# 控制采集间隔
can_save = False
if mpoint_last_timex:
if (otime_obj - mpoint_last_timex).total_seconds() > mpoint_interval:
can_save = True
else:
can_save = True
if can_save:
save_dict = {
"timex": otime_obj,
"mpoint": Mpoint.objects.get(id=mpoint_data["id"]),
}
save_dict[f"val_{val_type}"] = val
val_mrs = Equipment.RUNING
if mpoint_is_rep_ep_running_state:
save_dict["val_mrs"] = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1)
try:
MpLogx.objects.create(**save_dict)
update_mpoint_cache(cache_key, mpoint_data, otime_obj, val, val_mrs)
# 此处代码用于更新envdata表里的数据
enp_field = mpoint_data.get("enp_field", None)
ep_monitored = mpoint_data.get("ep_monitored", None)
if enp_field and ep_monitored:
if enp_mpoints_dict.get(ep_monitored, None) is None:
enp_mpoints_dict[ep_monitored] = {"timex": otime_obj, "equipment": Equipment.objects.get(id=ep_monitored)}
if enp_field == "running_state":
enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs})
else:
enp_mpoints_dict[ep_monitored].update({enp_field: val})
enp_mpoints_dict[ep_monitored].update({enp_field: val})
except Exception:
myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True)
continue
# # 先尝试批量存库/发生异常则单个存储 # # 先尝试批量存库/发生异常则单个存储
# is_bulk_insert = True # is_bulk_insert = True
@ -343,3 +340,16 @@ def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is
if enp_mpoints_dict: if enp_mpoints_dict:
for _, item in enp_mpoints_dict: for _, item in enp_mpoints_dict:
EnvData.objects(**item) EnvData.objects(**item)
def insert_mplogx_from_king_rest_chunk(objs: list):
if objs:
timex = timezone.make_aware(datetime.strptime(objs[0]["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0)
enp_mpoints_dict = {}
for obj in objs:
n = obj["N"]
code = f"K_{n}"
insert_mplogx_item(code, obj["V"], timex, enp_mpoints_dict)
if enp_mpoints_dict:
for _, item in enp_mpoints_dict:
EnvData.objects(**item)

View File

@ -1,13 +1,9 @@
# Create your tasks here # Create your tasks here
from __future__ import absolute_import, unicode_literals from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask from apps.utils.tasks import CustomTask
from celery import shared_task, group, chain from celery import shared_task
from apps.utils.sql import DbConnection
from server.settings import get_sysconfig, update_sysconfig
import importlib
from django.core.cache import cache
from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2 from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2
from apps.wpm.models import SfLog, StLog from apps.wpm.models import SfLog
import datetime import datetime
from django.db.models import Sum, Avg from django.db.models import Sum, Avg
from dateutil import tz from dateutil import tz
@ -16,14 +12,15 @@ from apps.wpm.services import get_sflog
from apps.mtm.models import Mgroup, Material from apps.mtm.models import Mgroup, Material
from apps.fim.services import get_cost_unit, get_price_unit from apps.fim.services import get_cost_unit, get_price_unit
from apps.fim.models import Fee from apps.fim.models import Fee
from django.core.cache import cache
from apps.enm.services import translate_eval_formula from apps.enm.services import translate_eval_formula
import logging import logging
from django.db.models import F from django.db.models import F
from apps.wpm.services import get_pcoal_heat from apps.wpm.services import get_pcoal_heat
import traceback
from django.utils import timezone from django.utils import timezone
from django.db.models import Max from django.db.models import Max
from apps.third.king.k import kingClient
from apps.third.king.king_api import kapis
from apps.enm.services import insert_mplogx_from_king_rest_chunk
myLogger = logging.getLogger("log") myLogger = logging.getLogger("log")
@ -694,3 +691,13 @@ def enm_alarm(year_s: int, month_s: int, day_s: int):
event.save() event.save()
Eventdo.objects.get_or_create(cate=event_cate, event=event, defaults={"cate": event_cate, "event": event}) Eventdo.objects.get_or_create(cate=event_cate, event=event, defaults={"cate": event_cate, "event": event})
notify_event(event) notify_event(event)
@shared_task(base=CustomTask)
def king_insert_mplogx():
mpoint_codes = Mpoint.objects.filter(enabled=True, is_auto=True, code__startswith='K_').values_list('code', flat=True)
ml = []
for m in mpoint_codes:
ml.append({"N": m.replace('K_', '')})
_, res = kingClient.request(**kapis['read_batchtagrealvalue'], json={"objs": ml})
insert_mplogx_from_king_rest_chunk(res)

View File

@ -29,7 +29,7 @@ kapis = {
}, },
"read_batchtagrealvalue": { "read_batchtagrealvalue": {
"url": "/api/v1/batchrealvalue", "url": "/api/v1/batchrealvalue",
"method": "get" "method": "post"
}, },
"read_devicestatus": { "read_devicestatus": {
"url": "/api/v1/devicestatus", "url": "/api/v1/devicestatus",