235 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			235 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
| from apps.enm.models import Mpoint, MpointStat, EnStat, MpLog, MpLogx
 | |
| import re
 | |
| import traceback
 | |
| from apps.mtm.services import get_mgroup_goals
 | |
| from django.db.models import Q
 | |
| import datetime
 | |
| from django.utils import timezone
 | |
| from django.core.cache import cache
 | |
| import concurrent.futures
 | |
| from django.db import connection
 | |
| from django.utils import timezone
 | |
| from datetime import datetime, timedelta
 | |
| import time
 | |
| from apps.utils.decorators import auto_log
 | |
| 
 | |
| 
 | |
| def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int):
 | |
|     """
 | |
|     传入
 | |
|     """
 | |
|     pattern = r"\${(.*?)}"
 | |
|     matches = re.findall(pattern, exp_str)
 | |
|     for match in matches:
 | |
|         mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(
 | |
|             mpoint__code=match), type='hour', year=year, month=month, day=day, hour=hour).first()
 | |
|         if mpst:
 | |
|             exp_str = exp_str.replace(f"${{{match}}}", str(mpst.val))
 | |
|     rval = eval(exp_str)
 | |
|     return rval
 | |
| 
 | |
| 
 | |
| def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
 | |
|     """
 | |
|     根据给定的小时数, 计算出班天
 | |
|     """
 | |
|     if hour <= hour_split:
 | |
|         return year, month, day
 | |
|     else:
 | |
|         now = datetime.datetime(year, month, day, hour)
 | |
|         now2 = now + datetime.timedelta(days=1)
 | |
|         return now2.year, now2.month, now2.day
 | |
| 
 | |
| # cal_rule = {
 | |
| #     "电石渣": {
 | |
| #         "total_production": 0.4,
 | |
| #         "elec_consume_unit": 0.4,
 | |
| #         "production_cost_unit": 0.2
 | |
| #     },
 | |
| #     "原料磨":{
 | |
| #         "production_hour": 0.3,
 | |
| #         "elec_consume_unit": 0.3,
 | |
| #         "production_cost_unit": 0.1,
 | |
| #         "辅料_细度":0.05,
 | |
| #         "辅料_水分":0.04,
 | |
| #         "干混生料_CaO":0.04
 | |
| #     }
 | |
| # }
 | |
| # def cal_team_score(data):
 | |
| #     """
 | |
| #     计算月度绩效
 | |
| #     """
 | |
| #     qua_rate = {}
 | |
| #     month_s = data['month_s']
 | |
| #     for item in data['qua_data']:
 | |
| #         qua_rate[f'{item["material_name"]}_{item["testitem_name"]}'] = item["rate_pass"]
 | |
| 
 | |
| #     goal_dict = get_mgroup_goals(data['mgroup'], data['year_s'], False)
 | |
| #     goal_data = {}
 | |
| #     try:
 | |
| #         rule = cal_rule[data['mgroup_name']]
 | |
| #         score = 0
 | |
| #         for key in rule:
 | |
| #             new_key = f'{key}_{month_s}'
 | |
| #             goal_data[new_key] = goal_dict[new_key]
 | |
| #             if '-' in key:
 | |
| #                 score = score + qua_rate.get(key, 0)/goal_data[new_key]*rule[key]
 | |
| #             else:
 | |
| #                 score = score + data.get(key)/goal_data[new_key]*rule[key]
 | |
| #                 print(score)
 | |
| #         # enstat.goal_data = goal_data
 | |
| #         # enstat.score =score
 | |
| #         # enstat.save(update_fields=['goal_data', 'score'])
 | |
| #     except:
 | |
| #         print(traceback.format_exc())
 | |
| #     return goal_data, score
 | |
| 
 | |
| 
 | |
| def shutdown_or_startup(mplog: MpLog):
 | |
|     from apps.wpm.models import StLog
 | |
|     from apps.wpm.tasks import cal_exp_duration_hour
 | |
|     from apps.wpm.services import get_sflog
 | |
|     mpoint = mplog.mpoint
 | |
|     mgroup = mpoint.mgroup
 | |
|     last_stlog = StLog.objects.filter(
 | |
|         mgroup=mgroup, is_shutdown=True).order_by('start_time').last()  # 找到最后一次停机记录
 | |
|     if last_stlog:
 | |
|         if mplog.tag_update >= last_stlog.start_time:  # 认为是有效信号
 | |
|             if last_stlog.end_time is None and mplog.tag_val == 1:  # 从停到开
 | |
|                 last_stlog.end_time = mplog.tag_update
 | |
|                 last_stlog.duration = (
 | |
|                     last_stlog.end_time - last_stlog.start_time).total_seconds()/3600
 | |
|                 last_stlog.save()
 | |
|                 mgroup.is_runing = True
 | |
|                 mgroup.save()
 | |
|                 cal_exp_duration_hour(last_stlog.id)  # 触发时间分配
 | |
|             elif last_stlog.end_time and mplog.tag_val == 0 and mplog.tag_update > last_stlog.end_time:  # 从开到停
 | |
|                 StLog.objects.create(
 | |
|                     title='停机',
 | |
|                     is_shutdown=True,
 | |
|                     mgroup=mgroup,
 | |
|                     end_time=None,
 | |
|                     start_time=mplog.tag_update,
 | |
|                     sflog=get_sflog(mgroup, mplog.tag_update)
 | |
|                 )
 | |
|                 mgroup.is_runing = False
 | |
|                 mgroup.save()
 | |
|     else:
 | |
|         StLog.objects.create(
 | |
|             title='停机',
 | |
|             is_shutdown=True,
 | |
|             mgroup=mgroup,
 | |
|             end_time=None,
 | |
|             start_time=mplog.tag_update,
 | |
|             sflog=get_sflog(mgroup, mplog.tag_update))
 | |
|         mgroup.is_runing = False
 | |
|         mgroup.save()
 | |
| 
 | |
| 
 | |
| 
 | |
| def king_sync(projectName: str):
 | |
|     """
 | |
|     同步亚控测点
 | |
|     """
 | |
|     from apps.third.king.k import kingClient
 | |
|     from apps.third.king import king_api
 | |
|     _, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName})
 | |
|     t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"}
 | |
|     for item in res['objectList']:
 | |
|         if 't' in item and item['t']:
 | |
|             code = f'king_{item["n"]}'
 | |
|             item['from'] = 'king'
 | |
|             Mpoint.objects.get_or_create(code=code, defaults={
 | |
|                 "name": item["d"],
 | |
|                 "code": code,
 | |
|                 "enabled": False,
 | |
|                 "is_auto": True,
 | |
|                 "val_type": t_dict[item["t"]],
 | |
|                 "third_info": item
 | |
|             })
 | |
| 
 | |
| def cache_mpoints(mpointId: str = ''):
 | |
|     """
 | |
|     缓存所有可用的测点
 | |
|     """
 | |
|     if mpointId:
 | |
|         mpoints_data = Mpoint.objects.filter(id=mpointId).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
 | |
|     else:
 | |
|         mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
 | |
|     for item in list(mpoints_data):
 | |
|         key = f'mpoint_{item["code"]}'
 | |
|         cache_mpoint = cache.get(key, {})
 | |
|         cache_mpoint.update(item)
 | |
|         cache.set(key, cache_mpoint, timeout=None)
 | |
| 
 | |
| 
 | |
| def insert_mplogx_from_king_realdata(data: dict, is_offset=True):
 | |
|     """
 | |
|     从king mqtt数据插入超表
 | |
|     """
 | |
|     objs = data['Objs']
 | |
|     len_objs = len(objs)
 | |
|     pvs = data['PVs']
 | |
| 
 | |
|     chunk_size = 200
 | |
|     num_chunks = (len(objs) + chunk_size - 1) // chunk_size
 | |
| 
 | |
|     with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
 | |
|         threads = []
 | |
|         for i in range(num_chunks):
 | |
|             start = i * chunk_size
 | |
|             end = min(start + chunk_size, len_objs)
 | |
|             chunk = objs[start:end]
 | |
|             threads.append(executor.submit(insert_mplogx_from_king_realdata_chunk, chunk, pvs, is_offset))
 | |
| 
 | |
|         concurrent.futures.wait(threads)
 | |
| 
 | |
| @auto_log
 | |
| def insert_mplogx_from_king_realdata_chunk(objs: list, pvs: dict, is_offset=True):
 | |
|     """
 | |
|     分批存库
 | |
|     """
 | |
|     oval = pvs['1']
 | |
|     otime_str = pvs['2']
 | |
|     otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f'))
 | |
|     insert_data = []
 | |
|     for obj in objs:
 | |
|         n = obj['N']
 | |
|         val = obj.get('1', None)
 | |
|         timex = obj.get("2", None)
 | |
|         cache_key = f'mpoint_king_{n}'
 | |
|         mpoint_data = cache.get(cache_key, None)
 | |
|         if mpoint_data is None:
 | |
|             return
 | |
|         val_type = mpoint_data['val_type']
 | |
|         if is_offset:
 | |
|             if val is None:
 | |
|                 val = oval
 | |
|             else:
 | |
|                 val = oval + val
 | |
|             if timex is None:
 | |
|                 timex = otime_obj
 | |
|             else:
 | |
|                 timex = otime_obj + timedelta(milliseconds=timex)
 | |
|         else:
 | |
|             timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f'))
 | |
|         mpoint_interval = mpoint_data['interval']
 | |
|         mpoint_last_timex = mpoint_data.get('last_timex', None)
 | |
|         # 控制采集间隔
 | |
|         can_save = False
 | |
|         if mpoint_last_timex:
 | |
|             if (timex - mpoint_last_timex).total_seconds() > mpoint_interval:
 | |
|                 can_save = True
 | |
|         if can_save:
 | |
|             save_dict = {
 | |
|                 "timex": timex,
 | |
|                 "mpoint": Mpoint.objects.get(id=mpoint_data['id']),
 | |
|             }
 | |
|             # 更新对应缓存
 | |
|             save_dict[f'val_{val_type}'] = val
 | |
|             mpoint_data.update({'last_val': val, 'last_timex': timex})
 | |
|             cache.set(cache_key, mpoint_data)
 | |
|             insert_data.append(MpLogx(**save_dict))
 | |
|     MpLogx.objects.bulk_create(insert_data)
 | |
| 
 |