feat: 采集逻辑优化
This commit is contained in:
		
							parent
							
								
									9805b471d4
								
							
						
					
					
						commit
						c179d91ebd
					
				|  | @ -4,21 +4,31 @@ from apps.mtm.models import Mgroup | |||
| import datetime | ||||
| from django.core.cache import cache | ||||
| from django.utils.timezone import localtime | ||||
| from apps.utils.tasks import ctask_run | ||||
| 
 | ||||
| def set_equip_rs(equipId: str, last_timex: datetime, last_mrs, to_db=False): | ||||
|     """更新设备运行状态缓存 | ||||
|     """ | ||||
|     cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) # 更新缓存 | ||||
|     if to_db: | ||||
|         Equipment.objects.filter(id=equipId).update(running_state=last_mrs) | ||||
| 
 | ||||
| def get_equip_rs(equipId): | ||||
|     """返回设备运行状态缓存 | ||||
| 
 | ||||
|     {"running_state": 50, "running_state_timex": localtime} | ||||
| def get_eq_rs(equipId: str): | ||||
|     """获取设备运行状态缓存 | ||||
|     """ | ||||
|     return cache.get(f"equipment_{equipId}", {"running_state": Equipment.OFFLINE, "running_state_timex": localtime()}) | ||||
| 
 | ||||
| def set_eq_rs(equipId, last_timex: datetime, last_mrs: int): | ||||
|     """更新设备运行状态(包括缓存和数据库) | ||||
|     """ | ||||
|     if last_mrs == Equipment.OFFLINE: # 如果是离线直接更新 | ||||
|         cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) # 更新缓存 | ||||
|         Equipment.objects.filter(id=equipId).update(running_state=last_mrs)  # 更新数据库 | ||||
|     else: | ||||
|         eq_rs_cache = get_eq_rs() | ||||
| 
 | ||||
|         eq_rs_change = False | ||||
|         if eq_rs_cache["running_state"] != last_mrs:  # 如果状态变动了要调用方法否则只需更新缓存 | ||||
|             eq_rs_change = True | ||||
| 
 | ||||
|         cache.set(f"equipment_{equipId}", {"running_state": last_mrs, "running_state_timex": last_timex}, timeout=None) # 更新缓存 | ||||
|         if eq_rs_change: | ||||
|             ctask_run.delay("apps.em.services.shutdown_or_startup", equipId, last_timex, last_mrs) | ||||
| 
 | ||||
| 
 | ||||
| def shutdown_or_startup(equipId: str, last_timex: datetime, last_mrs): | ||||
|     """ | ||||
|     last_mrs 设备运行状态值 | ||||
|  | @ -30,7 +40,6 @@ def shutdown_or_startup(equipId: str, last_timex: datetime, last_mrs): | |||
|     equip = Equipment.objects.get(id=equipId) | ||||
|     equip.running_state = last_mrs | ||||
|     equip.save(update_fields=["running_state"]) | ||||
|     set_equip_rs(equipId, last_timex, last_mrs) | ||||
| 
 | ||||
|     mgroup: Mgroup = equip.mgroup | ||||
|     indicate = equip.indicate_mgroup_running | ||||
|  |  | |||
|  | @ -4,7 +4,7 @@ from apps.utils.tasks import CustomTask | |||
| from celery import shared_task | ||||
| from django.utils.timezone import localtime | ||||
| from apps.em.models import Equipment | ||||
| from apps.em.services import set_equip_rs, get_equip_rs | ||||
| from apps.em.services import set_eq_rs, get_eq_rs | ||||
| 
 | ||||
| 
 | ||||
| @shared_task(base=CustomTask) | ||||
|  | @ -13,12 +13,9 @@ def check_equipment_offline(seconds=30): | |||
| 
 | ||||
|     监测设备是否掉线 | ||||
|     """ | ||||
|     equips = Equipment.objects.filter(mp_ep_monitored__is_rep_ep_running_state=True, mp_ep_monitored__enabled=True) | ||||
|     equips = Equipment.objects.filter(mp_ep_monitored__is_rep_ep_running_state=True, mp_ep_monitored__enabled=True).exclude(running_state=Equipment.OFFLINE) | ||||
|     now = localtime() | ||||
|     for equip in equips: | ||||
|         rs = get_equip_rs(equip.id) | ||||
|         rs = get_eq_rs(equip.id) | ||||
|         if (now - rs['running_state_timex']).total_seconds() > seconds: | ||||
|             to_db = False | ||||
|             if equip.running_state != Equipment.OFFLINE: | ||||
|                 to_db = True | ||||
|             set_equip_rs(equip.id, now, Equipment.OFFLINE, to_db) | ||||
|                 set_eq_rs(equip.id, now, Equipment.OFFLINE) | ||||
|  | @ -31,7 +31,7 @@ class MpointStatFilter(filters.FilterSet): | |||
|             "year_s": ["exact"], | ||||
|             "create_by": ["exact"], | ||||
|             "type": ["exact"], | ||||
|             "mpoint__is_auto": ["exact"], | ||||
|             "mpoint__type": ["exact"], | ||||
|         } | ||||
| 
 | ||||
|     def filter_has_create_by(self, queryset, name, value): | ||||
|  |  | |||
|  | @ -16,8 +16,8 @@ class Migration(migrations.Migration): | |||
| CREATE TABLE public.enm_mplogx ( | ||||
|     "timex" timestamptz NOT NULL, | ||||
|     "mpoint_id" text NOT NULL, | ||||
|     "val_mrs" integer, | ||||
|     "val_int" integer, | ||||
|     "val_mrs" smallint, | ||||
|     "val_int" bigint, | ||||
|     "val_float" float, | ||||
|     "val_bool" boolean, | ||||
|     "val_str" text, | ||||
|  |  | |||
|  | @ -0,0 +1,60 @@ | |||
| # Generated by Django 3.2.12 on 2024-04-28 01:06 | ||||
| 
 | ||||
| from django.db import migrations, models | ||||
| 
 | ||||
| 
 | ||||
| class Migration(migrations.Migration): | ||||
| 
 | ||||
|     dependencies = [ | ||||
|         ('enm', '0030_mpoint_can_manual'), | ||||
|     ] | ||||
| 
 | ||||
|     operations = [ | ||||
|         migrations.RemoveField( | ||||
|             model_name='mpoint', | ||||
|             name='can_manual', | ||||
|         ), | ||||
|         migrations.RemoveField( | ||||
|             model_name='mpoint', | ||||
|             name='ep_base_val1', | ||||
|         ), | ||||
|         migrations.RemoveField( | ||||
|             model_name='mpoint', | ||||
|             name='is_auto', | ||||
|         ), | ||||
|         migrations.AddField( | ||||
|             model_name='mpoint', | ||||
|             name='ep_rs_expr', | ||||
|             field=models.TextField(blank=True, null=True, verbose_name='状态量表达式'), | ||||
|         ), | ||||
|         migrations.AddField( | ||||
|             model_name='mpoint', | ||||
|             name='ep_rs_val', | ||||
|             field=models.FloatField(blank=True, null=True, verbose_name='状态量基准值'), | ||||
|         ), | ||||
|         migrations.AddField( | ||||
|             model_name='mpoint', | ||||
|             name='is_rep_mgroup', | ||||
|             field=models.BooleanField(default=False, verbose_name='是否代表所分配集合数据'), | ||||
|         ), | ||||
|         migrations.AddField( | ||||
|             model_name='mpoint', | ||||
|             name='need_display', | ||||
|             field=models.BooleanField(default=False, verbose_name='是否需要展示'), | ||||
|         ), | ||||
|         migrations.AddField( | ||||
|             model_name='mpoint', | ||||
|             name='nickname', | ||||
|             field=models.CharField(blank=True, max_length=50, null=True, verbose_name='测点别名'), | ||||
|         ), | ||||
|         migrations.AddField( | ||||
|             model_name='mpoint', | ||||
|             name='report_sortstr', | ||||
|             field=models.CharField(blank=True, default='', max_length=50, verbose_name='在报告中的排序'), | ||||
|         ), | ||||
|         migrations.AddField( | ||||
|             model_name='mpoint', | ||||
|             name='type', | ||||
|             field=models.PositiveSmallIntegerField(default=10, help_text='10:自动采集, 20:计算测点, 30:手动录入', verbose_name='类型'), | ||||
|         ), | ||||
|     ] | ||||
|  | @ -6,18 +6,25 @@ from apps.mtm.models import Material, Mgroup, Team | |||
| 
 | ||||
| class Mpoint(CommonBModel): | ||||
|     """测点""" | ||||
|     MT_AUTO = 10 | ||||
|     MT_COMPUTE = 20 | ||||
|     MT_MANUAL = 30 | ||||
| 
 | ||||
|     MG_OK = 0  # 采集状态 | ||||
|     MG_SAVEWRONG = -1 | ||||
|     MG_OFFLINE = -2 | ||||
| 
 | ||||
|     type = models.PositiveSmallIntegerField("类型", default=MT_AUTO, help_text="10:自动采集, 20:计算测点, 30:手动录入") | ||||
|     name = models.CharField("测点名称", max_length=50) | ||||
|     nickname = models.CharField("测点别名", max_length=50, null=True, blank=True) | ||||
|     code = models.CharField("测点编号", max_length=50, unique=True) | ||||
|     unit = models.CharField("单位", max_length=50, null=True, blank=True) | ||||
|     cate = models.CharField("分类", max_length=50, null=True, blank=True) | ||||
|     material = models.ForeignKey(Material, verbose_name="计量某种物料", on_delete=models.CASCADE, null=True, blank=True) | ||||
|     ep_monitored = models.ForeignKey("em.equipment", verbose_name="所监测设备", related_name="mp_ep_monitored", on_delete=models.SET_NULL, null=True, blank=True) | ||||
|     ep_belong = models.ForeignKey("em.equipment", verbose_name="所属设备", related_name="mp_ep_belong", on_delete=models.SET_NULL, null=True, blank=True) | ||||
|     mgroup = models.ForeignKey("mtm.mgroup", verbose_name="所在集合", on_delete=models.SET_NULL, null=True, blank=True) | ||||
|     mgroups_allocate = models.JSONField("各工段分配", default=list, blank=True, help_text='[{"mgroup":"x", "ratio": 1}]') | ||||
|     is_auto = models.BooleanField("是否自动采集", default=True) | ||||
|     can_manual = models.BooleanField("是否允许手动录入", default=False) | ||||
|     is_rep_mgroup = models.BooleanField("是否代表所分配集合数据", default=False) | ||||
|     formula = models.TextField("计算公式", default="") | ||||
|     func_on_change = models.CharField("数据变动时执行方法", max_length=100, default="", blank=True)  # 废弃字段暂时不用 | ||||
|     interval = models.PositiveSmallIntegerField("采集间隔(秒)", default=10) | ||||
|  | @ -27,7 +34,12 @@ class Mpoint(CommonBModel): | |||
|     # {"from": "king", "n": "某名称","d": "某描述或备注","g": "某组", "t": "某类型",  "id": 5001, "o": "其他信息"} | ||||
|     enp_field = models.CharField("关联enp采集字段", max_length=50, null=True, blank=True) | ||||
|     is_rep_ep_running_state = models.BooleanField("是否表示所监测设备运行状态", default=False) | ||||
|     ep_base_val1 = models.FloatField("状态量基准值1", null=True, blank=True) | ||||
|     ep_monitored = models.ForeignKey("em.equipment", verbose_name="所监测设备", related_name="mp_ep_monitored", on_delete=models.SET_NULL, null=True, blank=True) | ||||
|     ep_rs_val = models.FloatField("状态量基准值", null=True, blank=True) | ||||
|     ep_rs_expr = models.TextField("状态量表达式", null=True, blank=True) | ||||
| 
 | ||||
|     need_display = models.BooleanField("是否需要展示", default=False) | ||||
|     report_sortstr = models.CharField('在报告中的排序', max_length=50, default='', blank=True) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def cache_key(cls, code: str): | ||||
|  | @ -45,7 +57,7 @@ class MpLogx(models.Model): | |||
|     val_float = models.FloatField("数值", null=True, blank=True) | ||||
|     val_int = models.IntegerField("数值", null=True, blank=True) | ||||
|     val_bool = models.BooleanField("数值", null=True, blank=True) | ||||
|     val_str = models.CharField("数值", max_length=100, null=True, blank=True) | ||||
|     val_str = models.TextField("数值", null=True, blank=True) | ||||
| 
 | ||||
|     class Meta: | ||||
|         db_table = "enm_mplogx" | ||||
|  |  | |||
|  | @ -17,6 +17,7 @@ class MpointSerializer(CustomModelSerializer): | |||
|     material_name = serializers.CharField(source="material.name", read_only=True) | ||||
|     formula = serializers.CharField(allow_blank=True, required=False) | ||||
|     last_data = serializers.SerializerMethodField() | ||||
|     gather_state = serializers.SerializerMethodField() | ||||
| 
 | ||||
|     class Meta: | ||||
|         model = Mpoint | ||||
|  | @ -24,8 +25,10 @@ class MpointSerializer(CustomModelSerializer): | |||
|         read_only_fields = EXCLUDE_FIELDS + ["belong_dept", "cate"] | ||||
| 
 | ||||
|     def get_last_data(self, obj): | ||||
|         last_data = cache.get(Mpoint.cache_key(obj.code), {}).get('last_data', {}) | ||||
|         return last_data | ||||
|         return cache.get(Mpoint.cache_key(obj.code), {}).get('last_data', {}) | ||||
|      | ||||
|     def get_gather_state(self, obj): | ||||
|         return cache.get(Mpoint.cache_key(obj.code), {}).get('gather_state', -2) | ||||
| 
 | ||||
|     def validate(self, attrs): | ||||
|         if "material" in attrs and attrs["material"]: | ||||
|  | @ -87,7 +90,7 @@ class MpointStatSerializer(CustomModelSerializer): | |||
|         mpoint = attrs["mpoint"] | ||||
|         if 'mgroup' not in attrs: | ||||
|             raise ParseError("请选择工段") | ||||
|         if mpoint.material and mpoint.is_auto is False and "sflog" in attrs and attrs["sflog"]: | ||||
|         if mpoint.material and mpoint.type is Mpoint.MT_MANUAL and "sflog" in attrs and attrs["sflog"]: | ||||
|             attrs["type"] = "sflog" | ||||
|             sflog = attrs["sflog"] | ||||
|             attrs["year_s"], attrs["month_s"], attrs["day_s"] = sflog.get_ymd | ||||
|  |  | |||
|  | @ -15,7 +15,7 @@ from apps.utils.tasks import ctask_run | |||
| from .serializers import MpointSerializer | ||||
| from apps.enp.models import EnvData | ||||
| from apps.em.models import Equipment | ||||
| from apps.em.services import set_equip_rs, get_equip_rs | ||||
| from apps.em.services import set_eq_rs | ||||
| 
 | ||||
| myLogger = logging.getLogger("log") | ||||
| 
 | ||||
|  | @ -33,6 +33,35 @@ def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: | |||
|     rval = eval(exp_str) | ||||
|     return rval | ||||
| 
 | ||||
| def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_str: str): | ||||
|     """ | ||||
|     将测点值转换所监测设备的运行状态值 | ||||
|     base_expr: 三元表达式 | ||||
|     """ | ||||
|     if expr_str:  # 优先使用表达式 | ||||
|         pattern = r"\${(.*?)}" | ||||
|         matches = re.findall(pattern, expr_str) | ||||
|         for match in matches: | ||||
|             if match == 'self': | ||||
|                 expr_str = expr_str.replace(f"${{{match}}}", str(current_val)) | ||||
|             else: | ||||
|                 mpoint_data = get_mpoint_cache(match) | ||||
|                 if mpoint_data: | ||||
|                     expr_str = expr_str.replace(f"${{{match}}}", str(mpoint_data['last_data']['last_val'])) | ||||
|         rval = eval(expr_str) | ||||
|         return rval | ||||
|     if isinstance(current_val, bool): | ||||
|         if current_val: | ||||
|             return Equipment.RUNING | ||||
|         return Equipment.STOP | ||||
|     rs = Equipment.RUNING | ||||
|     if base_val: | ||||
|         if current_val < base_val: | ||||
|             rs = Equipment.STOP | ||||
|     else: | ||||
|         if current_val == 0: | ||||
|             rs = Equipment.STOP | ||||
|     return rs | ||||
| 
 | ||||
| # def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21): | ||||
| #     """ | ||||
|  | @ -121,39 +150,16 @@ def update_mpoint_cache(cache_key: str, current_cache_val: dict, last_timex: dat | |||
|     ep_belong_id = current_cache_val.get("ep_belong", None) | ||||
|     ep_monitored_id = current_cache_val.get("ep_monitored", False) | ||||
|     last_data = current_cache_val["last_data"] | ||||
|     current_cache_val['gather_state'] = 0 | ||||
|     last_data["last_val"] = last_val | ||||
|     last_data["last_mrs"] = last_mrs | ||||
|     last_data["last_timex"] = last_timex | ||||
|     cache.set(cache_key, current_cache_val, timeout=None) | ||||
|     # 更新设备状态缓存值 | ||||
|     # 更新设备状态值 | ||||
|     if ep_belong_id: | ||||
|         if get_equip_rs(ep_belong_id)["running_state"] != Equipment.RUNING:  # 如果状态变动了要调用方法否则只需更新缓存 | ||||
|             ctask_run.delay("apps.em.services.shutdown_or_startup", ep_belong_id, last_timex, Equipment.RUNING) | ||||
|         else: | ||||
|             set_equip_rs(ep_belong_id, last_timex, Equipment.RUNING) | ||||
|         set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING) | ||||
|     if ep_monitored_id: | ||||
|         if get_equip_rs(ep_monitored_id)["running_state"] != last_mrs: | ||||
|             ctask_run.delay("apps.em.services.shutdown_or_startup", ep_monitored_id, last_timex, last_mrs) | ||||
|         else: | ||||
|             set_equip_rs(ep_monitored_id, last_timex, last_mrs) | ||||
| 
 | ||||
| 
 | ||||
| def transfer_mpoint_val_to_ep_running_state(current_val, base_val1: float): | ||||
|     """ | ||||
|     将测点值转换所监测设备的运行状态值 | ||||
|     """ | ||||
|     if isinstance(current_val, bool): | ||||
|         if current_val: | ||||
|             return Equipment.RUNING | ||||
|         return Equipment.STOP | ||||
|     rs = Equipment.RUNING | ||||
|     if base_val1: | ||||
|         if current_val < base_val1: | ||||
|             rs = Equipment.STOP | ||||
|     else: | ||||
|         if current_val == 0: | ||||
|             rs = Equipment.STOP | ||||
|     return rs | ||||
|         set_eq_rs(ep_belong_id, last_timex, last_mrs) | ||||
| 
 | ||||
| 
 | ||||
| def king_sync(projectName: str, json_path: str = ""): | ||||
|  | @ -182,8 +188,11 @@ def king_sync(projectName: str, json_path: str = ""): | |||
|             name = item["d"] | ||||
|             if group: | ||||
|                 name = f"{group}.{name}" | ||||
|             Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "is_auto": True, "val_type": t_dict[item["t"]], "third_info": item}) | ||||
| 
 | ||||
|             ins, created = Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "type": Mpoint.MT_AUTO, "val_type": t_dict[item["t"]], "third_info": item}) | ||||
|             if not created and ins.val_type != t_dict[item["t"]]: # 如果数据类型变了要同步更新 | ||||
|                 ins.val_type = t_dict[item["t"]] | ||||
|                 ins.third_info = item | ||||
|                 ins.save(update_fields=["val_type", "third_info"]) | ||||
| 
 | ||||
| test_data = { | ||||
|     "PNs": {"1": "V", "2": "T", "3": "Q"}, | ||||
|  | @ -258,7 +267,7 @@ def insert_mplogx_from_king_mqtt(data: dict, is_offset=True): | |||
|     #     concurrent.futures.wait(futures) | ||||
|     # for future in futures: | ||||
|     #     print(future.result(), end=', ') | ||||
| 
 | ||||
|      | ||||
| def insert_mplogx_item(code, val, timex, enp_mpoints_dict): | ||||
|     """ | ||||
|     存入超表 | ||||
|  | @ -271,7 +280,8 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict): | |||
|     mpoint_interval = mpoint_data["interval"] | ||||
|     mpoint_last_timex = mpoint_data.get('last_data', {}).get('last_timex', None) | ||||
|     mpoint_is_rep_ep_running_state = mpoint_data.get("is_rep_ep_running_state", False) | ||||
|     mpoint_ep_base_val1 = mpoint_data.get("ep_base_val1", None) | ||||
|     mpoint_ep_rs_val = mpoint_data.get("ep_rs_val", None) | ||||
|     mpoint_ep_rs_expr = mpoint_data.get("ep_rs_expr", None) | ||||
|     # 控制采集间隔 | ||||
|     can_save = False | ||||
|     if mpoint_last_timex: | ||||
|  | @ -286,10 +296,10 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict): | |||
|         } | ||||
|         save_dict[f"val_{val_type}"] = val | ||||
|         val_mrs = None | ||||
|         if mpoint_is_rep_ep_running_state: | ||||
|             val_mrs = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_base_val1) | ||||
|             save_dict["val_mrs"] = val_mrs | ||||
|         try: | ||||
|             if mpoint_is_rep_ep_running_state: | ||||
|                 val_mrs = transfer_mpoint_val_to_ep_running_state(val, mpoint_ep_rs_val, mpoint_ep_rs_expr) | ||||
|                 save_dict["val_mrs"] = val_mrs | ||||
|             MpLogx.objects.create(**save_dict) | ||||
|             update_mpoint_cache(cache_key, mpoint_data, timex, val, val_mrs) | ||||
| 
 | ||||
|  | @ -308,6 +318,9 @@ def insert_mplogx_item(code, val, timex, enp_mpoints_dict): | |||
|             pass | ||||
|         except Exception: | ||||
|             myLogger.error(f"mpoint_cache_key: {cache_key} 存库失败", exc_info=True) | ||||
|             mpoint_data['gather_state'] = -1 | ||||
|             cache.set(cache_key, mpoint_data, timeout=None) | ||||
| 
 | ||||
| 
 | ||||
| def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True): | ||||
|     """ | ||||
|  | @ -342,14 +355,13 @@ def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is | |||
|             EnvData.objects(**item) | ||||
| 
 | ||||
| def insert_mplogx_from_king_rest_chunk(objs: list): | ||||
|     if objs: | ||||
|         timex = timezone.make_aware(datetime.strptime(objs[0]["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0)  | ||||
|     enp_mpoints_dict = {}  | ||||
|     for obj in objs: | ||||
|         n = obj["N"] | ||||
|         code = f"K_{n}" | ||||
|         timex = timezone.make_aware(datetime.strptime(obj["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0)  | ||||
|         insert_mplogx_item(code, obj["V"], timex, enp_mpoints_dict) | ||||
|      | ||||
|     if enp_mpoints_dict: | ||||
|         for _, item in enp_mpoints_dict: | ||||
|             EnvData.objects(**item) | ||||
|         for _, item in enp_mpoints_dict.items(): | ||||
|             EnvData.objects(**item) | ||||
|  | @ -205,17 +205,17 @@ def cal_mpointstats(is_now=1, year=None, month=None, day=None, hour=None): | |||
|         else: | ||||
|             year, month, day, hour = pre.year, pre.month, pre.day, pre.hour | ||||
| 
 | ||||
|     # 先统计不带公式的测点 | ||||
|     mpoints_without_formula = Mpoint.objects.filter(is_auto=True, enabled=True, formula="") | ||||
|     # 先统计自动采集的测点 | ||||
|     mpoints_auto = Mpoint.objects.filter(type=Mpoint.MT_AUTO, enabled=True) | ||||
|     # mpoints_without_formula_group = [] | ||||
|     for item in mpoints_without_formula: | ||||
|     for item in mpoints_auto: | ||||
|         # mpoints_without_formula_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour)) | ||||
|         cal_mpointstat_hour(item.id, year, month, day, hour) | ||||
| 
 | ||||
|     # 再统计其他测点 | ||||
|     mpoints_other = Mpoint.objects.filter(is_auto=True, enabled=True).exclude(formula="") | ||||
|     # 再统计计算测点 | ||||
|     mpoints_compute = Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True).exclude(formula="") | ||||
|     # mpoints_other_group = [] | ||||
|     for item in mpoints_other: | ||||
|     for item in mpoints_compute: | ||||
|         # mpoints_other_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour)) | ||||
|         cal_mpointstat_hour(item.id, year, month, day, hour) | ||||
| 
 | ||||
|  | @ -353,6 +353,8 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, | |||
|                 mps = MpointStat.objects.filter(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material) | ||||
|             elif type == "year_s": | ||||
|                 mps = MpointStat.objects.filter(type="year_s", mgroup=mgroup, year_s=year_s, mpoint__material=material) | ||||
|             if mps.filter(mpoint__is_rep_mgroup=True).exists(): | ||||
|                 mps = mps.filter(mpoint__is_rep_mgroup=True) | ||||
|             amount_consume = mps.aggregate(sum=Sum("val"))["sum"] | ||||
|             if amount_consume is None: | ||||
|                 amount_consume = 0 | ||||
|  | @ -369,7 +371,7 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, | |||
|                 cost = amount_consume * price_unit | ||||
|                 try: | ||||
|                     cost_unit = cost / enstat.total_production | ||||
|                 except Exception as e: | ||||
|                 except Exception: | ||||
|                     cost_unit = 0 | ||||
|                 imaterial_cost_unit = imaterial_cost_unit + cost_unit | ||||
|                 if material.code == "elec": | ||||
|  | @ -377,7 +379,7 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, | |||
|                     enstat.elec_coal_consume = enstat.elec_consume * 0.1229 / 1000 | ||||
|                     try: | ||||
|                         enstat.elec_consume_unit = enstat.elec_consume / enstat.total_production | ||||
|                     except Exception as e: | ||||
|                     except Exception: | ||||
|                         pass | ||||
|                 elif material.code == "water": | ||||
|                     enstat.water = amount_consume | ||||
|  | @ -417,6 +419,7 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, | |||
|         enstat.production_cost_unit = imaterial_cost_unit + other_cost_unit | ||||
|         enstat.save() | ||||
|     if enstat.total_production: | ||||
|         # 更新所监测设备测点的total_production | ||||
|         MpointStat.objects.filter(mgroup=enstat.mgroup, mpoint__material__code="elec").exclude(mpoint__ep_monitored=None).update( | ||||
|             total_production=enstat.total_production, elec_consume_unit=F("val") / enstat.total_production | ||||
|         ) | ||||
|  | @ -426,7 +429,7 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, | |||
|             if enstat.mgroup.name != "回转窑": | ||||
|                 try: | ||||
|                     enstat.en_consume_unit = enstat.elec_coal_consume / enstat.total_production | ||||
|                 except Exception as e: | ||||
|                 except Exception: | ||||
|                     pass | ||||
|             enstat.save() | ||||
|         # 计算一些其他数据 | ||||
|  | @ -453,7 +456,7 @@ def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, | |||
|                         # 料耗系数 | ||||
|                         enstat.celec_consume_unit = enstat.elec_consume_unit + get_sysconfig('enm.enm_lhxs') * pre_enstat.elec_consume_unit | ||||
|                         enstat.save() | ||||
|                     except Exception as e: | ||||
|                     except Exception: | ||||
|                         pass | ||||
| 
 | ||||
|             # 算总煤耗 | ||||
|  | @ -596,6 +599,11 @@ def cal_enstat2(type: str, year_s: int, month_s: int, day_s: int, cascade=True): | |||
|     enstat2.industry_add_val = enstat2.industry_total_val - enstat2.cement_val * enstat2.cement_cost_unit / 10000 | ||||
| 
 | ||||
|     # 全厂电量 | ||||
|     # 全厂的耗电量和水量都得单独处理 | ||||
|     use_mpoint_elec_val = False | ||||
|     mp_elecs = Mpoint.objects.filter(material__code="elec", code__endswith='__all', mgroups_allocate=[]) | ||||
|     if mp_elecs.exists():  #  | ||||
|         use_mpoint_elec_val = True | ||||
|     if type == "month_s": | ||||
|         enstat_qs = EnStat.objects.filter(type="month_s", year_s=year_s, month_s=month_s) | ||||
|     elif type == "day_s": | ||||
|  | @ -603,8 +611,18 @@ def cal_enstat2(type: str, year_s: int, month_s: int, day_s: int, cascade=True): | |||
|     res_elec_pcoal = enstat_qs.aggregate( | ||||
|         sum1=Sum("elec_consume"), sum2=Sum("elec_coal_consume"), sum3=Sum("pcoal_consume"), sum4=Sum("pcoal_coal_consume"), sum5=Sum("water_consume"), sum6=Sum("cair_consume") | ||||
|     ) | ||||
|     enstat2.elec_consume = res_elec_pcoal["sum1"] if res_elec_pcoal["sum1"] else 0 | ||||
|     if use_mpoint_elec_val: | ||||
|         if type == 'day_s': | ||||
|             typex = 'day' | ||||
|             enstat2.elec_consume = MpointStat.objects.filter(type=typex, mpoint__in=mp_elecs, year=year_s, month=month_s, day=day_s).aggregate(sum=Sum("val"))["sum"] | ||||
|         elif type == 'month_s': | ||||
|             typex = 'month' | ||||
|             enstat2.elec_consume = MpointStat.objects.filter(type=typex, mpoint__in=mp_elecs, year=year_s, month=month_s).aggregate(sum=Sum("val"))["sum"] | ||||
|     else: | ||||
|         enstat2.elec_consume = res_elec_pcoal["sum1"] if res_elec_pcoal["sum1"] else 0 | ||||
|     enstat2.elec_coal_consume = enstat2.elec_consume * 0.1229 / 1000 | ||||
| 
 | ||||
|     # 其他的统计工段合就行 | ||||
|     enstat2.pcoal_consume = res_elec_pcoal["sum3"] if res_elec_pcoal["sum3"] else 0 | ||||
|     enstat2.pcoal_coal_consume = res_elec_pcoal["sum4"] if res_elec_pcoal["sum4"] else 0 | ||||
|     enstat2.water_consume = res_elec_pcoal["sum5"] if res_elec_pcoal["sum5"] else 0 | ||||
|  | @ -703,7 +721,7 @@ def enm_alarm(year_s: int, month_s: int, day_s: int): | |||
| 
 | ||||
| @shared_task(base=CustomTask) | ||||
| def king_insert_mplogx(): | ||||
|     mpoint_codes = Mpoint.objects.filter(enabled=True, is_auto=True, code__startswith='K_').values_list('code', flat=True) | ||||
|     mpoint_codes = Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, code__startswith='K_').values_list('code', flat=True) | ||||
|     ml = [] | ||||
|     for m in mpoint_codes: | ||||
|         ml.append({"N": m.replace('K_', '')}) | ||||
|  |  | |||
|  | @ -25,9 +25,9 @@ class MpointViewSet(CustomModelViewSet): | |||
|     queryset = Mpoint.objects.all() | ||||
|     serializer_class = MpointSerializer | ||||
|     select_related_fields = ["create_by", "belong_dept", "ep_monitored", "ep_belong", "mgroup"] | ||||
|     filterset_fields = ["belong_dept", "ep_monitored", "ep_belong", "mgroup", "is_auto", "mgroup__name", "val_type", "enabled", "can_manual"] | ||||
|     filterset_fields = ["belong_dept", "ep_monitored", "ep_belong", "mgroup", "type", "mgroup__name", "val_type", "enabled"] | ||||
|     search_fields = ["name", "code"] | ||||
|     ordering = ["-create_time", "name", "code"] | ||||
|     ordering = ["sort", "create_time", "name", "code"] | ||||
| 
 | ||||
|     @transaction.atomic | ||||
|     def perform_create(self, serializer): | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue