775 lines
41 KiB
Python
775 lines
41 KiB
Python
# Create your tasks here
|
|
from __future__ import absolute_import, unicode_literals
|
|
from apps.utils.tasks import CustomTask
|
|
from celery import shared_task
|
|
from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2
|
|
from apps.wpm.models import SfLog
|
|
import datetime
|
|
from django.db.models import Sum, Avg
|
|
from dateutil import tz
|
|
from django.conf import settings
|
|
from apps.wpm.services import get_sflog
|
|
from apps.mtm.models import Mgroup, Material
|
|
from apps.fim.services import get_cost_unit, get_price_unit
|
|
from apps.fim.models import Fee
|
|
from apps.enm.services import translate_eval_formula
|
|
import logging
|
|
from server.settings import get_sysconfig
|
|
from django.db.models import F, Case, When
|
|
from apps.wpm.services import get_pcoal_heat
|
|
from django.utils import timezone
|
|
from django.db.models import Max
|
|
from apps.third.king.k import kingClient
|
|
from apps.third.king.king_api import kapis
|
|
from apps.enm.services import insert_mplogx_from_king_rest_chunk, MpointCache
|
|
from django.utils.timezone import localtime
|
|
from apps.wpm.tasks import get_total_hour_now, cal_exp_duration_hour
|
|
|
|
myLogger = logging.getLogger("log")
|
|
|
|
|
|
def get_current_and_previous_time():
|
|
now = datetime.datetime.now()
|
|
pre = now - datetime.timedelta(hours=1)
|
|
return now, pre
|
|
|
|
|
|
# @shared_task(base=CustomTask)
|
|
# def get_tag_val():
|
|
# """
|
|
# 应该用不着
|
|
# """
|
|
# config = get_sysconfig()
|
|
# with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
|
|
# last_tag_id = config['enm'].get('last_tag_id', None)
|
|
# if not last_tag_id:
|
|
# mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
|
|
# if mr is None:
|
|
# last_tag_id = 0
|
|
# else:
|
|
# last_tag_id = mr.tag_id
|
|
# update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
|
|
# cursor.execute(
|
|
# "select id, val, tag_code, update_time from tag_value where id > %s order by update_time, id", (last_tag_id))
|
|
# results = cursor.fetchall() # 获取数据后保存至本地
|
|
# need_func = {} # 需要执行测点函数的字典, 此种情况只执行一次
|
|
# for row in results:
|
|
# mr_one = MpLog()
|
|
# mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
|
|
# mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={
|
|
# 'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
|
|
# mr_one.mpoint = mpoint
|
|
# mr_one.save()
|
|
# last_tag_id = mr_one.tag_id
|
|
# if mpoint.func_on_change:
|
|
# need_func[mpoint.id] = mr_one.id
|
|
# # 执行测点函数
|
|
# for key in need_func:
|
|
# mpoint_val_on_change.delay(need_func[key])
|
|
# update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
|
|
|
|
|
|
# @shared_task(base=CustomTask)
|
|
# def mpoint_val_on_change(mpLogId: str):
|
|
# """测点值变化的时候执行任务(废弃)
|
|
# """
|
|
# mplog = MpLog.objects.get(id=mpLogId)
|
|
# mpoint = mplog.mpoint
|
|
# module, func = mpoint.func_on_change.rsplit(".", 1)
|
|
# m = importlib.import_module(module)
|
|
# f = getattr(m, func)
|
|
# f(mplog) # 同步执行
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal_attrs=[]):
|
|
"""
|
|
重跑某一段时间的任务
|
|
"""
|
|
mytz = tz.gettz(settings.TIME_ZONE)
|
|
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
|
|
start_time.replace(tzinfo=mytz)
|
|
end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
|
|
start_time.replace(tzinfo=mytz)
|
|
current_time = start_time
|
|
while current_time <= end_time:
|
|
year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
|
|
cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs=[])
|
|
current_time += datetime.timedelta(hours=1)
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int, cascade=True, sflog_hours=[]):
|
|
"""
|
|
计算某一测点, 某一时间点某一小时的统计值
|
|
"""
|
|
mpoint = Mpoint.objects.get(id=mpointId)
|
|
mytz = tz.gettz(settings.TIME_ZONE)
|
|
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz) # 整点时间
|
|
dt_hour_p = dt - datetime.timedelta(hours=1) # 上个整点
|
|
dt_hour_n = dt + datetime.timedelta(hours=1) # 下个整点
|
|
if mpoint.material and mpoint.val_type in ["float", "int"]: # 如果计量的是物料 # 累计量 有的会清零,需要额外处理(还未做)
|
|
params = {"mpoint": mpoint, "type": "hour"}
|
|
params["year"], params["month"], params["day"], params["hour"] = year, month, day, hour
|
|
val = 0
|
|
val_type = mpoint.val_type
|
|
if mpoint.type == Mpoint.MT_AUTO:
|
|
mrs0 = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex")
|
|
mrs = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex")
|
|
if mrs0.exists() and mrs.exists():
|
|
last_val = getattr(mrs.last(), f"val_{val_type}")
|
|
first_val = getattr(mrs0.last(), f"val_{val_type}")
|
|
if last_val >= first_val:
|
|
val = last_val - first_val
|
|
else:
|
|
# 这里判断有可能清零了
|
|
max_val = max(mrs.aggregate(max=Max(f"val_{val_type}"))["max"], first_val)
|
|
val = max_val - first_val + last_val
|
|
elif mpoint.type == Mpoint.MT_COMPUTE and mpoint.formula:
|
|
formula = mpoint.formula
|
|
val = translate_eval_formula(formula, year, month, day, hour)
|
|
else:
|
|
return
|
|
ms, _ = MpointStat.objects.get_or_create(**params, defaults=params)
|
|
ms.val = val
|
|
ms.save()
|
|
|
|
# 更新更高级别的值
|
|
if cascade or hour == 23:
|
|
sum_dict_day = MpointStat.objects.filter(type="hour", mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum("val"))
|
|
params_day = {"type": "day", "mpoint": mpoint, "year": year, "month": month, "day": day}
|
|
ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day)
|
|
ms_day.val = sum_dict_day["sum"] if sum_dict_day["sum"] is not None else 0
|
|
ms_day.save()
|
|
|
|
if cascade or day in [28, 29, 30, 31]:
|
|
sum_dict_month = MpointStat.objects.filter(type="day", mpoint=mpoint, year=year, month=month).aggregate(sum=Sum("val"))
|
|
params_month = {"type": "month", "mpoint": mpoint, "year": year, "month": month}
|
|
ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month)
|
|
ms_month.val = sum_dict_month["sum"] if sum_dict_month["sum"] is not None else 0
|
|
ms_month.save()
|
|
|
|
if cascade or month == 12:
|
|
sum_dict_year = MpointStat.objects.filter(type="month", mpoint=mpoint, year=year).aggregate(sum=Sum("val"))
|
|
params_year = {"type": "year", "mpoint": mpoint, "year": year}
|
|
ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year)
|
|
ms_year.val = sum_dict_year["sum"] if sum_dict_year["sum"] is not None else 0
|
|
ms_year.save()
|
|
|
|
mgroup = mpoint.mgroup
|
|
if mgroup:
|
|
sflog = get_sflog(mgroup, dt)
|
|
if sflog is None:
|
|
myLogger.error(f"{mgroup.name}--{dt}")
|
|
year_s, month_s, day_s = sflog.get_ymd
|
|
|
|
params_hour_s = {
|
|
"type": "hour_s",
|
|
"mpoint": mpoint,
|
|
"sflog": sflog,
|
|
"mgroup": mgroup,
|
|
"year": year,
|
|
"month": month,
|
|
"day": day,
|
|
"year_s": year_s,
|
|
"month_s": month_s,
|
|
"day_s": day_s,
|
|
"hour": hour,
|
|
}
|
|
ms_hour_s, _ = MpointStat.objects.get_or_create(**params_hour_s, defaults=params_hour_s)
|
|
ms_hour_s.val = ms.val
|
|
ms_hour_s.save()
|
|
|
|
# 开始往上计算
|
|
sum_dict_sflog_s = MpointStat.objects.filter(type="hour_s", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog).aggregate(sum=Sum("val"))
|
|
params_sflog_s = {"type": "sflog", "mpoint": mpoint, "sflog": sflog, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup}
|
|
ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s)
|
|
ms_sflog_s.val = sum_dict_sflog_s["sum"] if sum_dict_sflog_s["sum"] is not None else 0
|
|
ms_sflog_s.save()
|
|
|
|
# next_cal_dict = [mpoint.material.id, sflog.id, year, month, day, hour, year_s, month_s, day_s]
|
|
# if next_cal_dict == cache.get('enm_cal_dict', None):
|
|
# next_cal = 0
|
|
# else:
|
|
# next_cal = 1
|
|
# cache.set('enm_cal_dict', next_cal_dict, 60)
|
|
cal_mpointstat_manual(mpoint.id, sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s, 0)
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def cal_mpointstats(is_now=1, year=None, month=None, day=None, hour=None, m_code_list=[], cal_attrs=[]):
|
|
"""
|
|
计算所有自动采集测点的统计值,默认当前小时, 可手动传入时间和测点编号集
|
|
cal_attrs: 需要计算的属性,空列表默认计算所有属性["material", "pcoal", "run_hour"]
|
|
"""
|
|
if year and month and day and hour is not None:
|
|
pass
|
|
else:
|
|
now, pre = get_current_and_previous_time()
|
|
if is_now:
|
|
year, month, day, hour = now.year, now.month, now.day, now.hour
|
|
else:
|
|
year, month, day, hour = pre.year, pre.month, pre.day, pre.hour
|
|
if m_code_list:
|
|
mpoints1 = Mpoint.objects.filter(code__in=m_code_list)
|
|
for item in mpoints1:
|
|
cal_mpointstat_hour(item.id, year, month, day, hour)
|
|
mpoints_related = Mpoint.objects.none()
|
|
code2 = []
|
|
for code in m_code_list:
|
|
mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False, formula__contains="{" + code + "}")
|
|
code2.extend(mpoints_related.values_list("code", flat=True))
|
|
code2 = list(set(code2))
|
|
for code in code2:
|
|
mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False, formula__contains="{" + code + "}")
|
|
mpoints_related = mpoints_related.distinct().order_by("report_sortstr", "create_time")
|
|
for item in mpoints_related:
|
|
cal_mpointstat_hour(item.id, year, month, day, hour)
|
|
else:
|
|
# 先统计自动采集的测点
|
|
mpoints_auto = Mpoint.objects.filter(type=Mpoint.MT_AUTO, enabled=True)
|
|
# mpoints_without_formula_group = []
|
|
for item in mpoints_auto:
|
|
# mpoints_without_formula_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour))
|
|
cal_mpointstat_hour(item.id, year, month, day, hour)
|
|
|
|
# 再统计计算测点
|
|
mpoints_compute = Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False).exclude(formula="").order_by("report_sortstr", "create_time")
|
|
# mpoints_other_group = []
|
|
for item in mpoints_compute:
|
|
# mpoints_other_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour))
|
|
cal_mpointstat_hour(item.id, year, month, day, hour)
|
|
|
|
# 先调整一下班时间,以防计算错误
|
|
get_total_hour_now() # 先处理total_hour_now
|
|
cal_exp_duration_hour() # 再处理shut_hour
|
|
|
|
# 开始计算enstat
|
|
mgroups = Mgroup.objects.filter(need_enm=True).order_by("sort")
|
|
# mgroups_group = []
|
|
year_s, month_s, day_s = 0, 0, 0
|
|
for mgroup in mgroups:
|
|
# mgroups_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['material', 'run_hour']))
|
|
year_s, month_s, day_s = cal_enstat("hour_s", None, mgroup.id, year, month, day, hour, None, None, None, True, cal_attrs)
|
|
|
|
# mgroups_t = mgroups.filter(name__in=['回转窑', '水泥磨'])
|
|
# mgroups_t_group = []
|
|
# for mgroup in mgroups_t:
|
|
# mgroups_t_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['pcoal']))
|
|
|
|
# 最后计算enstat2
|
|
cal_enstat2(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s)
|
|
# task_chain = chain(group(mpoints_without_formula_group), group(mpoints_other_group), group(mgroups_group), group(mgroups_t_group), group([cal_enstat2.s(year_s=year, month_s=month)]))
|
|
# task_chain.delay()
|
|
if hour == 21:
|
|
enm_alarm.delay(year_s, month_s, day_s)
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def cal_mpointstat_manual(mpointId: str, sflogId: str, mgroupId: str, year: int, month: int, day: int, hour: int, year_s: int, month_s: int, day_s: int, next_cal=0):
|
|
"""
|
|
手动录入的测点数据进行往上统计,一级一级往上
|
|
"""
|
|
mpoint = Mpoint.objects.get(id=mpointId)
|
|
mgroup = Mgroup.objects.get(id=mgroupId)
|
|
sum_dict_day_s = MpointStat.objects.filter(type="sflog", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum("val"))
|
|
params_day_s = {"type": "day_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup}
|
|
ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
|
|
ms_day_s.val = sum_dict_day_s["sum"] if sum_dict_day_s["sum"] is not None else 0
|
|
ms_day_s.save()
|
|
|
|
sum_dict_month_s = MpointStat.objects.filter(type="day_s", mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum("val"))
|
|
params_month_s = {"type": "month_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "mgroup": mgroup}
|
|
ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
|
|
ms_month_s.val = sum_dict_month_s["sum"] if sum_dict_month_s["sum"] is not None else 0
|
|
ms_month_s.save()
|
|
|
|
sum_dict_year_s = MpointStat.objects.filter(type="month_s", mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum("val"))
|
|
params_year_s = {"type": "year_s", "mpoint": mpoint, "year_s": year_s, "mgroup": mgroup}
|
|
ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
|
|
ms_year_s.val = sum_dict_year_s["sum"] if sum_dict_year_s["sum"] is not None else 0
|
|
ms_year_s.save()
|
|
|
|
if next_cal: # 计算工段级数据,手动录入时调用
|
|
if hour:
|
|
cal_enstat("hour_s", sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
|
|
else:
|
|
cal_enstat("sflog", sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
|
|
|
|
|
|
types_list = ["hour_s", "sflog", "day_s", "month_st", "month_s", "year_s"]
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, cascade=True, cal_attrs=[]):
|
|
"""
|
|
计算能源数据统计
|
|
"""
|
|
if cascade:
|
|
if type in types_list:
|
|
start_index = types_list.index(type)
|
|
new_types_list = types_list[start_index:]
|
|
for type in new_types_list:
|
|
year_s, month_s, day_s = cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, False)
|
|
return year_s, month_s, day_s
|
|
if not cal_attrs:
|
|
this_cal_attrs = ["material", "pcoal", "run_hour"]
|
|
else:
|
|
this_cal_attrs = cal_attrs[:]
|
|
|
|
mgroup = Mgroup.objects.get(id=mgroupId)
|
|
if sflogId:
|
|
sflog = SfLog.objects.get(id=sflogId)
|
|
elif year and month and day and hour is not None:
|
|
mytz = tz.gettz(settings.TIME_ZONE)
|
|
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
|
|
sflog = get_sflog(mgroup, dt)
|
|
if sflog:
|
|
year_s, month_s, day_s = sflog.get_ymd
|
|
team = sflog.team
|
|
# 如果没有班组,那么month_st无需计算
|
|
if team is None and type == "month_st":
|
|
return year_s, month_s, day_s
|
|
if type == "hour_s":
|
|
enstat, _ = EnStat.objects.get_or_create(
|
|
type="hour_s",
|
|
mgroup=mgroup,
|
|
year=year,
|
|
month=month,
|
|
day=day,
|
|
hour=hour,
|
|
defaults={"type": "hour_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s, "year": year, "month": month, "day": day, "hour": hour, "sflog": sflog},
|
|
)
|
|
elif type == "sflog":
|
|
enstat, _ = EnStat.objects.get_or_create(type="sflog", sflog=sflog, defaults={"type": "sflog", "sflog": sflog, "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s})
|
|
elif type == "day_s":
|
|
enstat, _ = EnStat.objects.get_or_create(
|
|
type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, defaults={"type": "day_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s}
|
|
)
|
|
elif type == "month_st":
|
|
enstat, _ = EnStat.objects.get_or_create(
|
|
type="month_st", mgroup=mgroup, team=team, year_s=year_s, month_s=month_s, defaults={"type": "month_st", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "team": team}
|
|
)
|
|
elif type == "month_s":
|
|
enstat, _ = EnStat.objects.get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, defaults={"type": "month_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s})
|
|
elif type == "year_s":
|
|
enstat, _ = EnStat.objects.get_or_create(type="year_s", mgroup=mgroup, year_s=year_s, defaults={"type": "year_s", "mgroup": mgroup, "year_s": year_s})
|
|
|
|
if "material" in this_cal_attrs:
|
|
# 消耗物料统计(包括电耗)
|
|
input_materials = []
|
|
has_product = False
|
|
input_materials = mgroup.input_materials
|
|
if mgroup.product:
|
|
input_materials.insert(0, mgroup.product.id)
|
|
has_product = True
|
|
imaterial_cost_unit = 0
|
|
imaterial_data = []
|
|
for ind, mid in enumerate(input_materials):
|
|
material = Material.objects.get(id=mid)
|
|
if type == "hour_s":
|
|
mps = MpointStat.objects.filter(type="hour_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material)
|
|
elif type == "sflog":
|
|
mps = MpointStat.objects.filter(type="sflog", sflog=sflog, mpoint__material=material)
|
|
elif type == "day_s":
|
|
mps = MpointStat.objects.filter(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material)
|
|
elif type == "month_st":
|
|
mps = MpointStat.objects.filter(type="sflog", mgroup=mgroup, sflog__team=team, year_s=year_s, month_s=month_s, mpoint__material=material)
|
|
elif type == "month_s":
|
|
mps = MpointStat.objects.filter(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material)
|
|
elif type == "year_s":
|
|
mps = MpointStat.objects.filter(type="year_s", mgroup=mgroup, year_s=year_s, mpoint__material=material)
|
|
if mps.filter(mpoint__is_rep_mgroup=True).exists():
|
|
mps = mps.filter(mpoint__is_rep_mgroup=True)
|
|
amount_consume = mps.aggregate(sum=Sum("val"))["sum"]
|
|
if amount_consume is None:
|
|
amount_consume = 0
|
|
if ind == 0 and has_product: # 如果是产量
|
|
enstat.total_production = amount_consume
|
|
else:
|
|
if material.code in ["pcoal", "cair", "steam"]:
|
|
price_unit = 0
|
|
else:
|
|
price_unit = get_price_unit(material, year_s, month_s)
|
|
cost = amount_consume * price_unit
|
|
try:
|
|
cost_unit = cost / enstat.total_production
|
|
except ZeroDivisionError:
|
|
cost_unit = 0
|
|
imaterial_cost_unit = imaterial_cost_unit + cost_unit
|
|
if material.code == "elec":
|
|
enstat.elec_consume = amount_consume
|
|
enstat.elec_coal_consume = enstat.elec_consume * 0.1229 / 1000
|
|
try:
|
|
enstat.elec_consume_unit = enstat.elec_consume / enstat.total_production
|
|
except ZeroDivisionError:
|
|
enstat.elec_consume_unit = 0
|
|
elif material.code == "water":
|
|
enstat.water_consume = amount_consume
|
|
elif material.code == "pcoal":
|
|
enstat.pcoal_consume = amount_consume
|
|
elif material.code == "cair":
|
|
enstat.cair_consume = amount_consume
|
|
elif material.code == "steam":
|
|
enstat.out_steam = amount_consume
|
|
enstat.out_steam_coal = enstat.out_steam * 128.6 / 1000
|
|
elif material.code == "ccr":
|
|
enstat.ccr_consume = amount_consume
|
|
enstat.kiln_end_heat = enstat.total_production - enstat.ccr_consume
|
|
|
|
imaterial_item = {
|
|
"material": mid,
|
|
"material_name": material.name,
|
|
"material_code": material.code,
|
|
"material_type": material.type,
|
|
"price_unit": price_unit,
|
|
"amount_consume": amount_consume,
|
|
"cost": cost,
|
|
"cost_unit": cost_unit,
|
|
}
|
|
imaterial_data.append(imaterial_item)
|
|
enstat.imaterial_data = imaterial_data
|
|
# 其他成本数据
|
|
other_cost_data = []
|
|
other_cost_unit = 0
|
|
fee_qs = Fee.objects.order_by("sort")
|
|
for fee in fee_qs:
|
|
item = {"element": fee.element, "cate": fee.cate, "name": fee.name, "id": fee.id}
|
|
item["cost_unit"] = get_cost_unit(mgroup, fee, year_s, month_s)
|
|
other_cost_unit = other_cost_unit + item["cost_unit"]
|
|
other_cost_data.append(item)
|
|
enstat.other_cost_data = other_cost_data
|
|
enstat.production_cost_unit = imaterial_cost_unit + other_cost_unit
|
|
enstat.save()
|
|
if mgroup.cate == "section":
|
|
# 更新所监测设备测点的total_production
|
|
MpointStat.objects.filter(mgroup=enstat.mgroup, mpoint__material__code="elec").exclude(mpoint__ep_monitored=None).update(
|
|
total_production=enstat.total_production, elec_consume_unit=Case(When(total_production__gt=0, then=F("val") / F("total_production")), default=0)
|
|
)
|
|
if enstat.mgroup.cate == "section":
|
|
if "material" in this_cal_attrs:
|
|
# 算能耗
|
|
if enstat.mgroup.name != "回转窑":
|
|
try:
|
|
enstat.en_consume_unit = enstat.elec_coal_consume / enstat.total_production
|
|
except ZeroDivisionError:
|
|
enstat.en_consume_unit = 0
|
|
enstat.save()
|
|
# 计算一些其他数据
|
|
if type == "month_st" and "material" in this_cal_attrs: # 如果计算的是班月,把主要设备电耗数据拉过来, 方便查询
|
|
# res = MpointStat.objects.filter(type='sflog', year_s=year_s, month_s=month_s, sflog__team=enstat.team, mpoint__ep_monitored__power_kw__gte=100).values(
|
|
# equipment=F('mpoint__ep_monitored__id'), equipment_name=F('mpoint__ep_monitored__name')).annotate(consume=Sum('val'))
|
|
# res = list(res)
|
|
# for item in res:
|
|
# try:
|
|
# item['consume_unit'] = item['consume'] / enstat.total_production
|
|
# except ZeroDivisionError:
|
|
# item['consume_unit'] = None
|
|
res = MpointStat.objects.filter(type="sflog", year_s=year_s, month_s=month_s, sflog__team=enstat.team, mpoint__ep_monitored__power_kw__gte=100).values(
|
|
"elec_consume_unit", equipment=F("mpoint__ep_monitored__id"), equipment_name=F("mpoint__ep_monitored__name")
|
|
)
|
|
enstat.equip_elec_data = list(res)
|
|
enstat.save()
|
|
if enstat.mgroup.name == "回转窑": # 算单位产品(综合电耗/标煤耗/综合能耗)
|
|
# 综合电耗
|
|
if enstat.type in ["hour_s", "day_s", "year_s", "month_s"]:
|
|
pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="原料磨").first()
|
|
if pre_enstat:
|
|
enstat.celec_consume_unit = enstat.elec_consume_unit + get_sysconfig("enm.enm_lhxs") * pre_enstat.elec_consume_unit
|
|
enstat.save()
|
|
|
|
# 算总煤耗
|
|
if "pcoal" in this_cal_attrs:
|
|
if type in ["hour_s", "sflog", "day_s"]:
|
|
enstat.pcoal_heat = get_pcoal_heat(enstat.year_s, enstat.month_s, enstat.day_s)
|
|
enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 7000
|
|
elif type == "month_st":
|
|
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s, sflog__team=enstat.team).aggregate(
|
|
sum=Sum("pcoal_coal_consume")
|
|
)["sum"]
|
|
elif type == "month_s":
|
|
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
|
|
elif type == "year_s":
|
|
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
|
|
|
|
if enstat.pcoal_coal_consume is None:
|
|
enstat.pcoal_coal_consume = 0
|
|
|
|
# 算单位产品标煤耗
|
|
try:
|
|
enstat.coal_consume_unit = enstat.pcoal_coal_consume * 1000 / enstat.total_production
|
|
except ZeroDivisionError:
|
|
enstat.coal_consume_unit = 0
|
|
|
|
# 综合能耗
|
|
enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit
|
|
enstat.save()
|
|
elif enstat.mgroup.name == "水泥磨" and enstat.type not in ["month_st", "sflog"] and "pcoal" in this_cal_attrs:
|
|
pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
|
|
if pre_enstat:
|
|
# 综合能耗
|
|
enstat.cen_consume_unit = enstat.elec_consume_unit * 0.1229 + 0.7 * pre_enstat.cen_consume_unit
|
|
enstat.save()
|
|
elif enstat.mgroup.name == "余热发电":
|
|
pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
|
|
if pre_enstat:
|
|
# 吨熟料发电量
|
|
try:
|
|
enstat.production_elec_unit = enstat.total_production / pre_enstat.total_production
|
|
enstat.save()
|
|
except ZeroDivisionError:
|
|
enstat.production_elec_unit = 0
|
|
enstat.save()
|
|
# 运转时长相关
|
|
if type != "hour_s" and "run_hour" in this_cal_attrs:
|
|
enstat.total_hour_now, enstat.shut_hour = get_total_hour_now_and_shut_hour(enstat)
|
|
enstat.run_hour = enstat.total_hour_now - enstat.shut_hour
|
|
try:
|
|
enstat.run_rate = (enstat.run_hour / enstat.total_hour_now) * 100
|
|
except ZeroDivisionError:
|
|
enstat.run_rate = 0
|
|
try:
|
|
enstat.production_hour = enstat.total_production / enstat.run_hour
|
|
except ZeroDivisionError:
|
|
enstat.production_hour = 0
|
|
enstat.save()
|
|
enstat.save()
|
|
return year_s, month_s, day_s # 返回这个值主要为了全厂数据计算而用
|
|
|
|
|
|
def get_total_hour_now_and_shut_hour(enstat: EnStat):
|
|
from apps.wpm.models import SfLog
|
|
|
|
# if enstat.type == 'hour_s':
|
|
# # 找到停机记录,并划分到该小时
|
|
# end_time = datetime.datetime(enstat.year, enstat.month, enstat.day, enstat.hour)
|
|
# start_time = end_time - datetime.timedelta(hours=1)
|
|
# sts = StLog.objects.filter(mgroup=enstat.mgroup)
|
|
# sts = (sts.filter(start_time__gt=start_time, start_time__lt=end_time)|sts.filter(start_time__lt=start_time, end_time=None)|sts.filter(end_time__gt=start_time, end_time__lt=end_time)).distinct()
|
|
# shut_hour = 0
|
|
# for i in sts:
|
|
# if i.end_time is None:
|
|
# run_hour = 0
|
|
# if i.start_time > start_time:
|
|
# run_hour = (i.start_time - start_time).total_seconds/3600
|
|
# shut_hour = 1 - run_hour
|
|
# shut_hour = shut_hour +
|
|
# return 1, 0
|
|
if enstat.type == "sflog":
|
|
sflog = enstat.sflog
|
|
return sflog.total_hour_now, sflog.shut_hour
|
|
elif enstat.type == "day_s":
|
|
res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, work_date__day=enstat.day_s, mgroup=enstat.mgroup).aggregate(
|
|
sum1=Sum("total_hour_now"), sum2=Sum("shut_hour")
|
|
)
|
|
return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
|
|
elif enstat.type == "month_st":
|
|
res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup, team=enstat.team).aggregate(sum1=Sum("total_hour_now"), sum2=Sum("shut_hour"))
|
|
return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
|
|
elif enstat.type == "month_s":
|
|
res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_hour_now"), sum2=Sum("shut_hour"))
|
|
return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
|
|
elif enstat.type == "year_s":
|
|
res = SfLog.objects.filter(work_date__year=enstat.year_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_hour_now"), sum2=Sum("shut_hour"))
|
|
return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def cal_enstat2(type: str, year_s: int, month_s: int, day_s: int, cascade=True):
|
|
if cascade:
|
|
if type == "day_s":
|
|
cal_enstat2("day_s", year_s, month_s, day_s, False)
|
|
cal_enstat2("month_s", year_s, month_s, day_s, False)
|
|
elif type == "month_s":
|
|
cal_enstat2("month_s", year_s, month_s, day_s, False)
|
|
else:
|
|
return
|
|
if type == "month_s":
|
|
enstat2, _ = EnStat2.objects.get_or_create(type="month_s", year_s=year_s, month_s=month_s, defaults={"year_s": year_s, "month_s": month_s, "type": "month_s"})
|
|
elif type == "day_s":
|
|
enstat2, _ = EnStat2.objects.get_or_create(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s, defaults={"year_s": year_s, "month_s": month_s, "day_s": day_s, "type": "day_s"})
|
|
# enstat2 = EnStat2.objects.select_for_update().get(id=enstat2.id) # 加锁
|
|
material_cement = Material.objects.get(code="cement")
|
|
material_clinker = Material.objects.get(code="clinker")
|
|
material_bulk_cement = Material.objects.get(code="bulk_cement")
|
|
material_bag_cement = Material.objects.get(code="bag_cement")
|
|
|
|
enstat2.bulk_cement_price = get_price_unit(material_bulk_cement, year_s, month_s)
|
|
enstat2.clinker_price = get_price_unit(material_clinker, year_s, month_s)
|
|
enstat2.bag_cement_price = get_price_unit(material_bag_cement, year_s, month_s)
|
|
if type == "month_s":
|
|
enstat2.bulk_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material=material_bulk_cement, year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"]
|
|
elif type == "day_s":
|
|
enstat2.bulk_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material=material_bulk_cement, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"]
|
|
if enstat2.bulk_cement_val is None:
|
|
enstat2.bulk_cement_val = 0
|
|
if type == "month_s":
|
|
enstat2.bag_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material=material_bag_cement, year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"]
|
|
elif type == "day_s":
|
|
enstat2.bag_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material=material_bag_cement, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"]
|
|
if enstat2.bag_cement_val is None:
|
|
enstat2.bag_cement_val = 0
|
|
if type == "month_s":
|
|
enstat2.clinker_val = MpointStat.objects.filter(type="month_s", mpoint__material=material_cement, year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"]
|
|
elif type == "day_s":
|
|
enstat2.clinker_val = MpointStat.objects.filter(type="day_s", mpoint__material=material_cement, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"]
|
|
if enstat2.clinker_val is None:
|
|
enstat2.clinker_val = 0
|
|
|
|
enstat2.industry_total_val = (enstat2.bulk_cement_val * enstat2.bulk_cement_price + enstat2.bag_cement_val * enstat2.bag_cement_price + enstat2.clinker_val * enstat2.clinker_price) / 10000
|
|
|
|
if type == "month_s":
|
|
res = EnStat.objects.filter(mgroup__product__code="cement", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
|
|
elif type == "day_s":
|
|
res = EnStat.objects.filter(mgroup__product__code="cement", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
|
|
|
|
enstat2.cement_val = res["sum"] if res["sum"] else 0
|
|
enstat2.cement_cost_unit = res["avg"] if res["avg"] else 0
|
|
enstat2.industry_add_val = enstat2.industry_total_val - enstat2.cement_val * enstat2.cement_cost_unit / 10000
|
|
|
|
# 全厂电量
|
|
# 全厂的耗电量和水量都得单独处理
|
|
use_mpoint_elec_val = False
|
|
mp_elecs = Mpoint.objects.filter(material__code="elec", code__endswith="__all")
|
|
if mp_elecs.exists(): #
|
|
use_mpoint_elec_val = True
|
|
if type == "month_s":
|
|
enstat_qs = EnStat.objects.filter(type="month_s", year_s=year_s, month_s=month_s)
|
|
elif type == "day_s":
|
|
enstat_qs = EnStat.objects.filter(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s)
|
|
res_elec_pcoal = enstat_qs.aggregate(
|
|
sum1=Sum("elec_consume"), sum2=Sum("elec_coal_consume"), sum3=Sum("pcoal_consume"), sum4=Sum("pcoal_coal_consume"), sum5=Sum("water_consume"), sum6=Sum("cair_consume")
|
|
)
|
|
if use_mpoint_elec_val:
|
|
if type == "day_s":
|
|
enstat2.elec_consume = MpointStat.objects.filter(type="day", mpoint__in=mp_elecs, year=year_s, month=month_s, day=day_s).aggregate(sum=Sum("val"))["sum"]
|
|
elif type == "month_s":
|
|
enstat2.elec_consume = MpointStat.objects.filter(type="month", mpoint__in=mp_elecs, year=year_s, month=month_s).aggregate(sum=Sum("val"))["sum"]
|
|
if enstat2.elec_consume is None:
|
|
enstat2.elec_consume = 0
|
|
else:
|
|
enstat2.elec_consume = res_elec_pcoal["sum1"] if res_elec_pcoal["sum1"] else 0
|
|
|
|
enstat2.elec_coal_consume = enstat2.elec_consume * 0.1229 / 1000
|
|
|
|
# 其他的统计工段合就行
|
|
enstat2.pcoal_consume = res_elec_pcoal["sum3"] if res_elec_pcoal["sum3"] else 0
|
|
enstat2.pcoal_coal_consume = res_elec_pcoal["sum4"] if res_elec_pcoal["sum4"] else 0
|
|
enstat2.water_consume = res_elec_pcoal["sum5"] if res_elec_pcoal["sum5"] else 0
|
|
enstat2.cair_consume = res_elec_pcoal["sum6"] if res_elec_pcoal["sum6"] else 0
|
|
enstat2.en_consume = enstat2.pcoal_coal_consume + enstat2.elec_coal_consume
|
|
try:
|
|
enstat2.en_consume_unit = enstat2.en_consume / enstat2.industry_total_val
|
|
except ZeroDivisionError:
|
|
enstat2.en_consume_unit = 0
|
|
try:
|
|
enstat2.en_add_consume_unit = enstat2.en_consume / enstat2.industry_add_val
|
|
except ZeroDivisionError:
|
|
enstat2.en_add_consume_unit = 0
|
|
enstat2.save()
|
|
|
|
|
|
def cal_enstat_pcoal_change(enstat: EnStat, new_pcoal_heat):
|
|
type = enstat.type
|
|
if type in ["hour_s", "sflog", "day_s"]:
|
|
enstat.pcoal_heat = new_pcoal_heat
|
|
enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 7000
|
|
elif type == "month_st":
|
|
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s, sflog__team=enstat.team).aggregate(
|
|
sum=Sum("pcoal_coal_consume")
|
|
)["sum"]
|
|
elif type == "month_s":
|
|
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
|
|
elif type == "year_s":
|
|
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
|
|
|
|
if enstat.pcoal_coal_consume is None:
|
|
enstat.pcoal_coal_consume = 0
|
|
# 算单位产品标煤耗
|
|
try:
|
|
enstat.coal_consume_unit = enstat.pcoal_coal_consume * 1000 / enstat.total_production
|
|
except ZeroDivisionError:
|
|
enstat.coal_consume_unit = 0
|
|
|
|
# 综合能耗
|
|
enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit
|
|
|
|
enstat.save(update_fields=["pcoal_heat", "pcoal_coal_consume", "coal_consume_unit", "cen_consume_unit"])
|
|
# 同步更新水泥磨的综合能耗,这步有可能不成功,因为水泥磨是后算的, 但是当pcoal_change时这个就有用了
|
|
if type not in ["month_st", "sflog"]:
|
|
next_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥磨").first()
|
|
if next_enstat:
|
|
next_enstat.cen_consume_unit = next_enstat.elec_consume_unit * 0.1229 + 0.7 * enstat.cen_consume_unit
|
|
next_enstat.save(update_fields=["cen_consume_unit"])
|
|
|
|
|
|
enm_alarms_list = [["回转窑", "celec_consume_unit", "单位产品综合电耗"], ["回转窑", "coal_consume_unit", "单位产品标煤耗"], ["水泥磨", "elec_consume_unit", "单位产品分布电耗"]]
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def enm_alarm(year_s: int, month_s: int, day_s: int):
|
|
"""
|
|
enm报警任务
|
|
"""
|
|
from apps.ecm.models import Event, EventCate, Eventdo
|
|
from apps.mtm.models import Goal
|
|
from apps.ecm.service import notify_event
|
|
|
|
now = timezone.now()
|
|
event_cate, _ = EventCate.objects.get_or_create(code="consume_exceed", defaults={"name": "能耗超过目标值", "code": "consume_exceed", "trigger": 30})
|
|
for item in enm_alarms_list:
|
|
mgroups = Mgroup.objects.filter(name=item[0])
|
|
for mgroup in mgroups:
|
|
enstat = EnStat.objects.filter(mgroup=mgroup, type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).first()
|
|
if enstat:
|
|
mgroup_name = item[0]
|
|
goal_cate_str = item[1]
|
|
real_val = getattr(enstat, goal_cate_str, None)
|
|
goal = Goal.objects.filter(goal_cate__code=goal_cate_str, year=year_s, mgroup=mgroup).first()
|
|
if goal:
|
|
goal_val = getattr(goal, f"goal_val_{month_s}", None)
|
|
if goal_val and real_val and real_val > goal_val: # 触发事件
|
|
event = Event()
|
|
event.obj_cate = "enm"
|
|
event.happen_time = now
|
|
event.voice_msg = f"{mgroup_name}{item[2]}超过设定目标值"
|
|
event.enm_data = {
|
|
"mgroup": mgroup.id,
|
|
"mgroup_name": mgroup.name,
|
|
"type": f"{goal_cate_str}.exceed",
|
|
"year_s": year_s,
|
|
"month_s": month_s,
|
|
"day_s": day_s,
|
|
"val": real_val,
|
|
"goal_val": goal_val,
|
|
"enstat": enstat.id,
|
|
}
|
|
event.save()
|
|
Eventdo.objects.get_or_create(cate=event_cate, event=event, defaults={"cate": event_cate, "event": event})
|
|
notify_event(event)
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def king_insert_mplogx():
|
|
mpoint_codes = Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, code__startswith="K_").values_list("code", flat=True)
|
|
ml = []
|
|
for m in mpoint_codes:
|
|
ml.append({"N": m.replace("K_", "")})
|
|
_, res = kingClient.request(**kapis["read_batchtagrealvalue"], json={"objs": ml})
|
|
insert_mplogx_from_king_rest_chunk(res)
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def check_mpoint_offline(seconds=100):
|
|
"""监测测点采集掉线"""
|
|
now = localtime()
|
|
for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO) | Mpoint.objects.filter(enabled=True, type=Mpoint.MT_COMPUTE, is_rep_ep_running_state=True):
|
|
mc = MpointCache(mpoint.code)
|
|
mpoint_data = mc.data
|
|
last_data = mpoint_data.get("last_data", None)
|
|
is_offline = True
|
|
if last_data and last_data["last_timex"] and (now - last_data["last_timex"]).total_seconds() < seconds:
|
|
is_offline = False
|
|
if is_offline:
|
|
mc.set_fail(-2, now)
|