diff --git a/apps/enm/migrations/0040_mpoint_is_unit.py b/apps/enm/migrations/0040_mpoint_is_unit.py new file mode 100644 index 00000000..4456d077 --- /dev/null +++ b/apps/enm/migrations/0040_mpoint_is_unit.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.12 on 2024-08-06 01:56 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('enm', '0039_auto_20240731_1652'), + ] + + operations = [ + migrations.AddField( + model_name='mpoint', + name='is_unit', + field=models.BooleanField(default=False, verbose_name='是否单位量'), + ), + ] diff --git a/apps/enm/models.py b/apps/enm/models.py index 883b8e9c..7998e633 100644 --- a/apps/enm/models.py +++ b/apps/enm/models.py @@ -15,6 +15,7 @@ class Mpoint(CommonBModel): MG_OFFLINE = -2 type = models.PositiveSmallIntegerField("类型", default=MT_AUTO, help_text="10:自动采集, 20:计算测点, 30:手动录入") + is_unit = models.BooleanField("是否单位量", default=False) name = models.CharField("测点名称", max_length=50) nickname = models.CharField("测点别名", max_length=50, null=True, blank=True) code = models.CharField("测点编号", max_length=50, unique=True) @@ -32,7 +33,6 @@ class Mpoint(CommonBModel): third_info = models.JSONField("第三方信息", default=dict, blank=True) # {"from": "king", "n": "某名称","d": "某描述或备注","g": "某组", "t": "某类型", "id": 5001, "o": "其他信息"} enp_field = models.CharField("关联enp采集字段", max_length=50, null=True, blank=True) - is_rep_ep_running_state = models.BooleanField("是否表示所监测设备运行状态", default=False) ep_monitored = models.ForeignKey("em.equipment", verbose_name="所监测设备", related_name="mp_ep_monitored", on_delete=models.SET_NULL, null=True, blank=True) ep_rs_val = models.FloatField("状态量基准值", null=True, blank=True) diff --git a/apps/enm/tasks.py b/apps/enm/tasks.py index 18bd4eb1..2a3cfaba 100644 --- a/apps/enm/tasks.py +++ b/apps/enm/tasks.py @@ -4,7 +4,7 @@ from apps.utils.tasks import CustomTask from celery import shared_task from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2 from apps.wpm.models import SfLog -from datetime import datetime, timedelta +import datetime from django.db.models import Sum, Avg from dateutil import tz from django.conf import settings @@ -28,19 +28,22 @@ from apps.utils.sql import DbConnection from apps.enm.services import insert_mplogx_item from django.utils.timezone import make_aware from apps.utils.thread import MyThread +from django.core.cache import cache myLogger = logging.getLogger("log") def get_current_and_previous_time(): - now = datetime.now() - pre = now - timedelta(hours=1) + now = datetime.datetime.now() + pre = now - datetime.timedelta(hours=1) return now, pre def db_insert_mplogx_batch(rows): for row in rows: _, tag_val, tag_code, tag_update = row + if cache.get("tag_code", None) is None: + continue insert_mplogx_item(tag_code, tag_val, make_aware(tag_update), {}) @shared_task(base=CustomTask) @@ -66,6 +69,37 @@ def db_insert_mplogx(): update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) +@shared_task(base=CustomTask) +def db_ins_mplogx(): + """ + 从数据库转存到超表 + """ + config = get_sysconfig() + with DbConnection(config['enm1']['db_host'], config['enm1']['db_user'], config['enm1']['db_password'], config['enm']['db_database1']) as cursor: + bill_date = config['enm1'].get('bill_date', None) + if bill_date is None: + raise Exception("bill_date is None") + # cursor.execute("select count(id) from sa_weigh_view where bill_date > %s", (bill_date)) + # count = cursor.fetchone()[0] + # if count > 400: + # raise Exception("db inset count > 400") + # materials_name = ['水泥+P.C42.5 袋装', '水泥+P.O42.5R 袋装', '水泥+P.O42.5 散装','水泥+P.O42.5 袋装', '水泥+P.O52.5 散装', '水泥+P.C42.5 散装', '水泥+P.O42.5R 散装'] + query = """ + SELECT id, CONCAT('x', inv_name) AS inv_name, de_real_quantity, bill_date + FROM sa_weigh_view + WHERE bill_date > %s + ORDER BY id, bill_date + """ + cursor.execute(query, (bill_date,)) + rows = cursor.fetchall() # 获取数据后保存至本地 + if rows: + bill_date = rows[-1][0] + print(rows) + db_insert_mplogx_batch(rows) + update_sysconfig({'enm1': {'bill_date': bill_date}}) + + + # @shared_task(base=CustomTask) # def mpoint_val_on_change(mpLogId: str): # """测点值变化的时候执行任务(废弃) @@ -84,15 +118,15 @@ def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal 重跑某一段时间的任务 """ mytz = tz.gettz(settings.TIME_ZONE) - start_time = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") + start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") start_time.replace(tzinfo=mytz) - end_time = datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") + end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") start_time.replace(tzinfo=mytz) current_time = start_time while current_time <= end_time: year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs=[]) - current_time += timedelta(hours=1) + current_time += datetime.timedelta(hours=1) @shared_task(base=CustomTask) @@ -102,26 +136,29 @@ def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: in """ mpoint = Mpoint.objects.get(id=mpointId) mytz = tz.gettz(settings.TIME_ZONE) - dt = datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz) # 整点时间 - dt_hour_p= dt - timedelta(hours=1) # 上个整点 - dt_hour_n= dt + timedelta(hours=1) # 下个整点 + dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz) # 整点时间 + dt_hour_p= dt - datetime.timedelta(hours=1) # 上个整点 + dt_hour_n= dt + datetime.timedelta(hours=1) # 下个整点 if mpoint.material and mpoint.val_type in ['float', 'int']: # 如果计量的是物料 # 累计量 有的会清零,需要额外处理(还未做) params = {"mpoint": mpoint, "type": "hour"} params["year"], params["month"], params["day"], params["hour"] = year, month, day, hour val = 0 val_type = mpoint.val_type if mpoint.type == Mpoint.MT_AUTO: - mrs0 = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex") - mrs = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex") - if mrs0.exists() and mrs.exists(): - last_val = getattr(mrs.last(), f'val_{val_type}') - first_val = getattr(mrs0.last(), f'val_{val_type}') - if last_val >= first_val: - val = last_val - first_val - else: - # 这里判断有可能清零了 - max_val = max(mrs.aggregate(max=Max(f'val_{val_type}'))["max"], first_val) - val = max_val - first_val + last_val + if mpoint.is_unit: + val = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lt=dt_hour_n).aggregate(Sum("val"))["val__sum"] or 0 + else: + mrs0 = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex") + mrs = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex") + if mrs0.exists() and mrs.exists(): + last_val = getattr(mrs.last(), f'val_{val_type}') + first_val = getattr(mrs0.last(), f'val_{val_type}') + if last_val >= first_val: + val = last_val - first_val + else: + # 这里判断有可能清零了 + max_val = max(mrs.aggregate(max=Max(f'val_{val_type}'))["max"], first_val) + val = max_val - first_val + last_val elif mpoint.type == Mpoint.MT_COMPUTE and mpoint.formula: formula = mpoint.formula val = translate_eval_formula(formula, year, month, day, hour) @@ -363,7 +400,7 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, sflog = SfLog.objects.get(id=sflogId) elif year and month and day and hour is not None: mytz = tz.gettz(settings.TIME_ZONE) - dt = datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz) + dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz) sflog = get_sflog(mgroup, dt) team = None if sflog: @@ -597,8 +634,8 @@ def get_total_sec_now_and_shut_sec(enstat: EnStat): # if enstat.type == 'hour_s': # # 找到停机记录,并划分到该小时 - # end_time = datetime(enstat.year, enstat.month, enstat.day, enstat.hour) - # start_time = end_time - timedelta(hours=1) + # end_time = datetime.datetime(enstat.year, enstat.month, enstat.day, enstat.hour) + # start_time = end_time - datetime.timedelta(hours=1) # sts = StLog.objects.filter(mgroup=enstat.mgroup) # sts = (sts.filter(start_time__gt=start_time, start_time__lt=end_time)|sts.filter(start_time__lt=start_time, end_time=None)|sts.filter(end_time__gt=start_time, end_time__lt=end_time)).distinct() # shut_hour = 0