feat: 采集逻辑整体重构

This commit is contained in:
caoqianming 2024-04-16 15:03:59 +08:00
parent 10632d9cae
commit dfaed5e5e5
7 changed files with 528 additions and 379 deletions

View File

@ -4,14 +4,14 @@ from django.db import migrations, models
class Migration(migrations.Migration): class Migration(migrations.Migration):
dependencies = [ dependencies = [
('enm', '0023_mpoint_interval'), ("enm", "0023_mpoint_interval"),
] ]
operations = [ operations = [
migrations.RunSQL( migrations.RunSQL(
sql=[( sql=[
(
""" """
CREATE TABLE public.enm_mplogx ( CREATE TABLE public.enm_mplogx (
"timex" timestamptz NOT NULL, "timex" timestamptz NOT NULL,
@ -24,43 +24,42 @@ CREATE TABLE public.enm_mplogx (
); );
SELECT create_hypertable('enm_mplogx', 'timex'); SELECT create_hypertable('enm_mplogx', 'timex');
""" """
)], )
reverse_sql=[ ],
"DROP TABLE IF EXISTS public.enm_mplogx;" reverse_sql=["DROP TABLE IF EXISTS public.enm_mplogx;"],
]
), ),
migrations.CreateModel( migrations.CreateModel(
name='MpLogx', name="MpLogx",
fields=[ fields=[
('timex', models.DateTimeField(primary_key=True, serialize=False, verbose_name='采集时间')), ("timex", models.DateTimeField(primary_key=True, serialize=False, verbose_name="采集时间")),
('val_float', models.FloatField(blank=True, null=True, verbose_name='数值')), ("val_float", models.FloatField(blank=True, null=True, verbose_name="数值")),
('val_int', models.IntegerField(blank=True, null=True, verbose_name='数值')), ("val_int", models.IntegerField(blank=True, null=True, verbose_name="数值")),
('val_bool', models.BooleanField(blank=True, null=True, verbose_name='数值')), ("val_bool", models.BooleanField(blank=True, null=True, verbose_name="数值")),
('val_str', models.CharField(blank=True, max_length=100, null=True, verbose_name='数值')), ("val_str", models.CharField(blank=True, max_length=100, null=True, verbose_name="数值")),
], ],
options={ options={
'db_table': 'enm_mplog', "db_table": "enm_mplogx",
'managed': False, "managed": False,
}, },
), ),
migrations.AddField( migrations.AddField(
model_name='mpoint', model_name="mpoint",
name='enabled', name="enabled",
field=models.BooleanField(default=False, verbose_name='是否启用'), field=models.BooleanField(default=False, verbose_name="是否启用"),
), ),
migrations.AddField( migrations.AddField(
model_name='mpoint', model_name="mpoint",
name='val_type', name="val_type",
field=models.CharField(default='float', help_text='float, int, str, bool', max_length=50, verbose_name='值类型'), field=models.CharField(default="float", help_text="float, int, str, bool", max_length=50, verbose_name="值类型"),
), ),
migrations.AlterField( migrations.AlterField(
model_name='mpoint', model_name="mpoint",
name='cate', name="cate",
field=models.CharField(blank=True, max_length=50, null=True, verbose_name='分类'), field=models.CharField(blank=True, max_length=50, null=True, verbose_name="分类"),
), ),
migrations.AlterField( migrations.AlterField(
model_name='mpoint', model_name="mpoint",
name='unit', name="unit",
field=models.CharField(blank=True, max_length=50, null=True, verbose_name='单位'), field=models.CharField(blank=True, max_length=50, null=True, verbose_name="单位"),
), ),
] ]

View File

@ -0,0 +1,18 @@
# Generated by Django 3.2.12 on 2024-04-12 05:08
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('enm', '0026_alter_mpoint_func_on_change'),
]
operations = [
migrations.AddField(
model_name='mpoint',
name='gather_field',
field=models.CharField(blank=True, max_length=50, null=True, verbose_name='关联采集字段'),
),
]

View File

@ -0,0 +1,44 @@
# Generated by Django 3.2.12 on 2024-04-16 06:54
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('em', '0016_auto_20240416_1454'),
('enm', '0027_mpoint_gather_field'),
]
operations = [
migrations.RemoveField(
model_name='mpoint',
name='gather_field',
),
migrations.AddField(
model_name='mpoint',
name='enp_field',
field=models.CharField(blank=True, max_length=50, null=True, verbose_name='关联enp采集字段'),
),
migrations.AddField(
model_name='mpoint',
name='ep_base_val1',
field=models.FloatField(blank=True, null=True, verbose_name='状态量基准值1'),
),
migrations.AddField(
model_name='mpoint',
name='is_rep_ep_running_state',
field=models.BooleanField(default=False, verbose_name='是否表示所监测设备运行状态'),
),
migrations.AlterField(
model_name='mpoint',
name='ep_belong',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='mp_ep_belong', to='em.equipment', verbose_name='所属设备'),
),
migrations.AlterField(
model_name='mpoint',
name='ep_monitored',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='mp_ep_monitored', to='em.equipment', verbose_name='所监测设备'),
),
]

View File

@ -5,143 +5,155 @@ from apps.mtm.models import Material, Mgroup, Team
class Mpoint(CommonBModel): class Mpoint(CommonBModel):
"""测点 """测点"""
"""
name = models.CharField('测点名称', max_length=50) name = models.CharField("测点名称", max_length=50)
code = models.CharField('测点编号', max_length=50, unique=True) code = models.CharField("测点编号", max_length=50, unique=True)
unit = models.CharField('单位', max_length=50, null=True, blank=True) unit = models.CharField("单位", max_length=50, null=True, blank=True)
cate = models.CharField('分类', max_length=50, null=True, blank=True) cate = models.CharField("分类", max_length=50, null=True, blank=True)
material = models.ForeignKey(Material, verbose_name='计量某种物料', on_delete=models.CASCADE, null=True, blank=True) material = models.ForeignKey(Material, verbose_name="计量某种物料", on_delete=models.CASCADE, null=True, blank=True)
ep_monitored = models.ForeignKey('em.equipment', verbose_name='监测哪个设备', related_name='mp_ep_monitored', on_delete=models.SET_NULL, null=True, blank=True) ep_monitored = models.ForeignKey("em.equipment", verbose_name="所监测设备", related_name="mp_ep_monitored", on_delete=models.SET_NULL, null=True, blank=True)
ep_belong = models.ForeignKey('em.equipment', verbose_name='属于哪个设备', related_name='mp_ep_belong', on_delete=models.SET_NULL, null=True, blank=True) ep_belong = models.ForeignKey("em.equipment", verbose_name="所属设备", related_name="mp_ep_belong", on_delete=models.SET_NULL, null=True, blank=True)
mgroup = models.ForeignKey('mtm.mgroup', verbose_name='所在集合', on_delete=models.SET_NULL, null=True, blank=True) mgroup = models.ForeignKey("mtm.mgroup", verbose_name="所在集合", on_delete=models.SET_NULL, null=True, blank=True)
mgroups_allocate = models.JSONField('各工段分配', default=list, blank=True, help_text='[{"mgroup":"x", "ratio": 1}]') mgroups_allocate = models.JSONField("各工段分配", default=list, blank=True, help_text='[{"mgroup":"x", "ratio": 1}]')
is_auto = models.BooleanField('是否自动采集', default=True) is_auto = models.BooleanField("是否自动采集", default=True)
is_all = models.BooleanField('是否记录是整个工段', default=False) is_all = models.BooleanField("是否记录是整个工段", default=False)
formula = models.TextField('计算公式', default='') formula = models.TextField("计算公式", default="")
func_on_change = models.CharField('数据变动时执行方法', max_length=100, default='', blank=True) func_on_change = models.CharField("数据变动时执行方法", max_length=100, default="", blank=True)
interval = models.PositiveSmallIntegerField('采集间隔(秒)', default=10) interval = models.PositiveSmallIntegerField("采集间隔(秒)", default=10)
val_type = models.CharField('值类型', default='float', max_length=50, help_text='float, int, str, bool') val_type = models.CharField("值类型", default="float", max_length=50, help_text="float, int, str, bool")
enabled = models.BooleanField('是否启用', default=False) enabled = models.BooleanField("是否启用", default=False)
third_info = models.JSONField('第三方信息', default=dict, blank=True) third_info = models.JSONField("第三方信息", default=dict, blank=True)
# {"from": "king", "n": "某名称","d": "某描述或备注","g": "某组", "t": "某类型", "id": 5001, "o": "其他信息"} # {"from": "king", "n": "某名称","d": "某描述或备注","g": "某组", "t": "某类型", "id": 5001, "o": "其他信息"}
enp_field = models.CharField("关联enp采集字段", max_length=50, null=True, blank=True)
is_rep_ep_running_state = models.BooleanField("是否表示所监测设备运行状态", default=False)
ep_base_val1 = models.FloatField("状态量基准值1", null=True, blank=True)
@classmethod
def cache_key(cls, code: str):
return f"mpoint_{code}"
class MpLogx(models.Model): class MpLogx(models.Model):
""" """
测点记录超表 测点记录超表
""" """
timex = models.DateTimeField('采集时间', primary_key=True)
mpoint = models.ForeignKey( timex = models.DateTimeField("采集时间", primary_key=True)
Mpoint, verbose_name='关联测点', on_delete=models.CASCADE) mpoint = models.ForeignKey(Mpoint, verbose_name="关联测点", on_delete=models.CASCADE)
val_float = models.FloatField('数值', null=True, blank=True) val_mrs = models.PositiveSmallIntegerField("所监测设备运行状态值", default=10)
val_int = models.IntegerField('数值', null=True, blank=True) val_float = models.FloatField("数值", null=True, blank=True)
val_bool = models.BooleanField('数值', null=True, blank=True) val_int = models.IntegerField("数值", null=True, blank=True)
val_str = models.CharField('数值', max_length=100, null=True, blank=True) val_bool = models.BooleanField("数值", null=True, blank=True)
val_str = models.CharField("数值", max_length=100, null=True, blank=True)
class Meta: class Meta:
db_table = 'enm_mplogx' db_table = "enm_mplogx"
managed = False managed = False
unique_together = (('mpoint', 'timex'), ) unique_together = (("mpoint", "timex"),)
class MpLog(BaseModel): class MpLog(BaseModel):
"""旧表(已废弃) """旧表(已废弃)"""
"""
mpoint = models.ForeignKey(Mpoint, verbose_name='关联测点', on_delete=models.SET_NULL, null=True, blank=True) mpoint = models.ForeignKey(Mpoint, verbose_name="关联测点", on_delete=models.SET_NULL, null=True, blank=True)
tag_id = models.BigIntegerField('记录ID', db_index=True) tag_id = models.BigIntegerField("记录ID", db_index=True)
tag_code = models.CharField('测点编号', max_length=50) tag_code = models.CharField("测点编号", max_length=50)
tag_update = models.DateTimeField('更新时间') tag_update = models.DateTimeField("更新时间")
tag_val = models.FloatField('当前值') tag_val = models.FloatField("当前值")
class MpointStat(CommonADModel): class MpointStat(CommonADModel):
"""测点统计表 """测点统计表"""
"""
type = models.CharField('统计维度', max_length=50, default='hour', help_text='year/month/day/year_s/month_s/month_st/day_s/sflog/hour_s/hour')
year = models.PositiveSmallIntegerField('', null=True, blank=True)
month = models.PositiveSmallIntegerField('', null=True, blank=True)
day = models.PositiveSmallIntegerField('', null=True, blank=True)
year_s = models.PositiveSmallIntegerField('班年', null=True, blank=True) type = models.CharField("统计维度", max_length=50, default="hour", help_text="year/month/day/year_s/month_s/month_st/day_s/sflog/hour_s/hour")
month_s = models.PositiveSmallIntegerField('班月', null=True, blank=True) year = models.PositiveSmallIntegerField("", null=True, blank=True)
day_s = models.PositiveSmallIntegerField('班日', null=True, blank=True) month = models.PositiveSmallIntegerField("", null=True, blank=True)
day = models.PositiveSmallIntegerField("", null=True, blank=True)
hour = models.PositiveSmallIntegerField('', null=True, blank=True) year_s = models.PositiveSmallIntegerField("班年", null=True, blank=True)
sflog = models.ForeignKey(SfLog, verbose_name='关联值班记录', on_delete=models.CASCADE, null=True, blank=True) month_s = models.PositiveSmallIntegerField("班月", null=True, blank=True)
mgroup = models.ForeignKey(Mgroup, verbose_name='关联测点集', on_delete=models.CASCADE, null=True, blank=True) day_s = models.PositiveSmallIntegerField("班日", null=True, blank=True)
mpoint = models.ForeignKey(Mpoint, verbose_name='关联测点', on_delete=models.CASCADE)
val = models.FloatField('统计值', default=0) hour = models.PositiveSmallIntegerField("", null=True, blank=True)
total_production = models.FloatField('总产量', default=0, help_text='t') sflog = models.ForeignKey(SfLog, verbose_name="关联值班记录", on_delete=models.CASCADE, null=True, blank=True)
elec_consume_unit = models.FloatField('单位产品电耗', default=0, help_text='kw·h/t') mgroup = models.ForeignKey(Mgroup, verbose_name="关联测点集", on_delete=models.CASCADE, null=True, blank=True)
mpoint = models.ForeignKey(Mpoint, verbose_name="关联测点", on_delete=models.CASCADE)
val = models.FloatField("统计值", default=0)
total_production = models.FloatField("总产量", default=0, help_text="t")
elec_consume_unit = models.FloatField("单位产品电耗", default=0, help_text="kw·h/t")
class EnStat(BaseModel): class EnStat(BaseModel):
""" """
能源数据统计表 能源数据统计表
""" """
type = models.CharField('统计维度', max_length=50, default='hour', help_text='year_s/month_s/month_st/day_s/sflog/hour_s')
sflog = models.ForeignKey(SfLog, verbose_name='关联值班记录', on_delete=models.CASCADE, null=True, blank=True) type = models.CharField("统计维度", max_length=50, default="hour", help_text="year_s/month_s/month_st/day_s/sflog/hour_s")
team = models.ForeignKey(Team, verbose_name='关联班组', on_delete=models.CASCADE, null=True, blank=True) sflog = models.ForeignKey(SfLog, verbose_name="关联值班记录", on_delete=models.CASCADE, null=True, blank=True)
mgroup = models.ForeignKey(Mgroup, verbose_name='关联工段', on_delete=models.CASCADE) team = models.ForeignKey(Team, verbose_name="关联班组", on_delete=models.CASCADE, null=True, blank=True)
year = models.PositiveSmallIntegerField('', null=True, blank=True) mgroup = models.ForeignKey(Mgroup, verbose_name="关联工段", on_delete=models.CASCADE)
month = models.PositiveSmallIntegerField('', null=True, blank=True) year = models.PositiveSmallIntegerField("", null=True, blank=True)
day = models.PositiveSmallIntegerField('', null=True, blank=True) month = models.PositiveSmallIntegerField("", null=True, blank=True)
hour = models.PositiveSmallIntegerField('小时', null=True, blank=True) day = models.PositiveSmallIntegerField("", null=True, blank=True)
year_s = models.PositiveSmallIntegerField('班年', null=True, blank=True) hour = models.PositiveSmallIntegerField("小时", null=True, blank=True)
month_s = models.PositiveSmallIntegerField('班月', null=True, blank=True) year_s = models.PositiveSmallIntegerField("班年", null=True, blank=True)
day_s = models.PositiveSmallIntegerField('班日', null=True, blank=True) month_s = models.PositiveSmallIntegerField("班月", null=True, blank=True)
total_production = models.FloatField('总产量', default=0, help_text='t') day_s = models.PositiveSmallIntegerField("班日", null=True, blank=True)
elec_consume = models.FloatField('总电耗', default=0, help_text='kw·h') total_production = models.FloatField("总产量", default=0, help_text="t")
elec_coal_consume = models.FloatField('电量折标煤', default=0, help_text='tce') elec_consume = models.FloatField("总电耗", default=0, help_text="kw·h")
pcoal_heat = models.FloatField('煤粉热值', default=0) elec_coal_consume = models.FloatField("电量折标煤", default=0, help_text="tce")
pcoal_consume = models.FloatField('煤粉消耗', default=0, help_text='t') pcoal_heat = models.FloatField("煤粉热值", default=0)
pcoal_coal_consume = models.FloatField('煤粉折标煤', default=0, help_text='tce') pcoal_consume = models.FloatField("煤粉消耗", default=0, help_text="t")
water_consume = models.FloatField('水消耗', default=0, help_text='t') pcoal_coal_consume = models.FloatField("煤粉折标煤", default=0, help_text="tce")
cair_consume = models.FloatField('压缩空气', default=0, help_text='m3') water_consume = models.FloatField("水消耗", default=0, help_text="t")
out_steam = models.FloatField('外送蒸汽', default=0, help_text='t') cair_consume = models.FloatField("压缩空气", default=0, help_text="m3")
out_steam_coal = models.FloatField('外送蒸汽折标煤', default=0, help_text='tce') out_steam = models.FloatField("外送蒸汽", default=0, help_text="t")
ccr_consume = models.FloatField('电石渣消耗', default=0, help_text='t') out_steam_coal = models.FloatField("外送蒸汽折标煤", default=0, help_text="tce")
kiln_end_heat = models.FloatField('窑尾余热', default=0, help_text='tce') ccr_consume = models.FloatField("电石渣消耗", default=0, help_text="t")
imaterial_data = models.JSONField('成本物料数据', default=list, blank=True) kiln_end_heat = models.FloatField("窑尾余热", default=0, help_text="tce")
other_cost_data = models.JSONField('其他成本数据', default=list, blank=True) imaterial_data = models.JSONField("成本物料数据", default=list, blank=True)
qua_data = models.JSONField('质检数据', default=list, blank=True) other_cost_data = models.JSONField("其他成本数据", default=list, blank=True)
equip_elec_data = models.JSONField('设备电耗数据', default=list, blank=True) qua_data = models.JSONField("质检数据", default=list, blank=True)
production_cost_unit = models.FloatField('单位产品成本', default=0, help_text='y/t') equip_elec_data = models.JSONField("设备电耗数据", default=list, blank=True)
elec_consume_unit = models.FloatField('单位产品分布电耗', default=0, help_text='kw·h/t') production_cost_unit = models.FloatField("单位产品成本", default=0, help_text="y/t")
celec_consume_unit = models.FloatField('单位产品综合电耗', default=0, help_text='kw·h/t') elec_consume_unit = models.FloatField("单位产品分布电耗", default=0, help_text="kw·h/t")
coal_consume_unit = models.FloatField('单位产品标煤耗', default=0, help_text='kgce/t') celec_consume_unit = models.FloatField("单位产品综合电耗", default=0, help_text="kw·h/t")
en_consume_unit = models.FloatField('单位产品能耗', default=0, help_text='tce/t') coal_consume_unit = models.FloatField("单位产品标煤耗", default=0, help_text="kgce/t")
cen_consume_unit = models.FloatField('单位产品综合能耗', default=0, help_text='kgce/t') en_consume_unit = models.FloatField("单位产品能耗", default=0, help_text="tce/t")
production_hour = models.FloatField('台时产量', default=0, help_text='t/h') cen_consume_unit = models.FloatField("单位产品综合能耗", default=0, help_text="kgce/t")
total_hour_now = models.FloatField('动态总时长', default=0, help_text='h') production_hour = models.FloatField("台时产量", default=0, help_text="t/h")
run_hour = models.FloatField('运转时长', default=0, help_text='h') total_hour_now = models.FloatField("动态总时长", default=0, help_text="h")
shut_hour = models.FloatField('停机时长', default=0, help_text='h') run_hour = models.FloatField("运转时长", default=0, help_text="h")
run_rate = models.FloatField('运转率', default=0, help_text='%') shut_hour = models.FloatField("停机时长", default=0, help_text="h")
run_rate = models.FloatField("运转率", default=0, help_text="%")
class EnStat2(BaseModel): class EnStat2(BaseModel):
""" """
能源数据统计表2 能源数据统计表2
""" """
type = models.CharField('统计维度', max_length=50, default='month_s', help_text='month_s/day_s')
year_s = models.PositiveSmallIntegerField('班年') type = models.CharField("统计维度", max_length=50, default="month_s", help_text="month_s/day_s")
month_s = models.PositiveSmallIntegerField('班月') year_s = models.PositiveSmallIntegerField("班年")
day_s = models.PositiveSmallIntegerField('班日', null=True, blank=True) month_s = models.PositiveSmallIntegerField("班月")
industry_total_val = models.FloatField('工业总产值', default=0, help_text='万元') day_s = models.PositiveSmallIntegerField("班日", null=True, blank=True)
industry_add_val = models.FloatField('工业增加值', default=0, help_text='万元') industry_total_val = models.FloatField("工业总产值", default=0, help_text="万元")
elec_consume = models.FloatField('总电耗', default=0, help_text='kw·h') industry_add_val = models.FloatField("工业增加值", default=0, help_text="万元")
water_consume = models.FloatField('水耗', default=0, help_text='t') elec_consume = models.FloatField("总电耗", default=0, help_text="kw·h")
cair_consume = models.FloatField('压缩空气', default=0, help_text='m3') water_consume = models.FloatField("水耗", default=0, help_text="t")
elec_coal_consume = models.FloatField('电量折标煤', default=0, help_text='tce') cair_consume = models.FloatField("压缩空气", default=0, help_text="m3")
pcoal_consume = models.FloatField('煤粉消耗', default=0, help_text='t') elec_coal_consume = models.FloatField("电量折标煤", default=0, help_text="tce")
pcoal_coal_consume = models.FloatField('煤粉折标煤', default=0, help_text='tce') pcoal_consume = models.FloatField("煤粉消耗", default=0, help_text="t")
bulk_cement_val =models.FloatField('散装水泥发运量', default=0) pcoal_coal_consume = models.FloatField("煤粉折标煤", default=0, help_text="tce")
bulk_cement_price = models.FloatField('散装水泥价格', default=0) bulk_cement_val = models.FloatField("散装水泥发运量", default=0)
bag_cement_val = models.FloatField('袋装水泥发运量', default=0) bulk_cement_price = models.FloatField("散装水泥价格", default=0)
bag_cement_price = models.FloatField('袋装水泥价格', default=0) bag_cement_val = models.FloatField("袋装水泥发运量", default=0)
clinker_val = models.FloatField('散装熟料发运量', default=0) bag_cement_price = models.FloatField("袋装水泥价格", default=0)
clinker_price = models.FloatField('散装熟料价格', default=0) clinker_val = models.FloatField("散装熟料发运量", default=0)
cement_val = models.FloatField('水泥产量', default=0) clinker_price = models.FloatField("散装熟料价格", default=0)
cement_cost_unit = models.FloatField('水泥单位成本', default=0) cement_val = models.FloatField("水泥产量", default=0)
en_consume = models.FloatField('能源消耗', default=0, help_text='tce') cement_cost_unit = models.FloatField("水泥单位成本", default=0)
en_consume_unit = models.FloatField('单位工业总产值能耗', default=0) en_consume = models.FloatField("能源消耗", default=0, help_text="tce")
en_add_consume_unit = models.FloatField('单位工业增加值能耗', default=0) en_consume_unit = models.FloatField("单位工业总产值能耗", default=0)
en_add_consume_unit = models.FloatField("单位工业增加值能耗", default=0)

View File

@ -9,40 +9,41 @@ from django.core.cache import cache
class MpointSerializer(CustomModelSerializer): class MpointSerializer(CustomModelSerializer):
mgroup = serializers.PrimaryKeyRelatedField(label="测点集", queryset=Mgroup.objects.all(), required=False, allow_null=True) mgroup = serializers.PrimaryKeyRelatedField(label="测点集", queryset=Mgroup.objects.all(), required=False, allow_null=True)
mgroup_name = serializers.CharField(source='mgroup.name', read_only=True) mgroup_name = serializers.CharField(source="mgroup.name", read_only=True)
belong_dept_name = serializers.CharField(source='belong_dept.name', read_only=True) belong_dept_name = serializers.CharField(source="belong_dept.name", read_only=True)
ep_monitored_name = serializers.CharField(source='ep_monitored.name', read_only=True) ep_monitored_name = serializers.CharField(source="ep_monitored.name", read_only=True)
ep_monitored_power_kw = serializers.CharField(source='ep_monitored.power_kw', read_only=True) ep_monitored_power_kw = serializers.CharField(source="ep_monitored.power_kw", read_only=True)
ep_belong_name = serializers.CharField(source='ep_belong.name', read_only=True) ep_belong_name = serializers.CharField(source="ep_belong.name", read_only=True)
material_name = serializers.CharField(source='material.name', read_only=True) material_name = serializers.CharField(source="material.name", read_only=True)
formula = serializers.CharField(allow_blank=True, required=False) formula = serializers.CharField(allow_blank=True, required=False)
last_data = serializers.SerializerMethodField() last_data = serializers.SerializerMethodField()
class Meta: class Meta:
model = Mpoint model = Mpoint
fields = '__all__' fields = "__all__"
read_only_fields = EXCLUDE_FIELDS + ['belong_dept', 'cate'] read_only_fields = EXCLUDE_FIELDS + ["belong_dept", "cate"]
def get_last_data(self, obj): def get_last_data(self, obj):
last_data = cache.get(f'mpoint_{obj.code}', None) last_data = cache.get(Mpoint.cache_key(obj.code), {})
return {'last_val': last_data.get('last_val', None) if last_data else None, 'last_timex': last_data.get('last_timex', None) if last_data else None} return last_data
def validate(self, attrs): def validate(self, attrs):
if 'material' in attrs and attrs['material']: if "material" in attrs and attrs["material"]:
attrs['cate'] = 'material' attrs["cate"] = "material"
if 'mgroup' in attrs and attrs['mgroup']: if "mgroup" in attrs and attrs["mgroup"]:
attrs['belong_dept'] = attrs['mgroup'].belong_dept attrs["belong_dept"] = attrs["mgroup"].belong_dept
# attrs['mgroups_allocate'] = [{'mgroup': attrs['mgroup'].id, 'mgroup_name': attrs['mgroup'].name, 'ratio': 1}] # attrs['mgroups_allocate'] = [{'mgroup': attrs['mgroup'].id, 'mgroup_name': attrs['mgroup'].name, 'ratio': 1}]
ratio_ = 0 ratio_ = 0
mgroupIds = [] mgroupIds = []
for i in attrs['mgroups_allocate']: for i in attrs["mgroups_allocate"]:
if i['mgroup']: if i["mgroup"]:
ratio_ = ratio_ + i['ratio'] ratio_ = ratio_ + i["ratio"]
if i['mgroup'] in mgroupIds: if i["mgroup"] in mgroupIds:
raise ParseError('分配集错误') raise ParseError("分配集错误")
mgroupIds.append(i['mgroup']) mgroupIds.append(i["mgroup"])
i['mgroup_name'] = Mgroup.objects.get(id=i['mgroup']).name i["mgroup_name"] = Mgroup.objects.get(id=i["mgroup"]).name
if attrs['mgroups_allocate'] and round(ratio_, 3) != 1.0: if attrs["mgroups_allocate"] and round(ratio_, 3) != 1.0:
raise ParseError('比例合计错误') raise ParseError("比例合计错误")
return attrs return attrs
@ -63,17 +64,18 @@ class MpLogxSerializer(CustomModelSerializer):
class MpointStatSerializer(CustomModelSerializer): class MpointStatSerializer(CustomModelSerializer):
mpoint_name = serializers.CharField(source='mpoint.name', read_only=True) mpoint_name = serializers.CharField(source="mpoint.name", read_only=True)
ep_monitored_name = serializers.CharField(source='mpoint.ep_monitored.name', read_only=True) ep_monitored_name = serializers.CharField(source="mpoint.ep_monitored.name", read_only=True)
ep_monitored_number = serializers.CharField(source='mpoint.ep_monitored.number', read_only=True) ep_monitored_number = serializers.CharField(source="mpoint.ep_monitored.number", read_only=True)
ep_monitored_power_kw= serializers.CharField(source='mpoint.ep_monitored.power_kw', read_only=True) ep_monitored_power_kw = serializers.CharField(source="mpoint.ep_monitored.power_kw", read_only=True)
ep_belong_name = serializers.CharField(source='mpoint.ep_belong.name', read_only=True) ep_belong_name = serializers.CharField(source="mpoint.ep_belong.name", read_only=True)
mgroup_name = serializers.CharField(source='mgroup.name', read_only=True) mgroup_name = serializers.CharField(source="mgroup.name", read_only=True)
belong_dept_name = serializers.CharField(source='mgroup.belong_dept.name', read_only=True) belong_dept_name = serializers.CharField(source="mgroup.belong_dept.name", read_only=True)
class Meta: class Meta:
model = MpointStat model = MpointStat
fields = '__all__' fields = "__all__"
read_only_fields = EXCLUDE_FIELDS + ['mpoint_name', 'type', 'year', 'month', 'day'] read_only_fields = EXCLUDE_FIELDS + ["mpoint_name", "type", "year", "month", "day"]
def check_required_keys(dictionary, keys): def check_required_keys(dictionary, keys):
for key in keys: for key in keys:
@ -82,14 +84,14 @@ class MpointStatSerializer(CustomModelSerializer):
return True return True
def validate(self, attrs): def validate(self, attrs):
mpoint = attrs['mpoint'] mpoint = attrs["mpoint"]
if mpoint.material and mpoint.is_auto is False and 'sflog' in attrs and attrs['sflog']: if mpoint.material and mpoint.is_auto is False and "sflog" in attrs and attrs["sflog"]:
attrs['type'] = 'sflog' attrs["type"] = "sflog"
sflog = attrs['sflog'] sflog = attrs["sflog"]
attrs['year_s'], attrs['month_s'], attrs['day_s'] = sflog.get_ymd attrs["year_s"], attrs["month_s"], attrs["day_s"] = sflog.get_ymd
attrs['mgroup'] = sflog.mgroup attrs["mgroup"] = sflog.mgroup
else: else:
raise ParseError('该数据不支持手工录入') raise ParseError("该数据不支持手工录入")
# if 'sflog' in attrs and attrs['sflog']: # if 'sflog' in attrs and attrs['sflog']:
# else: # else:
@ -112,12 +114,13 @@ class MpointStatSerializer(CustomModelSerializer):
class EnStatSerializer(CustomModelSerializer): class EnStatSerializer(CustomModelSerializer):
mgroup_name = serializers.CharField(source='mgroup.name', read_only=True) mgroup_name = serializers.CharField(source="mgroup.name", read_only=True)
team_name = serializers.CharField(source='team.name', read_only=True) team_name = serializers.CharField(source="team.name", read_only=True)
belong_dept_name = serializers.CharField(source='mgroup.belong_dept.name', read_only=True) belong_dept_name = serializers.CharField(source="mgroup.belong_dept.name", read_only=True)
class Meta: class Meta:
model = EnStat model = EnStat
fields = '__all__' fields = "__all__"
def to_representation(self, instance): def to_representation(self, instance):
ret = super().to_representation(instance) ret = super().to_representation(instance)
@ -126,23 +129,24 @@ class EnStatSerializer(CustomModelSerializer):
ret_one_val = ret[key] ret_one_val = ret[key]
if isinstance(ret_one_val, float): if isinstance(ret_one_val, float):
ret[key] = "{:.2f}".format(round(ret_one_val, 2)) ret[key] = "{:.2f}".format(round(ret_one_val, 2))
qua_data = ret['qua_data'] qua_data = ret["qua_data"]
equip_elec_data = ret['equip_elec_data'] equip_elec_data = ret["equip_elec_data"]
if qua_data: if qua_data:
for item in qua_data: for item in qua_data:
ret[f'{item["material_name"]}_{item["testitem_name"]}_rate_pass'] = "{:.2f}".format(round(item['rate_pass'], 4)) ret[f'{item["material_name"]}_{item["testitem_name"]}_rate_pass'] = "{:.2f}".format(round(item["rate_pass"], 4))
if equip_elec_data: if equip_elec_data:
for item in equip_elec_data: for item in equip_elec_data:
val = item.get('consume_unit', None) val = item.get("consume_unit", None)
if val: if val:
val = "{:.2f}".format(round(val, 2)) val = "{:.2f}".format(round(val, 2))
ret[f'{item["equipment_name"]}_consume_unit'] = val ret[f'{item["equipment_name"]}_consume_unit'] = val
return ret return ret
class EnStat2Serializer(CustomModelSerializer): class EnStat2Serializer(CustomModelSerializer):
class Meta: class Meta:
model = EnStat2 model = EnStat2
fields = '__all__' fields = "__all__"
def to_representation(self, instance): def to_representation(self, instance):
ret = super().to_representation(instance) ret = super().to_representation(instance)

View File

@ -7,13 +7,17 @@ from django.db.models import Q
from django.utils import timezone from django.utils import timezone
from django.core.cache import cache from django.core.cache import cache
import concurrent.futures import concurrent.futures
from django.db import connection from django.db import connection, transaction
from datetime import datetime, timedelta from datetime import datetime, timedelta
from apps.utils.decorators import auto_log from apps.utils.decorators import auto_log
from django.db import IntegrityError from django.db import IntegrityError
from apps.utils.tasks import ctask_run from apps.utils.tasks import ctask_run
from .serializers import MpointSerializer
from apps.enp.models import EnvData
from apps.em.models import Equipment, RuningState
myLogger = logging.getLogger("log")
myLogger = logging.getLogger('log')
def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int): def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int):
""" """
@ -22,24 +26,24 @@ def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour:
pattern = r"\${(.*?)}" pattern = r"\${(.*?)}"
matches = re.findall(pattern, exp_str) matches = re.findall(pattern, exp_str)
for match in matches: for match in matches:
mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q( mpst = MpointStat.objects.filter(Q(mpoint__id=match) | Q(mpoint__name=match) | Q(mpoint__code=match), type="hour", year=year, month=month, day=day, hour=hour).first()
mpoint__code=match), type='hour', year=year, month=month, day=day, hour=hour).first()
if mpst: if mpst:
exp_str = exp_str.replace(f"${{{match}}}", str(mpst.val)) exp_str = exp_str.replace(f"${{{match}}}", str(mpst.val))
rval = eval(exp_str) rval = eval(exp_str)
return rval return rval
def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21): # def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
""" # """
根据给定的小时数, 计算出班天 # 根据给定的小时数, 计算出班天
""" # """
if hour <= hour_split: # if hour <= hour_split:
return year, month, day # return year, month, day
else: # else:
now = datetime.datetime(year, month, day, hour) # now = datetime.datetime(year, month, day, hour)
now2 = now + datetime.timedelta(days=1) # now2 = now + datetime.timedelta(days=1)
return now2.year, now2.month, now2.day # return now2.year, now2.month, now2.day
# cal_rule = { # cal_rule = {
# "电石渣": { # "电石渣": {
@ -86,52 +90,76 @@ def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
# return goal_data, score # return goal_data, score
def shutdown_or_startup(mpointId: str, last_val, last_timex: datetime): def get_mpoint_cache(code: str, force_update=False, update_mplogx=True):
""" """
last_val 可能是不同类型的值(bool int) 获取或更新mpoint的缓存数据
返回空代表无该测点
""" """
from apps.wpm.models import StLog key = Mpoint.cache_key(code)
from apps.wpm.tasks import cal_exp_duration_hour mpoint_data = cache.get(key, None)
from apps.wpm.services import get_sflog if mpoint_data is None or force_update:
mpoint = Mpoint.objects.get(id=mpointId) try:
mgroup = mpoint.mgroup mpoint = Mpoint.objects.get(code=code)
last_stlog = StLog.objects.filter( except Exception:
mgroup=mgroup, is_shutdown=True).order_by('start_time').last() # 找到最后一次停机记录 return None
if last_stlog: mpoint_data = MpointSerializer(instance=mpoint).data
if last_timex >= last_stlog.start_time: # 认为是有效信号 if update_mplogx:
if last_stlog.end_time is None and last_val in (1, True): # 从停到开 now = timezone.now()
last_stlog.end_time = last_timex last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=5)).order_by("-timex").first()
last_stlog.duration = ( if last_mplogx: # 核心数据
last_stlog.end_time - last_stlog.start_time).total_seconds()/3600 mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex}
last_stlog.save() cache.set(key, mpoint_data, timeout=None)
mgroup.is_runing = True return mpoint_data
mgroup.save()
cal_exp_duration_hour(last_stlog.id) # 触发时间分配
elif last_stlog.end_time and last_val in (0, False) and last_timex > last_stlog.end_time: # 从开到停 def update_mpoint_cache(cache_key: str, current_cache_val: dict, last_timex: datetime, last_val, last_mrs):
StLog.objects.create( """
title='停机', 更新mpoint的缓存数据并执行某些操作
is_shutdown=True, last_mrs: 所监测的设备运行状态值可不传
mgroup=mgroup, """
end_time=None, ep_belong_id = current_cache_val.get("ep_belong", None)
start_time=last_timex, ep_monitored_id = current_cache_val.get("ep_monitored", False)
sflog=get_sflog(mgroup, last_timex) last_data = current_cache_val["last_data"]
) last_data["pre_val"] = last_data.get("last_val", None)
mgroup.is_runing = False last_data["pre_timex"] = last_data.get("last_timex", None)
mgroup.save() last_data["pre_mrs"] = last_data.get("last_mrs", None)
last_data["last_val"] = last_val
if last_mrs:
last_data["last_mrs"] = last_mrs
last_data["last_timex"] = last_timex
cache.set(cache_key, current_cache_val, timeout=None)
# 更新设备状态缓存值
if ep_belong_id:
cache.set(f"equipment_{ep_belong_id}", {"running_state": RuningState.RUNING, "running_state_timex": last_timex}, timeout=None)
if ep_monitored_id:
cache.set(f"equipment_{ep_monitored_id}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None)
# 如果state变动则触发函数
if last_data["pre_mrs"] != last_mrs:
if ep_belong_id:
ctask_run("apps.em.services.shutdown_or_startup", ep_belong_id, last_timex, RuningState.RUNING)
if ep_monitored_id:
ctask_run("apps.em.services.shutdown_or_startup", ep_belong_id, last_timex, last_mrs)
def transfer_mpoint_val_to_ep_running_state(current_val, base_val1: float):
"""
将测点值转换所监测设备的运行状态值
"""
if isinstance(current_val, bool):
if current_val:
return RuningState.RUNING
return RuningState.STOP
rs = RuningState.RUNING
if base_val1:
if current_val < base_val1:
rs = RuningState.STOP
else: else:
StLog.objects.create( if current_val == 0:
title='停机', rs = RuningState.STOP
is_shutdown=True, return rs
mgroup=mgroup,
end_time=None,
start_time=last_timex,
sflog=get_sflog(mgroup, last_timex))
mgroup.is_runing = False
mgroup.save()
def king_sync(projectName: str, json_path: str = ""):
def king_sync(projectName: str, json_path: str=""):
""" """
同步亚控测点 同步亚控测点
""" """
@ -139,98 +167,129 @@ def king_sync(projectName: str, json_path: str=""):
import os import os
import json import json
from django.conf import settings from django.conf import settings
with open(os.path.join(settings.BASE_DIR, json_path), 'r', encoding='utf-8') as f:
with open(os.path.join(settings.BASE_DIR, json_path), "r", encoding="utf-8") as f:
res = json.loads(f.read()) res = json.loads(f.read())
else: else:
from apps.third.king.k import kingClient from apps.third.king.k import kingClient
from apps.third.king import king_api from apps.third.king import king_api
_, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName}) _, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName})
t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"} t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"}
for index, item in enumerate(res['objectList']): for index, item in enumerate(res["objectList"]):
if 't' in item and item['t'] and 'SystemTag' not in item['d']: if "t" in item and item["t"] and "SystemTag" not in item["d"]:
code = f'K_{item["n"]}' code = f'K_{item["n"]}'
item['from'] = 'king' item["from"] = "king"
group = item['g'] group = item["g"]
name = item['d'] name = item["d"]
if group: if group:
name = f'{group}.{name}' name = f"{group}.{name}"
Mpoint.objects.get_or_create(code=code, defaults={ Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "is_auto": True, "val_type": t_dict[item["t"]], "third_info": item})
"name": name,
"code": code,
"enabled": False,
"is_auto": True,
"val_type": t_dict[item["t"]],
"third_info": item
})
@auto_log('缓存测点')
def cache_mpoints(mpointId: str = ''):
"""
缓存所有可用的测点
"""
if mpointId:
mpoints_data = Mpoint.objects.filter(id=mpointId, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
else:
mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info")
for item in list(mpoints_data):
key = f'mpoint_{item["code"]}'
cache_mpoint = cache.get(key, {})
cache_mpoint.update(item)
cache.set(key, cache_mpoint, timeout=None)
# print(cache.get('mpoint_K_IP2_MM_SC_IW14'))
test_data = {"PNs":{"1":"V","2":"T","3":"Q"},"PVs":{"1":-725,"2":"2024-04-08 13:43:53.140","3":192},"Objs":[{"N":"IP2_MM_IW3"},{"N":"IP2_MM_IW4","1":-6386},{"N":"IP2_MM_IW5","1":-7835},{"N":"IP2_MM_IW7","1":-6864},{"N":"IP2_MM_IW15","1":29},{"N":"IP2_MM_SC_IW3","1":337},{"N":"IP2_MM_SC_IW13","1":-5511,"2":24},{"N":"IP2_MM_SC_IW14","1":-4640,"2":24},{"N":"IP2_MM_SC_IW15","1":-5586,"2":24},{"N":"IP2_MM_SC_IW16","1":-6634,"2":24},{"N":"IP2_MM_SC_IW17","1":-2768,"2":24},{"N":"IP2_MM_SC_IW18","1":-2946,"2":24},{"N":"IP2_MM_SC_IW19","1":-2550,"2":24},{"N":"IP2_MM_SC_IW20","1":-1512,"2":24},{"N":"IP2_MM_SC_IW21","1":-1775,"2":24},{"N":"IP2_MM_SC_IW22","1":-194,"2":24},{"N":"IP2_MM_SC_IW23","1":-366,"2":24},{"N":"IP2_MM_SC_IW24","1":-9291,"2":24},{"N":"IP2_MM_SC_IW25","1":-6888,"2":24},{"N":"IP2_MM_SC_IW26","1":-2360,"2":24},{"N":"IP2_MM_SC_IW29","1":164,"2":24},{"N":"IP2_MM_SC_IW30","1":914,"2":24},{"N":"IP2_MM_SC_IW31","1":849,"2":24},{"N":"IP2_MM_SC_IW37","1":-125,"2":24},{"N":"IP2_MM_SC_IW38","1":-3009,"2":24},{"N":"IP2_MM_SC_IW40","1":-1394,"2":24},{"N":"IP2_MM_SC_IW43","1":758,"2":24},{"N":"IP3_SC_IW1","1":11557,"2":107},{"N":"IP3_SC_IW2","1":7624,"2":107},{"N":"IP3_SC_IW6","1":11159,"2":107},{"N":"IP3_SC_IW7","1":8073,"2":107},{"N":"IP3_SC_IW9","1":4490,"2":107},{"N":"IP3_SC_IW10","1":5437,"2":107},{"N":"IP3_SC_IW11","1":9244,"2":107},{"N":"IP3_SC_IW12","1":7886,"2":107},{"N":"IP3_SC_IW13","1":-2962,"2":107},{"N":"IP3_SC_IW14","1":-159,"2":107},{"N":"IP3_SC_IW26","1":15,"2":107},{"N":"IP3_SC_IW27","1":15,"2":107}]} test_data = {
"PNs": {"1": "V", "2": "T", "3": "Q"},
"PVs": {"1": -725, "2": "2024-04-08 13:43:53.140", "3": 192},
"Objs": [
{"N": "IP2_MM_IW3"},
{"N": "IP2_MM_IW4", "1": -6386},
{"N": "IP2_MM_IW5", "1": -7835},
{"N": "IP2_MM_IW7", "1": -6864},
{"N": "IP2_MM_IW15", "1": 29},
{"N": "IP2_MM_SC_IW3", "1": 337},
{"N": "IP2_MM_SC_IW13", "1": -5511, "2": 24},
{"N": "IP2_MM_SC_IW14", "1": -4640, "2": 24},
{"N": "IP2_MM_SC_IW15", "1": -5586, "2": 24},
{"N": "IP2_MM_SC_IW16", "1": -6634, "2": 24},
{"N": "IP2_MM_SC_IW17", "1": -2768, "2": 24},
{"N": "IP2_MM_SC_IW18", "1": -2946, "2": 24},
{"N": "IP2_MM_SC_IW19", "1": -2550, "2": 24},
{"N": "IP2_MM_SC_IW20", "1": -1512, "2": 24},
{"N": "IP2_MM_SC_IW21", "1": -1775, "2": 24},
{"N": "IP2_MM_SC_IW22", "1": -194, "2": 24},
{"N": "IP2_MM_SC_IW23", "1": -366, "2": 24},
{"N": "IP2_MM_SC_IW24", "1": -9291, "2": 24},
{"N": "IP2_MM_SC_IW25", "1": -6888, "2": 24},
{"N": "IP2_MM_SC_IW26", "1": -2360, "2": 24},
{"N": "IP2_MM_SC_IW29", "1": 164, "2": 24},
{"N": "IP2_MM_SC_IW30", "1": 914, "2": 24},
{"N": "IP2_MM_SC_IW31", "1": 849, "2": 24},
{"N": "IP2_MM_SC_IW37", "1": -125, "2": 24},
{"N": "IP2_MM_SC_IW38", "1": -3009, "2": 24},
{"N": "IP2_MM_SC_IW40", "1": -1394, "2": 24},
{"N": "IP2_MM_SC_IW43", "1": 758, "2": 24},
{"N": "IP3_SC_IW1", "1": 11557, "2": 107},
{"N": "IP3_SC_IW2", "1": 7624, "2": 107},
{"N": "IP3_SC_IW6", "1": 11159, "2": 107},
{"N": "IP3_SC_IW7", "1": 8073, "2": 107},
{"N": "IP3_SC_IW9", "1": 4490, "2": 107},
{"N": "IP3_SC_IW10", "1": 5437, "2": 107},
{"N": "IP3_SC_IW11", "1": 9244, "2": 107},
{"N": "IP3_SC_IW12", "1": 7886, "2": 107},
{"N": "IP3_SC_IW13", "1": -2962, "2": 107},
{"N": "IP3_SC_IW14", "1": -159, "2": 107},
{"N": "IP3_SC_IW26", "1": 15, "2": 107},
{"N": "IP3_SC_IW27", "1": 15, "2": 107},
],
}
@auto_log("亚控存库")
def insert_mplogx_from_king_mqtt(data: dict, is_offset=True): def insert_mplogx_from_king_mqtt(data: dict, is_offset=True):
""" """
从king mqtt数据插入超表 从king mqtt数据插入超表
注释的代码是分批存的, 但是实际上不需要, 暂时注释有需要再加
""" """
objs = data['Objs'] objs = data["Objs"]
len_objs = len(objs) # len_objs = len(objs)
pvs = data['PVs'] pvs = data["PVs"]
chunk_size = 200 # chunk_size = 200
num_chunks = (len(objs) + chunk_size - 1) // chunk_size # num_chunks = (len(objs) + chunk_size - 1) // chunk_size
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: otime_obj = timezone.make_aware(datetime.strptime(pvs["2"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0) # 只保留到秒级的精度
futures = []
for i in range(num_chunks): insert_mplogx_from_king_mqtt_chunk(objs, otime_obj, is_offset)
start = i * chunk_size # with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
end = min(start + chunk_size, len_objs) # futures = []
chunk = objs[start:end] # for i in range(num_chunks):
futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, pvs, is_offset)) # start = i * chunk_size
concurrent.futures.wait(futures) # end = min(start + chunk_size, len_objs)
# chunk = objs[start:end]
# futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, otime_obj, is_offset))
# concurrent.futures.wait(futures)
# for future in futures: # for future in futures:
# print(future.result(), end=', ') # print(future.result(), end=', ')
@auto_log('亚控存库')
def insert_mplogx_from_king_mqtt_chunk(objs: list, pvs: dict, is_offset=True): def insert_mplogx_from_king_mqtt_chunk(objs: list, otime_obj: datetime, is_offset=True):
""" """
分批存库, 亚控 38.00,00000.11011 版本偏移只是时间戳偏移另外其实可以不在乎 分批存库, 亚控 38.00,00000.11011 版本偏移只是时间戳偏移另外其实可以不在乎
""" """
# oval = pvs['1'] # oval = pvs['1']
otime_str = pvs['2'] enp_mpoints_dict = {} # 这个地方主要是需要更新envdata表里的数据
otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f'))
insert_db_data = []
insert_data = []
for obj in objs: for obj in objs:
n = obj['N'] n = obj["N"]
val = obj["1"] val = obj["1"]
timex = obj.get("2", None) # timex = obj.get("2", None)
cache_key = f'mpoint_K_{n}' code = f"K_{n}"
mpoint_data = cache.get(cache_key, None) cache_key = Mpoint.cache_key(code)
mpoint_data = get_mpoint_cache(code)
if mpoint_data is None: if mpoint_data is None:
continue continue
val_type = mpoint_data['val_type'] val_type = mpoint_data["val_type"]
if is_offset: # if is_offset:
if timex is None: # if timex is None:
timex = otime_obj # timex = otime_obj
else: # else:
timex = otime_obj + timedelta(milliseconds=timex) # timex = otime_obj + timedelta(mgilliseconds=timex)
else: # else:
timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f')) # timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f'))
mpoint_interval = mpoint_data['interval'] mpoint_interval = mpoint_data["interval"]
mpoint_last_timex = mpoint_data.get('last_timex', None) mpoint_last_timex = mpoint_data.get("last_timex", None)
mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False)
mpoint_ep_base_val1 = mpoint_data.get("ep_base_val1", None)
# 控制采集间隔 # 控制采集间隔
can_save = False can_save = False
if mpoint_last_timex: if mpoint_last_timex:
@ -240,42 +299,47 @@ def insert_mplogx_from_king_mqtt_chunk(objs: list, pvs: dict, is_offset=True):
can_save = True can_save = True
if can_save: if can_save:
save_dict = { save_dict = {
"timex": timex, "timex": otime_obj,
"mpoint": Mpoint.objects.get(id=mpoint_data['id']), "mpoint": Mpoint.objects.get(id=mpoint_data["id"]),
} }
save_dict[f'val_{val_type}'] = val save_dict[f"val_{val_type}"] = val
insert_data.append((cache_key, mpoint_data, val, timex)) val_mrs = RuningState.RUNING
insert_db_data.append(MpLogx(**save_dict)) if mpoint_is_rep_ep_running_state:
save_dict["val_mrs"] = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1)
try:
MpLogx.objects.create(**save_dict)
update_mpoint_cache(cache_key, mpoint_data, otime_obj, val, val_mrs)
# 先尝试批量存库/发生异常则单个存储 # 此处代码用于更新envdata表里的数据
is_bulk_insert = True enp_field = mpoint_data.get("enp_field", None)
try: ep_monitored = mpoint_data.get("ep_monitored", None)
MpLogx.objects.bulk_create(insert_db_data) if enp_field and ep_monitored:
except IntegrityError: if enp_mpoints_dict.get(ep_monitored, None) is None:
is_bulk_insert = False enp_mpoints_dict[ep_monitored] = {"timex": otime_obj, "equipment": Equipment.objects.get(id=ep_monitored)}
for item1, item2 in zip(insert_data, insert_db_data): if enp_field == "running_state":
try: enp_mpoints_dict[ep_monitored].update({enp_field: val_mrs})
MpLogx.objects.create(**item2) else:
update_mpoint_cache_and_do_func(item1) enp_mpoints_dict[ep_monitored].update({enp_field: val})
enp_mpoints_dict[ep_monitored].update({enp_field: val})
except Exception: except Exception:
myLogger.error(f'mpoint_cache_key: {item1[0]} 存库失败', exc_info=True) myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True)
continue continue
if is_bulk_insert: # # 先尝试批量存库/发生异常则单个存储
# 批量存库成功后更新缓存 # is_bulk_insert = True
for item in insert_data: # try:
update_mpoint_cache_and_do_func(item) # MpLogx.objects.bulk_create(insert_db_data)
# except IntegrityError:
# is_bulk_insert = False
# if is_bulk_insert:
# # 批量存库成功后更新缓存
# for item in update_cache_data:
# need_gather_filed_mpoints.append(update_mpoint_cache_and_do_func(item))
# else:
# {'eq_belong': {'dust_rtd': 1.0}}
def update_mpoint_cache_and_do_func(item: dict): # 额外需要处理的数据(存储到envdata表里)
cache_key = item[0] if enp_mpoints_dict:
mpoint_data = item[1] for _, item in enp_mpoints_dict:
mpoint_id = mpoint_data['id'] EnvData.objects(**item)
last_timex: datetime = item[3]
last_val = item[2]
mpoint_data['last_timex'] = last_timex
mpoint_data['last_val'] = last_val
cache.set(cache_key, mpoint_data, timeout=None)
mpoint_func: str = mpoint_data.get('func_on_change', '')
if mpoint_func:
ctask_run.delay(mpoint_func, mpoint_id, last_val, last_timex)

View File

@ -4,46 +4,50 @@ from apps.enm.models import Mpoint, MpointStat, EnStat, EnStat2, MpLogx
from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet
from rest_framework.mixins import ListModelMixin from rest_framework.mixins import ListModelMixin
from apps.utils.mixins import BulkCreateModelMixin, BulkDestroyModelMixin from apps.utils.mixins import BulkCreateModelMixin, BulkDestroyModelMixin
from apps.enm.serializers import (MpointSerializer, MpLogxSerializer, MpointStatSerializer, EnStatSerializer, EnStat2Serializer, ReCalSerializer) from apps.enm.serializers import MpointSerializer, MpLogxSerializer, MpointStatSerializer, EnStatSerializer, EnStat2Serializer, ReCalSerializer
from apps.enm.filters import MpointStatFilter, EnStatFilter, EnStat2Filter from apps.enm.filters import MpointStatFilter, EnStatFilter, EnStat2Filter
from apps.enm.tasks import cal_mpointstat_manual from apps.enm.tasks import cal_mpointstat_manual
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.serializers import Serializer from rest_framework.serializers import Serializer
from rest_framework.decorators import action from rest_framework.decorators import action
from apps.enm.tasks import cal_mpointstats_duration from apps.enm.tasks import cal_mpointstats_duration
from apps.enm.services import king_sync, cache_mpoints from apps.enm.services import king_sync, get_mpoint_cache
from django.db import transaction from django.db import transaction
class MpointViewSet(CustomModelViewSet): class MpointViewSet(CustomModelViewSet):
""" """
list:测点 list:测点
测点 测点
""" """
queryset = Mpoint.objects.all() queryset = Mpoint.objects.all()
serializer_class = MpointSerializer serializer_class = MpointSerializer
select_related_fields = ['create_by', 'belong_dept', 'ep_monitored', 'ep_belong', 'mgroup'] select_related_fields = ["create_by", "belong_dept", "ep_monitored", "ep_belong", "mgroup"]
filterset_fields = ['belong_dept', 'ep_monitored', 'ep_belong', 'mgroup', 'is_auto', 'is_all', 'mgroup__name', 'val_type', 'enabled'] filterset_fields = ["belong_dept", "ep_monitored", "ep_belong", "mgroup", "is_auto", "is_all", "mgroup__name", "val_type", "enabled"]
search_fields = ['name', 'code'] search_fields = ["name", "code"]
ordering = ['-create_time', 'name', 'code'] ordering = ["-create_time", "name", "code"]
@transaction.atomic @transaction.atomic
def perform_create(self, serializer): def perform_create(self, serializer):
instance = serializer.save() instance = serializer.save()
cache_mpoints(instance.id) if instance.code:
get_mpoint_cache(instance.code, True)
@transaction.atomic @transaction.atomic
def perform_update(self, serializer): def perform_update(self, serializer):
instance = serializer.save() instance = serializer.save()
cache_mpoints(instance.id) if instance.code:
get_mpoint_cache(instance.code, True)
@action(methods=['post'], detail=False, perms_map={'post': 'mpoint.create'}, serializer_class=Serializer) @action(methods=["post"], detail=False, perms_map={"post": "mpoint.create"}, serializer_class=Serializer)
def king_sync(self, request, *args, **kwargs): def king_sync(self, request, *args, **kwargs):
"""同步亚控采集点 """同步亚控采集点
同步亚控采集点 同步亚控采集点
""" """
king_sync(getattr(settings, 'KING_PROJECTNAME', "")) king_sync(getattr(settings, "KING_PROJECTNAME", ""))
return Response() return Response()
@ -59,21 +63,23 @@ class MpointViewSet(CustomModelViewSet):
# select_related_fields = ['mpoint'] # select_related_fields = ['mpoint']
# filterset_fields = ['mpoint', 'mpoint__mgroup', 'mpoint__mgroup__belong_dept'] # filterset_fields = ['mpoint', 'mpoint__mgroup', 'mpoint__mgroup__belong_dept']
class MpLogxViewSet(ListModelMixin, CustomGenericViewSet): class MpLogxViewSet(ListModelMixin, CustomGenericViewSet):
""" """
list: 测点采集数据 list: 测点采集数据
测点采集数据 测点采集数据
""" """
perms_map = {'get': '*'}
perms_map = {"get": "*"}
queryset = MpLogx.objects.all() queryset = MpLogx.objects.all()
serializer_class = MpLogxSerializer serializer_class = MpLogxSerializer
filterset_fields = { filterset_fields = {
"timex": ['exact', 'gte', 'lte', 'year', 'month', 'day'], "timex": ["exact", "gte", "lte", "year", "month", "day"],
"mpoint": ['exact'], "mpoint": ["exact"],
} }
ordering_fields = ['timex'] ordering_fields = ["timex"]
ordering = ['-timex'] ordering = ["-timex"]
class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMixin, CustomGenericViewSet): class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMixin, CustomGenericViewSet):
@ -82,10 +88,11 @@ class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMi
测点统计记录 测点统计记录
""" """
perms_map = {'get': '*', 'post': 'mpointstat.create', 'delete': 'mpointstat.delete'}
perms_map = {"get": "*", "post": "mpointstat.create", "delete": "mpointstat.delete"}
queryset = MpointStat.objects.all() queryset = MpointStat.objects.all()
serializer_class = MpointStatSerializer serializer_class = MpointStatSerializer
select_related_fields = ['mpoint', 'mpoint__ep_monitored', 'mpoint__ep_belong', 'mgroup', 'mgroup__belong_dept'] select_related_fields = ["mpoint", "mpoint__ep_monitored", "mpoint__ep_belong", "mgroup", "mgroup__belong_dept"]
filterset_class = MpointStatFilter filterset_class = MpointStatFilter
def perform_create(self, serializer): def perform_create(self, serializer):
@ -97,7 +104,7 @@ class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMi
instance.delete() instance.delete()
cal_mpointstat_manual.delay(mpoint.id, sflog.id, mgroup.id, None, None, None, None, year_s, month_s, day_s) cal_mpointstat_manual.delay(mpoint.id, sflog.id, mgroup.id, None, None, None, None, year_s, month_s, day_s)
@action(methods=['post'], detail=False, perms_map={'post': 'mpointstat.create'}, serializer_class=ReCalSerializer) @action(methods=["post"], detail=False, perms_map={"post": "mpointstat.create"}, serializer_class=ReCalSerializer)
def recal(self, request, *args, **kwargs): def recal(self, request, *args, **kwargs):
"""重新运行某段时间的enm计算 """重新运行某段时间的enm计算
@ -106,9 +113,8 @@ class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMi
data = request.data data = request.data
sr = ReCalSerializer(data=data) sr = ReCalSerializer(data=data)
sr.is_valid(raise_exception=True) sr.is_valid(raise_exception=True)
vdata = sr.validated_data task = cal_mpointstats_duration.delay(data["start_time"], data["end_time"])
task = cal_mpointstats_duration.delay(data['start_time'], data['end_time']) return Response({"task_id": task.task_id})
return Response({'task_id': task.task_id})
class EnStatViewSet(ListModelMixin, CustomGenericViewSet): class EnStatViewSet(ListModelMixin, CustomGenericViewSet):
@ -117,12 +123,13 @@ class EnStatViewSet(ListModelMixin, CustomGenericViewSet):
能耗统计记录 能耗统计记录
""" """
perms_map = {'get': '*'}
perms_map = {"get": "*"}
queryset = EnStat.objects.all() queryset = EnStat.objects.all()
serializer_class = EnStatSerializer serializer_class = EnStatSerializer
select_related_fields = ['mgroup', 'team', 'mgroup__belong_dept'] select_related_fields = ["mgroup", "team", "mgroup__belong_dept"]
filterset_class = EnStatFilter filterset_class = EnStatFilter
ordering = ['mgroup__sort', 'year_s', 'month_s', 'day_s', 'hour'] ordering = ["mgroup__sort", "year_s", "month_s", "day_s", "hour"]
class EnStat2ViewSet(ListModelMixin, CustomGenericViewSet): class EnStat2ViewSet(ListModelMixin, CustomGenericViewSet):
@ -131,7 +138,8 @@ class EnStat2ViewSet(ListModelMixin, CustomGenericViewSet):
全厂统计记录 全厂统计记录
""" """
perms_map = {'get': '*'}
perms_map = {"get": "*"}
queryset = EnStat2.objects.all() queryset = EnStat2.objects.all()
serializer_class = EnStat2Serializer serializer_class = EnStat2Serializer
filterset_class = EnStat2Filter filterset_class = EnStat2Filter