# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2, Xscript from apps.wpm.models import SfLog import datetime from django.db.models import Q from django.db.models import Sum, Avg from dateutil import tz from django.conf import settings from apps.wpm.services import get_sflog from apps.mtm.models import Mgroup, Material from apps.fim.services import get_cost_unit, get_price_unit from apps.fim.models import Fee from apps.enm.services import translate_eval_formula import logging from server.settings import get_sysconfig, update_sysconfig from django.db.models import F from apps.wpm.services import get_pcoal_heat from django.utils import timezone from django.db.models import Max from apps.third.king.k import kingClient from apps.third.king.king_api import kapis from apps.enm.services import insert_mplogx_from_king_rest_chunk, MpointCache from django.utils.timezone import localtime from apps.wpm.tasks import get_total_sec_now, cal_exp_duration_sec from apps.utils.sql import DbConnection from apps.enm.services import db_insert_mplogx_batch, get_elec_level from apps.enm.xscript import main from django.core.exceptions import ObjectDoesNotExist from django.utils.timezone import make_aware myLogger = logging.getLogger("log") def get_current_and_previous_time(): now = datetime.datetime.now() pre = now - datetime.timedelta(hours=1) return now, pre @shared_task(base=CustomTask) def insert_mplogx_from_xscript(xscript_id): xscript = Xscript.objects.get(id=xscript_id) mcodes_list = Mpoint.objects.filter(enabled=True).values_list('code', flat=True) if mcodes_list: main(xscript, mcodes_list) @shared_task(base=CustomTask) def db_insert_mplogx(limit:bool=True): """ 从数据库转存到超表 """ config = get_sysconfig() with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: last_tag_id = config['enm'].get('last_tag_id', None) if last_tag_id is None: raise Exception("last_tag_id is None") cursor.execute("select count(id) from tag_value where id > %s", (last_tag_id)) count = cursor.fetchone()[0] if limit and count > 400: raise Exception("db inset count > 400") cursor.execute( "select id, val, tag_code, update_time from tag_value where id > %s order by id, update_time", (last_tag_id, )) rows = cursor.fetchall() # 获取数据后保存至本地 if rows: last_tag_id = rows[-1][0] db_insert_mplogx_batch(rows) update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) @shared_task(base=CustomTask) def db_ins_mplogx(): """ 从数据库转存到超表 """ config = get_sysconfig() with DbConnection(config['enm1']['db_host'], config['enm1']['db_user'], config['enm1']['db_password'], config['enm1']['db_database1']) as cursor: bill_date = config['enm1'].get('bill_date', None) batchs = config['enm1'].get('batch', None) if not batchs: raise Exception("batch is None") try: bill_date = datetime.datetime.strptime(bill_date, '%Y-%m-%d %H:%M:%S') except ValueError: raise Exception(f"Invalid date format in {bill_date}") if bill_date is None: raise Exception("bill_date is None") query = """ SELECT id, de_real_quantity, CONCAT('x', inv_name) AS inv_name, bill_date FROM sa_weigh_view WHERE bill_date >= %s and de_real_quantity > 0 AND inv_name IN %s ORDER BY bill_date """ cursor.execute(query, (bill_date, tuple(batchs))) rows = cursor.fetchall() # 获取数据后保存至本地 if rows: bill_date = rows[-1][-1] db_insert_mplogx_batch(rows) update_sysconfig({'enm1': {'bill_date': str(bill_date)}}) # @shared_task(base=CustomTask) # def mpoint_val_on_change(mpLogId: str): # """测点值变化的时候执行任务(废弃) # """ # mplog = MpLog.objects.get(id=mpLogId) # mpoint = mplog.mpoint # module, func = mpoint.func_on_change.rsplit(".", 1) # m = importlib.import_module(module) # f = getattr(m, func) # f(mplog) # 同步执行 @shared_task(base=CustomTask) def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal_attrs=[]): """ 重跑某一段时间的任务 """ mytz = tz.gettz(settings.TIME_ZONE) start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") start_time.replace(tzinfo=mytz) end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") start_time.replace(tzinfo=mytz) current_time = start_time while current_time <= end_time: year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs) current_time += datetime.timedelta(hours=1) @shared_task(base=CustomTask) def correct_bill_date(): now_time = datetime.datetime.now() update_time = now_time - datetime.timedelta(hours=24) bill_date = make_aware(update_time) update_sysconfig({'enm1': {'bill_date': str(bill_date)}}) @shared_task(base=CustomTask) def cal_mpointstats_scheduled_tasks(m_code_list=None, cal_attrs=None): """ 重跑某一段时间的任务 """ if m_code_list is None: m_code_list = [] if cal_attrs is None: cal_attrs = [] end_time = datetime.datetime.now() start_time = end_time - datetime.timedelta(hours=24) start_time = make_aware(start_time) end_time = make_aware(end_time) current_time = start_time while current_time <= end_time: year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs) current_time += datetime.timedelta(hours=1) @shared_task(base=CustomTask) def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int, cascade=True, sflog_hours=[]): """ 计算某一测点, 某一时间点某一小时的统计值 """ val_level = None mpoint = Mpoint.objects.get(id=mpointId) mytz = tz.gettz(settings.TIME_ZONE) dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz) # 整点时间 dt_hour_p= dt - datetime.timedelta(hours=1) # 上个整点 dt_hour_n= dt + datetime.timedelta(hours=1) # 下个整点 if (mpoint.material or mpoint.type == Mpoint.MT_COMPUTE) and mpoint.val_type in ['float', 'int']: # 如果计量的是物料 # 累计量 有的会清零,需要额外处理(还未做) material_code = mpoint.material.code if mpoint.material else None params = {"mpoint": mpoint, "type": "hour"} params["year"], params["month"], params["day"], params["hour"] = year, month, day, hour val = 0 val_type = mpoint.val_type if mpoint.type == Mpoint.MT_AUTO: if mpoint.is_unit: val = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lt=dt_hour_n).aggregate(sum=Sum(f'val_{mpoint.val_type}'))["sum"] if val is None: val = 0 else: mrs0 = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex") mrs = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex") if mrs0.exists() and mrs.exists(): last_val = getattr(mrs.last(), f'val_{val_type}') first_val = getattr(mrs0.last(), f'val_{val_type}') if last_val >= first_val or first_val == 0: val = last_val - first_val elif first_val - last_val > 0 and (first_val - last_val)/first_val < 0.01: val = 0 myLogger.info(f'{mpoint.code}--{dt}--{last_val}--{first_val}--last_val 小于 first_val') else: # 这里判断有可能清零了 max_val = max(mrs.aggregate(max=Max(f'val_{val_type}'))["max"], first_val) myLogger.info(f'{mpoint.code}--{dt}--{last_val}--{first_val}--清零') val = max_val - first_val + last_val # if mpoint.code == 'x水泥+P.O42.5 散装': elif mpoint.type == Mpoint.MT_COMPUTE and mpoint.formula: formula = mpoint.formula val = translate_eval_formula(formula, year, month, day, hour) else: return ms, _ = MpointStat.objects.get_or_create(**params, defaults=params) ms.val = ms.val_correct if ms.val_correct is not None else val if material_code == 'elec': val_level = get_elec_level(month, hour) ms.val_level = val_level # ms.val_level ms.save() # 更新更高级别的值 if cascade or hour == 23: params_day = {"type": "day", "mpoint": mpoint, "year": year, "month": month, "day": day} ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day) if ms_day.val_correct is not None: ms_day.val = ms_day.val_correct else: sum_dict_day = MpointStat.objects.filter(type="hour", mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum("val")) ms_day.val = sum_dict_day['sum'] if sum_dict_day['sum'] is not None else 0 ms_day.val_origin = ms_day.val ms_day.save() if cascade or day in [28, 29, 30, 31]: params_month = {"type": "month", "mpoint": mpoint, "year": year, "month": month} ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month) if ms_month.val_correct is not None: ms_month.val = ms_month.val_correct else: sum_dict_month = MpointStat.objects.filter(type="day", mpoint=mpoint, year=year, month=month).aggregate(sum=Sum("val")) ms_month.val = sum_dict_month['sum'] if sum_dict_month['sum'] is not None else 0 ms_month.val_origin = ms_month.val ms_month.save() if cascade or month == 12: params_year = {"type": "year", "mpoint": mpoint, "year": year} ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year) if ms_year.val_correct is not None: ms_year.val = ms_year.val_correct else: sum_dict_year = MpointStat.objects.filter(type="month", mpoint=mpoint, year=year).aggregate(sum=Sum("val")) ms_year.val = sum_dict_year['sum'] if sum_dict_year['sum'] is not None else 0 ms_year.val_origin = ms_year.val ms_year.save() mgroup = mpoint.mgroup if mgroup: sflog = get_sflog(mgroup, dt) if sflog is None: myLogger.error(f'{mgroup.name}--{dt}') year_s, month_s, day_s = sflog.get_ymd params_hour_s = { "type": "hour_s", "mpoint": mpoint, "sflog": sflog, "mgroup": mgroup, "year": year, "month": month, "day": day, "year_s": year_s, "month_s": month_s, "day_s": day_s, "hour": hour, } ms_hour_s, _ = MpointStat.objects.get_or_create(**params_hour_s, defaults=params_hour_s) ms_hour_s.val = ms_hour_s.val_correct if ms_hour_s.val_correct is not None else ms.val ms_hour_s.save() # 开始往上计算 params_sflog_s = {"type": "sflog", "mpoint": mpoint, "sflog": sflog, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup} ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s) if ms_sflog_s.val_correct is not None: ms_sflog_s.val = ms_sflog_s.val_correct else: sum_dict_sflog_s = MpointStat.objects.filter(type="hour_s", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog).aggregate(sum=Sum("val")) ms_sflog_s.val = sum_dict_sflog_s['sum'] if sum_dict_sflog_s['sum'] is not None else 0 ms_sflog_s.val_origin = ms_sflog_s.val ms_sflog_s.save() # next_cal_dict = [mpoint.material.id, sflog.id, year, month, day, hour, year_s, month_s, day_s] # if next_cal_dict == cache.get('enm_cal_dict', None): # next_cal = 0 # else: # next_cal = 1 # cache.set('enm_cal_dict', next_cal_dict, 60) cal_mpointstat_manual(mpoint.id, sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s, 0) @shared_task(base=CustomTask) def cal_mpointstats(is_now=1, year=None, month=None, day=None, hour=None, m_code_list=[], cal_attrs=[]): """ 计算所有自动采集测点的统计值,默认当前小时, 可手动传入时间和测点编号集 cal_attrs: 需要计算的属性,空列表默认计算所有属性["material", "pcoal", "run_hour"] """ if year and month and day and hour is not None: pass else: now, pre = get_current_and_previous_time() if is_now: year, month, day, hour = now.year, now.month, now.day, now.hour else: year, month, day, hour = pre.year, pre.month, pre.day, pre.hour if m_code_list: mpoints1 = Mpoint.objects.filter(code__in=m_code_list) for item in mpoints1: cal_mpointstat_hour(item.id, year, month, day, hour) mpoints_related = Mpoint.objects.none() code2 = [] for code in m_code_list: mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False, formula__contains='{' + code + '}') code2.extend(mpoints_related.values_list('code', flat=True)) code2 = list(set(code2)) for code in code2: mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False, formula__contains='{' + code + '}') mpoints_related = mpoints_related.distinct().order_by('report_sortstr', 'create_time') for item in mpoints_related: cal_mpointstat_hour(item.id, year, month, day, hour) else: # 先统计自动采集的测点 mpoints_auto = Mpoint.objects.filter(type=Mpoint.MT_AUTO, enabled=True) # mpoints_without_formula_group = [] for item in mpoints_auto: # mpoints_without_formula_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour)) cal_mpointstat_hour(item.id, year, month, day, hour) # 再统计计算测点 mpoints_compute = Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True).exclude(formula="").order_by('report_sortstr', 'create_time') # mpoints_other_group = [] for item in mpoints_compute: # mpoints_other_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour)) cal_mpointstat_hour(item.id, year, month, day, hour) # 先调整一下班时间,以防计算错误 # 为了保持一致使用统计的时间点 nowx = timezone.now() get_total_sec_now(now=nowx) # 再处理total_sec cal_exp_duration_sec(now=nowx) # 先处理shut_sec # 开始计算enstat mgroups = Mgroup.objects.filter(need_enm=True).order_by("sort") # mgroups_group = [] year_s, month_s, day_s = 0, 0, 0 for mgroup in mgroups: # mgroups_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['material', 'run_hour'])) year_s, month_s, day_s = cal_enstat("hour_s", None, mgroup.id, year, month, day, hour, None, None, None, True, cal_attrs) # mgroups_t = mgroups.filter(name__in=['回转窑', '水泥磨']) # mgroups_t_group = [] # for mgroup in mgroups_t: # mgroups_t_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['pcoal'])) # 最后计算enstat2 cal_enstat2(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s) # task_chain = chain(group(mpoints_without_formula_group), group(mpoints_other_group), group(mgroups_group), group(mgroups_t_group), group([cal_enstat2.s(year_s=year, month_s=month)])) # task_chain.delay() if hour == 21: enm_alarm.delay(year_s, month_s, day_s) @shared_task(base=CustomTask) def cal_mpointstat_manual(mpointId: str, sflogId: str, mgroupId: str, year: int, month: int, day: int, hour: int, year_s: int, month_s: int, day_s: int, next_cal=0): """ 手动录入的测点数据进行往上统计,一级一级往上 """ mpoint = Mpoint.objects.get(id=mpointId) mgroup = Mgroup.objects.get(id=mgroupId) if sflogId: params_day_s = {"type": "day_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup} ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s) if ms_day_s.val_correct is not None: ms_day_s.val = ms_day_s.val_correct else: sum_dict_day_s = MpointStat.objects.filter(type="sflog", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum("val")) ms_day_s.val = sum_dict_day_s['sum'] if sum_dict_day_s['sum'] is not None else 0 ms_day_s.val_origin = ms_day_s.val ms_day_s.save() if day_s: params_month_s = {"type": "month_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "mgroup": mgroup} ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s) if ms_month_s.val_correct is not None: ms_month_s.val = ms_month_s.val_correct else: sum_dict_month_s = MpointStat.objects.filter(type="day_s", mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum("val")) ms_month_s.val = sum_dict_month_s['sum'] if sum_dict_month_s['sum'] is not None else 0 ms_month_s.val_origin = ms_month_s.val ms_month_s.save() if month_s: params_year_s = {"type": "year_s", "mpoint": mpoint, "year_s": year_s, "mgroup": mgroup} ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s) if ms_year_s.val_correct is not None: ms_year_s.val = ms_year_s.val_correct else: sum_dict_year_s = MpointStat.objects.filter(type="month_s", mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum("val")) ms_year_s.val = sum_dict_year_s['sum'] if sum_dict_year_s['sum'] is not None else 0 ms_year_s.val_origin = ms_year_s.val ms_year_s.save() if next_cal: # 计算工段级数据,手动录入时调用 start_cal_type = 'hour_s' if hour: start_cal_type = 'hour_s' elif sflogId: start_cal_type = 'sflog' elif day_s: start_cal_type = 'day_s' elif month_s: start_cal_type = 'month_s' elif year_s: start_cal_type = 'year_s' cal_enstat(start_cal_type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s) types_list = ["hour_s", "sflog", "day_s", "month_st", "month_s", "year_s"] @shared_task(base=CustomTask) def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, cascade=True, cal_attrs=[]): """ 计算能源数据统计 """ if cascade: if type in types_list: start_index = types_list.index(type) new_types_list = types_list[start_index:] for type in new_types_list: year_s, month_s, day_s = cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, False) return year_s, month_s, day_s if not cal_attrs: this_cal_attrs = ["material", "pcoal", "run_hour"] else: this_cal_attrs = cal_attrs[:] mgroup = Mgroup.objects.get(id=mgroupId) sflog = None # 默认无班次 if sflogId: sflog = SfLog.objects.get(id=sflogId) elif year and month and day and hour is not None: mytz = tz.gettz(settings.TIME_ZONE) dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz) sflog = get_sflog(mgroup, dt) team = None if sflog: year_s, month_s, day_s = sflog.get_ymd team = sflog.team # 如果没有班组,那么month_st无需计算 if team is None and type == "month_st": return year_s, month_s, day_s if type == "hour_s": enstat, _ = EnStat.objects.get_or_create( type="hour_s", mgroup=mgroup, year=year, month=month, day=day, hour=hour, defaults={"type": "hour_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s, "year": year, "month": month, "day": day, "hour": hour, "sflog": sflog}, ) elif type == "sflog": enstat, _ = EnStat.objects.get_or_create(type="sflog", sflog=sflog, defaults={"type": "sflog", "sflog": sflog, "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s}) elif type == "day_s": enstat, _ = EnStat.objects.get_or_create( type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, defaults={"type": "day_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s} ) elif type == "month_st": enstat, _ = EnStat.objects.get_or_create( type="month_st", mgroup=mgroup, team=team, year_s=year_s, month_s=month_s, defaults={"type": "month_st", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "team": team} ) elif type == "month_s": enstat, _ = EnStat.objects.get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, defaults={"type": "month_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s}) elif type == "year_s": enstat, _ = EnStat.objects.get_or_create(type="year_s", mgroup=mgroup, year_s=year_s, defaults={"type": "year_s", "mgroup": mgroup, "year_s": year_s}) if "material" in this_cal_attrs: # 消耗物料统计(包括电耗) input_materials = [] input_materials = mgroup.input_materials input_materials_dict = {} has_p_cost = False if mgroup.product: input_materials_dict[mgroup.product.id] = "P" if mgroup.product_cost: input_materials_dict[mgroup.product_cost.id] = "P_COST" has_p_cost = True input_materials_dict.update({i : "M" for i in input_materials if i not in input_materials_dict}) imaterial_cost_unit = 0 imaterial_data = [] imaterial_data_dict = {} for mid, mtype in input_materials_dict.items(): material = Material.objects.get(id=mid) if type == "hour_s": mps = MpointStat.objects.filter(type="hour_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material) elif type == "sflog": mps = MpointStat.objects.filter(type="sflog", sflog=sflog, mpoint__material=material) elif type == "day_s": mps = MpointStat.objects.filter(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material) elif type == "month_st": mps = MpointStat.objects.filter(type="sflog", mgroup=mgroup, sflog__team=team, year_s=year_s, month_s=month_s, mpoint__material=material) elif type == "month_s": mps = MpointStat.objects.filter(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material) elif type == "year_s": mps = MpointStat.objects.filter(type="year_s", mgroup=mgroup, year_s=year_s, mpoint__material=material) if mps.filter(mpoint__is_rep_mgroup=True).exists(): mps = mps.filter(mpoint__is_rep_mgroup=True) amount_consume = mps.aggregate(sum=Sum("val"))["sum"] if amount_consume is None: amount_consume = 0 if mtype == "P": # 如果是产量 enstat.total_production = amount_consume elif mtype == "P_COST": # 如果是产量(用于计算成本) enstat.total_production_cost = amount_consume else: if material.code in ["cair", "steam"]: price_unit = 0 else: price_unit = get_price_unit(material, year_s, month_s) cost = amount_consume * price_unit try: cost_unit = cost / enstat.total_production if has_p_cost is False else cost / enstat.total_production_cost except ZeroDivisionError: cost_unit = 0 imaterial_cost_unit = imaterial_cost_unit + cost_unit if material.code == "elec": enstat.elec_consume = amount_consume enstat.elec_coal_consume = enstat.elec_consume * 0.1229 / 1000 try: enstat.elec_consume_unit = enstat.elec_consume / enstat.total_production except ZeroDivisionError: enstat.elec_consume_unit = 0 elif material.code == "water": enstat.water_consume = amount_consume elif material.code == "pcoal": enstat.pcoal_consume = amount_consume elif material.code == "cair": enstat.cair_consume = amount_consume elif material.code == "steam": enstat.out_steam = amount_consume enstat.out_steam_coal = enstat.out_steam * 128.6 / 1000 elif material.code == "ammonia": enstat.ammonia_consume = amount_consume elif material.code == "ccr": enstat.ccr_consume = amount_consume enstat.kiln_end_heat = enstat.total_production - enstat.ccr_consume imaterial_item = { "material": mid, "material_name": material.name, "material_code": material.code, "material_type": material.type, "price_unit": price_unit, "amount_consume": amount_consume, "cost": cost, "cost_unit": cost_unit, } imaterial_data.append(imaterial_item) imaterial_data_dict.setdefault(material.name, imaterial_item) enstat.imaterial_data = imaterial_data enstat.imaterial_data_dict = imaterial_data_dict # 其他成本数据 other_cost_data = [] other_cost_unit = 0 fee_qs = Fee.objects.order_by("sort") for fee in fee_qs: item = {"element": fee.element, "cate": fee.cate, "name": fee.name, "id": fee.id} item["cost_unit"] = get_cost_unit(mgroup, fee, year_s, month_s) other_cost_unit = other_cost_unit + item["cost_unit"] other_cost_data.append(item) enstat.other_cost_data = other_cost_data enstat.production_cost_unit = imaterial_cost_unit + other_cost_unit enstat.save() if mgroup.cate == 'section': # 更新所监测设备测点的total_production total_production = enstat.total_production if total_production == 0: elec_consume_unit = 0 else: elec_consume_unit = F("val") / total_production MpointStat.objects.filter( mgroup=enstat.mgroup, mpoint__material__code="elec", type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour ).update(total_production=total_production, elec_consume_unit=elec_consume_unit) # 更新余热发电的production_elec_unit if mgroup.name == "回转窑": if enstat.total_production == 0: production_elec_unit_yr = 0 else: production_elec_unit_yr = F("total_production") / enstat.total_production EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="余热发电").update( production_elec_unit=production_elec_unit_yr ) # 其他一些触发计算 if mgroup.cate == "section": if "material" in this_cal_attrs and mgroup.name != "回转窑": try: enstat.en_consume_unit = enstat.elec_coal_consume / enstat.total_production except ZeroDivisionError: enstat.en_consume_unit = 0 enstat.save() # 计算一些其他数据 if type == "month_st" and "material" in this_cal_attrs: # 如果计算的是班月,把主要设备电耗数据拉过来, 方便查询 res = MpointStat.objects.filter(type='sflog', year_s=year_s, month_s=month_s, sflog__team=enstat.team, mpoint__material__code='elec', mpoint__ep_monitored__isnull=False).values( equipment=F('mpoint__ep_monitored__id'), equipment_name=F('mpoint__ep_monitored__name')).annotate(consume=Sum('val')) res = list(res) for item in res: try: item['consume_unit'] = item['consume'] / enstat.total_production except ZeroDivisionError: item['consume_unit'] = None enstat.equip_elec_data = res enstat.save() if enstat.mgroup.name == "回转窑": # 算单位产品(综合电耗/标煤耗/综合能耗) # 综合电耗 if enstat.type in ["hour_s", "day_s", "year_s", "month_s"]: pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="原料磨").first() pre_enstat2 = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="煤磨").first() if pre_enstat: enstat.celec_consume_unit = enstat.elec_consume_unit + get_sysconfig('enm.enm_lhxs') * pre_enstat.elec_consume_unit if pre_enstat2: try: pre_enstat2.production_elec_consume_unit = pre_enstat2.elec_consume / enstat.total_production except ZeroDivisionError: pre_enstat2.production_elec_consume_unit = 0 pre_enstat2.save(update_fields=["production_elec_consume_unit"]) enstat.celec_consume_unit += pre_enstat2.production_elec_consume_unit enstat.save() # 算总煤耗 if "pcoal" in this_cal_attrs: if type in ["hour_s", "sflog", "day_s"]: enstat.pcoal_heat = get_pcoal_heat(enstat.year_s, enstat.month_s, enstat.day_s) enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 29307.6 elif type == "month_st": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s, sflog__team=enstat.team).aggregate( sum=Sum("pcoal_coal_consume") )["sum"] elif type == "month_s": enstat.pcoal_coal_consume = EnStat.objects.filter(type="day_s", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"] elif type == "year_s": enstat.pcoal_coal_consume = EnStat.objects.filter(type="month_s", mgroup=enstat.mgroup, year_s=year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"] if enstat.pcoal_coal_consume is None: enstat.pcoal_coal_consume = 0 # 算单位产品标煤耗 try: enstat.coal_consume_unit = enstat.pcoal_coal_consume * 1000 / enstat.total_production except ZeroDivisionError: enstat.coal_consume_unit = 0 # 综合能耗 enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit enstat.save() elif enstat.mgroup.name == "水泥磨" and enstat.type not in ["month_st", "sflog"] and "pcoal" in this_cal_attrs: pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first() if pre_enstat: # 综合能耗 enstat.cen_consume_unit = enstat.elec_consume_unit * 0.1229 + 0.7 * pre_enstat.cen_consume_unit enstat.save() elif enstat.mgroup.name == '余热发电': pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first() if pre_enstat: # 吨熟料发电量 try: enstat.production_elec_unit = enstat.total_production / pre_enstat.total_production enstat.save() except ZeroDivisionError: enstat.production_elec_unit = 0 enstat.save() # 运转时长相关 if type != "hour_s" and "run_hour" in this_cal_attrs: enstat.total_sec_now, enstat.shut_sec = get_total_sec_now_and_shut_sec(enstat) enstat.run_sec = enstat.total_sec_now - enstat.shut_sec if enstat.total_sec_now - enstat.shut_sec >0 else 0 try: enstat.run_rate = (enstat.run_sec / enstat.total_sec_now) * 100 except ZeroDivisionError: enstat.run_rate = 0 try: enstat.production_hour = enstat.total_production * 3600 / enstat.run_sec except ZeroDivisionError: enstat.production_hour = 0 enstat.save() enstat.save() return year_s, month_s, day_s # 返回这个值主要为了全厂数据计算而用 def get_total_sec_now_and_shut_sec(enstat: EnStat): from apps.wpm.models import SfLog # if enstat.type == 'hour_s': # # 找到停机记录,并划分到该小时 # end_time = datetime.datetime(enstat.year, enstat.month, enstat.day, enstat.hour) # start_time = end_time - datetime.timedelta(hours=1) # sts = StLog.objects.filter(mgroup=enstat.mgroup) # sts = (sts.filter(start_time__gt=start_time, start_time__lt=end_time)|sts.filter(start_time__lt=start_time, end_time=None)|sts.filter(end_time__gt=start_time, end_time__lt=end_time)).distinct() # shut_hour = 0 # for i in sts: # if i.end_time is None: # run_hour = 0 # if i.start_time > start_time: # run_hour = (i.start_time - start_time).total_seconds/3600 # shut_hour = 1 - run_hour # shut_hour = shut_hour + # return 1, 0 if enstat.type == "sflog": sflog = enstat.sflog return sflog.total_sec_now, sflog.shut_sec elif enstat.type == "day_s": res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, work_date__day=enstat.day_s, mgroup=enstat.mgroup).aggregate( sum1=Sum("total_sec_now"), sum2=Sum("shut_sec") ) return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0 elif enstat.type == "month_st": res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup, team=enstat.team).aggregate( sum1=Sum("total_sec_now"), sum2=Sum("shut_sec") ) return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0 elif enstat.type == "month_s": res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_sec_now"), sum2=Sum("shut_sec")) return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0 elif enstat.type == "year_s": res = SfLog.objects.filter(work_date__year=enstat.year_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_sec_now"), sum2=Sum("shut_sec")) return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0 @shared_task(base=CustomTask) def cal_enstat2(type: str, year_s: int, month_s: int, day_s: int, cascade=True): if cascade: if type == "day_s": cal_enstat2("day_s", year_s, month_s, day_s, False) cal_enstat2("month_s", year_s, month_s, day_s, False) elif type == "month_s": cal_enstat2("month_s", year_s, month_s, day_s, False) else: return if type == "month_s": enstat2, _ = EnStat2.objects.get_or_create(type="month_s", year_s=year_s, month_s=month_s, defaults={"year_s": year_s, "month_s": month_s, "type": "month_s"}) elif type == "day_s": enstat2, _ = EnStat2.objects.get_or_create(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s, defaults={"year_s": year_s, "month_s": month_s, "day_s": day_s, "type": "day_s"}) # enstat2 = EnStat2.objects.select_for_update().get(id=enstat2.id) # 加锁 try: material_bulk_clinker = Material.objects.get(code="bulk_clinker") # 散装熟料 enstat2.bulk_clinker_price = get_price_unit(material_bulk_clinker, year_s, month_s) except ObjectDoesNotExist: enstat2.bulk_clinker_price = 0 try: material_bulk_cement = Material.objects.get(code="bulk_cement") # 散装水泥 enstat2.bulk_cement_price = get_price_unit(material_bulk_cement, year_s, month_s) except ObjectDoesNotExist: enstat2.bulk_cement_price = 0 try: material_bag_cement = Material.objects.get(code="bag_cement") # 袋装水泥 enstat2.bag_cement_price = get_price_unit(material_bag_cement, year_s, month_s) except ObjectDoesNotExist: enstat2.bag_cement_price = 0 if type == "month_s": enstat2.bulk_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material__code="bulk_cement", year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"] elif type == "day_s": enstat2.bulk_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material__code="bulk_cement", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"] if enstat2.bulk_cement_val is None: enstat2.bulk_cement_val = 0 if type == "month_s": enstat2.bag_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material__code='bag_cement', year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"] elif type == "day_s": enstat2.bag_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material__code='bag_cement', year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"] if enstat2.bag_cement_val is None: enstat2.bag_cement_val = 0 if type == "month_s": enstat2.bulk_clinker_val = MpointStat.objects.filter(type="month_s", mpoint__material__code='bulk_clinker', year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"] elif type == "day_s": enstat2.bulk_clinker_val = MpointStat.objects.filter(type="day_s", mpoint__material__code='bulk_clinker', year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"] if enstat2.bulk_clinker_val is None: enstat2.bulk_clinker_val = 0 enstat2.industry_total_val = ( enstat2.bulk_cement_val * enstat2.bulk_cement_price + enstat2.bag_cement_val * enstat2.bag_cement_price + enstat2.bulk_clinker_val * enstat2.bulk_clinker_price) / 10000 # 出磨水泥产量 if type == "month_s": res = EnStat.objects.filter(mgroup__product__code="cement", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit")) elif type == "day_s": res = EnStat.objects.filter(mgroup__product__code="cement", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit")) enstat2.cement_val = res["sum"] if res["sum"] else 0 enstat2.cement_cost_unit = res["avg"] if res["avg"] else 0 # 出窑熟料产量 if type == "month_s": res = EnStat.objects.filter(mgroup__product__code="clinker", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit")) elif type == "day_s": res = EnStat.objects.filter(mgroup__product__code="clinker", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit")) # 出窑熟料成本 enstat2.cliker_price_cost = res["avg"] if res["avg"] else 0 enstat2.clinker_val = res["sum"] if res["sum"] else 0 # 出厂总产量 if type == "month_s": res = EnStat.objects.filter(mgroup__product__code="out_cement", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit")) elif type == "day_s": res = EnStat.objects.filter(mgroup__product__code="out_cement", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit")) enstat2.out_cement_val = res["sum"] if res["sum"] else 0 # enstat2.out_cement_cost_unit = res["avg"] if res["avg"] else 0 # enstat2.out_cement_val = enstat2.bag_cement_val + enstat2.bulk_cement_val + enstat2.bulk_clinker_val enstat2.industry_add_val = enstat2.industry_total_val - enstat2.out_cement_val * enstat2.cement_cost_unit / 10000 - enstat2.bulk_clinker_val*enstat2.cliker_price_cost / 10000 # 全厂电量 # 全厂的耗电量得单独处理 use_mpoint_elec_val = False mp_elecs = Mpoint.objects.filter(material__code="elec", code__endswith='__all') if mp_elecs.exists(): # use_mpoint_elec_val = True if type == "month_s": enstat_qs = EnStat.objects.filter(type="month_s", year_s=year_s, month_s=month_s) elif type == "day_s": enstat_qs = EnStat.objects.filter(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s) res_elec_pcoal = enstat_qs.aggregate( sum1=Sum("elec_consume"), sum2=Sum("elec_coal_consume"), sum3=Sum("pcoal_consume"), sum4=Sum("pcoal_coal_consume"), sum5=Sum("water_consume"), sum6=Sum("cair_consume") ) if use_mpoint_elec_val: if type == 'day_s': enstat2.elec_consume = MpointStat.objects.filter(type='day_s', mpoint__in=mp_elecs, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))['sum'] elif type == 'month_s': enstat2.elec_consume = MpointStat.objects.filter(type='month_s', mpoint__in=mp_elecs, year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))['sum'] if enstat2.elec_consume is None: enstat2.elec_consume = 0 else: enstat2.elec_consume = res_elec_pcoal["sum1"] if res_elec_pcoal["sum1"] else 0 enstat2.elec_coal_consume = enstat2.elec_consume * 0.1229 / 1000 # 其他的统计工段合就行 enstat2.pcoal_consume = res_elec_pcoal["sum3"] if res_elec_pcoal["sum3"] else 0 enstat2.pcoal_coal_consume = res_elec_pcoal["sum4"] if res_elec_pcoal["sum4"] else 0 enstat2.water_consume = res_elec_pcoal["sum5"] if res_elec_pcoal["sum5"] else 0 enstat2.cair_consume = res_elec_pcoal["sum6"] if res_elec_pcoal["sum6"] else 0 enstat2.en_consume = enstat2.pcoal_coal_consume + enstat2.elec_coal_consume try: enstat2.en_consume_unit = enstat2.en_consume / enstat2.industry_total_val except ZeroDivisionError: enstat2.en_consume_unit = 0 try: enstat2.en_add_consume_unit = enstat2.en_consume / enstat2.industry_add_val except ZeroDivisionError: enstat2.en_add_consume_unit = 0 enstat2.save() def cal_enstat_pcoal_change(enstat: EnStat, new_pcoal_heat): type = enstat.type if type in ["hour_s", "sflog", "day_s"]: enstat.pcoal_heat = new_pcoal_heat enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 29307.6 elif type == "month_st": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s, sflog__team=enstat.team).aggregate( sum=Sum("pcoal_coal_consume") )["sum"] elif type == "month_s": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"] elif type == "year_s": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"] if enstat.pcoal_coal_consume is None: enstat.pcoal_coal_consume = 0 # 算单位产品标煤耗 try: enstat.coal_consume_unit = enstat.pcoal_coal_consume * 1000 / enstat.total_production except ZeroDivisionError: enstat.coal_consume_unit = 0 # 综合能耗 enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit enstat.save(update_fields=["pcoal_heat", "pcoal_coal_consume", "coal_consume_unit", "cen_consume_unit"]) # 同步更新水泥磨的综合能耗,这步有可能不成功,因为水泥磨是后算的, 但是当pcoal_change时这个就有用了 if type not in ["month_st", "sflog"]: next_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥磨").first() if next_enstat: next_enstat.cen_consume_unit = next_enstat.elec_consume_unit * 0.1229 + 0.7 * enstat.cen_consume_unit next_enstat.save(update_fields=["cen_consume_unit"]) enm_alarms_list = [ ["电石渣", "elec_consume_unit", "单位产品分布电耗"], ["原料磨", "elec_consume_unit", "单位产品分布电耗"], ["回转窑", "celec_consume_unit", "单位产品综合电耗"], ["回转窑", "coal_consume_unit", "单位产品标煤耗"], ["煤磨", "elec_consume_unit", "单位产品分布电耗"], ["水泥磨", "elec_consume_unit", "单位产品分布电耗"], ["水泥包装", "elec_consume_unit", "单位产品分布电耗"] ] @shared_task(base=CustomTask) def enm_alarm(year_s: int, month_s: int, day_s: int): """ enm报警任务 """ from apps.ecm.models import Event, EventCate, Eventdo from apps.mtm.models import Goal from apps.ecm.service import notify_event now = timezone.now() event_cate, _ = EventCate.objects.get_or_create(code="consume_exceed", defaults={"name": "能耗超过目标值", "code": "consume_exceed", "trigger": 30}) for item in enm_alarms_list: mgroups = Mgroup.objects.filter(name=item[0]) for mgroup in mgroups: enstat = EnStat.objects.filter(mgroup=mgroup, type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).first() if enstat: mgroup_name = item[0] goal_cate_str = item[1] goal_cate_str_chin = item[2] real_val = getattr(enstat, goal_cate_str, None) goal = Goal.objects.filter(goal_cate__code=goal_cate_str, year=year_s, mgroup=mgroup).first() if goal: goal_val = getattr(goal, f"goal_val_{month_s}", None) if goal_val and real_val and real_val > goal_val: # 触发事件 event = Event() event.obj_cate = "enm" event.happen_time = now event.voice_msg = f"{mgroup_name}{item[2]}超过设定目标值" event.enm_data = { "mgroup": mgroup.id, "mgroup_name": mgroup.name, "type": f"{goal_cate_str}.exceed", "type_chin": f"{goal_cate_str_chin}-超过设定目标值", "year_s": year_s, "month_s": month_s, "day_s": day_s, "val": real_val, "goal_val": goal_val, "enstat": enstat.id } event.save() Eventdo.objects.get_or_create(cate=event_cate, event=event, defaults={"cate": event_cate, "event": event}) notify_event(event) @shared_task(base=CustomTask) def king_insert_mplogx(): mpoint_codes = Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, code__startswith='K_').values_list('code', flat=True) ml = [] for m in mpoint_codes: ml.append({"N": m.replace('K_', '')}) _, res = kingClient.request(**kapis['read_batchtagrealvalue'], json={"objs": ml}) insert_mplogx_from_king_rest_chunk(res) @shared_task(base=CustomTask) def check_mpoint_offline(seconds=100): """监测测点采集掉线""" now = localtime() for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, is_unit=False)| Mpoint.objects.filter(enabled=True, type=Mpoint.MT_COMPUTE, is_rep_ep_running_state=True, is_unit=False): mc = MpointCache(mpoint.code) mpoint_data = mc.data last_data = mpoint_data.get('last_data', None) is_offline = True if last_data and last_data['last_timex'] and (now-last_data['last_timex']).total_seconds() < seconds: is_offline = False if is_offline: mc.set_fail(-2, now)