add : db_ins_mplogx 增加测点插入数据库

This commit is contained in:
zty 2024-08-06 10:01:21 +08:00
parent 341a681d00
commit f09a8fe4cd
3 changed files with 79 additions and 24 deletions

View File

@ -0,0 +1,18 @@
# Generated by Django 3.2.12 on 2024-08-06 01:56
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('enm', '0039_auto_20240731_1652'),
]
operations = [
migrations.AddField(
model_name='mpoint',
name='is_unit',
field=models.BooleanField(default=False, verbose_name='是否单位量'),
),
]

View File

@ -15,6 +15,7 @@ class Mpoint(CommonBModel):
MG_OFFLINE = -2
type = models.PositiveSmallIntegerField("类型", default=MT_AUTO, help_text="10:自动采集, 20:计算测点, 30:手动录入")
is_unit = models.BooleanField("是否单位量", default=False)
name = models.CharField("测点名称", max_length=50)
nickname = models.CharField("测点别名", max_length=50, null=True, blank=True)
code = models.CharField("测点编号", max_length=50, unique=True)
@ -32,7 +33,6 @@ class Mpoint(CommonBModel):
third_info = models.JSONField("第三方信息", default=dict, blank=True)
# {"from": "king", "n": "某名称","d": "某描述或备注","g": "某组", "t": "某类型", "id": 5001, "o": "其他信息"}
enp_field = models.CharField("关联enp采集字段", max_length=50, null=True, blank=True)
is_rep_ep_running_state = models.BooleanField("是否表示所监测设备运行状态", default=False)
ep_monitored = models.ForeignKey("em.equipment", verbose_name="所监测设备", related_name="mp_ep_monitored", on_delete=models.SET_NULL, null=True, blank=True)
ep_rs_val = models.FloatField("状态量基准值", null=True, blank=True)

View File

@ -4,7 +4,7 @@ from apps.utils.tasks import CustomTask
from celery import shared_task
from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2
from apps.wpm.models import SfLog
from datetime import datetime, timedelta
import datetime
from django.db.models import Sum, Avg
from dateutil import tz
from django.conf import settings
@ -28,19 +28,22 @@ from apps.utils.sql import DbConnection
from apps.enm.services import insert_mplogx_item
from django.utils.timezone import make_aware
from apps.utils.thread import MyThread
from django.core.cache import cache
myLogger = logging.getLogger("log")
def get_current_and_previous_time():
now = datetime.now()
pre = now - timedelta(hours=1)
now = datetime.datetime.now()
pre = now - datetime.timedelta(hours=1)
return now, pre
def db_insert_mplogx_batch(rows):
for row in rows:
_, tag_val, tag_code, tag_update = row
if cache.get("tag_code", None) is None:
continue
insert_mplogx_item(tag_code, tag_val, make_aware(tag_update), {})
@shared_task(base=CustomTask)
@ -66,6 +69,37 @@ def db_insert_mplogx():
update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
@shared_task(base=CustomTask)
def db_ins_mplogx():
"""
从数据库转存到超表
"""
config = get_sysconfig()
with DbConnection(config['enm1']['db_host'], config['enm1']['db_user'], config['enm1']['db_password'], config['enm']['db_database1']) as cursor:
bill_date = config['enm1'].get('bill_date', None)
if bill_date is None:
raise Exception("bill_date is None")
# cursor.execute("select count(id) from sa_weigh_view where bill_date > %s", (bill_date))
# count = cursor.fetchone()[0]
# if count > 400:
# raise Exception("db inset count > 400")
# materials_name = ['水泥+P.C42.5 袋装', '水泥+P.O42.5R 袋装', '水泥+P.O42.5 散装','水泥+P.O42.5 袋装', '水泥+P.O52.5 散装', '水泥+P.C42.5 散装', '水泥+P.O42.5R 散装']
query = """
SELECT id, CONCAT('x', inv_name) AS inv_name, de_real_quantity, bill_date
FROM sa_weigh_view
WHERE bill_date > %s
ORDER BY id, bill_date
"""
cursor.execute(query, (bill_date,))
rows = cursor.fetchall() # 获取数据后保存至本地
if rows:
bill_date = rows[-1][0]
print(rows)
db_insert_mplogx_batch(rows)
update_sysconfig({'enm1': {'bill_date': bill_date}})
# @shared_task(base=CustomTask)
# def mpoint_val_on_change(mpLogId: str):
# """测点值变化的时候执行任务(废弃)
@ -84,15 +118,15 @@ def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal
重跑某一段时间的任务
"""
mytz = tz.gettz(settings.TIME_ZONE)
start_time = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
start_time.replace(tzinfo=mytz)
end_time = datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
start_time.replace(tzinfo=mytz)
current_time = start_time
while current_time <= end_time:
year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs=[])
current_time += timedelta(hours=1)
current_time += datetime.timedelta(hours=1)
@shared_task(base=CustomTask)
@ -102,26 +136,29 @@ def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: in
"""
mpoint = Mpoint.objects.get(id=mpointId)
mytz = tz.gettz(settings.TIME_ZONE)
dt = datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz) # 整点时间
dt_hour_p= dt - timedelta(hours=1) # 上个整点
dt_hour_n= dt + timedelta(hours=1) # 下个整点
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz) # 整点时间
dt_hour_p= dt - datetime.timedelta(hours=1) # 上个整点
dt_hour_n= dt + datetime.timedelta(hours=1) # 下个整点
if mpoint.material and mpoint.val_type in ['float', 'int']: # 如果计量的是物料 # 累计量 有的会清零,需要额外处理(还未做)
params = {"mpoint": mpoint, "type": "hour"}
params["year"], params["month"], params["day"], params["hour"] = year, month, day, hour
val = 0
val_type = mpoint.val_type
if mpoint.type == Mpoint.MT_AUTO:
mrs0 = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex")
mrs = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex")
if mrs0.exists() and mrs.exists():
last_val = getattr(mrs.last(), f'val_{val_type}')
first_val = getattr(mrs0.last(), f'val_{val_type}')
if last_val >= first_val:
val = last_val - first_val
else:
# 这里判断有可能清零了
max_val = max(mrs.aggregate(max=Max(f'val_{val_type}'))["max"], first_val)
val = max_val - first_val + last_val
if mpoint.is_unit:
val = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lt=dt_hour_n).aggregate(Sum("val"))["val__sum"] or 0
else:
mrs0 = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex")
mrs = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex")
if mrs0.exists() and mrs.exists():
last_val = getattr(mrs.last(), f'val_{val_type}')
first_val = getattr(mrs0.last(), f'val_{val_type}')
if last_val >= first_val:
val = last_val - first_val
else:
# 这里判断有可能清零了
max_val = max(mrs.aggregate(max=Max(f'val_{val_type}'))["max"], first_val)
val = max_val - first_val + last_val
elif mpoint.type == Mpoint.MT_COMPUTE and mpoint.formula:
formula = mpoint.formula
val = translate_eval_formula(formula, year, month, day, hour)
@ -363,7 +400,7 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s,
sflog = SfLog.objects.get(id=sflogId)
elif year and month and day and hour is not None:
mytz = tz.gettz(settings.TIME_ZONE)
dt = datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
sflog = get_sflog(mgroup, dt)
team = None
if sflog:
@ -597,8 +634,8 @@ def get_total_sec_now_and_shut_sec(enstat: EnStat):
# if enstat.type == 'hour_s':
# # 找到停机记录,并划分到该小时
# end_time = datetime(enstat.year, enstat.month, enstat.day, enstat.hour)
# start_time = end_time - timedelta(hours=1)
# end_time = datetime.datetime(enstat.year, enstat.month, enstat.day, enstat.hour)
# start_time = end_time - datetime.timedelta(hours=1)
# sts = StLog.objects.filter(mgroup=enstat.mgroup)
# sts = (sts.filter(start_time__gt=start_time, start_time__lt=end_time)|sts.filter(start_time__lt=start_time, end_time=None)|sts.filter(end_time__gt=start_time, end_time__lt=end_time)).distinct()
# shut_hour = 0