factory/apps/enm/services.py

235 lines
8.1 KiB
Python

from apps.enm.models import Mpoint, MpointStat, EnStat, MpLog, MpLogx
import re
import traceback
from apps.mtm.services import get_mgroup_goals
from django.db.models import Q
import datetime
from django.utils import timezone
from django.core.cache import cache
import concurrent.futures
from django.db import connection
from django.utils import timezone
from datetime import datetime, timedelta
import time
from apps.utils.decorators import auto_log
def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int):
"""
传入
"""
pattern = r"\${(.*?)}"
matches = re.findall(pattern, exp_str)
for match in matches:
mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(
mpoint__code=match), type='hour', year=year, month=month, day=day, hour=hour).first()
if mpst:
exp_str = exp_str.replace(f"${{{match}}}", str(mpst.val))
rval = eval(exp_str)
return rval
def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
"""
根据给定的小时数, 计算出班天
"""
if hour <= hour_split:
return year, month, day
else:
now = datetime.datetime(year, month, day, hour)
now2 = now + datetime.timedelta(days=1)
return now2.year, now2.month, now2.day
# cal_rule = {
# "电石渣": {
# "total_production": 0.4,
# "elec_consume_unit": 0.4,
# "production_cost_unit": 0.2
# },
# "原料磨":{
# "production_hour": 0.3,
# "elec_consume_unit": 0.3,
# "production_cost_unit": 0.1,
# "辅料_细度":0.05,
# "辅料_水分":0.04,
# "干混生料_CaO":0.04
# }
# }
# def cal_team_score(data):
# """
# 计算月度绩效
# """
# qua_rate = {}
# month_s = data['month_s']
# for item in data['qua_data']:
# qua_rate[f'{item["material_name"]}_{item["testitem_name"]}'] = item["rate_pass"]
# goal_dict = get_mgroup_goals(data['mgroup'], data['year_s'], False)
# goal_data = {}
# try:
# rule = cal_rule[data['mgroup_name']]
# score = 0
# for key in rule:
# new_key = f'{key}_{month_s}'
# goal_data[new_key] = goal_dict[new_key]
# if '-' in key:
# score = score + qua_rate.get(key, 0)/goal_data[new_key]*rule[key]
# else:
# score = score + data.get(key)/goal_data[new_key]*rule[key]
# print(score)
# # enstat.goal_data = goal_data
# # enstat.score =score
# # enstat.save(update_fields=['goal_data', 'score'])
# except:
# print(traceback.format_exc())
# return goal_data, score
def shutdown_or_startup(mplog: MpLog):
from apps.wpm.models import StLog
from apps.wpm.tasks import cal_exp_duration_hour
from apps.wpm.services import get_sflog
mpoint = mplog.mpoint
mgroup = mpoint.mgroup
last_stlog = StLog.objects.filter(
mgroup=mgroup, is_shutdown=True).order_by('start_time').last() # 找到最后一次停机记录
if last_stlog:
if mplog.tag_update >= last_stlog.start_time: # 认为是有效信号
if last_stlog.end_time is None and mplog.tag_val == 1: # 从停到开
last_stlog.end_time = mplog.tag_update
last_stlog.duration = (
last_stlog.end_time - last_stlog.start_time).total_seconds()/3600
last_stlog.save()
mgroup.is_runing = True
mgroup.save()
cal_exp_duration_hour(last_stlog.id) # 触发时间分配
elif last_stlog.end_time and mplog.tag_val == 0 and mplog.tag_update > last_stlog.end_time: # 从开到停
StLog.objects.create(
title='停机',
is_shutdown=True,
mgroup=mgroup,
end_time=None,
start_time=mplog.tag_update,
sflog=get_sflog(mgroup, mplog.tag_update)
)
mgroup.is_runing = False
mgroup.save()
else:
StLog.objects.create(
title='停机',
is_shutdown=True,
mgroup=mgroup,
end_time=None,
start_time=mplog.tag_update,
sflog=get_sflog(mgroup, mplog.tag_update))
mgroup.is_runing = False
mgroup.save()
def king_sync(projectName: str):
"""
同步亚控测点
"""
from apps.third.king.k import kingClient
from apps.third.king import king_api
_, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName})
t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"}
for item in res['objectList']:
if 't' in item and item['t']:
code = f'king_{item["n"]}'
item['from'] = 'king'
Mpoint.objects.get_or_create(code=code, defaults={
"name": item["n"],
"code": code,
"enabled": False,
"is_auto": True,
"val_type": t_dict[item["t"]],
"third_info": item
})
def cache_mpoints(mpointId: str = ''):
"""
缓存所有可用的测点
"""
if mpointId:
mpoints_data = Mpoint.objects.filter(id=mpointId).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
else:
mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
for item in list(mpoints_data):
key = f'mpoint_{item["code"]}'
cache_mpoint = cache.get(key, {})
cache_mpoint.update(item)
cache.set(key, cache_mpoint, timeout=None)
def insert_mplogx_from_king_realdata(data: dict, is_offset=True):
"""
从king mqtt数据插入超表
"""
objs = data['Objs']
len_objs = len(objs)
pvs = data['PVs']
chunk_size = 200
num_chunks = (len(objs) + chunk_size - 1) // chunk_size
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
threads = []
for i in range(num_chunks):
start = i * chunk_size
end = min(start + chunk_size, len_objs)
chunk = objs[start:end]
threads.append(executor.submit(insert_mplogx_from_king_realdata_chunk, chunk, pvs, is_offset))
concurrent.futures.wait(threads)
@auto_log
def insert_mplogx_from_king_realdata_chunk(objs: list, pvs: dict, is_offset=True):
"""
分批存库
"""
oval = pvs['1']
otime_str = pvs['2']
otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f'))
insert_data = []
for obj in objs:
n = obj['N']
val = obj.get('1', None)
timex = obj.get("2", None)
cache_key = f'mpoint_king_{n}'
mpoint_data = cache.get(cache_key, None)
if mpoint_data is None:
return
val_type = mpoint_data['val_type']
if is_offset:
if val is None:
val = oval
else:
val = oval + val
if timex is None:
timex = otime_obj
else:
timex = otime_obj + timedelta(milliseconds=timex)
else:
timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f'))
mpoint_interval = mpoint_data['interval']
mpoint_last_timex = mpoint_data.get('last_timex', None)
# 控制采集间隔
can_save = False
if mpoint_last_timex:
if (timex - mpoint_last_timex).total_seconds() > mpoint_interval:
can_save = True
if can_save:
save_dict = {
"timex": timex,
"mpoint": Mpoint.objects.get(id=mpoint_data['id']),
}
# 更新对应缓存
save_dict[f'val_{val_type}'] = val
mpoint_data.update({'last_val': val, 'last_timex': timex})
cache.set(cache_key, mpoint_data)
insert_data.append(MpLogx(**save_dict))
MpLogx.objects.bulk_create(insert_data)