diff --git a/apps/enm/services.py b/apps/enm/services.py index d352dc32..8a483b3a 100644 --- a/apps/enm/services.py +++ b/apps/enm/services.py @@ -1,18 +1,19 @@ -from apps.enm.models import Mpoint, MpointStat, EnStat, MpLog, MpLogx +from apps.enm.models import Mpoint, MpointStat, EnStat, MpLogx import re +import logging import traceback from apps.mtm.services import get_mgroup_goals from django.db.models import Q -import datetime from django.utils import timezone from django.core.cache import cache import concurrent.futures from django.db import connection -from django.utils import timezone from datetime import datetime, timedelta -import time from apps.utils.decorators import auto_log +from django.db import IntegrityError +from apps.utils.tasks import ctask_run +myLogger = logging.getLogger('log') def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int): """ @@ -85,32 +86,35 @@ def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21): # return goal_data, score -def shutdown_or_startup(mplog: MpLog): +def shutdown_or_startup(mpointId: str, last_val, last_timex: datetime): + """ + last_val 可能是不同类型的值(bool 或 int) + """ from apps.wpm.models import StLog from apps.wpm.tasks import cal_exp_duration_hour from apps.wpm.services import get_sflog - mpoint = mplog.mpoint + mpoint = Mpoint.objects.get(id=mpointId) mgroup = mpoint.mgroup last_stlog = StLog.objects.filter( mgroup=mgroup, is_shutdown=True).order_by('start_time').last() # 找到最后一次停机记录 if last_stlog: - if mplog.tag_update >= last_stlog.start_time: # 认为是有效信号 - if last_stlog.end_time is None and mplog.tag_val == 1: # 从停到开 - last_stlog.end_time = mplog.tag_update + if last_timex >= last_stlog.start_time: # 认为是有效信号 + if last_stlog.end_time is None and last_val in (1, True): # 从停到开 + last_stlog.end_time = last_timex last_stlog.duration = ( last_stlog.end_time - last_stlog.start_time).total_seconds()/3600 last_stlog.save() mgroup.is_runing = True mgroup.save() cal_exp_duration_hour(last_stlog.id) # 触发时间分配 - elif last_stlog.end_time and mplog.tag_val == 0 and mplog.tag_update > last_stlog.end_time: # 从开到停 + elif last_stlog.end_time and last_val in (0, False) and last_timex > last_stlog.end_time: # 从开到停 StLog.objects.create( title='停机', is_shutdown=True, mgroup=mgroup, end_time=None, - start_time=mplog.tag_update, - sflog=get_sflog(mgroup, mplog.tag_update) + start_time=last_timex, + sflog=get_sflog(mgroup, last_timex) ) mgroup.is_runing = False mgroup.save() @@ -120,27 +124,39 @@ def shutdown_or_startup(mplog: MpLog): is_shutdown=True, mgroup=mgroup, end_time=None, - start_time=mplog.tag_update, - sflog=get_sflog(mgroup, mplog.tag_update)) + start_time=last_timex, + sflog=get_sflog(mgroup, last_timex)) mgroup.is_runing = False mgroup.save() -def king_sync(projectName: str): +def king_sync(projectName: str, json_path: str=""): """ 同步亚控测点 """ - from apps.third.king.k import kingClient - from apps.third.king import king_api - _, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName}) + if json_path: + import os + import json + from django.conf import settings + with open(os.path.join(settings.BASE_DIR, json_path), 'r', encoding='utf-8') as f: + res = json.loads(f.read()) + else: + from apps.third.king.k import kingClient + from apps.third.king import king_api + _, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName}) + t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"} - for item in res['objectList']: - if 't' in item and item['t']: - code = f'king_{item["n"]}' + for index, item in enumerate(res['objectList']): + if 't' in item and item['t'] and 'SystemTag' not in item['d']: + code = f'K_{item["n"]}' item['from'] = 'king' + group = item['g'] + name = item['d'] + if group: + name = f'{group}.{name}' Mpoint.objects.get_or_create(code=code, defaults={ - "name": item["d"], + "name": name, "code": code, "enabled": False, "is_auto": True, @@ -148,12 +164,13 @@ def king_sync(projectName: str): "third_info": item }) +@auto_log('缓存测点') def cache_mpoints(mpointId: str = ''): """ 缓存所有可用的测点 """ if mpointId: - mpoints_data = Mpoint.objects.filter(id=mpointId).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") + mpoints_data = Mpoint.objects.filter(id=mpointId, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") else: mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") for item in list(mpoints_data): @@ -161,8 +178,9 @@ def cache_mpoints(mpointId: str = ''): cache_mpoint = cache.get(key, {}) cache_mpoint.update(item) cache.set(key, cache_mpoint, timeout=None) + # print(cache.get('mpoint_K_IP2_MM_SC_IW14')) - +test_data = {"PNs":{"1":"V","2":"T","3":"Q"},"PVs":{"1":-725,"2":"2024-04-08 13:43:53.140","3":192},"Objs":[{"N":"IP2_MM_IW3"},{"N":"IP2_MM_IW4","1":-6386},{"N":"IP2_MM_IW5","1":-7835},{"N":"IP2_MM_IW7","1":-6864},{"N":"IP2_MM_IW15","1":29},{"N":"IP2_MM_SC_IW3","1":337},{"N":"IP2_MM_SC_IW13","1":-5511,"2":24},{"N":"IP2_MM_SC_IW14","1":-4640,"2":24},{"N":"IP2_MM_SC_IW15","1":-5586,"2":24},{"N":"IP2_MM_SC_IW16","1":-6634,"2":24},{"N":"IP2_MM_SC_IW17","1":-2768,"2":24},{"N":"IP2_MM_SC_IW18","1":-2946,"2":24},{"N":"IP2_MM_SC_IW19","1":-2550,"2":24},{"N":"IP2_MM_SC_IW20","1":-1512,"2":24},{"N":"IP2_MM_SC_IW21","1":-1775,"2":24},{"N":"IP2_MM_SC_IW22","1":-194,"2":24},{"N":"IP2_MM_SC_IW23","1":-366,"2":24},{"N":"IP2_MM_SC_IW24","1":-9291,"2":24},{"N":"IP2_MM_SC_IW25","1":-6888,"2":24},{"N":"IP2_MM_SC_IW26","1":-2360,"2":24},{"N":"IP2_MM_SC_IW29","1":164,"2":24},{"N":"IP2_MM_SC_IW30","1":914,"2":24},{"N":"IP2_MM_SC_IW31","1":849,"2":24},{"N":"IP2_MM_SC_IW37","1":-125,"2":24},{"N":"IP2_MM_SC_IW38","1":-3009,"2":24},{"N":"IP2_MM_SC_IW40","1":-1394,"2":24},{"N":"IP2_MM_SC_IW43","1":758,"2":24},{"N":"IP3_SC_IW1","1":11557,"2":107},{"N":"IP3_SC_IW2","1":7624,"2":107},{"N":"IP3_SC_IW6","1":11159,"2":107},{"N":"IP3_SC_IW7","1":8073,"2":107},{"N":"IP3_SC_IW9","1":4490,"2":107},{"N":"IP3_SC_IW10","1":5437,"2":107},{"N":"IP3_SC_IW11","1":9244,"2":107},{"N":"IP3_SC_IW12","1":7886,"2":107},{"N":"IP3_SC_IW13","1":-2962,"2":107},{"N":"IP3_SC_IW14","1":-159,"2":107},{"N":"IP3_SC_IW26","1":15,"2":107},{"N":"IP3_SC_IW27","1":15,"2":107}]} def insert_mplogx_from_king_realdata(data: dict, is_offset=True): """ 从king mqtt数据插入超表 @@ -175,38 +193,36 @@ def insert_mplogx_from_king_realdata(data: dict, is_offset=True): num_chunks = (len(objs) + chunk_size - 1) // chunk_size with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: - threads = [] + futures = [] for i in range(num_chunks): start = i * chunk_size end = min(start + chunk_size, len_objs) chunk = objs[start:end] - threads.append(executor.submit(insert_mplogx_from_king_realdata_chunk, chunk, pvs, is_offset)) + futures.append(executor.submit(insert_mplogx_from_king_realdata_chunk, chunk, pvs, is_offset)) + concurrent.futures.wait(futures) + # for future in futures: + # print(future.result(), end=', ') - concurrent.futures.wait(threads) - -@auto_log +@auto_log('亚控存库') def insert_mplogx_from_king_realdata_chunk(objs: list, pvs: dict, is_offset=True): """ - 分批存库 + 分批存库, 亚控 38.00,00000.11011 版本偏移只是时间戳偏移。另外其实可以不在乎 """ - oval = pvs['1'] + # oval = pvs['1'] otime_str = pvs['2'] otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f')) + insert_db_data = [] insert_data = [] for obj in objs: n = obj['N'] - val = obj.get('1', None) + val = obj["1"] timex = obj.get("2", None) - cache_key = f'mpoint_king_{n}' + cache_key = f'mpoint_K_{n}' mpoint_data = cache.get(cache_key, None) if mpoint_data is None: - return + continue val_type = mpoint_data['val_type'] if is_offset: - if val is None: - val = oval - else: - val = oval + val if timex is None: timex = otime_obj else: @@ -218,17 +234,48 @@ def insert_mplogx_from_king_realdata_chunk(objs: list, pvs: dict, is_offset=True # 控制采集间隔 can_save = False if mpoint_last_timex: - if (timex - mpoint_last_timex).total_seconds() > mpoint_interval: + if (otime_obj - mpoint_last_timex).total_seconds() > mpoint_interval: can_save = True + else: + can_save = True if can_save: save_dict = { "timex": timex, "mpoint": Mpoint.objects.get(id=mpoint_data['id']), } - # 更新对应缓存 save_dict[f'val_{val_type}'] = val - mpoint_data.update({'last_val': val, 'last_timex': timex}) - cache.set(cache_key, mpoint_data) - insert_data.append(MpLogx(**save_dict)) - MpLogx.objects.bulk_create(insert_data) + insert_data.append((cache_key, mpoint_data, val, timex)) + insert_db_data.append(MpLogx(**save_dict)) + # 先尝试批量存库/发生异常则单个存储 + is_bulk_insert = True + try: + MpLogx.objects.bulk_create(insert_db_data) + except IntegrityError: + is_bulk_insert = False + for item1, item2 in zip(insert_data, insert_db_data): + try: + MpLogx.objects.create(**item2) + update_mpoint_cache_and_do_func(item1) + except Exception: + myLogger.error(f'mpoint_cache_key: {item1[0]} 存库失败', exc_info=True) + continue + + if is_bulk_insert: + # 批量存库成功后更新缓存 + for item in insert_data: + update_mpoint_cache_and_do_func(item) + + +def update_mpoint_cache_and_do_func(item: dict): + cache_key = item[0] + mpoint_data = item[1] + mpoint_id = mpoint_data['id'] + last_timex: datetime = item[3] + last_val = item[2] + mpoint_data['last_timex'] = last_timex + mpoint_data['last_val'] = last_val + cache.set(cache_key, mpoint_data, timeout=None) + mpoint_func: str = mpoint_data.get('func_on_change', '') + if mpoint_func: + ctask_run.delay(mpoint_func, mpoint_id, last_val, last_timex) \ No newline at end of file diff --git a/apps/enm/tasks.py b/apps/enm/tasks.py index 576f3ee2..0ba6ad15 100644 --- a/apps/enm/tasks.py +++ b/apps/enm/tasks.py @@ -6,7 +6,7 @@ from apps.utils.sql import DbConnection from server.settings import get_sysconfig, update_sysconfig import importlib from django.core.cache import cache -from apps.enm.models import MpLog, Mpoint, MpointStat, EnStat, EnStat2 +from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2 from apps.wpm.models import SfLog, StLog import datetime from django.db.models import Sum, Avg @@ -23,6 +23,7 @@ from django.db.models import F from apps.wpm.services import get_pcoal_heat import traceback from django.utils import timezone +from django.db.models import Max myLogger = logging.getLogger('log') @@ -32,48 +33,51 @@ def get_current_and_previous_time(): return now, pre -@shared_task(base=CustomTask) -def get_tag_val(): - config = get_sysconfig() - with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: - last_tag_id = config['enm'].get('last_tag_id', None) - if not last_tag_id: - mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first() - if mr is None: - last_tag_id = 0 - else: - last_tag_id = mr.tag_id - update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) - cursor.execute( - "select id, val, tag_code, update_time from tag_value where id > %s order by update_time, id", (last_tag_id)) - results = cursor.fetchall() # 获取数据后保存至本地 - need_func = {} # 需要执行测点函数的字典, 此种情况只执行一次 - for row in results: - mr_one = MpLog() - mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row - mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={ - 'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'}) - mr_one.mpoint = mpoint - mr_one.save() - last_tag_id = mr_one.tag_id - if mpoint.func_on_change: - need_func[mpoint.id] = mr_one.id - # 执行测点函数 - for key in need_func: - mpoint_val_on_change.delay(need_func[key]) - update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) +# @shared_task(base=CustomTask) +# def get_tag_val(): +# """ +# 应该用不着 +# """ +# config = get_sysconfig() +# with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: +# last_tag_id = config['enm'].get('last_tag_id', None) +# if not last_tag_id: +# mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first() +# if mr is None: +# last_tag_id = 0 +# else: +# last_tag_id = mr.tag_id +# update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) +# cursor.execute( +# "select id, val, tag_code, update_time from tag_value where id > %s order by update_time, id", (last_tag_id)) +# results = cursor.fetchall() # 获取数据后保存至本地 +# need_func = {} # 需要执行测点函数的字典, 此种情况只执行一次 +# for row in results: +# mr_one = MpLog() +# mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row +# mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={ +# 'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'}) +# mr_one.mpoint = mpoint +# mr_one.save() +# last_tag_id = mr_one.tag_id +# if mpoint.func_on_change: +# need_func[mpoint.id] = mr_one.id +# # 执行测点函数 +# for key in need_func: +# mpoint_val_on_change.delay(need_func[key]) +# update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) -@shared_task(base=CustomTask) -def mpoint_val_on_change(mpLogId: str): - """测点值变化的时候执行任务 - """ - mplog = MpLog.objects.get(id=mpLogId) - mpoint = mplog.mpoint - module, func = mpoint.func_on_change.rsplit(".", 1) - m = importlib.import_module(module) - f = getattr(m, func) - f(mplog) # 同步执行 +# @shared_task(base=CustomTask) +# def mpoint_val_on_change(mpLogId: str): +# """测点值变化的时候执行任务(废弃) +# """ +# mplog = MpLog.objects.get(id=mpLogId) +# mpoint = mplog.mpoint +# module, func = mpoint.func_on_change.rsplit(".", 1) +# m = importlib.import_module(module) +# f = getattr(m, func) +# f(mplog) # 同步执行 @shared_task(base=CustomTask) @@ -99,27 +103,35 @@ def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: in mytz = tz.gettz(settings.TIME_ZONE) dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz) - if mpoint.material: # 如果计量的是物料 + if mpoint.material: # 如果计量的是物料 # 累计量 有的会清零,需要额外处理(还未做) params = {'mpoint': mpoint, 'type': 'hour'} params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour val = 0 + val_type = 'float' if mpoint.formula: formula = mpoint.formula try: val = translate_eval_formula(formula, year, month, day, hour) - except: + except Exception: myLogger.error( '公式执行错误:{}-{}'.format(mpoint.id, formula), exc_info=True) return else: - mrs = MpLog.objects.filter( + mrs = MpLogx.objects.filter( mpoint=mpoint, - tag_update__year=params['year'], - tag_update__month=params['month'], - tag_update__day=params['day'], - tag_update__hour=params['hour']).order_by('tag_update') + timex__year=params['year'], + timex__month=params['month'], + timex__day=params['day'], + timex__hour=params['hour']).order_by('timex') if mrs.exists(): - val = mrs.last().tag_val - mrs.first().tag_val + last_val = mrs.last().val_float + first_val = mrs.first().val_float + if last_val >= first_val: + val = last_val - first_val + else: + # 这里判断有可能清零了 + max_val = mrs.aggregate(max=Max('val_float'))['max'] + val = max_val - first_val + last_val ms, _ = MpointStat.objects.get_or_create(**params, defaults=params) ms.val = val ms.save() @@ -309,8 +321,7 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, mytz = tz.gettz(settings.TIME_ZONE) dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz) - sflog = SfLog.objects.get( - start_time__lt=dt, end_time__gte=dt, mgroup=mgroup) + sflog = get_sflog(mgroup, dt) if sflog: year_s, month_s, day_s = sflog.get_ymd team = sflog.team