feat: 优化亚控测点批量存库及相应enm逻辑更改

This commit is contained in:
caoqianming 2024-04-11 11:08:32 +08:00
parent 6fde78574b
commit 16417e0016
2 changed files with 153 additions and 95 deletions

View File

@ -1,18 +1,19 @@
from apps.enm.models import Mpoint, MpointStat, EnStat, MpLog, MpLogx from apps.enm.models import Mpoint, MpointStat, EnStat, MpLogx
import re import re
import logging
import traceback import traceback
from apps.mtm.services import get_mgroup_goals from apps.mtm.services import get_mgroup_goals
from django.db.models import Q from django.db.models import Q
import datetime
from django.utils import timezone from django.utils import timezone
from django.core.cache import cache from django.core.cache import cache
import concurrent.futures import concurrent.futures
from django.db import connection from django.db import connection
from django.utils import timezone
from datetime import datetime, timedelta from datetime import datetime, timedelta
import time
from apps.utils.decorators import auto_log from apps.utils.decorators import auto_log
from django.db import IntegrityError
from apps.utils.tasks import ctask_run
myLogger = logging.getLogger('log')
def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int): def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int):
""" """
@ -85,32 +86,35 @@ def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
# return goal_data, score # return goal_data, score
def shutdown_or_startup(mplog: MpLog): def shutdown_or_startup(mpointId: str, last_val, last_timex: datetime):
"""
last_val 可能是不同类型的值(bool int)
"""
from apps.wpm.models import StLog from apps.wpm.models import StLog
from apps.wpm.tasks import cal_exp_duration_hour from apps.wpm.tasks import cal_exp_duration_hour
from apps.wpm.services import get_sflog from apps.wpm.services import get_sflog
mpoint = mplog.mpoint mpoint = Mpoint.objects.get(id=mpointId)
mgroup = mpoint.mgroup mgroup = mpoint.mgroup
last_stlog = StLog.objects.filter( last_stlog = StLog.objects.filter(
mgroup=mgroup, is_shutdown=True).order_by('start_time').last() # 找到最后一次停机记录 mgroup=mgroup, is_shutdown=True).order_by('start_time').last() # 找到最后一次停机记录
if last_stlog: if last_stlog:
if mplog.tag_update >= last_stlog.start_time: # 认为是有效信号 if last_timex >= last_stlog.start_time: # 认为是有效信号
if last_stlog.end_time is None and mplog.tag_val == 1: # 从停到开 if last_stlog.end_time is None and last_val in (1, True): # 从停到开
last_stlog.end_time = mplog.tag_update last_stlog.end_time = last_timex
last_stlog.duration = ( last_stlog.duration = (
last_stlog.end_time - last_stlog.start_time).total_seconds()/3600 last_stlog.end_time - last_stlog.start_time).total_seconds()/3600
last_stlog.save() last_stlog.save()
mgroup.is_runing = True mgroup.is_runing = True
mgroup.save() mgroup.save()
cal_exp_duration_hour(last_stlog.id) # 触发时间分配 cal_exp_duration_hour(last_stlog.id) # 触发时间分配
elif last_stlog.end_time and mplog.tag_val == 0 and mplog.tag_update > last_stlog.end_time: # 从开到停 elif last_stlog.end_time and last_val in (0, False) and last_timex > last_stlog.end_time: # 从开到停
StLog.objects.create( StLog.objects.create(
title='停机', title='停机',
is_shutdown=True, is_shutdown=True,
mgroup=mgroup, mgroup=mgroup,
end_time=None, end_time=None,
start_time=mplog.tag_update, start_time=last_timex,
sflog=get_sflog(mgroup, mplog.tag_update) sflog=get_sflog(mgroup, last_timex)
) )
mgroup.is_runing = False mgroup.is_runing = False
mgroup.save() mgroup.save()
@ -120,27 +124,39 @@ def shutdown_or_startup(mplog: MpLog):
is_shutdown=True, is_shutdown=True,
mgroup=mgroup, mgroup=mgroup,
end_time=None, end_time=None,
start_time=mplog.tag_update, start_time=last_timex,
sflog=get_sflog(mgroup, mplog.tag_update)) sflog=get_sflog(mgroup, last_timex))
mgroup.is_runing = False mgroup.is_runing = False
mgroup.save() mgroup.save()
def king_sync(projectName: str): def king_sync(projectName: str, json_path: str=""):
""" """
同步亚控测点 同步亚控测点
""" """
from apps.third.king.k import kingClient if json_path:
from apps.third.king import king_api import os
_, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName}) import json
from django.conf import settings
with open(os.path.join(settings.BASE_DIR, json_path), 'r', encoding='utf-8') as f:
res = json.loads(f.read())
else:
from apps.third.king.k import kingClient
from apps.third.king import king_api
_, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName})
t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"} t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"}
for item in res['objectList']: for index, item in enumerate(res['objectList']):
if 't' in item and item['t']: if 't' in item and item['t'] and 'SystemTag' not in item['d']:
code = f'king_{item["n"]}' code = f'K_{item["n"]}'
item['from'] = 'king' item['from'] = 'king'
group = item['g']
name = item['d']
if group:
name = f'{group}.{name}'
Mpoint.objects.get_or_create(code=code, defaults={ Mpoint.objects.get_or_create(code=code, defaults={
"name": item["d"], "name": name,
"code": code, "code": code,
"enabled": False, "enabled": False,
"is_auto": True, "is_auto": True,
@ -148,12 +164,13 @@ def king_sync(projectName: str):
"third_info": item "third_info": item
}) })
@auto_log('缓存测点')
def cache_mpoints(mpointId: str = ''): def cache_mpoints(mpointId: str = ''):
""" """
缓存所有可用的测点 缓存所有可用的测点
""" """
if mpointId: if mpointId:
mpoints_data = Mpoint.objects.filter(id=mpointId).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") mpoints_data = Mpoint.objects.filter(id=mpointId, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
else: else:
mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
for item in list(mpoints_data): for item in list(mpoints_data):
@ -161,8 +178,9 @@ def cache_mpoints(mpointId: str = ''):
cache_mpoint = cache.get(key, {}) cache_mpoint = cache.get(key, {})
cache_mpoint.update(item) cache_mpoint.update(item)
cache.set(key, cache_mpoint, timeout=None) cache.set(key, cache_mpoint, timeout=None)
# print(cache.get('mpoint_K_IP2_MM_SC_IW14'))
test_data = {"PNs":{"1":"V","2":"T","3":"Q"},"PVs":{"1":-725,"2":"2024-04-08 13:43:53.140","3":192},"Objs":[{"N":"IP2_MM_IW3"},{"N":"IP2_MM_IW4","1":-6386},{"N":"IP2_MM_IW5","1":-7835},{"N":"IP2_MM_IW7","1":-6864},{"N":"IP2_MM_IW15","1":29},{"N":"IP2_MM_SC_IW3","1":337},{"N":"IP2_MM_SC_IW13","1":-5511,"2":24},{"N":"IP2_MM_SC_IW14","1":-4640,"2":24},{"N":"IP2_MM_SC_IW15","1":-5586,"2":24},{"N":"IP2_MM_SC_IW16","1":-6634,"2":24},{"N":"IP2_MM_SC_IW17","1":-2768,"2":24},{"N":"IP2_MM_SC_IW18","1":-2946,"2":24},{"N":"IP2_MM_SC_IW19","1":-2550,"2":24},{"N":"IP2_MM_SC_IW20","1":-1512,"2":24},{"N":"IP2_MM_SC_IW21","1":-1775,"2":24},{"N":"IP2_MM_SC_IW22","1":-194,"2":24},{"N":"IP2_MM_SC_IW23","1":-366,"2":24},{"N":"IP2_MM_SC_IW24","1":-9291,"2":24},{"N":"IP2_MM_SC_IW25","1":-6888,"2":24},{"N":"IP2_MM_SC_IW26","1":-2360,"2":24},{"N":"IP2_MM_SC_IW29","1":164,"2":24},{"N":"IP2_MM_SC_IW30","1":914,"2":24},{"N":"IP2_MM_SC_IW31","1":849,"2":24},{"N":"IP2_MM_SC_IW37","1":-125,"2":24},{"N":"IP2_MM_SC_IW38","1":-3009,"2":24},{"N":"IP2_MM_SC_IW40","1":-1394,"2":24},{"N":"IP2_MM_SC_IW43","1":758,"2":24},{"N":"IP3_SC_IW1","1":11557,"2":107},{"N":"IP3_SC_IW2","1":7624,"2":107},{"N":"IP3_SC_IW6","1":11159,"2":107},{"N":"IP3_SC_IW7","1":8073,"2":107},{"N":"IP3_SC_IW9","1":4490,"2":107},{"N":"IP3_SC_IW10","1":5437,"2":107},{"N":"IP3_SC_IW11","1":9244,"2":107},{"N":"IP3_SC_IW12","1":7886,"2":107},{"N":"IP3_SC_IW13","1":-2962,"2":107},{"N":"IP3_SC_IW14","1":-159,"2":107},{"N":"IP3_SC_IW26","1":15,"2":107},{"N":"IP3_SC_IW27","1":15,"2":107}]}
def insert_mplogx_from_king_realdata(data: dict, is_offset=True): def insert_mplogx_from_king_realdata(data: dict, is_offset=True):
""" """
从king mqtt数据插入超表 从king mqtt数据插入超表
@ -175,38 +193,36 @@ def insert_mplogx_from_king_realdata(data: dict, is_offset=True):
num_chunks = (len(objs) + chunk_size - 1) // chunk_size num_chunks = (len(objs) + chunk_size - 1) // chunk_size
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
threads = [] futures = []
for i in range(num_chunks): for i in range(num_chunks):
start = i * chunk_size start = i * chunk_size
end = min(start + chunk_size, len_objs) end = min(start + chunk_size, len_objs)
chunk = objs[start:end] chunk = objs[start:end]
threads.append(executor.submit(insert_mplogx_from_king_realdata_chunk, chunk, pvs, is_offset)) futures.append(executor.submit(insert_mplogx_from_king_realdata_chunk, chunk, pvs, is_offset))
concurrent.futures.wait(futures)
# for future in futures:
# print(future.result(), end=', ')
concurrent.futures.wait(threads) @auto_log('亚控存库')
@auto_log
def insert_mplogx_from_king_realdata_chunk(objs: list, pvs: dict, is_offset=True): def insert_mplogx_from_king_realdata_chunk(objs: list, pvs: dict, is_offset=True):
""" """
分批存库 分批存库, 亚控 38.00,00000.11011 版本偏移只是时间戳偏移另外其实可以不在乎
""" """
oval = pvs['1'] # oval = pvs['1']
otime_str = pvs['2'] otime_str = pvs['2']
otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f')) otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f'))
insert_db_data = []
insert_data = [] insert_data = []
for obj in objs: for obj in objs:
n = obj['N'] n = obj['N']
val = obj.get('1', None) val = obj["1"]
timex = obj.get("2", None) timex = obj.get("2", None)
cache_key = f'mpoint_king_{n}' cache_key = f'mpoint_K_{n}'
mpoint_data = cache.get(cache_key, None) mpoint_data = cache.get(cache_key, None)
if mpoint_data is None: if mpoint_data is None:
return continue
val_type = mpoint_data['val_type'] val_type = mpoint_data['val_type']
if is_offset: if is_offset:
if val is None:
val = oval
else:
val = oval + val
if timex is None: if timex is None:
timex = otime_obj timex = otime_obj
else: else:
@ -218,17 +234,48 @@ def insert_mplogx_from_king_realdata_chunk(objs: list, pvs: dict, is_offset=True
# 控制采集间隔 # 控制采集间隔
can_save = False can_save = False
if mpoint_last_timex: if mpoint_last_timex:
if (timex - mpoint_last_timex).total_seconds() > mpoint_interval: if (otime_obj - mpoint_last_timex).total_seconds() > mpoint_interval:
can_save = True can_save = True
else:
can_save = True
if can_save: if can_save:
save_dict = { save_dict = {
"timex": timex, "timex": timex,
"mpoint": Mpoint.objects.get(id=mpoint_data['id']), "mpoint": Mpoint.objects.get(id=mpoint_data['id']),
} }
# 更新对应缓存
save_dict[f'val_{val_type}'] = val save_dict[f'val_{val_type}'] = val
mpoint_data.update({'last_val': val, 'last_timex': timex}) insert_data.append((cache_key, mpoint_data, val, timex))
cache.set(cache_key, mpoint_data) insert_db_data.append(MpLogx(**save_dict))
insert_data.append(MpLogx(**save_dict))
MpLogx.objects.bulk_create(insert_data)
# 先尝试批量存库/发生异常则单个存储
is_bulk_insert = True
try:
MpLogx.objects.bulk_create(insert_db_data)
except IntegrityError:
is_bulk_insert = False
for item1, item2 in zip(insert_data, insert_db_data):
try:
MpLogx.objects.create(**item2)
update_mpoint_cache_and_do_func(item1)
except Exception:
myLogger.error(f'mpoint_cache_key: {item1[0]} 存库失败', exc_info=True)
continue
if is_bulk_insert:
# 批量存库成功后更新缓存
for item in insert_data:
update_mpoint_cache_and_do_func(item)
def update_mpoint_cache_and_do_func(item: dict):
cache_key = item[0]
mpoint_data = item[1]
mpoint_id = mpoint_data['id']
last_timex: datetime = item[3]
last_val = item[2]
mpoint_data['last_timex'] = last_timex
mpoint_data['last_val'] = last_val
cache.set(cache_key, mpoint_data, timeout=None)
mpoint_func: str = mpoint_data.get('func_on_change', '')
if mpoint_func:
ctask_run.delay(mpoint_func, mpoint_id, last_val, last_timex)

View File

@ -6,7 +6,7 @@ from apps.utils.sql import DbConnection
from server.settings import get_sysconfig, update_sysconfig from server.settings import get_sysconfig, update_sysconfig
import importlib import importlib
from django.core.cache import cache from django.core.cache import cache
from apps.enm.models import MpLog, Mpoint, MpointStat, EnStat, EnStat2 from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2
from apps.wpm.models import SfLog, StLog from apps.wpm.models import SfLog, StLog
import datetime import datetime
from django.db.models import Sum, Avg from django.db.models import Sum, Avg
@ -23,6 +23,7 @@ from django.db.models import F
from apps.wpm.services import get_pcoal_heat from apps.wpm.services import get_pcoal_heat
import traceback import traceback
from django.utils import timezone from django.utils import timezone
from django.db.models import Max
myLogger = logging.getLogger('log') myLogger = logging.getLogger('log')
@ -32,48 +33,51 @@ def get_current_and_previous_time():
return now, pre return now, pre
@shared_task(base=CustomTask) # @shared_task(base=CustomTask)
def get_tag_val(): # def get_tag_val():
config = get_sysconfig() # """
with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: # 应该用不着
last_tag_id = config['enm'].get('last_tag_id', None) # """
if not last_tag_id: # config = get_sysconfig()
mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first() # with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
if mr is None: # last_tag_id = config['enm'].get('last_tag_id', None)
last_tag_id = 0 # if not last_tag_id:
else: # mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
last_tag_id = mr.tag_id # if mr is None:
update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) # last_tag_id = 0
cursor.execute( # else:
"select id, val, tag_code, update_time from tag_value where id > %s order by update_time, id", (last_tag_id)) # last_tag_id = mr.tag_id
results = cursor.fetchall() # 获取数据后保存至本地 # update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
need_func = {} # 需要执行测点函数的字典, 此种情况只执行一次 # cursor.execute(
for row in results: # "select id, val, tag_code, update_time from tag_value where id > %s order by update_time, id", (last_tag_id))
mr_one = MpLog() # results = cursor.fetchall() # 获取数据后保存至本地
mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row # need_func = {} # 需要执行测点函数的字典, 此种情况只执行一次
mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={ # for row in results:
'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'}) # mr_one = MpLog()
mr_one.mpoint = mpoint # mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
mr_one.save() # mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={
last_tag_id = mr_one.tag_id # 'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
if mpoint.func_on_change: # mr_one.mpoint = mpoint
need_func[mpoint.id] = mr_one.id # mr_one.save()
# 执行测点函数 # last_tag_id = mr_one.tag_id
for key in need_func: # if mpoint.func_on_change:
mpoint_val_on_change.delay(need_func[key]) # need_func[mpoint.id] = mr_one.id
update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) # # 执行测点函数
# for key in need_func:
# mpoint_val_on_change.delay(need_func[key])
# update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
@shared_task(base=CustomTask) # @shared_task(base=CustomTask)
def mpoint_val_on_change(mpLogId: str): # def mpoint_val_on_change(mpLogId: str):
"""测点值变化的时候执行任务 # """测点值变化的时候执行任务(废弃)
""" # """
mplog = MpLog.objects.get(id=mpLogId) # mplog = MpLog.objects.get(id=mpLogId)
mpoint = mplog.mpoint # mpoint = mplog.mpoint
module, func = mpoint.func_on_change.rsplit(".", 1) # module, func = mpoint.func_on_change.rsplit(".", 1)
m = importlib.import_module(module) # m = importlib.import_module(module)
f = getattr(m, func) # f = getattr(m, func)
f(mplog) # 同步执行 # f(mplog) # 同步执行
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
@ -99,27 +103,35 @@ def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: in
mytz = tz.gettz(settings.TIME_ZONE) mytz = tz.gettz(settings.TIME_ZONE)
dt = datetime.datetime(year=year, month=month, dt = datetime.datetime(year=year, month=month,
day=day, hour=hour, tzinfo=mytz) day=day, hour=hour, tzinfo=mytz)
if mpoint.material: # 如果计量的是物料 if mpoint.material: # 如果计量的是物料 # 累计量 有的会清零,需要额外处理(还未做)
params = {'mpoint': mpoint, 'type': 'hour'} params = {'mpoint': mpoint, 'type': 'hour'}
params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour
val = 0 val = 0
val_type = 'float'
if mpoint.formula: if mpoint.formula:
formula = mpoint.formula formula = mpoint.formula
try: try:
val = translate_eval_formula(formula, year, month, day, hour) val = translate_eval_formula(formula, year, month, day, hour)
except: except Exception:
myLogger.error( myLogger.error(
'公式执行错误:{}-{}'.format(mpoint.id, formula), exc_info=True) '公式执行错误:{}-{}'.format(mpoint.id, formula), exc_info=True)
return return
else: else:
mrs = MpLog.objects.filter( mrs = MpLogx.objects.filter(
mpoint=mpoint, mpoint=mpoint,
tag_update__year=params['year'], timex__year=params['year'],
tag_update__month=params['month'], timex__month=params['month'],
tag_update__day=params['day'], timex__day=params['day'],
tag_update__hour=params['hour']).order_by('tag_update') timex__hour=params['hour']).order_by('timex')
if mrs.exists(): if mrs.exists():
val = mrs.last().tag_val - mrs.first().tag_val last_val = mrs.last().val_float
first_val = mrs.first().val_float
if last_val >= first_val:
val = last_val - first_val
else:
# 这里判断有可能清零了
max_val = mrs.aggregate(max=Max('val_float'))['max']
val = max_val - first_val + last_val
ms, _ = MpointStat.objects.get_or_create(**params, defaults=params) ms, _ = MpointStat.objects.get_or_create(**params, defaults=params)
ms.val = val ms.val = val
ms.save() ms.save()
@ -309,8 +321,7 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s,
mytz = tz.gettz(settings.TIME_ZONE) mytz = tz.gettz(settings.TIME_ZONE)
dt = datetime.datetime(year=year, month=month, dt = datetime.datetime(year=year, month=month,
day=day, hour=hour, tzinfo=mytz) day=day, hour=hour, tzinfo=mytz)
sflog = SfLog.objects.get( sflog = get_sflog(mgroup, dt)
start_time__lt=dt, end_time__gte=dt, mgroup=mgroup)
if sflog: if sflog:
year_s, month_s, day_s = sflog.get_ymd year_s, month_s, day_s = sflog.get_ymd
team = sflog.team team = sflog.team