From dfaed5e5e504a1d49a774537868f31f239d99145 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 16 Apr 2024 15:03:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=87=E9=9B=86=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E6=95=B4=E4=BD=93=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../enm/migrations/0024_auto_20240326_1617.py | 57 ++- .../migrations/0027_mpoint_gather_field.py | 18 + .../enm/migrations/0028_auto_20240416_1454.py | 44 +++ apps/enm/models.py | 240 +++++------ apps/enm/serializers.py | 114 +++--- apps/enm/services.py | 372 ++++++++++-------- apps/enm/views.py | 62 +-- 7 files changed, 528 insertions(+), 379 deletions(-) create mode 100644 apps/enm/migrations/0027_mpoint_gather_field.py create mode 100644 apps/enm/migrations/0028_auto_20240416_1454.py diff --git a/apps/enm/migrations/0024_auto_20240326_1617.py b/apps/enm/migrations/0024_auto_20240326_1617.py index 89936acd..65165012 100644 --- a/apps/enm/migrations/0024_auto_20240326_1617.py +++ b/apps/enm/migrations/0024_auto_20240326_1617.py @@ -4,15 +4,15 @@ from django.db import migrations, models class Migration(migrations.Migration): - dependencies = [ - ('enm', '0023_mpoint_interval'), + ("enm", "0023_mpoint_interval"), ] operations = [ - migrations.RunSQL( - sql=[( - """ + migrations.RunSQL( + sql=[ + ( + """ CREATE TABLE public.enm_mplogx ( "timex" timestamptz NOT NULL, "mpoint_id" text NOT NULL, @@ -24,43 +24,42 @@ CREATE TABLE public.enm_mplogx ( ); SELECT create_hypertable('enm_mplogx', 'timex'); """ - )], - reverse_sql=[ - "DROP TABLE IF EXISTS public.enm_mplogx;" - ] + ) + ], + reverse_sql=["DROP TABLE IF EXISTS public.enm_mplogx;"], ), migrations.CreateModel( - name='MpLogx', + name="MpLogx", fields=[ - ('timex', models.DateTimeField(primary_key=True, serialize=False, verbose_name='采集时间')), - ('val_float', models.FloatField(blank=True, null=True, verbose_name='数值')), - ('val_int', models.IntegerField(blank=True, null=True, verbose_name='数值')), - ('val_bool', models.BooleanField(blank=True, null=True, verbose_name='数值')), - ('val_str', models.CharField(blank=True, max_length=100, null=True, verbose_name='数值')), + ("timex", models.DateTimeField(primary_key=True, serialize=False, verbose_name="采集时间")), + ("val_float", models.FloatField(blank=True, null=True, verbose_name="数值")), + ("val_int", models.IntegerField(blank=True, null=True, verbose_name="数值")), + ("val_bool", models.BooleanField(blank=True, null=True, verbose_name="数值")), + ("val_str", models.CharField(blank=True, max_length=100, null=True, verbose_name="数值")), ], options={ - 'db_table': 'enm_mplog', - 'managed': False, + "db_table": "enm_mplogx", + "managed": False, }, ), migrations.AddField( - model_name='mpoint', - name='enabled', - field=models.BooleanField(default=False, verbose_name='是否启用'), + model_name="mpoint", + name="enabled", + field=models.BooleanField(default=False, verbose_name="是否启用"), ), migrations.AddField( - model_name='mpoint', - name='val_type', - field=models.CharField(default='float', help_text='float, int, str, bool', max_length=50, verbose_name='值类型'), + model_name="mpoint", + name="val_type", + field=models.CharField(default="float", help_text="float, int, str, bool", max_length=50, verbose_name="值类型"), ), migrations.AlterField( - model_name='mpoint', - name='cate', - field=models.CharField(blank=True, max_length=50, null=True, verbose_name='分类'), + model_name="mpoint", + name="cate", + field=models.CharField(blank=True, max_length=50, null=True, verbose_name="分类"), ), migrations.AlterField( - model_name='mpoint', - name='unit', - field=models.CharField(blank=True, max_length=50, null=True, verbose_name='单位'), + model_name="mpoint", + name="unit", + field=models.CharField(blank=True, max_length=50, null=True, verbose_name="单位"), ), ] diff --git a/apps/enm/migrations/0027_mpoint_gather_field.py b/apps/enm/migrations/0027_mpoint_gather_field.py new file mode 100644 index 00000000..e9bfb49b --- /dev/null +++ b/apps/enm/migrations/0027_mpoint_gather_field.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.12 on 2024-04-12 05:08 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('enm', '0026_alter_mpoint_func_on_change'), + ] + + operations = [ + migrations.AddField( + model_name='mpoint', + name='gather_field', + field=models.CharField(blank=True, max_length=50, null=True, verbose_name='关联采集字段'), + ), + ] diff --git a/apps/enm/migrations/0028_auto_20240416_1454.py b/apps/enm/migrations/0028_auto_20240416_1454.py new file mode 100644 index 00000000..582b0512 --- /dev/null +++ b/apps/enm/migrations/0028_auto_20240416_1454.py @@ -0,0 +1,44 @@ +# Generated by Django 3.2.12 on 2024-04-16 06:54 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('em', '0016_auto_20240416_1454'), + ('enm', '0027_mpoint_gather_field'), + ] + + operations = [ + migrations.RemoveField( + model_name='mpoint', + name='gather_field', + ), + migrations.AddField( + model_name='mpoint', + name='enp_field', + field=models.CharField(blank=True, max_length=50, null=True, verbose_name='关联enp采集字段'), + ), + migrations.AddField( + model_name='mpoint', + name='ep_base_val1', + field=models.FloatField(blank=True, null=True, verbose_name='状态量基准值1'), + ), + migrations.AddField( + model_name='mpoint', + name='is_rep_ep_running_state', + field=models.BooleanField(default=False, verbose_name='是否表示所监测设备运行状态'), + ), + migrations.AlterField( + model_name='mpoint', + name='ep_belong', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='mp_ep_belong', to='em.equipment', verbose_name='所属设备'), + ), + migrations.AlterField( + model_name='mpoint', + name='ep_monitored', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='mp_ep_monitored', to='em.equipment', verbose_name='所监测设备'), + ), + ] diff --git a/apps/enm/models.py b/apps/enm/models.py index 39cbdfe1..8ee3bd7d 100644 --- a/apps/enm/models.py +++ b/apps/enm/models.py @@ -5,143 +5,155 @@ from apps.mtm.models import Material, Mgroup, Team class Mpoint(CommonBModel): - """测点 - """ - name = models.CharField('测点名称', max_length=50) - code = models.CharField('测点编号', max_length=50, unique=True) - unit = models.CharField('单位', max_length=50, null=True, blank=True) - cate = models.CharField('分类', max_length=50, null=True, blank=True) - material = models.ForeignKey(Material, verbose_name='计量某种物料', on_delete=models.CASCADE, null=True, blank=True) - ep_monitored = models.ForeignKey('em.equipment', verbose_name='监测哪个设备', related_name='mp_ep_monitored', on_delete=models.SET_NULL, null=True, blank=True) - ep_belong = models.ForeignKey('em.equipment', verbose_name='属于哪个设备', related_name='mp_ep_belong', on_delete=models.SET_NULL, null=True, blank=True) - mgroup = models.ForeignKey('mtm.mgroup', verbose_name='所在集合', on_delete=models.SET_NULL, null=True, blank=True) - mgroups_allocate = models.JSONField('各工段分配', default=list, blank=True, help_text='[{"mgroup":"x", "ratio": 1}]') - is_auto = models.BooleanField('是否自动采集', default=True) - is_all = models.BooleanField('是否记录是整个工段', default=False) - formula = models.TextField('计算公式', default='') - func_on_change = models.CharField('数据变动时执行方法', max_length=100, default='', blank=True) - interval = models.PositiveSmallIntegerField('采集间隔(秒)', default=10) - val_type = models.CharField('值类型', default='float', max_length=50, help_text='float, int, str, bool') - enabled = models.BooleanField('是否启用', default=False) - third_info = models.JSONField('第三方信息', default=dict, blank=True) + """测点""" + + name = models.CharField("测点名称", max_length=50) + code = models.CharField("测点编号", max_length=50, unique=True) + unit = models.CharField("单位", max_length=50, null=True, blank=True) + cate = models.CharField("分类", max_length=50, null=True, blank=True) + material = models.ForeignKey(Material, verbose_name="计量某种物料", on_delete=models.CASCADE, null=True, blank=True) + ep_monitored = models.ForeignKey("em.equipment", verbose_name="所监测设备", related_name="mp_ep_monitored", on_delete=models.SET_NULL, null=True, blank=True) + ep_belong = models.ForeignKey("em.equipment", verbose_name="所属设备", related_name="mp_ep_belong", on_delete=models.SET_NULL, null=True, blank=True) + mgroup = models.ForeignKey("mtm.mgroup", verbose_name="所在集合", on_delete=models.SET_NULL, null=True, blank=True) + mgroups_allocate = models.JSONField("各工段分配", default=list, blank=True, help_text='[{"mgroup":"x", "ratio": 1}]') + is_auto = models.BooleanField("是否自动采集", default=True) + is_all = models.BooleanField("是否记录是整个工段", default=False) + formula = models.TextField("计算公式", default="") + func_on_change = models.CharField("数据变动时执行方法", max_length=100, default="", blank=True) + interval = models.PositiveSmallIntegerField("采集间隔(秒)", default=10) + val_type = models.CharField("值类型", default="float", max_length=50, help_text="float, int, str, bool") + enabled = models.BooleanField("是否启用", default=False) + third_info = models.JSONField("第三方信息", default=dict, blank=True) # {"from": "king", "n": "某名称","d": "某描述或备注","g": "某组", "t": "某类型", "id": 5001, "o": "其他信息"} + enp_field = models.CharField("关联enp采集字段", max_length=50, null=True, blank=True) + is_rep_ep_running_state = models.BooleanField("是否表示所监测设备运行状态", default=False) + ep_base_val1 = models.FloatField("状态量基准值1", null=True, blank=True) + + @classmethod + def cache_key(cls, code: str): + return f"mpoint_{code}" + class MpLogx(models.Model): """ 测点记录超表 """ - timex = models.DateTimeField('采集时间', primary_key=True) - mpoint = models.ForeignKey( - Mpoint, verbose_name='关联测点', on_delete=models.CASCADE) - val_float = models.FloatField('数值', null=True, blank=True) - val_int = models.IntegerField('数值', null=True, blank=True) - val_bool = models.BooleanField('数值', null=True, blank=True) - val_str = models.CharField('数值', max_length=100, null=True, blank=True) - + + timex = models.DateTimeField("采集时间", primary_key=True) + mpoint = models.ForeignKey(Mpoint, verbose_name="关联测点", on_delete=models.CASCADE) + val_mrs = models.PositiveSmallIntegerField("所监测设备运行状态值", default=10) + val_float = models.FloatField("数值", null=True, blank=True) + val_int = models.IntegerField("数值", null=True, blank=True) + val_bool = models.BooleanField("数值", null=True, blank=True) + val_str = models.CharField("数值", max_length=100, null=True, blank=True) + class Meta: - db_table = 'enm_mplogx' + db_table = "enm_mplogx" managed = False - unique_together = (('mpoint', 'timex'), ) + unique_together = (("mpoint", "timex"),) + class MpLog(BaseModel): - """旧表(已废弃) - """ - mpoint = models.ForeignKey(Mpoint, verbose_name='关联测点', on_delete=models.SET_NULL, null=True, blank=True) - tag_id = models.BigIntegerField('记录ID', db_index=True) - tag_code = models.CharField('测点编号', max_length=50) - tag_update = models.DateTimeField('更新时间') - tag_val = models.FloatField('当前值') - + """旧表(已废弃)""" + + mpoint = models.ForeignKey(Mpoint, verbose_name="关联测点", on_delete=models.SET_NULL, null=True, blank=True) + tag_id = models.BigIntegerField("记录ID", db_index=True) + tag_code = models.CharField("测点编号", max_length=50) + tag_update = models.DateTimeField("更新时间") + tag_val = models.FloatField("当前值") + class MpointStat(CommonADModel): - """测点统计表 - """ - type = models.CharField('统计维度', max_length=50, default='hour', help_text='year/month/day/year_s/month_s/month_st/day_s/sflog/hour_s/hour') - year = models.PositiveSmallIntegerField('年', null=True, blank=True) - month = models.PositiveSmallIntegerField('月', null=True, blank=True) - day = models.PositiveSmallIntegerField('日', null=True, blank=True) + """测点统计表""" - year_s = models.PositiveSmallIntegerField('班年', null=True, blank=True) - month_s = models.PositiveSmallIntegerField('班月', null=True, blank=True) - day_s = models.PositiveSmallIntegerField('班日', null=True, blank=True) + type = models.CharField("统计维度", max_length=50, default="hour", help_text="year/month/day/year_s/month_s/month_st/day_s/sflog/hour_s/hour") + year = models.PositiveSmallIntegerField("年", null=True, blank=True) + month = models.PositiveSmallIntegerField("月", null=True, blank=True) + day = models.PositiveSmallIntegerField("日", null=True, blank=True) - hour = models.PositiveSmallIntegerField('时', null=True, blank=True) - sflog = models.ForeignKey(SfLog, verbose_name='关联值班记录', on_delete=models.CASCADE, null=True, blank=True) - mgroup = models.ForeignKey(Mgroup, verbose_name='关联测点集', on_delete=models.CASCADE, null=True, blank=True) - mpoint = models.ForeignKey(Mpoint, verbose_name='关联测点', on_delete=models.CASCADE) - val = models.FloatField('统计值', default=0) - total_production = models.FloatField('总产量', default=0, help_text='t') - elec_consume_unit = models.FloatField('单位产品电耗', default=0, help_text='kw·h/t') + year_s = models.PositiveSmallIntegerField("班年", null=True, blank=True) + month_s = models.PositiveSmallIntegerField("班月", null=True, blank=True) + day_s = models.PositiveSmallIntegerField("班日", null=True, blank=True) + + hour = models.PositiveSmallIntegerField("时", null=True, blank=True) + sflog = models.ForeignKey(SfLog, verbose_name="关联值班记录", on_delete=models.CASCADE, null=True, blank=True) + mgroup = models.ForeignKey(Mgroup, verbose_name="关联测点集", on_delete=models.CASCADE, null=True, blank=True) + mpoint = models.ForeignKey(Mpoint, verbose_name="关联测点", on_delete=models.CASCADE) + val = models.FloatField("统计值", default=0) + total_production = models.FloatField("总产量", default=0, help_text="t") + elec_consume_unit = models.FloatField("单位产品电耗", default=0, help_text="kw·h/t") class EnStat(BaseModel): """ 能源数据统计表 """ - type = models.CharField('统计维度', max_length=50, default='hour', help_text='year_s/month_s/month_st/day_s/sflog/hour_s') - sflog = models.ForeignKey(SfLog, verbose_name='关联值班记录', on_delete=models.CASCADE, null=True, blank=True) - team = models.ForeignKey(Team, verbose_name='关联班组', on_delete=models.CASCADE, null=True, blank=True) - mgroup = models.ForeignKey(Mgroup, verbose_name='关联工段', on_delete=models.CASCADE) - year = models.PositiveSmallIntegerField('年', null=True, blank=True) - month = models.PositiveSmallIntegerField('月', null=True, blank=True) - day = models.PositiveSmallIntegerField('日', null=True, blank=True) - hour = models.PositiveSmallIntegerField('小时', null=True, blank=True) - year_s = models.PositiveSmallIntegerField('班年', null=True, blank=True) - month_s = models.PositiveSmallIntegerField('班月', null=True, blank=True) - day_s = models.PositiveSmallIntegerField('班日', null=True, blank=True) - total_production = models.FloatField('总产量', default=0, help_text='t') - elec_consume = models.FloatField('总电耗', default=0, help_text='kw·h') - elec_coal_consume = models.FloatField('电量折标煤', default=0, help_text='tce') - pcoal_heat = models.FloatField('煤粉热值', default=0) - pcoal_consume = models.FloatField('煤粉消耗', default=0, help_text='t') - pcoal_coal_consume = models.FloatField('煤粉折标煤', default=0, help_text='tce') - water_consume = models.FloatField('水消耗', default=0, help_text='t') - cair_consume = models.FloatField('压缩空气', default=0, help_text='m3') - out_steam = models.FloatField('外送蒸汽', default=0, help_text='t') - out_steam_coal = models.FloatField('外送蒸汽折标煤', default=0, help_text='tce') - ccr_consume = models.FloatField('电石渣消耗', default=0, help_text='t') - kiln_end_heat = models.FloatField('窑尾余热', default=0, help_text='tce') - imaterial_data = models.JSONField('成本物料数据', default=list, blank=True) - other_cost_data = models.JSONField('其他成本数据', default=list, blank=True) - qua_data = models.JSONField('质检数据', default=list, blank=True) - equip_elec_data = models.JSONField('设备电耗数据', default=list, blank=True) - production_cost_unit = models.FloatField('单位产品成本', default=0, help_text='y/t') - elec_consume_unit = models.FloatField('单位产品分布电耗', default=0, help_text='kw·h/t') - celec_consume_unit = models.FloatField('单位产品综合电耗', default=0, help_text='kw·h/t') - coal_consume_unit = models.FloatField('单位产品标煤耗', default=0, help_text='kgce/t') - en_consume_unit = models.FloatField('单位产品能耗', default=0, help_text='tce/t') - cen_consume_unit = models.FloatField('单位产品综合能耗', default=0, help_text='kgce/t') - production_hour = models.FloatField('台时产量', default=0, help_text='t/h') - total_hour_now = models.FloatField('动态总时长', default=0, help_text='h') - run_hour = models.FloatField('运转时长', default=0, help_text='h') - shut_hour = models.FloatField('停机时长', default=0, help_text='h') - run_rate = models.FloatField('运转率', default=0, help_text='%') + + type = models.CharField("统计维度", max_length=50, default="hour", help_text="year_s/month_s/month_st/day_s/sflog/hour_s") + sflog = models.ForeignKey(SfLog, verbose_name="关联值班记录", on_delete=models.CASCADE, null=True, blank=True) + team = models.ForeignKey(Team, verbose_name="关联班组", on_delete=models.CASCADE, null=True, blank=True) + mgroup = models.ForeignKey(Mgroup, verbose_name="关联工段", on_delete=models.CASCADE) + year = models.PositiveSmallIntegerField("年", null=True, blank=True) + month = models.PositiveSmallIntegerField("月", null=True, blank=True) + day = models.PositiveSmallIntegerField("日", null=True, blank=True) + hour = models.PositiveSmallIntegerField("小时", null=True, blank=True) + year_s = models.PositiveSmallIntegerField("班年", null=True, blank=True) + month_s = models.PositiveSmallIntegerField("班月", null=True, blank=True) + day_s = models.PositiveSmallIntegerField("班日", null=True, blank=True) + total_production = models.FloatField("总产量", default=0, help_text="t") + elec_consume = models.FloatField("总电耗", default=0, help_text="kw·h") + elec_coal_consume = models.FloatField("电量折标煤", default=0, help_text="tce") + pcoal_heat = models.FloatField("煤粉热值", default=0) + pcoal_consume = models.FloatField("煤粉消耗", default=0, help_text="t") + pcoal_coal_consume = models.FloatField("煤粉折标煤", default=0, help_text="tce") + water_consume = models.FloatField("水消耗", default=0, help_text="t") + cair_consume = models.FloatField("压缩空气", default=0, help_text="m3") + out_steam = models.FloatField("外送蒸汽", default=0, help_text="t") + out_steam_coal = models.FloatField("外送蒸汽折标煤", default=0, help_text="tce") + ccr_consume = models.FloatField("电石渣消耗", default=0, help_text="t") + kiln_end_heat = models.FloatField("窑尾余热", default=0, help_text="tce") + imaterial_data = models.JSONField("成本物料数据", default=list, blank=True) + other_cost_data = models.JSONField("其他成本数据", default=list, blank=True) + qua_data = models.JSONField("质检数据", default=list, blank=True) + equip_elec_data = models.JSONField("设备电耗数据", default=list, blank=True) + production_cost_unit = models.FloatField("单位产品成本", default=0, help_text="y/t") + elec_consume_unit = models.FloatField("单位产品分布电耗", default=0, help_text="kw·h/t") + celec_consume_unit = models.FloatField("单位产品综合电耗", default=0, help_text="kw·h/t") + coal_consume_unit = models.FloatField("单位产品标煤耗", default=0, help_text="kgce/t") + en_consume_unit = models.FloatField("单位产品能耗", default=0, help_text="tce/t") + cen_consume_unit = models.FloatField("单位产品综合能耗", default=0, help_text="kgce/t") + production_hour = models.FloatField("台时产量", default=0, help_text="t/h") + total_hour_now = models.FloatField("动态总时长", default=0, help_text="h") + run_hour = models.FloatField("运转时长", default=0, help_text="h") + shut_hour = models.FloatField("停机时长", default=0, help_text="h") + run_rate = models.FloatField("运转率", default=0, help_text="%") class EnStat2(BaseModel): """ 能源数据统计表2 """ - type = models.CharField('统计维度', max_length=50, default='month_s', help_text='month_s/day_s') - year_s = models.PositiveSmallIntegerField('班年') - month_s = models.PositiveSmallIntegerField('班月') - day_s = models.PositiveSmallIntegerField('班日', null=True, blank=True) - industry_total_val = models.FloatField('工业总产值', default=0, help_text='万元') - industry_add_val = models.FloatField('工业增加值', default=0, help_text='万元') - elec_consume = models.FloatField('总电耗', default=0, help_text='kw·h') - water_consume = models.FloatField('水耗', default=0, help_text='t') - cair_consume = models.FloatField('压缩空气', default=0, help_text='m3') - elec_coal_consume = models.FloatField('电量折标煤', default=0, help_text='tce') - pcoal_consume = models.FloatField('煤粉消耗', default=0, help_text='t') - pcoal_coal_consume = models.FloatField('煤粉折标煤', default=0, help_text='tce') - bulk_cement_val =models.FloatField('散装水泥发运量', default=0) - bulk_cement_price = models.FloatField('散装水泥价格', default=0) - bag_cement_val = models.FloatField('袋装水泥发运量', default=0) - bag_cement_price = models.FloatField('袋装水泥价格', default=0) - clinker_val = models.FloatField('散装熟料发运量', default=0) - clinker_price = models.FloatField('散装熟料价格', default=0) - cement_val = models.FloatField('水泥产量', default=0) - cement_cost_unit = models.FloatField('水泥单位成本', default=0) - en_consume = models.FloatField('能源消耗', default=0, help_text='tce') - en_consume_unit = models.FloatField('单位工业总产值能耗', default=0) - en_add_consume_unit = models.FloatField('单位工业增加值能耗', default=0) + + type = models.CharField("统计维度", max_length=50, default="month_s", help_text="month_s/day_s") + year_s = models.PositiveSmallIntegerField("班年") + month_s = models.PositiveSmallIntegerField("班月") + day_s = models.PositiveSmallIntegerField("班日", null=True, blank=True) + industry_total_val = models.FloatField("工业总产值", default=0, help_text="万元") + industry_add_val = models.FloatField("工业增加值", default=0, help_text="万元") + elec_consume = models.FloatField("总电耗", default=0, help_text="kw·h") + water_consume = models.FloatField("水耗", default=0, help_text="t") + cair_consume = models.FloatField("压缩空气", default=0, help_text="m3") + elec_coal_consume = models.FloatField("电量折标煤", default=0, help_text="tce") + pcoal_consume = models.FloatField("煤粉消耗", default=0, help_text="t") + pcoal_coal_consume = models.FloatField("煤粉折标煤", default=0, help_text="tce") + bulk_cement_val = models.FloatField("散装水泥发运量", default=0) + bulk_cement_price = models.FloatField("散装水泥价格", default=0) + bag_cement_val = models.FloatField("袋装水泥发运量", default=0) + bag_cement_price = models.FloatField("袋装水泥价格", default=0) + clinker_val = models.FloatField("散装熟料发运量", default=0) + clinker_price = models.FloatField("散装熟料价格", default=0) + cement_val = models.FloatField("水泥产量", default=0) + cement_cost_unit = models.FloatField("水泥单位成本", default=0) + en_consume = models.FloatField("能源消耗", default=0, help_text="tce") + en_consume_unit = models.FloatField("单位工业总产值能耗", default=0) + en_add_consume_unit = models.FloatField("单位工业增加值能耗", default=0) diff --git a/apps/enm/serializers.py b/apps/enm/serializers.py index 9717c4fe..9eb2001a 100644 --- a/apps/enm/serializers.py +++ b/apps/enm/serializers.py @@ -9,40 +9,41 @@ from django.core.cache import cache class MpointSerializer(CustomModelSerializer): mgroup = serializers.PrimaryKeyRelatedField(label="测点集", queryset=Mgroup.objects.all(), required=False, allow_null=True) - mgroup_name = serializers.CharField(source='mgroup.name', read_only=True) - belong_dept_name = serializers.CharField(source='belong_dept.name', read_only=True) - ep_monitored_name = serializers.CharField(source='ep_monitored.name', read_only=True) - ep_monitored_power_kw = serializers.CharField(source='ep_monitored.power_kw', read_only=True) - ep_belong_name = serializers.CharField(source='ep_belong.name', read_only=True) - material_name = serializers.CharField(source='material.name', read_only=True) + mgroup_name = serializers.CharField(source="mgroup.name", read_only=True) + belong_dept_name = serializers.CharField(source="belong_dept.name", read_only=True) + ep_monitored_name = serializers.CharField(source="ep_monitored.name", read_only=True) + ep_monitored_power_kw = serializers.CharField(source="ep_monitored.power_kw", read_only=True) + ep_belong_name = serializers.CharField(source="ep_belong.name", read_only=True) + material_name = serializers.CharField(source="material.name", read_only=True) formula = serializers.CharField(allow_blank=True, required=False) last_data = serializers.SerializerMethodField() + class Meta: model = Mpoint - fields = '__all__' - read_only_fields = EXCLUDE_FIELDS + ['belong_dept', 'cate'] - + fields = "__all__" + read_only_fields = EXCLUDE_FIELDS + ["belong_dept", "cate"] + def get_last_data(self, obj): - last_data = cache.get(f'mpoint_{obj.code}', None) - return {'last_val': last_data.get('last_val', None) if last_data else None, 'last_timex': last_data.get('last_timex', None) if last_data else None} + last_data = cache.get(Mpoint.cache_key(obj.code), {}) + return last_data def validate(self, attrs): - if 'material' in attrs and attrs['material']: - attrs['cate'] = 'material' - if 'mgroup' in attrs and attrs['mgroup']: - attrs['belong_dept'] = attrs['mgroup'].belong_dept + if "material" in attrs and attrs["material"]: + attrs["cate"] = "material" + if "mgroup" in attrs and attrs["mgroup"]: + attrs["belong_dept"] = attrs["mgroup"].belong_dept # attrs['mgroups_allocate'] = [{'mgroup': attrs['mgroup'].id, 'mgroup_name': attrs['mgroup'].name, 'ratio': 1}] ratio_ = 0 mgroupIds = [] - for i in attrs['mgroups_allocate']: - if i['mgroup']: - ratio_ = ratio_ + i['ratio'] - if i['mgroup'] in mgroupIds: - raise ParseError('分配集错误') - mgroupIds.append(i['mgroup']) - i['mgroup_name'] = Mgroup.objects.get(id=i['mgroup']).name - if attrs['mgroups_allocate'] and round(ratio_, 3) != 1.0: - raise ParseError('比例合计错误') + for i in attrs["mgroups_allocate"]: + if i["mgroup"]: + ratio_ = ratio_ + i["ratio"] + if i["mgroup"] in mgroupIds: + raise ParseError("分配集错误") + mgroupIds.append(i["mgroup"]) + i["mgroup_name"] = Mgroup.objects.get(id=i["mgroup"]).name + if attrs["mgroups_allocate"] and round(ratio_, 3) != 1.0: + raise ParseError("比例合计错误") return attrs @@ -52,7 +53,7 @@ class MpointSerializer(CustomModelSerializer): # model = MpLog # fields = '__all__' # read_only_fields = EXCLUDE_FIELDS + ['mpoint_name'] - + class MpLogxSerializer(CustomModelSerializer): """Serializer for EnvData model""" @@ -63,18 +64,19 @@ class MpLogxSerializer(CustomModelSerializer): class MpointStatSerializer(CustomModelSerializer): - mpoint_name = serializers.CharField(source='mpoint.name', read_only=True) - ep_monitored_name = serializers.CharField(source='mpoint.ep_monitored.name', read_only=True) - ep_monitored_number = serializers.CharField(source='mpoint.ep_monitored.number', read_only=True) - ep_monitored_power_kw= serializers.CharField(source='mpoint.ep_monitored.power_kw', read_only=True) - ep_belong_name = serializers.CharField(source='mpoint.ep_belong.name', read_only=True) - mgroup_name = serializers.CharField(source='mgroup.name', read_only=True) - belong_dept_name = serializers.CharField(source='mgroup.belong_dept.name', read_only=True) + mpoint_name = serializers.CharField(source="mpoint.name", read_only=True) + ep_monitored_name = serializers.CharField(source="mpoint.ep_monitored.name", read_only=True) + ep_monitored_number = serializers.CharField(source="mpoint.ep_monitored.number", read_only=True) + ep_monitored_power_kw = serializers.CharField(source="mpoint.ep_monitored.power_kw", read_only=True) + ep_belong_name = serializers.CharField(source="mpoint.ep_belong.name", read_only=True) + mgroup_name = serializers.CharField(source="mgroup.name", read_only=True) + belong_dept_name = serializers.CharField(source="mgroup.belong_dept.name", read_only=True) + class Meta: model = MpointStat - fields = '__all__' - read_only_fields = EXCLUDE_FIELDS + ['mpoint_name', 'type', 'year', 'month', 'day'] - + fields = "__all__" + read_only_fields = EXCLUDE_FIELDS + ["mpoint_name", "type", "year", "month", "day"] + def check_required_keys(dictionary, keys): for key in keys: if key not in dictionary or not dictionary[key]: @@ -82,16 +84,16 @@ class MpointStatSerializer(CustomModelSerializer): return True def validate(self, attrs): - mpoint = attrs['mpoint'] - if mpoint.material and mpoint.is_auto is False and 'sflog' in attrs and attrs['sflog']: - attrs['type'] = 'sflog' - sflog = attrs['sflog'] - attrs['year_s'], attrs['month_s'], attrs['day_s'] = sflog.get_ymd - attrs['mgroup'] = sflog.mgroup + mpoint = attrs["mpoint"] + if mpoint.material and mpoint.is_auto is False and "sflog" in attrs and attrs["sflog"]: + attrs["type"] = "sflog" + sflog = attrs["sflog"] + attrs["year_s"], attrs["month_s"], attrs["day_s"] = sflog.get_ymd + attrs["mgroup"] = sflog.mgroup else: - raise ParseError('该数据不支持手工录入') + raise ParseError("该数据不支持手工录入") # if 'sflog' in attrs and attrs['sflog']: - + # else: # keys = ['hour', 'day_s', 'month_s', 'year_s'] # for ind, key in enumerate(keys): @@ -112,13 +114,14 @@ class MpointStatSerializer(CustomModelSerializer): class EnStatSerializer(CustomModelSerializer): - mgroup_name = serializers.CharField(source='mgroup.name', read_only=True) - team_name = serializers.CharField(source='team.name', read_only=True) - belong_dept_name = serializers.CharField(source='mgroup.belong_dept.name', read_only=True) + mgroup_name = serializers.CharField(source="mgroup.name", read_only=True) + team_name = serializers.CharField(source="team.name", read_only=True) + belong_dept_name = serializers.CharField(source="mgroup.belong_dept.name", read_only=True) + class Meta: model = EnStat - fields = '__all__' - + fields = "__all__" + def to_representation(self, instance): ret = super().to_representation(instance) my_dic_keys = list(ret.keys()) @@ -126,23 +129,24 @@ class EnStatSerializer(CustomModelSerializer): ret_one_val = ret[key] if isinstance(ret_one_val, float): ret[key] = "{:.2f}".format(round(ret_one_val, 2)) - qua_data = ret['qua_data'] - equip_elec_data = ret['equip_elec_data'] + qua_data = ret["qua_data"] + equip_elec_data = ret["equip_elec_data"] if qua_data: for item in qua_data: - ret[f'{item["material_name"]}_{item["testitem_name"]}_rate_pass'] = "{:.2f}".format(round(item['rate_pass'], 4)) + ret[f'{item["material_name"]}_{item["testitem_name"]}_rate_pass'] = "{:.2f}".format(round(item["rate_pass"], 4)) if equip_elec_data: for item in equip_elec_data: - val = item.get('consume_unit', None) + val = item.get("consume_unit", None) if val: val = "{:.2f}".format(round(val, 2)) ret[f'{item["equipment_name"]}_consume_unit'] = val return ret + class EnStat2Serializer(CustomModelSerializer): class Meta: model = EnStat2 - fields = '__all__' + fields = "__all__" def to_representation(self, instance): ret = super().to_representation(instance) @@ -152,8 +156,8 @@ class EnStat2Serializer(CustomModelSerializer): if isinstance(ret_one_val, float): ret[key] = "{:.2f}".format(round(ret_one_val, 2)) return ret - + class ReCalSerializer(serializers.Serializer): start_time = serializers.DateTimeField(label="开始时间") - end_time = serializers.DateTimeField(label="结束时间") \ No newline at end of file + end_time = serializers.DateTimeField(label="结束时间") diff --git a/apps/enm/services.py b/apps/enm/services.py index c6aa039b..0a3d9a39 100644 --- a/apps/enm/services.py +++ b/apps/enm/services.py @@ -7,13 +7,17 @@ from django.db.models import Q from django.utils import timezone from django.core.cache import cache import concurrent.futures -from django.db import connection +from django.db import connection, transaction from datetime import datetime, timedelta from apps.utils.decorators import auto_log from django.db import IntegrityError from apps.utils.tasks import ctask_run +from .serializers import MpointSerializer +from apps.enp.models import EnvData +from apps.em.models import Equipment, RuningState + +myLogger = logging.getLogger("log") -myLogger = logging.getLogger('log') def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int): """ @@ -22,24 +26,24 @@ def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: pattern = r"\${(.*?)}" matches = re.findall(pattern, exp_str) for match in matches: - mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q( - mpoint__code=match), type='hour', year=year, month=month, day=day, hour=hour).first() + mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(mpoint__code=match), type="hour", year=year, month=month, day=day, hour=hour).first() if mpst: exp_str = exp_str.replace(f"${{{match}}}", str(mpst.val)) rval = eval(exp_str) return rval -def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21): - """ - 根据给定的小时数, 计算出班天 - """ - if hour <= hour_split: - return year, month, day - else: - now = datetime.datetime(year, month, day, hour) - now2 = now + datetime.timedelta(days=1) - return now2.year, now2.month, now2.day +# def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21): +# """ +# 根据给定的小时数, 计算出班天 +# """ +# if hour <= hour_split: +# return year, month, day +# else: +# now = datetime.datetime(year, month, day, hour) +# now2 = now + datetime.timedelta(days=1) +# return now2.year, now2.month, now2.day + # cal_rule = { # "电石渣": { @@ -86,52 +90,76 @@ def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21): # return goal_data, score -def shutdown_or_startup(mpointId: str, last_val, last_timex: datetime): +def get_mpoint_cache(code: str, force_update=False, update_mplogx=True): """ - last_val 可能是不同类型的值(bool 或 int) + 获取或更新mpoint的缓存数据 + 返回空代表无该测点 """ - from apps.wpm.models import StLog - from apps.wpm.tasks import cal_exp_duration_hour - from apps.wpm.services import get_sflog - mpoint = Mpoint.objects.get(id=mpointId) - mgroup = mpoint.mgroup - last_stlog = StLog.objects.filter( - mgroup=mgroup, is_shutdown=True).order_by('start_time').last() # 找到最后一次停机记录 - if last_stlog: - if last_timex >= last_stlog.start_time: # 认为是有效信号 - if last_stlog.end_time is None and last_val in (1, True): # 从停到开 - last_stlog.end_time = last_timex - last_stlog.duration = ( - last_stlog.end_time - last_stlog.start_time).total_seconds()/3600 - last_stlog.save() - mgroup.is_runing = True - mgroup.save() - cal_exp_duration_hour(last_stlog.id) # 触发时间分配 - elif last_stlog.end_time and last_val in (0, False) and last_timex > last_stlog.end_time: # 从开到停 - StLog.objects.create( - title='停机', - is_shutdown=True, - mgroup=mgroup, - end_time=None, - start_time=last_timex, - sflog=get_sflog(mgroup, last_timex) - ) - mgroup.is_runing = False - mgroup.save() + key = Mpoint.cache_key(code) + mpoint_data = cache.get(key, None) + if mpoint_data is None or force_update: + try: + mpoint = Mpoint.objects.get(code=code) + except Exception: + return None + mpoint_data = MpointSerializer(instance=mpoint).data + if update_mplogx: + now = timezone.now() + last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first() + if last_mplogx: # 核心数据 + mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex} + cache.set(key, mpoint_data, timeout=None) + return mpoint_data + + +def update_mpoint_cache(cache_key: str, current_cache_val: dict, last_timex: datetime, last_val, last_mrs): + """ + 更新mpoint的缓存数据并执行某些操作 + last_mrs: 所监测的设备运行状态值可不传 + """ + ep_belong_id = current_cache_val.get("ep_belong", None) + ep_monitored_id = current_cache_val.get("ep_monitored", False) + last_data = current_cache_val["last_data"] + last_data["pre_val"] = last_data.get("last_val", None) + last_data["pre_timex"] = last_data.get("last_timex", None) + last_data["pre_mrs"] = last_data.get("last_mrs", None) + last_data["last_val"] = last_val + if last_mrs: + last_data["last_mrs"] = last_mrs + last_data["last_timex"] = last_timex + cache.set(cache_key, current_cache_val, timeout=None) + # 更新设备状态缓存值 + if ep_belong_id: + cache.set(f"equipment_{ep_belong_id}", {"running_state": RuningState.RUNING, "running_state_timex": last_timex}, timeout=None) + if ep_monitored_id: + cache.set(f"equipment_{ep_monitored_id}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) + # 如果state变动则触发函数 + if last_data["pre_mrs"] != last_mrs: + if ep_belong_id: + ctask_run("apps.em.services.shutdown_or_startup", ep_belong_id, last_timex, RuningState.RUNING) + if ep_monitored_id: + ctask_run("apps.em.services.shutdown_or_startup", ep_belong_id, last_timex, last_mrs) + + +def transfer_mpoint_val_to_ep_running_state(current_val, base_val1: float): + """ + 将测点值转换所监测设备的运行状态值 + """ + if isinstance(current_val, bool): + if current_val: + return RuningState.RUNING + return RuningState.STOP + rs = RuningState.RUNING + if base_val1: + if current_val < base_val1: + rs = RuningState.STOP else: - StLog.objects.create( - title='停机', - is_shutdown=True, - mgroup=mgroup, - end_time=None, - start_time=last_timex, - sflog=get_sflog(mgroup, last_timex)) - mgroup.is_runing = False - mgroup.save() + if current_val == 0: + rs = RuningState.STOP + return rs - -def king_sync(projectName: str, json_path: str=""): +def king_sync(projectName: str, json_path: str = ""): """ 同步亚控测点 """ @@ -139,98 +167,129 @@ def king_sync(projectName: str, json_path: str=""): import os import json from django.conf import settings - with open(os.path.join(settings.BASE_DIR, json_path), 'r', encoding='utf-8') as f: + + with open(os.path.join(settings.BASE_DIR, json_path), "r", encoding="utf-8") as f: res = json.loads(f.read()) else: from apps.third.king.k import kingClient from apps.third.king import king_api + _, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName}) t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"} - for index, item in enumerate(res['objectList']): - if 't' in item and item['t'] and 'SystemTag' not in item['d']: + for index, item in enumerate(res["objectList"]): + if "t" in item and item["t"] and "SystemTag" not in item["d"]: code = f'K_{item["n"]}' - item['from'] = 'king' - group = item['g'] - name = item['d'] + item["from"] = "king" + group = item["g"] + name = item["d"] if group: - name = f'{group}.{name}' - Mpoint.objects.get_or_create(code=code, defaults={ - "name": name, - "code": code, - "enabled": False, - "is_auto": True, - "val_type": t_dict[item["t"]], - "third_info": item - }) + name = f"{group}.{name}" + Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "is_auto": True, "val_type": t_dict[item["t"]], "third_info": item}) -@auto_log('缓存测点') -def cache_mpoints(mpointId: str = ''): - """ - 缓存所有可用的测点 - """ - if mpointId: - mpoints_data = Mpoint.objects.filter(id=mpointId, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") - else: - mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") - for item in list(mpoints_data): - key = f'mpoint_{item["code"]}' - cache_mpoint = cache.get(key, {}) - cache_mpoint.update(item) - cache.set(key, cache_mpoint, timeout=None) - # print(cache.get('mpoint_K_IP2_MM_SC_IW14')) -test_data = {"PNs":{"1":"V","2":"T","3":"Q"},"PVs":{"1":-725,"2":"2024-04-08 13:43:53.140","3":192},"Objs":[{"N":"IP2_MM_IW3"},{"N":"IP2_MM_IW4","1":-6386},{"N":"IP2_MM_IW5","1":-7835},{"N":"IP2_MM_IW7","1":-6864},{"N":"IP2_MM_IW15","1":29},{"N":"IP2_MM_SC_IW3","1":337},{"N":"IP2_MM_SC_IW13","1":-5511,"2":24},{"N":"IP2_MM_SC_IW14","1":-4640,"2":24},{"N":"IP2_MM_SC_IW15","1":-5586,"2":24},{"N":"IP2_MM_SC_IW16","1":-6634,"2":24},{"N":"IP2_MM_SC_IW17","1":-2768,"2":24},{"N":"IP2_MM_SC_IW18","1":-2946,"2":24},{"N":"IP2_MM_SC_IW19","1":-2550,"2":24},{"N":"IP2_MM_SC_IW20","1":-1512,"2":24},{"N":"IP2_MM_SC_IW21","1":-1775,"2":24},{"N":"IP2_MM_SC_IW22","1":-194,"2":24},{"N":"IP2_MM_SC_IW23","1":-366,"2":24},{"N":"IP2_MM_SC_IW24","1":-9291,"2":24},{"N":"IP2_MM_SC_IW25","1":-6888,"2":24},{"N":"IP2_MM_SC_IW26","1":-2360,"2":24},{"N":"IP2_MM_SC_IW29","1":164,"2":24},{"N":"IP2_MM_SC_IW30","1":914,"2":24},{"N":"IP2_MM_SC_IW31","1":849,"2":24},{"N":"IP2_MM_SC_IW37","1":-125,"2":24},{"N":"IP2_MM_SC_IW38","1":-3009,"2":24},{"N":"IP2_MM_SC_IW40","1":-1394,"2":24},{"N":"IP2_MM_SC_IW43","1":758,"2":24},{"N":"IP3_SC_IW1","1":11557,"2":107},{"N":"IP3_SC_IW2","1":7624,"2":107},{"N":"IP3_SC_IW6","1":11159,"2":107},{"N":"IP3_SC_IW7","1":8073,"2":107},{"N":"IP3_SC_IW9","1":4490,"2":107},{"N":"IP3_SC_IW10","1":5437,"2":107},{"N":"IP3_SC_IW11","1":9244,"2":107},{"N":"IP3_SC_IW12","1":7886,"2":107},{"N":"IP3_SC_IW13","1":-2962,"2":107},{"N":"IP3_SC_IW14","1":-159,"2":107},{"N":"IP3_SC_IW26","1":15,"2":107},{"N":"IP3_SC_IW27","1":15,"2":107}]} +test_data = { + "PNs": {"1": "V", "2": "T", "3": "Q"}, + "PVs": {"1": -725, "2": "2024-04-08 13:43:53.140", "3": 192}, + "Objs": [ + {"N": "IP2_MM_IW3"}, + {"N": "IP2_MM_IW4", "1": -6386}, + {"N": "IP2_MM_IW5", "1": -7835}, + {"N": "IP2_MM_IW7", "1": -6864}, + {"N": "IP2_MM_IW15", "1": 29}, + {"N": "IP2_MM_SC_IW3", "1": 337}, + {"N": "IP2_MM_SC_IW13", "1": -5511, "2": 24}, + {"N": "IP2_MM_SC_IW14", "1": -4640, "2": 24}, + {"N": "IP2_MM_SC_IW15", "1": -5586, "2": 24}, + {"N": "IP2_MM_SC_IW16", "1": -6634, "2": 24}, + {"N": "IP2_MM_SC_IW17", "1": -2768, "2": 24}, + {"N": "IP2_MM_SC_IW18", "1": -2946, "2": 24}, + {"N": "IP2_MM_SC_IW19", "1": -2550, "2": 24}, + {"N": "IP2_MM_SC_IW20", "1": -1512, "2": 24}, + {"N": "IP2_MM_SC_IW21", "1": -1775, "2": 24}, + {"N": "IP2_MM_SC_IW22", "1": -194, "2": 24}, + {"N": "IP2_MM_SC_IW23", "1": -366, "2": 24}, + {"N": "IP2_MM_SC_IW24", "1": -9291, "2": 24}, + {"N": "IP2_MM_SC_IW25", "1": -6888, "2": 24}, + {"N": "IP2_MM_SC_IW26", "1": -2360, "2": 24}, + {"N": "IP2_MM_SC_IW29", "1": 164, "2": 24}, + {"N": "IP2_MM_SC_IW30", "1": 914, "2": 24}, + {"N": "IP2_MM_SC_IW31", "1": 849, "2": 24}, + {"N": "IP2_MM_SC_IW37", "1": -125, "2": 24}, + {"N": "IP2_MM_SC_IW38", "1": -3009, "2": 24}, + {"N": "IP2_MM_SC_IW40", "1": -1394, "2": 24}, + {"N": "IP2_MM_SC_IW43", "1": 758, "2": 24}, + {"N": "IP3_SC_IW1", "1": 11557, "2": 107}, + {"N": "IP3_SC_IW2", "1": 7624, "2": 107}, + {"N": "IP3_SC_IW6", "1": 11159, "2": 107}, + {"N": "IP3_SC_IW7", "1": 8073, "2": 107}, + {"N": "IP3_SC_IW9", "1": 4490, "2": 107}, + {"N": "IP3_SC_IW10", "1": 5437, "2": 107}, + {"N": "IP3_SC_IW11", "1": 9244, "2": 107}, + {"N": "IP3_SC_IW12", "1": 7886, "2": 107}, + {"N": "IP3_SC_IW13", "1": -2962, "2": 107}, + {"N": "IP3_SC_IW14", "1": -159, "2": 107}, + {"N": "IP3_SC_IW26", "1": 15, "2": 107}, + {"N": "IP3_SC_IW27", "1": 15, "2": 107}, + ], +} + + +@auto_log("亚控存库") def insert_mplogx_from_king_mqtt(data: dict, is_offset=True): """ 从king mqtt数据插入超表 + 注释的代码是分批存的, 但是实际上不需要, 暂时注释,有需要再加 """ - objs = data['Objs'] - len_objs = len(objs) - pvs = data['PVs'] + objs = data["Objs"] + # len_objs = len(objs) + pvs = data["PVs"] - chunk_size = 200 - num_chunks = (len(objs) + chunk_size - 1) // chunk_size + # chunk_size = 200 + # num_chunks = (len(objs) + chunk_size - 1) // chunk_size - with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: - futures = [] - for i in range(num_chunks): - start = i * chunk_size - end = min(start + chunk_size, len_objs) - chunk = objs[start:end] - futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, pvs, is_offset)) - concurrent.futures.wait(futures) - # for future in futures: - # print(future.result(), end=', ') + otime_obj = timezone.make_aware(datetime.strptime(pvs["2"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0) # 只保留到秒级的精度 -@auto_log('亚控存库') -def insert_mplogx_from_king_mqtt_chunk(objs: list, pvs: dict, is_offset=True): + insert_mplogx_from_king_mqtt_chunk(objs, otime_obj, is_offset) + # with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + # futures = [] + # for i in range(num_chunks): + # start = i * chunk_size + # end = min(start + chunk_size, len_objs) + # chunk = objs[start:end] + # futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, otime_obj, is_offset)) + # concurrent.futures.wait(futures) + # for future in futures: + # print(future.result(), end=', ') + + +def insert_mplogx_from_king_mqtt_chunk(objs: list, otime_obj: datetime, is_offset=True): """ 分批存库, 亚控 38.00,00000.11011 版本偏移只是时间戳偏移。另外其实可以不在乎 """ # oval = pvs['1'] - otime_str = pvs['2'] - otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f')) - insert_db_data = [] - insert_data = [] + enp_mpoints_dict = {} # 这个地方主要是需要更新envdata表里的数据 for obj in objs: - n = obj['N'] + n = obj["N"] val = obj["1"] - timex = obj.get("2", None) - cache_key = f'mpoint_K_{n}' - mpoint_data = cache.get(cache_key, None) + # timex = obj.get("2", None) + code = f"K_{n}" + cache_key = Mpoint.cache_key(code) + mpoint_data = get_mpoint_cache(code) if mpoint_data is None: continue - val_type = mpoint_data['val_type'] - if is_offset: - if timex is None: - timex = otime_obj - else: - timex = otime_obj + timedelta(milliseconds=timex) - else: - timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f')) - mpoint_interval = mpoint_data['interval'] - mpoint_last_timex = mpoint_data.get('last_timex', None) + val_type = mpoint_data["val_type"] + # if is_offset: + # if timex is None: + # timex = otime_obj + # else: + # timex = otime_obj + timedelta(mgilliseconds=timex) + # else: + # timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f')) + mpoint_interval = mpoint_data["interval"] + mpoint_last_timex = mpoint_data.get("last_timex", None) + mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False) + mpoint_ep_base_val1 = mpoint_data.get("ep_base_val1", None) # 控制采集间隔 can_save = False if mpoint_last_timex: @@ -240,42 +299,47 @@ def insert_mplogx_from_king_mqtt_chunk(objs: list, pvs: dict, is_offset=True): can_save = True if can_save: save_dict = { - "timex": timex, - "mpoint": Mpoint.objects.get(id=mpoint_data['id']), + "timex": otime_obj, + "mpoint": Mpoint.objects.get(id=mpoint_data["id"]), } - save_dict[f'val_{val_type}'] = val - insert_data.append((cache_key, mpoint_data, val, timex)) - insert_db_data.append(MpLogx(**save_dict)) - - # 先尝试批量存库/发生异常则单个存储 - is_bulk_insert = True - try: - MpLogx.objects.bulk_create(insert_db_data) - except IntegrityError: - is_bulk_insert = False - for item1, item2 in zip(insert_data, insert_db_data): + save_dict[f"val_{val_type}"] = val + val_mrs = RuningState.RUNING + if mpoint_is_rep_ep_running_state: + save_dict["val_mrs"] = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1) try: - MpLogx.objects.create(**item2) - update_mpoint_cache_and_do_func(item1) + MpLogx.objects.create(**save_dict) + update_mpoint_cache(cache_key, mpoint_data, otime_obj, val, val_mrs) + + # 此处代码用于更新envdata表里的数据 + enp_field = mpoint_data.get("enp_field", None) + ep_monitored = mpoint_data.get("ep_monitored", None) + if enp_field and ep_monitored: + if enp_mpoints_dict.get(ep_monitored, None) is None: + enp_mpoints_dict[ep_monitored] = {"timex": otime_obj, "equipment": Equipment.objects.get(id=ep_monitored)} + if enp_field == "running_state": + enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs}) + else: + enp_mpoints_dict[ep_monitored].update({enp_field: val}) + enp_mpoints_dict[ep_monitored].update({enp_field: val}) except Exception: - myLogger.error(f'mpoint_cache_key: {item1[0]} 存库失败', exc_info=True) + myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True) continue - - if is_bulk_insert: - # 批量存库成功后更新缓存 - for item in insert_data: - update_mpoint_cache_and_do_func(item) + # # 先尝试批量存库/发生异常则单个存储 + # is_bulk_insert = True + # try: + # MpLogx.objects.bulk_create(insert_db_data) + # except IntegrityError: + # is_bulk_insert = False -def update_mpoint_cache_and_do_func(item: dict): - cache_key = item[0] - mpoint_data = item[1] - mpoint_id = mpoint_data['id'] - last_timex: datetime = item[3] - last_val = item[2] - mpoint_data['last_timex'] = last_timex - mpoint_data['last_val'] = last_val - cache.set(cache_key, mpoint_data, timeout=None) - mpoint_func: str = mpoint_data.get('func_on_change', '') - if mpoint_func: - ctask_run.delay(mpoint_func, mpoint_id, last_val, last_timex) \ No newline at end of file + # if is_bulk_insert: + # # 批量存库成功后更新缓存 + # for item in update_cache_data: + # need_gather_filed_mpoints.append(update_mpoint_cache_and_do_func(item)) + # else: + # {'eq_belong': {'dust_rtd': 1.0}} + + # 额外需要处理的数据(存储到envdata表里) + if enp_mpoints_dict: + for _, item in enp_mpoints_dict: + EnvData.objects(**item) diff --git a/apps/enm/views.py b/apps/enm/views.py index 84385cfc..a1ad9dac 100644 --- a/apps/enm/views.py +++ b/apps/enm/views.py @@ -4,46 +4,50 @@ from apps.enm.models import Mpoint, MpointStat, EnStat, EnStat2, MpLogx from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet from rest_framework.mixins import ListModelMixin from apps.utils.mixins import BulkCreateModelMixin, BulkDestroyModelMixin -from apps.enm.serializers import (MpointSerializer, MpLogxSerializer, MpointStatSerializer, EnStatSerializer, EnStat2Serializer, ReCalSerializer) +from apps.enm.serializers import MpointSerializer, MpLogxSerializer, MpointStatSerializer, EnStatSerializer, EnStat2Serializer, ReCalSerializer from apps.enm.filters import MpointStatFilter, EnStatFilter, EnStat2Filter from apps.enm.tasks import cal_mpointstat_manual from rest_framework.response import Response from rest_framework.serializers import Serializer from rest_framework.decorators import action from apps.enm.tasks import cal_mpointstats_duration -from apps.enm.services import king_sync, cache_mpoints +from apps.enm.services import king_sync, get_mpoint_cache from django.db import transaction + class MpointViewSet(CustomModelViewSet): """ list:测点 测点 """ + queryset = Mpoint.objects.all() serializer_class = MpointSerializer - select_related_fields = ['create_by', 'belong_dept', 'ep_monitored', 'ep_belong', 'mgroup'] - filterset_fields = ['belong_dept', 'ep_monitored', 'ep_belong', 'mgroup', 'is_auto', 'is_all', 'mgroup__name', 'val_type', 'enabled'] - search_fields = ['name', 'code'] - ordering = ['-create_time', 'name', 'code'] + select_related_fields = ["create_by", "belong_dept", "ep_monitored", "ep_belong", "mgroup"] + filterset_fields = ["belong_dept", "ep_monitored", "ep_belong", "mgroup", "is_auto", "is_all", "mgroup__name", "val_type", "enabled"] + search_fields = ["name", "code"] + ordering = ["-create_time", "name", "code"] @transaction.atomic def perform_create(self, serializer): instance = serializer.save() - cache_mpoints(instance.id) - + if instance.code: + get_mpoint_cache(instance.code, True) + @transaction.atomic def perform_update(self, serializer): instance = serializer.save() - cache_mpoints(instance.id) + if instance.code: + get_mpoint_cache(instance.code, True) - @action(methods=['post'], detail=False, perms_map={'post': 'mpoint.create'}, serializer_class=Serializer) + @action(methods=["post"], detail=False, perms_map={"post": "mpoint.create"}, serializer_class=Serializer) def king_sync(self, request, *args, **kwargs): """同步亚控采集点 同步亚控采集点 """ - king_sync(getattr(settings, 'KING_PROJECTNAME', "")) + king_sync(getattr(settings, "KING_PROJECTNAME", "")) return Response() @@ -59,21 +63,23 @@ class MpointViewSet(CustomModelViewSet): # select_related_fields = ['mpoint'] # filterset_fields = ['mpoint', 'mpoint__mgroup', 'mpoint__mgroup__belong_dept'] + class MpLogxViewSet(ListModelMixin, CustomGenericViewSet): """ list: 测点采集数据 测点采集数据 """ - perms_map = {'get': '*'} + + perms_map = {"get": "*"} queryset = MpLogx.objects.all() serializer_class = MpLogxSerializer filterset_fields = { - "timex": ['exact', 'gte', 'lte', 'year', 'month', 'day'], - "mpoint": ['exact'], + "timex": ["exact", "gte", "lte", "year", "month", "day"], + "mpoint": ["exact"], } - ordering_fields = ['timex'] - ordering = ['-timex'] + ordering_fields = ["timex"] + ordering = ["-timex"] class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMixin, CustomGenericViewSet): @@ -82,10 +88,11 @@ class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMi 测点统计记录 """ - perms_map = {'get': '*', 'post': 'mpointstat.create', 'delete': 'mpointstat.delete'} + + perms_map = {"get": "*", "post": "mpointstat.create", "delete": "mpointstat.delete"} queryset = MpointStat.objects.all() serializer_class = MpointStatSerializer - select_related_fields = ['mpoint', 'mpoint__ep_monitored', 'mpoint__ep_belong', 'mgroup', 'mgroup__belong_dept'] + select_related_fields = ["mpoint", "mpoint__ep_monitored", "mpoint__ep_belong", "mgroup", "mgroup__belong_dept"] filterset_class = MpointStatFilter def perform_create(self, serializer): @@ -97,7 +104,7 @@ class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMi instance.delete() cal_mpointstat_manual.delay(mpoint.id, sflog.id, mgroup.id, None, None, None, None, year_s, month_s, day_s) - @action(methods=['post'], detail=False, perms_map={'post': 'mpointstat.create'}, serializer_class=ReCalSerializer) + @action(methods=["post"], detail=False, perms_map={"post": "mpointstat.create"}, serializer_class=ReCalSerializer) def recal(self, request, *args, **kwargs): """重新运行某段时间的enm计算 @@ -106,9 +113,8 @@ class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMi data = request.data sr = ReCalSerializer(data=data) sr.is_valid(raise_exception=True) - vdata = sr.validated_data - task = cal_mpointstats_duration.delay(data['start_time'], data['end_time']) - return Response({'task_id': task.task_id}) + task = cal_mpointstats_duration.delay(data["start_time"], data["end_time"]) + return Response({"task_id": task.task_id}) class EnStatViewSet(ListModelMixin, CustomGenericViewSet): @@ -117,12 +123,13 @@ class EnStatViewSet(ListModelMixin, CustomGenericViewSet): 能耗统计记录 """ - perms_map = {'get': '*'} + + perms_map = {"get": "*"} queryset = EnStat.objects.all() serializer_class = EnStatSerializer - select_related_fields = ['mgroup', 'team', 'mgroup__belong_dept'] + select_related_fields = ["mgroup", "team", "mgroup__belong_dept"] filterset_class = EnStatFilter - ordering = ['mgroup__sort', 'year_s', 'month_s', 'day_s', 'hour'] + ordering = ["mgroup__sort", "year_s", "month_s", "day_s", "hour"] class EnStat2ViewSet(ListModelMixin, CustomGenericViewSet): @@ -131,7 +138,8 @@ class EnStat2ViewSet(ListModelMixin, CustomGenericViewSet): 全厂统计记录 """ - perms_map = {'get': '*'} + + perms_map = {"get": "*"} queryset = EnStat2.objects.all() serializer_class = EnStat2Serializer - filterset_class = EnStat2Filter \ No newline at end of file + filterset_class = EnStat2Filter