feat: 从数据库转存到超表

This commit is contained in:
caoqianming 2024-07-16 14:06:46 +08:00
parent ab413eed81
commit bf98ea8a48
2 changed files with 26 additions and 36 deletions

View File

@ -314,7 +314,7 @@ def insert_mplogx_from_king_mqtt(data: dict, is_offset=True):
# print(future.result(), end=', ')
def insert_mplogx_item(code, val, timex, enp_mpoints_dict):
def insert_mplogx_item(code: str, val, timex: datetime, enp_mpoints_dict):
"""
存入超表
"""
@ -403,7 +403,6 @@ def insert_mplogx_from_king_rest_chunk(objs: list):
except IntegrityError as e: # 忽略唯一性错误
myLogger.error(e, exc_info=True)
def get_analyse_data_mgroups_duration(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""
获取一段时间范围的工段分析数据

View File

@ -14,7 +14,7 @@ from apps.fim.services import get_cost_unit, get_price_unit
from apps.fim.models import Fee
from apps.enm.services import translate_eval_formula
import logging
from server.settings import get_sysconfig
from server.settings import get_sysconfig, update_sysconfig
from django.db.models import F
from apps.wpm.services import get_pcoal_heat
from django.utils import timezone
@ -24,6 +24,10 @@ from apps.third.king.king_api import kapis
from apps.enm.services import insert_mplogx_from_king_rest_chunk, MpointCache
from django.utils.timezone import localtime
from apps.wpm.tasks import get_total_sec_now, cal_exp_duration_sec
from apps.utils.sql import DbConnection
from apps.enm.services import insert_mplogx_item
from django.utils.timezone import make_aware
from apps.utils.thread import MyThread
myLogger = logging.getLogger("log")
@ -34,39 +38,26 @@ def get_current_and_previous_time():
return now, pre
# @shared_task(base=CustomTask)
# def get_tag_val():
# """
# 应该用不着
# """
# config = get_sysconfig()
# with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
# last_tag_id = config['enm'].get('last_tag_id', None)
# if not last_tag_id:
# mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
# if mr is None:
# last_tag_id = 0
# else:
# last_tag_id = mr.tag_id
# update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
# cursor.execute(
# "select id, val, tag_code, update_time from tag_value where id > %s order by update_time, id", (last_tag_id))
# results = cursor.fetchall() # 获取数据后保存至本地
# need_func = {} # 需要执行测点函数的字典, 此种情况只执行一次
# for row in results:
# mr_one = MpLog()
# mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
# mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={
# 'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
# mr_one.mpoint = mpoint
# mr_one.save()
# last_tag_id = mr_one.tag_id
# if mpoint.func_on_change:
# need_func[mpoint.id] = mr_one.id
# # 执行测点函数
# for key in need_func:
# mpoint_val_on_change.delay(need_func[key])
# update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
def db_insert_mplogx_batch(rows):
for row in rows:
_, tag_val, tag_code, tag_update = row
insert_mplogx_item(tag_code, tag_val, make_aware(tag_update), {})
@shared_task(base=CustomTask)
def db_insert_mplogx():
"""
从数据库转存到超表
"""
config = get_sysconfig()
with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
last_tag_id = config['enm'].get('last_tag_id', None)
if last_tag_id is None:
raise Exception("last_tag_id is None")
cursor.execute(
"select id, val, tag_code, update_time from tag_value where id > %s order by id, update_time", (last_tag_id))
rows = cursor.fetchall() # 获取数据后保存至本地
MyThread(target=db_insert_mplogx_batch, args=(rows,)).start()
update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
# @shared_task(base=CustomTask)