from apps.enm.models import Mpoint, MpointStat, EnStat, MpLog, MpLogx import re import traceback from apps.mtm.services import get_mgroup_goals from django.db.models import Q import datetime from django.utils import timezone from django.core.cache import cache import concurrent.futures from django.db import connection from django.utils import timezone from datetime import datetime, timedelta import time from apps.utils.decorators import auto_log def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int): """ 传入 """ pattern = r"\${(.*?)}" matches = re.findall(pattern, exp_str) for match in matches: mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q( mpoint__code=match), type='hour', year=year, month=month, day=day, hour=hour).first() if mpst: exp_str = exp_str.replace(f"${{{match}}}", str(mpst.val)) rval = eval(exp_str) return rval def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21): """ 根据给定的小时数, 计算出班天 """ if hour <= hour_split: return year, month, day else: now = datetime.datetime(year, month, day, hour) now2 = now + datetime.timedelta(days=1) return now2.year, now2.month, now2.day # cal_rule = { # "电石渣": { # "total_production": 0.4, # "elec_consume_unit": 0.4, # "production_cost_unit": 0.2 # }, # "原料磨":{ # "production_hour": 0.3, # "elec_consume_unit": 0.3, # "production_cost_unit": 0.1, # "辅料_细度":0.05, # "辅料_水分":0.04, # "干混生料_CaO":0.04 # } # } # def cal_team_score(data): # """ # 计算月度绩效 # """ # qua_rate = {} # month_s = data['month_s'] # for item in data['qua_data']: # qua_rate[f'{item["material_name"]}_{item["testitem_name"]}'] = item["rate_pass"] # goal_dict = get_mgroup_goals(data['mgroup'], data['year_s'], False) # goal_data = {} # try: # rule = cal_rule[data['mgroup_name']] # score = 0 # for key in rule: # new_key = f'{key}_{month_s}' # goal_data[new_key] = goal_dict[new_key] # if '-' in key: # score = score + qua_rate.get(key, 0)/goal_data[new_key]*rule[key] # else: # score = score + data.get(key)/goal_data[new_key]*rule[key] # print(score) # # enstat.goal_data = goal_data # # enstat.score =score # # enstat.save(update_fields=['goal_data', 'score']) # except: # print(traceback.format_exc()) # return goal_data, score def shutdown_or_startup(mplog: MpLog): from apps.wpm.models import StLog from apps.wpm.tasks import cal_exp_duration_hour from apps.wpm.services import get_sflog mpoint = mplog.mpoint mgroup = mpoint.mgroup last_stlog = StLog.objects.filter( mgroup=mgroup, is_shutdown=True).order_by('start_time').last() # 找到最后一次停机记录 if last_stlog: if mplog.tag_update >= last_stlog.start_time: # 认为是有效信号 if last_stlog.end_time is None and mplog.tag_val == 1: # 从停到开 last_stlog.end_time = mplog.tag_update last_stlog.duration = ( last_stlog.end_time - last_stlog.start_time).total_seconds()/3600 last_stlog.save() mgroup.is_runing = True mgroup.save() cal_exp_duration_hour(last_stlog.id) # 触发时间分配 elif last_stlog.end_time and mplog.tag_val == 0 and mplog.tag_update > last_stlog.end_time: # 从开到停 StLog.objects.create( title='停机', is_shutdown=True, mgroup=mgroup, end_time=None, start_time=mplog.tag_update, sflog=get_sflog(mgroup, mplog.tag_update) ) mgroup.is_runing = False mgroup.save() else: StLog.objects.create( title='停机', is_shutdown=True, mgroup=mgroup, end_time=None, start_time=mplog.tag_update, sflog=get_sflog(mgroup, mplog.tag_update)) mgroup.is_runing = False mgroup.save() def king_sync(projectName: str): """ 同步亚控测点 """ from apps.third.king.k import kingClient from apps.third.king import king_api _, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName}) t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"} for item in res['objectList']: if 't' in item and item['t']: code = f'king_{item["n"]}' item['from'] = 'king' Mpoint.objects.get_or_create(code=code, defaults={ "name": item["n"], "code": code, "enabled": False, "is_auto": True, "val_type": t_dict[item["t"]], "third_info": item }) def cache_mpoints(mpointId: str|None = None): """ 缓存所有可用的测点 """ if mpointId: mpoints_data = Mpoint.objects.filter(id=mpointId).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") else: mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") for item in list(mpoints_data): key = f'mpoint_{item["code"]}' cache_mpoint = cache.get(key, {}) cache_mpoint.update(item) cache.set(key, cache_mpoint, timeout=None) def insert_mplogx_from_king_realdata(data: dict, is_offset=True): """ 从king mqtt数据插入超表 """ objs = data['Objs'] len_objs = len(objs) pvs = data['PVs'] chunk_size = 200 num_chunks = (len(objs) + chunk_size - 1) // chunk_size with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: threads = [] for i in range(num_chunks): start = i * chunk_size end = min(start + chunk_size, len_objs) chunk = objs[start:end] threads.append(executor.submit(insert_mplogx_from_king_realdata_chunk, chunk, pvs, is_offset)) concurrent.futures.wait(threads) @auto_log def insert_mplogx_from_king_realdata_chunk(objs: list, pvs: dict, is_offset=True): """ 分批存库 """ oval = pvs['1'] otime_str = pvs['2'] otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f')) insert_data = [] for obj in objs: n = obj['N'] val = obj.get('1', None) timex = obj.get("2", None) cache_key = f'mpoint_king_{n}' mpoint_data = cache.get(cache_key, None) if mpoint_data is None: return val_type = mpoint_data['val_type'] if is_offset: if val is None: val = oval else: val = oval + val if timex is None: timex = otime_obj else: timex = otime_obj + timedelta(milliseconds=timex) else: timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f')) mpoint_interval = mpoint_data['interval'] mpoint_last_timex = mpoint_data.get('last_timex', None) # 控制采集间隔 can_save = False if mpoint_last_timex: if (timex - mpoint_last_timex).total_seconds() > mpoint_interval: can_save = True if can_save: save_dict = { "timex": timex, "mpoint": Mpoint.objects.get(id=mpoint_data['id']), } # 更新对应缓存 save_dict[f'val_{val_type}'] = val mpoint_data.update({'last_val': val, 'last_timex': timex}) cache.set(cache_key, mpoint_data) insert_data.append(MpLogx(**save_dict)) MpLogx.objects.bulk_create(insert_data)