diff --git a/apps/enm/services.py b/apps/enm/services.py index 9656bb05..d19d58ad 100644 --- a/apps/enm/services.py +++ b/apps/enm/services.py @@ -314,7 +314,7 @@ def insert_mplogx_from_king_mqtt(data: dict, is_offset=True): # print(future.result(), end=', ') -def insert_mplogx_item(code, val, timex, enp_mpoints_dict): +def insert_mplogx_item(code: str, val, timex: datetime, enp_mpoints_dict): """ 存入超表 """ @@ -403,7 +403,6 @@ def insert_mplogx_from_king_rest_chunk(objs: list): except IntegrityError as e: # 忽略唯一性错误 myLogger.error(e, exc_info=True) - def get_analyse_data_mgroups_duration(start_date: datetime, end_date: datetime) -> Dict[str, Any]: """ 获取一段时间范围的工段分析数据 diff --git a/apps/enm/tasks.py b/apps/enm/tasks.py index 9f051751..481b8e0c 100644 --- a/apps/enm/tasks.py +++ b/apps/enm/tasks.py @@ -14,7 +14,7 @@ from apps.fim.services import get_cost_unit, get_price_unit from apps.fim.models import Fee from apps.enm.services import translate_eval_formula import logging -from server.settings import get_sysconfig +from server.settings import get_sysconfig, update_sysconfig from django.db.models import F from apps.wpm.services import get_pcoal_heat from django.utils import timezone @@ -24,6 +24,10 @@ from apps.third.king.king_api import kapis from apps.enm.services import insert_mplogx_from_king_rest_chunk, MpointCache from django.utils.timezone import localtime from apps.wpm.tasks import get_total_sec_now, cal_exp_duration_sec +from apps.utils.sql import DbConnection +from apps.enm.services import insert_mplogx_item +from django.utils.timezone import make_aware +from apps.utils.thread import MyThread myLogger = logging.getLogger("log") @@ -34,39 +38,26 @@ def get_current_and_previous_time(): return now, pre -# @shared_task(base=CustomTask) -# def get_tag_val(): -# """ -# 应该用不着 -# """ -# config = get_sysconfig() -# with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: -# last_tag_id = config['enm'].get('last_tag_id', None) -# if not last_tag_id: -# mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first() -# if mr is None: -# last_tag_id = 0 -# else: -# last_tag_id = mr.tag_id -# update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) -# cursor.execute( -# "select id, val, tag_code, update_time from tag_value where id > %s order by update_time, id", (last_tag_id)) -# results = cursor.fetchall() # 获取数据后保存至本地 -# need_func = {} # 需要执行测点函数的字典, 此种情况只执行一次 -# for row in results: -# mr_one = MpLog() -# mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row -# mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={ -# 'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'}) -# mr_one.mpoint = mpoint -# mr_one.save() -# last_tag_id = mr_one.tag_id -# if mpoint.func_on_change: -# need_func[mpoint.id] = mr_one.id -# # 执行测点函数 -# for key in need_func: -# mpoint_val_on_change.delay(need_func[key]) -# update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) +def db_insert_mplogx_batch(rows): + for row in rows: + _, tag_val, tag_code, tag_update = row + insert_mplogx_item(tag_code, tag_val, make_aware(tag_update), {}) + +@shared_task(base=CustomTask) +def db_insert_mplogx(): + """ + 从数据库转存到超表 + """ + config = get_sysconfig() + with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: + last_tag_id = config['enm'].get('last_tag_id', None) + if last_tag_id is None: + raise Exception("last_tag_id is None") + cursor.execute( + "select id, val, tag_code, update_time from tag_value where id > %s order by id, update_time", (last_tag_id)) + rows = cursor.fetchall() # 获取数据后保存至本地 + MyThread(target=db_insert_mplogx_batch, args=(rows,)).start() + update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) # @shared_task(base=CustomTask)