factory/apps/enm/services.py

281 lines
12 KiB
Python

from apps.enm.models import Mpoint, MpointStat, EnStat, MpLogx
import re
import logging
import traceback
from apps.mtm.services import get_mgroup_goals
from django.db.models import Q
from django.utils import timezone
from django.core.cache import cache
import concurrent.futures
from django.db import connection
from datetime import datetime, timedelta
from apps.utils.decorators import auto_log
from django.db import IntegrityError
from apps.utils.tasks import ctask_run
myLogger = logging.getLogger('log')
def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int):
"""
传入
"""
pattern = r"\${(.*?)}"
matches = re.findall(pattern, exp_str)
for match in matches:
mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(
mpoint__code=match), type='hour', year=year, month=month, day=day, hour=hour).first()
if mpst:
exp_str = exp_str.replace(f"${{{match}}}", str(mpst.val))
rval = eval(exp_str)
return rval
def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
"""
根据给定的小时数, 计算出班天
"""
if hour <= hour_split:
return year, month, day
else:
now = datetime.datetime(year, month, day, hour)
now2 = now + datetime.timedelta(days=1)
return now2.year, now2.month, now2.day
# cal_rule = {
# "电石渣": {
# "total_production": 0.4,
# "elec_consume_unit": 0.4,
# "production_cost_unit": 0.2
# },
# "原料磨":{
# "production_hour": 0.3,
# "elec_consume_unit": 0.3,
# "production_cost_unit": 0.1,
# "辅料_细度":0.05,
# "辅料_水分":0.04,
# "干混生料_CaO":0.04
# }
# }
# def cal_team_score(data):
# """
# 计算月度绩效
# """
# qua_rate = {}
# month_s = data['month_s']
# for item in data['qua_data']:
# qua_rate[f'{item["material_name"]}_{item["testitem_name"]}'] = item["rate_pass"]
# goal_dict = get_mgroup_goals(data['mgroup'], data['year_s'], False)
# goal_data = {}
# try:
# rule = cal_rule[data['mgroup_name']]
# score = 0
# for key in rule:
# new_key = f'{key}_{month_s}'
# goal_data[new_key] = goal_dict[new_key]
# if '-' in key:
# score = score + qua_rate.get(key, 0)/goal_data[new_key]*rule[key]
# else:
# score = score + data.get(key)/goal_data[new_key]*rule[key]
# print(score)
# # enstat.goal_data = goal_data
# # enstat.score =score
# # enstat.save(update_fields=['goal_data', 'score'])
# except:
# print(traceback.format_exc())
# return goal_data, score
def shutdown_or_startup(mpointId: str, last_val, last_timex: datetime):
"""
last_val 可能是不同类型的值(bool 或 int)
"""
from apps.wpm.models import StLog
from apps.wpm.tasks import cal_exp_duration_hour
from apps.wpm.services import get_sflog
mpoint = Mpoint.objects.get(id=mpointId)
mgroup = mpoint.mgroup
last_stlog = StLog.objects.filter(
mgroup=mgroup, is_shutdown=True).order_by('start_time').last() # 找到最后一次停机记录
if last_stlog:
if last_timex >= last_stlog.start_time: # 认为是有效信号
if last_stlog.end_time is None and last_val in (1, True): # 从停到开
last_stlog.end_time = last_timex
last_stlog.duration = (
last_stlog.end_time - last_stlog.start_time).total_seconds()/3600
last_stlog.save()
mgroup.is_runing = True
mgroup.save()
cal_exp_duration_hour(last_stlog.id) # 触发时间分配
elif last_stlog.end_time and last_val in (0, False) and last_timex > last_stlog.end_time: # 从开到停
StLog.objects.create(
title='停机',
is_shutdown=True,
mgroup=mgroup,
end_time=None,
start_time=last_timex,
sflog=get_sflog(mgroup, last_timex)
)
mgroup.is_runing = False
mgroup.save()
else:
StLog.objects.create(
title='停机',
is_shutdown=True,
mgroup=mgroup,
end_time=None,
start_time=last_timex,
sflog=get_sflog(mgroup, last_timex))
mgroup.is_runing = False
mgroup.save()
def king_sync(projectName: str, json_path: str=""):
"""
同步亚控测点
"""
if json_path:
import os
import json
from django.conf import settings
with open(os.path.join(settings.BASE_DIR, json_path), 'r', encoding='utf-8') as f:
res = json.loads(f.read())
else:
from apps.third.king.k import kingClient
from apps.third.king import king_api
_, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName})
t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"}
for index, item in enumerate(res['objectList']):
if 't' in item and item['t'] and 'SystemTag' not in item['d']:
code = f'K_{item["n"]}'
item['from'] = 'king'
group = item['g']
name = item['d']
if group:
name = f'{group}.{name}'
Mpoint.objects.get_or_create(code=code, defaults={
"name": name,
"code": code,
"enabled": False,
"is_auto": True,
"val_type": t_dict[item["t"]],
"third_info": item
})
@auto_log('缓存测点')
def cache_mpoints(mpointId: str = ''):
"""
缓存所有可用的测点
"""
if mpointId:
mpoints_data = Mpoint.objects.filter(id=mpointId, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
else:
mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
for item in list(mpoints_data):
key = f'mpoint_{item["code"]}'
cache_mpoint = cache.get(key, {})
cache_mpoint.update(item)
cache.set(key, cache_mpoint, timeout=None)
# print(cache.get('mpoint_K_IP2_MM_SC_IW14'))
test_data = {"PNs":{"1":"V","2":"T","3":"Q"},"PVs":{"1":-725,"2":"2024-04-08 13:43:53.140","3":192},"Objs":[{"N":"IP2_MM_IW3"},{"N":"IP2_MM_IW4","1":-6386},{"N":"IP2_MM_IW5","1":-7835},{"N":"IP2_MM_IW7","1":-6864},{"N":"IP2_MM_IW15","1":29},{"N":"IP2_MM_SC_IW3","1":337},{"N":"IP2_MM_SC_IW13","1":-5511,"2":24},{"N":"IP2_MM_SC_IW14","1":-4640,"2":24},{"N":"IP2_MM_SC_IW15","1":-5586,"2":24},{"N":"IP2_MM_SC_IW16","1":-6634,"2":24},{"N":"IP2_MM_SC_IW17","1":-2768,"2":24},{"N":"IP2_MM_SC_IW18","1":-2946,"2":24},{"N":"IP2_MM_SC_IW19","1":-2550,"2":24},{"N":"IP2_MM_SC_IW20","1":-1512,"2":24},{"N":"IP2_MM_SC_IW21","1":-1775,"2":24},{"N":"IP2_MM_SC_IW22","1":-194,"2":24},{"N":"IP2_MM_SC_IW23","1":-366,"2":24},{"N":"IP2_MM_SC_IW24","1":-9291,"2":24},{"N":"IP2_MM_SC_IW25","1":-6888,"2":24},{"N":"IP2_MM_SC_IW26","1":-2360,"2":24},{"N":"IP2_MM_SC_IW29","1":164,"2":24},{"N":"IP2_MM_SC_IW30","1":914,"2":24},{"N":"IP2_MM_SC_IW31","1":849,"2":24},{"N":"IP2_MM_SC_IW37","1":-125,"2":24},{"N":"IP2_MM_SC_IW38","1":-3009,"2":24},{"N":"IP2_MM_SC_IW40","1":-1394,"2":24},{"N":"IP2_MM_SC_IW43","1":758,"2":24},{"N":"IP3_SC_IW1","1":11557,"2":107},{"N":"IP3_SC_IW2","1":7624,"2":107},{"N":"IP3_SC_IW6","1":11159,"2":107},{"N":"IP3_SC_IW7","1":8073,"2":107},{"N":"IP3_SC_IW9","1":4490,"2":107},{"N":"IP3_SC_IW10","1":5437,"2":107},{"N":"IP3_SC_IW11","1":9244,"2":107},{"N":"IP3_SC_IW12","1":7886,"2":107},{"N":"IP3_SC_IW13","1":-2962,"2":107},{"N":"IP3_SC_IW14","1":-159,"2":107},{"N":"IP3_SC_IW26","1":15,"2":107},{"N":"IP3_SC_IW27","1":15,"2":107}]}
def insert_mplogx_from_king_mqtt(data: dict, is_offset=True):
"""
从king mqtt数据插入超表
"""
objs = data['Objs']
len_objs = len(objs)
pvs = data['PVs']
chunk_size = 200
num_chunks = (len(objs) + chunk_size - 1) // chunk_size
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for i in range(num_chunks):
start = i * chunk_size
end = min(start + chunk_size, len_objs)
chunk = objs[start:end]
futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, pvs, is_offset))
concurrent.futures.wait(futures)
# for future in futures:
# print(future.result(), end=', ')
@auto_log('亚控存库')
def insert_mplogx_from_king_mqtt_chunk(objs: list, pvs: dict, is_offset=True):
"""
分批存库, 亚控 38.00,00000.11011 版本偏移只是时间戳偏移。另外其实可以不在乎
"""
# oval = pvs['1']
otime_str = pvs['2']
otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f'))
insert_db_data = []
insert_data = []
for obj in objs:
n = obj['N']
val = obj["1"]
timex = obj.get("2", None)
cache_key = f'mpoint_K_{n}'
mpoint_data = cache.get(cache_key, None)
if mpoint_data is None:
continue
val_type = mpoint_data['val_type']
if is_offset:
if timex is None:
timex = otime_obj
else:
timex = otime_obj + timedelta(milliseconds=timex)
else:
timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f'))
mpoint_interval = mpoint_data['interval']
mpoint_last_timex = mpoint_data.get('last_timex', None)
# 控制采集间隔
can_save = False
if mpoint_last_timex:
if (otime_obj - mpoint_last_timex).total_seconds() > mpoint_interval:
can_save = True
else:
can_save = True
if can_save:
save_dict = {
"timex": timex,
"mpoint": Mpoint.objects.get(id=mpoint_data['id']),
}
save_dict[f'val_{val_type}'] = val
insert_data.append((cache_key, mpoint_data, val, timex))
insert_db_data.append(MpLogx(**save_dict))
# 先尝试批量存库/发生异常则单个存储
is_bulk_insert = True
try:
MpLogx.objects.bulk_create(insert_db_data)
except IntegrityError:
is_bulk_insert = False
for item1, item2 in zip(insert_data, insert_db_data):
try:
MpLogx.objects.create(**item2)
update_mpoint_cache_and_do_func(item1)
except Exception:
myLogger.error(f'mpoint_cache_key: {item1[0]} 存库失败', exc_info=True)
continue
if is_bulk_insert:
# 批量存库成功后更新缓存
for item in insert_data:
update_mpoint_cache_and_do_func(item)
def update_mpoint_cache_and_do_func(item: dict):
cache_key = item[0]
mpoint_data = item[1]
mpoint_id = mpoint_data['id']
last_timex: datetime = item[3]
last_val = item[2]
mpoint_data['last_timex'] = last_timex
mpoint_data['last_val'] = last_val
cache.set(cache_key, mpoint_data, timeout=None)
mpoint_func: str = mpoint_data.get('func_on_change', '')
if mpoint_func:
ctask_run.delay(mpoint_func, mpoint_id, last_val, last_timex)