536 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			536 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
from apps.enm.models import Mpoint, MpointStat, MpLogx
 | 
						||
import re
 | 
						||
import logging
 | 
						||
from django.db.models import Q
 | 
						||
from django.utils import timezone
 | 
						||
from django.core.cache import cache
 | 
						||
from datetime import datetime, timedelta
 | 
						||
from apps.utils.decorators import auto_log
 | 
						||
from django.db import IntegrityError
 | 
						||
from .serializers import MpointSerializer
 | 
						||
from apps.enp.models import EnvData
 | 
						||
from apps.em.models import Equipment
 | 
						||
from apps.em.services import set_eq_rs
 | 
						||
from server.settings import get_sysconfig
 | 
						||
from apps.enm.models import EnStat
 | 
						||
from django.db.models import Sum
 | 
						||
from typing import Dict, Any
 | 
						||
 | 
						||
myLogger = logging.getLogger("log")
 | 
						||
 | 
						||
def db_insert_mplogx_batch(rows):
 | 
						||
    for row in rows:
 | 
						||
        _, tag_val, tag_code, tag_update = row
 | 
						||
        if tag_update.tzinfo is None:
 | 
						||
            tag_update = timezone.make_aware(tag_update)
 | 
						||
        insert_mplogx_item(tag_code, tag_val, tag_update, {})
 | 
						||
 | 
						||
def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int):
 | 
						||
    """
 | 
						||
    传入
 | 
						||
    """
 | 
						||
    pattern = r"\{(.*?)}"
 | 
						||
    matches = re.findall(pattern, exp_str)
 | 
						||
    exp_str2 = exp_str
 | 
						||
    for match in matches:
 | 
						||
        mpst = MpointStat.objects.filter(mpoint__code=match, type="hour", year=year, month=month, day=day, hour=hour).first()
 | 
						||
        if mpst is None:
 | 
						||
            mpoint = Mpoint.objects.get(code=match)
 | 
						||
            mpst, _ = MpointStat.objects.get_or_create(mpoint=mpoint, type="hour", year=year, month=month, day=day, hour=hour, defaults={"val": 0})
 | 
						||
            myLogger.error(f"找不到该测点的时间线数据: {match}, {year}, {month}, {day}, {hour}, 赋予0值")
 | 
						||
        if mpst:
 | 
						||
            # exp_str2 = exp_str2.replace(f"{{{match}}}", str(mpst.val))
 | 
						||
            # 为了校正计算测点
 | 
						||
            exp_str2 = exp_str2.replace(f"{{{match}}}", str(mpst.val if mpst.val_correct is None else mpst.val_correct))
 | 
						||
    rval = 0
 | 
						||
    try:
 | 
						||
        rval = eval(exp_str2)
 | 
						||
    except Exception as e:
 | 
						||
        myLogger.error(f"表达式计算错误: {e}, {exp_str}, --{exp_str2}")
 | 
						||
        rval = 0
 | 
						||
    return rval
 | 
						||
 | 
						||
def get_can_save_from_save_expr(expr_str: str, self_val) -> bool:
 | 
						||
    """判断是否可以保存
 | 
						||
    """
 | 
						||
    pattern = r"\{(.*?)}"
 | 
						||
    matches = re.findall(pattern, expr_str)
 | 
						||
    for match in matches:
 | 
						||
        if match == "self":
 | 
						||
            expr_str = expr_str.replace("{self}", self_val)
 | 
						||
        else:
 | 
						||
            mpoint_data = MpointCache(match).data
 | 
						||
            current_val = mpoint_data.get("last_data", {}).get("last_val", None)
 | 
						||
            if current_val is None:
 | 
						||
                return True
 | 
						||
            else:
 | 
						||
                expr_str = expr_str.replace(f"{{{match}}}", str(current_val))
 | 
						||
    try:
 | 
						||
        rval = eval(expr_str)
 | 
						||
    except Exception as e:
 | 
						||
        myLogger.error(f"存储表达式计算错误: {e}, {expr_str}")
 | 
						||
        rval = False
 | 
						||
    return rval
 | 
						||
 | 
						||
def transfer_mpoint_val_to_ep_running_state(current_val, base_val: float, expr_str: str):
 | 
						||
    """
 | 
						||
    将测点值转换所监测设备的运行状态值
 | 
						||
    base_expr: 三元表达式
 | 
						||
    """
 | 
						||
    if expr_str:  # 优先使用表达式
 | 
						||
        pattern = r"\{(.*?)}"
 | 
						||
        matches = re.findall(pattern, expr_str)
 | 
						||
        for match in matches:
 | 
						||
            if match == "self":
 | 
						||
                expr_str = expr_str.replace("{self}", str(current_val))
 | 
						||
            else:
 | 
						||
                match_val = 50
 | 
						||
                mpoint_data = MpointCache(match).data
 | 
						||
                if mpoint_data and mpoint_data.get("gather_state", -2) == 0:  # 测点正常采集
 | 
						||
                    match_val = mpoint_data["last_data"]["last_mrs"]
 | 
						||
                expr_str = expr_str.replace(f"{{{match}}}", str(match_val))
 | 
						||
        rval = eval(expr_str)
 | 
						||
        return rval
 | 
						||
    if isinstance(current_val, bool):
 | 
						||
        if current_val:
 | 
						||
            return Equipment.RUNING
 | 
						||
        return Equipment.STOP
 | 
						||
    rs = Equipment.RUNING
 | 
						||
    if base_val:
 | 
						||
        if current_val < base_val:
 | 
						||
            rs = Equipment.STOP
 | 
						||
    else:
 | 
						||
        if current_val == 0:
 | 
						||
            rs = Equipment.STOP
 | 
						||
    return rs
 | 
						||
 | 
						||
 | 
						||
# def get_day_s(year: int, month: int, day: int, hour: int, hour_split: int = 21):
 | 
						||
#     """
 | 
						||
#     根据给定的小时数, 计算出班天
 | 
						||
#     """
 | 
						||
#     if hour <= hour_split:
 | 
						||
#         return year, month, day
 | 
						||
#     else:
 | 
						||
#         now = datetime.datetime(year, month, day, hour)
 | 
						||
#         now2 = now + datetime.timedelta(days=1)
 | 
						||
#         return now2.year, now2.month, now2.day
 | 
						||
 | 
						||
 | 
						||
# cal_rule = {
 | 
						||
#     "电石渣": {
 | 
						||
#         "total_production": 0.4,
 | 
						||
#         "elec_consume_unit": 0.4,
 | 
						||
#         "production_cost_unit": 0.2
 | 
						||
#     },
 | 
						||
#     "原料磨":{
 | 
						||
#         "production_hour": 0.3,
 | 
						||
#         "elec_consume_unit": 0.3,
 | 
						||
#         "production_cost_unit": 0.1,
 | 
						||
#         "辅料_细度":0.05,
 | 
						||
#         "辅料_水分":0.04,
 | 
						||
#         "干混生料_CaO":0.04
 | 
						||
#     }
 | 
						||
# }
 | 
						||
# def cal_team_score(data):
 | 
						||
#     """
 | 
						||
#     计算月度绩效
 | 
						||
#     """
 | 
						||
#     qua_rate = {}
 | 
						||
#     month_s = data['month_s']
 | 
						||
#     for item in data['qua_data']:
 | 
						||
#         qua_rate[f'{item["material_name"]}_{item["testitem_name"]}'] = item["rate_pass"]
 | 
						||
 | 
						||
#     goal_dict = get_mgroup_goals(data['mgroup'], data['year_s'], False)
 | 
						||
#     goal_data = {}
 | 
						||
#     try:
 | 
						||
#         rule = cal_rule[data['mgroup_name']]
 | 
						||
#         score = 0
 | 
						||
#         for key in rule:
 | 
						||
#             new_key = f'{key}_{month_s}'
 | 
						||
#             goal_data[new_key] = goal_dict[new_key]
 | 
						||
#             if '-' in key:
 | 
						||
#                 score = score + qua_rate.get(key, 0)/goal_data[new_key]*rule[key]
 | 
						||
#             else:
 | 
						||
#                 score = score + data.get(key)/goal_data[new_key]*rule[key]
 | 
						||
#                 print(score)
 | 
						||
#         # enstat.goal_data = goal_data
 | 
						||
#         # enstat.score =score
 | 
						||
#         # enstat.save(update_fields=['goal_data', 'score'])
 | 
						||
#     except:
 | 
						||
#         print(traceback.format_exc())
 | 
						||
#     return goal_data, score
 | 
						||
 | 
						||
 | 
						||
class MpointCache:
 | 
						||
    """测点缓存类"""
 | 
						||
 | 
						||
    def __init__(self, code: str):
 | 
						||
        self.code = code
 | 
						||
        self.cache_key = Mpoint.cache_key(code)
 | 
						||
        self.data = self.get()
 | 
						||
 | 
						||
    def get(self, force_update=False, update_mplogx=True):
 | 
						||
        key = self.cache_key
 | 
						||
        code = self.code
 | 
						||
        mpoint_data = cache.get(key, None)
 | 
						||
        if mpoint_data is None or force_update:
 | 
						||
            try:
 | 
						||
                mpoint = Mpoint.objects.get(code=code)
 | 
						||
            except Exception as e:
 | 
						||
                myLogger.error(f"测点缓存获取失败: {e}, {code}")
 | 
						||
                cache.set(key, {}, timeout=None)
 | 
						||
                return {}
 | 
						||
            mpoint_data = MpointSerializer(instance=mpoint).data
 | 
						||
            mpoint_data["last_data"] = {"last_val": None, "last_timex": None, "last_mrs": None}  # 初始化
 | 
						||
            if update_mplogx:
 | 
						||
                now = timezone.now()
 | 
						||
                last_mplogx = MpLogx.objects.filter(mpoint=mpoint, timex__gte=now - timedelta(minutes=10)).order_by("-timex").first()
 | 
						||
                if last_mplogx:  # 核心数据
 | 
						||
                    mpoint_data["last_data"] = {"last_val": getattr(last_mplogx, "val_" + mpoint_data["val_type"]), "last_timex": last_mplogx.timex}
 | 
						||
            cache.set(key, mpoint_data, timeout=None)
 | 
						||
        return mpoint_data
 | 
						||
 | 
						||
    def set_fail(self, reason: int = -1, timex: datetime = datetime.now()):
 | 
						||
        """
 | 
						||
        -1 存库失败 -2 掉线
 | 
						||
        """
 | 
						||
        self.data["gather_state"] = reason
 | 
						||
        cache.set(self.cache_key, self.data, timeout=None)
 | 
						||
        if reason == -2:
 | 
						||
            is_rep_ep_running_state = self.data.get("is_rep_ep_running_state", False)
 | 
						||
            ep_monitored_id = self.data.get("ep_monitored")
 | 
						||
            ep_belong_id = self.data.get("ep_belong")
 | 
						||
            if ep_monitored_id and is_rep_ep_running_state and ep_belong_id != ep_monitored_id:
 | 
						||
                set_eq_rs(ep_monitored_id, timex, Equipment.OFFLINE)
 | 
						||
            if ep_belong_id:
 | 
						||
                set_eq_rs(ep_belong_id, timex, Equipment.OFFLINE)
 | 
						||
 | 
						||
    def set(self, last_timex: datetime, last_val):
 | 
						||
        # last_timex保存到秒
 | 
						||
        last_timex = last_timex.replace(microsecond=0)
 | 
						||
        current_cache_val = self.data
 | 
						||
        cache_key = self.cache_key
 | 
						||
        last_data = current_cache_val["last_data"]
 | 
						||
        if isinstance(last_val, (int, float)):
 | 
						||
            last_val = last_val*current_cache_val.get('coefficient', 1.0)
 | 
						||
        if (last_val == 0 and
 | 
						||
            current_cache_val.get("type", 0) == 10 and 
 | 
						||
            current_cache_val.get('is_rep_ep_running_state', None) is False and 
 | 
						||
            get_sysconfig('enm.use_cache_when_w_el_0', False) is True and 
 | 
						||
            current_cache_val.get("material_name", "") in ["动力电", "工业水"]): # 如果电表断电
 | 
						||
            last_val = last_data["last_val"] if last_data["last_val"] is not None else 0
 | 
						||
        else:
 | 
						||
            last_data["last_val"] = last_val
 | 
						||
        last_data["last_timex"] = last_timex
 | 
						||
        last_mrs = None  # 设备状态信号
 | 
						||
        mpoint_is_rep_ep_running_state = current_cache_val["is_rep_ep_running_state"]
 | 
						||
        if mpoint_is_rep_ep_running_state:
 | 
						||
            mpoint_ep_rs_val = current_cache_val.get("ep_rs_val", None)
 | 
						||
            mpoint_ep_rs_expr = current_cache_val.get("ep_rs_expr", None)
 | 
						||
            last_mrs = transfer_mpoint_val_to_ep_running_state(last_val, mpoint_ep_rs_val, mpoint_ep_rs_expr)
 | 
						||
            last_data["last_mrs"] = last_mrs
 | 
						||
        current_cache_val["gather_state"] = 0  # 表明数据已更新
 | 
						||
        cache.set(cache_key, current_cache_val, timeout=None)
 | 
						||
 | 
						||
        # 存库
 | 
						||
        mpoint = Mpoint.objects.filter(id=current_cache_val["id"]).first()
 | 
						||
        if mpoint is None:
 | 
						||
            cache.delete(cache_key)
 | 
						||
            return
 | 
						||
        save_dict = {"timex": last_timex, "mpoint": Mpoint.objects.get(id=current_cache_val["id"]), "val_mrs": last_mrs}
 | 
						||
        save_dict[f"val_{current_cache_val['val_type']}"] = last_val
 | 
						||
        try:
 | 
						||
            MpLogx.objects.create(**save_dict)
 | 
						||
        except IntegrityError:
 | 
						||
            pass
 | 
						||
 | 
						||
        # 下面开始更新设备信号
 | 
						||
        ep_belong_id = current_cache_val.get("ep_belong")
 | 
						||
        ep_monitored_id = current_cache_val.get("ep_monitored")
 | 
						||
        mpoint_is_rep_ep0_running_state = current_cache_val.get("is_rep_ep0_running_state", False)
 | 
						||
        if ep_monitored_id and mpoint_is_rep_ep_running_state:
 | 
						||
            set_eq_rs(ep_monitored_id, last_timex, last_mrs)
 | 
						||
        if ep_belong_id and mpoint_is_rep_ep0_running_state  and ep_belong_id != ep_monitored_id:
 | 
						||
            set_eq_rs(ep_belong_id, last_timex, Equipment.RUNING)
 | 
						||
 | 
						||
        mf_code = current_cache_val.get("mpoint_affect")
 | 
						||
        if mf_code:  # 如果该测点影响到另一个测点,要同步更新另一个测点
 | 
						||
            mc = MpointCache(mf_code)
 | 
						||
            mf_data = mc.data
 | 
						||
            # 只有自采测点才可影响计算测点只针对开关信号
 | 
						||
            if mf_data and current_cache_val["type"] == Mpoint.MT_AUTO and mf_data["type"] == Mpoint.MT_COMPUTE and mf_data["is_rep_ep_running_state"] and mf_data["ep_rs_expr"]:
 | 
						||
                mc.set(last_timex, None)
 | 
						||
 | 
						||
 | 
						||
def king_sync(projectName: str, json_path: str = ""):
 | 
						||
    """
 | 
						||
    同步亚控测点
 | 
						||
    """
 | 
						||
    if json_path:
 | 
						||
        import os
 | 
						||
        import json
 | 
						||
        from django.conf import settings
 | 
						||
 | 
						||
        with open(os.path.join(settings.BASE_DIR, json_path), "r", encoding="utf-8") as f:
 | 
						||
            res = json.loads(f.read())
 | 
						||
    else:
 | 
						||
        from apps.third.king.k import kingClient
 | 
						||
        from apps.third.king.king_api import kapis
 | 
						||
 | 
						||
        _, res = kingClient.request(**kapis["read_variables"], params={"projectInstanceName": projectName})
 | 
						||
 | 
						||
    t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"}
 | 
						||
    for index, item in enumerate(res["objectList"]):
 | 
						||
        if "t" in item and item["t"] and "SystemTag" not in item["d"]:
 | 
						||
            code = f'K_{item["n"]}'
 | 
						||
            item["from"] = "king"
 | 
						||
            group = item["g"]
 | 
						||
            name = item["d"]
 | 
						||
            if group:
 | 
						||
                name = f"{group}.{name}"
 | 
						||
            ins, created = Mpoint.objects.get_or_create(code=code, defaults={"name": name, "code": code, "enabled": False, "type": Mpoint.MT_AUTO, "val_type": t_dict[item["t"]], "third_info": item})
 | 
						||
            if not created and ins.val_type != t_dict[item["t"]]:  # 如果数据类型变了要同步更新
 | 
						||
                ins.val_type = t_dict[item["t"]]
 | 
						||
                ins.third_info = item
 | 
						||
                ins.save(update_fields=["val_type", "third_info"])
 | 
						||
 | 
						||
 | 
						||
test_data = {
 | 
						||
    "PNs": {"1": "V", "2": "T", "3": "Q"},
 | 
						||
    "PVs": {"1": -725, "2": "2024-04-08 13:43:53.140", "3": 192},
 | 
						||
    "Objs": [
 | 
						||
        {"N": "IP2_MM_IW3"},
 | 
						||
        {"N": "IP2_MM_IW4", "1": -6386},
 | 
						||
        {"N": "IP2_MM_IW5", "1": -7835},
 | 
						||
        {"N": "IP2_MM_IW7", "1": -6864},
 | 
						||
        {"N": "IP2_MM_IW15", "1": 29},
 | 
						||
        {"N": "IP2_MM_SC_IW3", "1": 337},
 | 
						||
        {"N": "IP2_MM_SC_IW13", "1": -5511, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW14", "1": -4640, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW15", "1": -5586, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW16", "1": -6634, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW17", "1": -2768, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW18", "1": -2946, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW19", "1": -2550, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW20", "1": -1512, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW21", "1": -1775, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW22", "1": -194, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW23", "1": -366, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW24", "1": -9291, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW25", "1": -6888, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW26", "1": -2360, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW29", "1": 164, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW30", "1": 914, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW31", "1": 849, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW37", "1": -125, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW38", "1": -3009, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW40", "1": -1394, "2": 24},
 | 
						||
        {"N": "IP2_MM_SC_IW43", "1": 758, "2": 24},
 | 
						||
        {"N": "IP3_SC_IW1", "1": 11557, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW2", "1": 7624, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW6", "1": 11159, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW7", "1": 8073, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW9", "1": 4490, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW10", "1": 5437, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW11", "1": 9244, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW12", "1": 7886, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW13", "1": -2962, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW14", "1": -159, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW26", "1": 15, "2": 107},
 | 
						||
        {"N": "IP3_SC_IW27", "1": 15, "2": 107},
 | 
						||
    ],
 | 
						||
}
 | 
						||
 | 
						||
 | 
						||
@auto_log("亚控存库")
 | 
						||
def insert_mplogx_from_king_mqtt(data: dict, is_offset=True):
 | 
						||
    """
 | 
						||
    从king mqtt数据插入超表
 | 
						||
    注释的代码是分批存的, 但是实际上不需要, 暂时注释,有需要再加
 | 
						||
    """
 | 
						||
    objs = data["Objs"]
 | 
						||
    # len_objs = len(objs)
 | 
						||
    pvs = data["PVs"]
 | 
						||
 | 
						||
    # chunk_size = 200
 | 
						||
    # num_chunks = (len(objs) + chunk_size - 1) // chunk_size
 | 
						||
 | 
						||
    otime_obj = timezone.make_aware(datetime.strptime(pvs["2"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0)  # 只保留到秒级的精度
 | 
						||
    oval = pvs["1"]
 | 
						||
    insert_mplogx_from_king_mqtt_chunk(objs, oval, otime_obj, is_offset)
 | 
						||
    # with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
 | 
						||
    #     futures = []
 | 
						||
    #     for i in range(num_chunks):
 | 
						||
    #         start = i * chunk_size
 | 
						||
    #         end = min(start + chunk_size, len_objs)
 | 
						||
    #         chunk = objs[start:end]
 | 
						||
    #         futures.append(executor.submit(insert_mplogx_from_king_mqtt_chunk, chunk, otime_obj, is_offset))
 | 
						||
    #     concurrent.futures.wait(futures)
 | 
						||
    # for future in futures:
 | 
						||
    #     print(future.result(), end=', ')
 | 
						||
 | 
						||
 | 
						||
def insert_mplogx_item(code: str, val, timex: datetime, enp_mpoints_dict):
 | 
						||
    """
 | 
						||
    存入超表
 | 
						||
    """
 | 
						||
    mc = MpointCache(code)
 | 
						||
    mpoint_data = mc.data
 | 
						||
    if mpoint_data in (None, {}) or not mpoint_data["enabled"]:
 | 
						||
        return
 | 
						||
 | 
						||
    mpoint_interval = mpoint_data["interval"]
 | 
						||
    mpoint_last_timex = mpoint_data.get("last_data", {}).get("last_timex", None)
 | 
						||
    # 控制采集间隔
 | 
						||
    can_save = False
 | 
						||
    if mpoint_last_timex:
 | 
						||
        if (timex - mpoint_last_timex).total_seconds() > mpoint_interval or timex < mpoint_last_timex:
 | 
						||
            can_save = True
 | 
						||
    else:
 | 
						||
        can_save = True
 | 
						||
    
 | 
						||
        
 | 
						||
    if mpoint_data.get("save_expr", None):
 | 
						||
        can_save = get_can_save_from_save_expr(mpoint_data["save_expr"], val)
 | 
						||
 | 
						||
    if can_save:
 | 
						||
        try:
 | 
						||
            MpointCache(code).set(timex, val)
 | 
						||
 | 
						||
            # 此处代码用于更新envdata表里的数据
 | 
						||
            enp_field = mpoint_data.get("enp_field", None)
 | 
						||
            ep_monitored = mpoint_data.get("ep_monitored", None)
 | 
						||
            if enp_field and ep_monitored:
 | 
						||
                if enp_mpoints_dict.get(ep_monitored, None) is None:
 | 
						||
                    enp_mpoints_dict[ep_monitored] = {"timex": timex, "equipment": Equipment.objects.get(id=ep_monitored)}
 | 
						||
                if enp_field == "running_state":
 | 
						||
                    enp_mpoints_dict[ep_monitored].update({enp_field: mpoint_data["last_data"]["last_mrs"]})
 | 
						||
                else:
 | 
						||
                    enp_mpoints_dict[ep_monitored].update({enp_field: val})
 | 
						||
                    enp_mpoints_dict[ep_monitored].update({enp_field: val})
 | 
						||
        except IntegrityError:  # 忽略唯一性错误
 | 
						||
            pass
 | 
						||
        except Exception:
 | 
						||
            myLogger.error(f"mpoint_code: {code} 存库失败", exc_info=True)
 | 
						||
            mc.set_fail(-1, timex)
 | 
						||
 | 
						||
 | 
						||
def insert_mplogx_from_king_mqtt_chunk(objs: list, oval, otime_obj: datetime, is_offset=True):
 | 
						||
    """
 | 
						||
    分批存库,  亚控 38.00,00000.11011 版本偏移只是时间戳偏移。另外其实可以不在乎
 | 
						||
    """
 | 
						||
    # oval = pvs['1']
 | 
						||
    enp_mpoints_dict = {}  # 这个地方主要是需要更新envdata表里的数据
 | 
						||
    for obj in objs:
 | 
						||
        n = obj["N"]
 | 
						||
        val = obj.get("1", oval)
 | 
						||
        # timex = obj.get("2", None)
 | 
						||
        code = f"K_{n}"
 | 
						||
        insert_mplogx_item(code, val, otime_obj, enp_mpoints_dict)
 | 
						||
 | 
						||
    # # 先尝试批量存库/发生异常则单个存储
 | 
						||
    # is_bulk_insert = True
 | 
						||
    # try:
 | 
						||
    #     MpLogx.objects.bulk_create(insert_db_data)
 | 
						||
    # except IntegrityError:
 | 
						||
    #     is_bulk_insert = False
 | 
						||
 | 
						||
    # if is_bulk_insert:
 | 
						||
    #     # 批量存库成功后更新缓存
 | 
						||
    #     for item in update_cache_data:
 | 
						||
    #         need_gather_filed_mpoints.append(update_mpoint_cache_and_do_func(item))
 | 
						||
    # else:
 | 
						||
    # {'eq_belong': {'dust_rtd': 1.0}}
 | 
						||
 | 
						||
    # 额外需要处理的数据(存储到envdata表里)
 | 
						||
    if enp_mpoints_dict:
 | 
						||
        for _, item in enp_mpoints_dict:
 | 
						||
            EnvData.objects(**item)
 | 
						||
 | 
						||
 | 
						||
def insert_mplogx_from_king_rest_chunk(objs: list):
 | 
						||
    enp_mpoints_dict = {}
 | 
						||
    for obj in objs:
 | 
						||
        n = obj["N"]
 | 
						||
        code = f"K_{n}"
 | 
						||
        timex = timezone.make_aware(datetime.strptime(obj["T"], "%Y-%m-%d %H:%M:%S.%f")).replace(microsecond=0)
 | 
						||
        insert_mplogx_item(code, obj["V"], timex, enp_mpoints_dict)
 | 
						||
 | 
						||
    if enp_mpoints_dict:
 | 
						||
        for _, item in enp_mpoints_dict.items():
 | 
						||
            # try:
 | 
						||
            EnvData.objects.get_or_create(timex=item["timex"], equipment=item["equipment"], defaults=item)
 | 
						||
            # except IntegrityError as e:  # 忽略唯一性错误
 | 
						||
            #     myLogger.error(e, exc_info=True)
 | 
						||
 | 
						||
def get_analyse_data_mgroups_duration(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
 | 
						||
    """
 | 
						||
    获取一段时间范围的工段分析数据
 | 
						||
    """
 | 
						||
    start_year, start_month, start_day = start_date.year, start_date.month, start_date.day
 | 
						||
    end_year, end_month, end_day = end_date.year, end_date.month, end_date.day
 | 
						||
    # total_sec = (end_date - start_date).total_seconds() + 3600 * 24
 | 
						||
 | 
						||
    qs = (
 | 
						||
        EnStat.objects.filter(mgroup__cate="section", type="day_s")
 | 
						||
        .filter(Q(year_s__gt=start_year) | Q(year_s=start_year, month_s__gt=start_month) | Q(year_s=start_year, month_s=start_month, day_s__gte=start_day))
 | 
						||
        .filter(Q(year_s__lt=end_year) | Q(year_s=end_year, month_s__lt=end_month) | Q(year_s=end_year, month_s=end_month, day_s__lte=end_day))
 | 
						||
    )
 | 
						||
    res = (
 | 
						||
        qs.values("mgroup", "mgroup__name")
 | 
						||
        .annotate(total_production=Sum("total_production"), 
 | 
						||
                  run_sec=Sum("run_sec"), elec_consume=Sum("elec_consume"), 
 | 
						||
                  pcoal_coal_consume=Sum("pcoal_coal_consume"),
 | 
						||
                  total_sec_now=Sum("total_sec_now"))
 | 
						||
        .order_by("mgroup__sort")
 | 
						||
    )
 | 
						||
    res_dict = {}
 | 
						||
    for item in res:
 | 
						||
        res_dict[item["mgroup__name"]] = item
 | 
						||
        for key, value in item.items():
 | 
						||
            if value is None:
 | 
						||
                item[key] = 0
 | 
						||
 | 
						||
    for item in res:
 | 
						||
        item["mgroup_name"] = item["mgroup__name"]
 | 
						||
        item["production_hour"] = round(item["total_production"] * 3600 / item["run_sec"], 2) if item["run_sec"] > 0 else 0
 | 
						||
        item["elec_consume_unit"] = round(item["elec_consume"] / item["total_production"], 2) if item["total_production"] > 0 else 0
 | 
						||
        item["run_hour"] = round(item["run_sec"] / 3600, 2) if item["run_sec"] > 0 else 0
 | 
						||
        item["run_rate"] = round(item["run_sec"] * 100 / item["total_sec_now"], 2) if item["total_sec_now"] > 0 else 0
 | 
						||
        item["coal_consume_unit"] = round(item["pcoal_coal_consume"] * 1000 / item["total_production"], 2) if item["total_production"] > 0 else 0
 | 
						||
        if item["mgroup_name"] == "回转窑":
 | 
						||
            total_production_ylm = res_dict.get("原料磨", {}).get("total_production", 0)
 | 
						||
            elec_consume_ylm = res_dict.get("原料磨", {}).get("elec_consume", 0)
 | 
						||
            elec_consume_mm = res_dict.get("煤磨", {}).get("elec_consume", 0)
 | 
						||
            if item["total_production"] == 0:
 | 
						||
                item["celec_consume_unit"] = 0
 | 
						||
                item["cen_consume_unit"] = 0
 | 
						||
            else:
 | 
						||
                item["celec_consume_unit"] = round((elec_consume_mm + item["elec_consume"]) / get_safe_value(item["total_production"]) + get_sysconfig("enm.enm_lhxs") * elec_consume_ylm / get_safe_value(total_production_ylm), 2)
 | 
						||
                item["cen_consume_unit"] = round(item["coal_consume_unit"] + 0.1229 * item["elec_consume_unit"], 2)
 | 
						||
        item["total_production"] = round(item["total_production"], 2)
 | 
						||
    return res
 | 
						||
 | 
						||
def get_safe_value(value, default=1):
 | 
						||
    return value if value != 0 else default
 | 
						||
 | 
						||
 | 
						||
def get_elec_level(month, hour):
 | 
						||
    """
 | 
						||
    peak:用电尖峰 1,11,12月份 19:00-21:00 -- 7 月份 21:00-23:00 更新val_level 为 peak
 | 
						||
    high:用电高峰时段8小时:8:00--11:00, 19:00--24:00 更新val_level 为 high
 | 
						||
    flat:用电平谷时段8小时: 11:00--13:00, 17:00--19:00 更新val_level 为 flat
 | 
						||
    low:用电低谷时段8小时: 4:00--8:00, 13:00--17:00 更新val_level 为 low
 | 
						||
    """
 | 
						||
    if (month in [1, 11, 12] and hour in [19, 20, 21]) or (month == 7 and hour in [21, 22]):
 | 
						||
        return 'peak'
 | 
						||
    elif month in [5, 6, 7, 8] and hour in [14, 15]:
 | 
						||
        return 'deep'
 | 
						||
    elif hour in [8, 9, 10, 19, 20, 21, 22, 23]:
 | 
						||
        return 'high'
 | 
						||
    elif hour in [4, 5, 6, 7, 13, 14, 15, 16]:
 | 
						||
        return 'low'
 | 
						||
    elif hour in [11, 12, 17, 18, 0, 1, 2, 3]:
 | 
						||
        return 'flat'
 | 
						||
    return 'flat' |