factory/apps/enm/tasks.py

1281 lines
67 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Create your tasks here
from apps.utils.tasks import CustomTask
from celery import shared_task
from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2, Xscript
from apps.wpm.models import SfLog, StLog
import datetime
from django.db.models import Q
from django.db.models import Sum, Avg
from dateutil import tz
from django.conf import settings
from apps.wpm.services import get_sflog
from apps.mtm.models import Mgroup, Material
from apps.fim.services import get_cost_unit, get_price_unit
from apps.fim.models import Fee
from apps.enm.services import translate_eval_formula
import logging
from server.settings import get_sysconfig, update_sysconfig
from django.db.models import F
from apps.wpm.services import get_pcoal_heat
from django.utils import timezone
from django.db.models import Max
from apps.third.king.k import kingClient
from apps.third.king.king_api import kapis
from apps.enm.services import insert_mplogx_from_king_rest_chunk, MpointCache
from django.utils.timezone import localtime
from apps.wpm.tasks import get_total_sec_now, cal_exp_duration_sec
from apps.utils.sql import DbConnection
from apps.enm.services import db_insert_mplogx_batch, get_elec_level
from apps.enm.xscript import main
from django.core.exceptions import ObjectDoesNotExist
from django.utils.timezone import make_aware
myLogger = logging.getLogger("log")
def get_current_and_previous_time():
now = datetime.datetime.now()
pre = now - datetime.timedelta(hours=1)
return now, pre
@shared_task(base=CustomTask)
def insert_mplogx_from_xscript(xscript_id):
xscript = Xscript.objects.get(id=xscript_id)
mcodes_list = Mpoint.objects.filter(enabled=True).values_list('code', flat=True)
if mcodes_list:
main(xscript, mcodes_list)
@shared_task(base=CustomTask)
def db_insert_mplogx(limit:bool=True):
"""
从数据库转存到超表
"""
config = get_sysconfig()
with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
last_tag_id = config['enm'].get('last_tag_id', None)
if last_tag_id is None:
raise Exception("last_tag_id is None")
cursor.execute("select count(id) from tag_value where id > %s", (last_tag_id))
count = cursor.fetchone()[0]
if limit and count > 400:
raise Exception("db inset count > 400")
cursor.execute(
"select id, val, tag_code, data_time from tag_value where id > %s order by id, data_time", (last_tag_id, ))
rows = cursor.fetchall() # 获取数据后保存至本地
if rows:
last_tag_id = rows[-1][0]
db_insert_mplogx_batch(rows)
update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
@shared_task(base=CustomTask)
def db_ins_mplogx():
"""
从数据库转存到超表
"""
config = get_sysconfig()
with DbConnection(config['enm1']['db_host'], config['enm1']['db_user'], config['enm1']['db_password'], config['enm1']['db_database1']) as cursor:
bill_date = config['enm1'].get('bill_date', None)
batchs = config['enm1'].get('batch', None)
if not batchs:
raise Exception("batch is None")
try:
bill_date = datetime.datetime.strptime(bill_date, '%Y-%m-%d %H:%M:%S')
except ValueError:
raise Exception(f"Invalid date format in {bill_date}")
if bill_date is None:
raise Exception("bill_date is None")
query = """
SELECT id, de_real_quantity, inv_code, bill_date
FROM sa_weigh_view
WHERE bill_date >= %s and de_real_quantity > 0
AND inv_code IN %s
ORDER BY bill_date
"""
cursor.execute(query, (bill_date, tuple(batchs)))
rows = cursor.fetchall() # 获取数据后保存至本地
if rows:
bill_date_x = rows[-1][-1]
db_insert_mplogx_batch(rows)
update_sysconfig({'enm1': {'bill_date': bill_date_x.strftime('%Y-%m-%d %H:%M:%S')}})
# @shared_task(base=CustomTask)
# def mpoint_val_on_change(mpLogId: str):
# """测点值变化的时候执行任务(废弃)
# """
# mplog = MpLog.objects.get(id=mpLogId)
# mpoint = mplog.mpoint
# module, func = mpoint.func_on_change.rsplit(".", 1)
# m = importlib.import_module(module)
# f = getattr(m, func)
# f(mplog) # 同步执行
@shared_task(base=CustomTask)
def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal_attrs=[], mpoint_stat=False):
"""
重跑某一段时间的任务
mpoint_stat: True时从已有的MpointStat hour记录开始重算(跳过MpLogx)只重算day/month/year/sflog聚合速度更快
"""
mytz = tz.gettz(settings.TIME_ZONE)
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
start_time = start_time.replace(tzinfo=mytz)
end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
end_time = end_time.replace(tzinfo=mytz)
if mpoint_stat:
_recal_from_mpointstat(start_time, end_time, m_code_list, cal_attrs)
else:
current_time = start_time
while current_time <= end_time:
year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs)
myLogger.info("now: {} cal_mpointstats completed: {}".format(datetime.datetime.now(), current_time))
current_time += datetime.timedelta(hours=1)
def _recal_from_mpointstat(start_time, end_time, m_code_list=[], cal_attrs=[]):
"""
从已有的MpointStat hour记录开始批量重算day/month/year/sflog聚合
不读取MpLogx速度远快于完整重算
"""
# 确定需要重算的测点
if m_code_list:
mpoints = list(Mpoint.objects.filter(code__in=m_code_list, enabled=True))
# 也要包含依赖这些测点的计算测点
related = Mpoint.objects.none()
for code in m_code_list:
related = related | Mpoint.objects.filter(
type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False,
formula__contains='{' + code + '}'
)
mpoints.extend(list(related.distinct()))
mpoints = list({mp.id: mp for mp in mpoints}.values()) # 去重
else:
mpoints = list(Mpoint.objects.filter(enabled=True, material__isnull=False))
mpoint_ids = [mp.id for mp in mpoints]
# 收集需要重算的所有 (year, month, day) 和 (year, month)
days_set = set()
months_set = set()
years_set = set()
current_time = start_time
while current_time <= end_time:
days_set.add((current_time.year, current_time.month, current_time.day))
months_set.add((current_time.year, current_time.month))
years_set.add(current_time.year)
current_time += datetime.timedelta(hours=1)
myLogger.info(f"_recal_from_mpointstat: {len(mpoints)} mpoints, {len(days_set)} days")
# 批量重算 day = sum(hour)
for year, month, day in days_set:
# 一次查询拿到所有测点该天的hour汇总
hour_sums = dict(
MpointStat.objects.filter(
type="hour", mpoint_id__in=mpoint_ids, year=year, month=month, day=day
).values('mpoint_id').annotate(total=Sum('val')).values_list('mpoint_id', 'total')
)
for mp in mpoints:
val = hour_sums.get(mp.id)
if val is None:
continue
params_day = {"type": "day", "mpoint": mp, "year": year, "month": month, "day": day}
ms_day, _ = MpointStat.safe_get_or_create(**params_day, defaults=params_day)
if ms_day.val_correct is None:
ms_day.val = val
ms_day.val_origin = val
ms_day.save()
# 批量重算 month = sum(day)
for year, month in months_set:
day_sums = dict(
MpointStat.objects.filter(
type="day", mpoint_id__in=mpoint_ids, year=year, month=month
).values('mpoint_id').annotate(total=Sum('val')).values_list('mpoint_id', 'total')
)
for mp in mpoints:
val = day_sums.get(mp.id)
if val is None:
continue
params_month = {"type": "month", "mpoint": mp, "year": year, "month": month}
ms_month, _ = MpointStat.safe_get_or_create(**params_month, defaults=params_month)
if ms_month.val_correct is None:
ms_month.val = val
ms_month.val_origin = val
ms_month.save()
# 批量重算 year = sum(month)
for year in years_set:
month_sums = dict(
MpointStat.objects.filter(
type="month", mpoint_id__in=mpoint_ids, year=year
).values('mpoint_id').annotate(total=Sum('val')).values_list('mpoint_id', 'total')
)
for mp in mpoints:
val = month_sums.get(mp.id)
if val is None:
continue
params_year = {"type": "year", "mpoint": mp, "year": year}
ms_year, _ = MpointStat.safe_get_or_create(**params_year, defaults=params_year)
if ms_year.val_correct is None:
ms_year.val = val
ms_year.val_origin = val
ms_year.save()
# 重算 sflog 相关统计 (hour_s -> sflog)
mytz = tz.gettz(settings.TIME_ZONE)
mgroups = Mgroup.objects.filter(need_enm=True).order_by("sort")
current_time = start_time
sflog_cache = {}
while current_time <= end_time:
year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz)
for mgroup in mgroups:
cache_key = (mgroup.id, year, month, day, hour)
if cache_key not in sflog_cache:
sflog = get_sflog(mgroup, dt)
sflog_cache[cache_key] = sflog
sflog = sflog_cache[cache_key]
if sflog is None:
continue
year_s, month_s, day_s = sflog.get_ymd
# 获取该mgroup下的测点
group_mpoints = [mp for mp in mpoints if mp.mgroup_id == mgroup.id]
for mp in group_mpoints:
# 找到对应的hour stat
ms_hour = MpointStat.objects.filter(
type="hour", mpoint=mp, year=year, month=month, day=day, hour=hour
).first()
if ms_hour is None:
continue
params_hour_s = {
"type": "hour_s", "mpoint": mp, "sflog": sflog, "mgroup": mgroup,
"year": year, "month": month, "day": day,
"year_s": year_s, "month_s": month_s, "day_s": day_s, "hour": hour,
}
ms_hour_s, _ = MpointStat.safe_get_or_create(**params_hour_s, defaults=params_hour_s)
ms_hour_s.val = ms_hour_s.val_correct if ms_hour_s.val_correct is not None else ms_hour.val
ms_hour_s.save()
# 重算 sflog 聚合
for mp in group_mpoints:
sflog_key = (mp.id, sflog.id, year_s, month_s, day_s)
if sflog_key in sflog_cache:
continue # 同一sflog只算一次
sflog_cache[sflog_key] = True
params_sflog_s = {
"type": "sflog", "mpoint": mp, "sflog": sflog,
"year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup,
}
ms_sflog_s, _ = MpointStat.safe_get_or_create(**params_sflog_s, defaults=params_sflog_s)
if ms_sflog_s.val_correct is None:
sum_val = MpointStat.objects.filter(
type="hour_s", mpoint=mp, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog
).aggregate(sum=Sum("val"))
ms_sflog_s.val = sum_val['sum'] if sum_val['sum'] is not None else 0
ms_sflog_s.val_origin = ms_sflog_s.val
ms_sflog_s.save()
myLogger.info("now: {} _recal_from_mpointstat completed: {}".format(datetime.datetime.now(), current_time))
current_time += datetime.timedelta(hours=1)
# 重算 enstat
current_time = start_time
while current_time <= end_time:
year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
for mgroup in mgroups:
cal_enstat("hour_s", None, mgroup.id, year, month, day, hour, None, None, None, True, cal_attrs)
current_time += datetime.timedelta(hours=1)
myLogger.info("_recal_from_mpointstat completed all")
@shared_task(base=CustomTask)
def correct_bill_date():
"""
定时更新bill_date时间为前一天
"""
now_time = datetime.datetime.now()
update_time = now_time - datetime.timedelta(hours=24)
bill_date = make_aware(update_time)
update_sysconfig({'enm1': {'bill_date': bill_date.strftime('%Y-%m-%d %H:%M:%S')}})
@shared_task(base=CustomTask)
def cal_mpointstats_scheduled_tasks(m_code_list=None, cal_attrs=None):
"""
重跑某一段时间的任务
"""
if m_code_list is None:
m_code_list = []
if cal_attrs is None:
cal_attrs = []
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(hours=24)
start_time = make_aware(start_time)
end_time = make_aware(end_time)
current_time = start_time
while current_time <= end_time:
year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs)
current_time += datetime.timedelta(hours=1)
def get_first_stlog_time_from_duration(mgroup:Mgroup, dt_start:datetime, dt_end: datetime):
st_qs = StLog.objects.filter(is_shutdown=True, mgroup=mgroup)
st = st_qs.filter(start_time__lte=dt_start, end_time=None).order_by("start_time").last()
if st:
return st, "ending"
st = st_qs.filter(start_time__gte=dt_start, start_time__lte=dt_end, duration_sec__gte=600).order_by("start_time").last()
if st:
return st, "start"
st = st_qs.filter(end_time__gte=dt_start, end_time__lte=dt_end, duration_sec__gte=600).order_by("end_time").first()
if st:
return st, "end"
return None, ""
@shared_task(base=CustomTask)
def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int, cascade=True, sflog_hours=[]):
"""
计算某一测点, 某一时间点某一小时的统计值
"""
val_level = None
mpoint = Mpoint.objects.get(id=mpointId)
mytz = tz.gettz(settings.TIME_ZONE)
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz) # 整点时间
dt_hour_p= dt - datetime.timedelta(hours=1) # 上个整点
dt_hour_n= dt + datetime.timedelta(hours=1) # 下个整点
if (mpoint.material or mpoint.type == Mpoint.MT_COMPUTE) and mpoint.val_type in ['float', 'int']: # 如果计量的是物料 # 累计量 有的会清零,需要额外处理(还未做)
material_code = mpoint.material.code if mpoint.material else None
params = {"mpoint": mpoint, "type": "hour"}
params["year"], params["month"], params["day"], params["hour"] = year, month, day, hour
val = 0
val_type = mpoint.val_type
mpointfrom = mpoint.mpoint_from if mpoint.mpoint_from else mpoint
if mpoint.type == Mpoint.MT_AUTO:
if mpoint.is_unit: # 单位量
val = MpLogx.objects.filter(mpoint=mpointfrom, timex__gte=dt, timex__lt=dt_hour_n).aggregate(sum=Sum(f'val_{mpointfrom.val_type}'))["sum"]
if val is None:
val = 0
elif mpoint.up_down: # 液位量
mrs0 = MpLogx.objects.filter(mpoint=mpointfrom, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex")
mrs = MpLogx.objects.filter(mpoint=mpointfrom, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex")
if mrs0.exists() and mrs.exists():
last_val = getattr(mrs.last(), f'val_{val_type}')
first_val = getattr(mrs0.last(), f'val_{val_type}')
if last_val <= first_val:
val = first_val - last_val
else:
# 加入氨水 需要手动校正 ,目前先取绝对值。
val = abs(first_val - last_val)
else:
xtype = "normal"
if mpointfrom and mpoint.cal_related_mgroup_running == 20:
stlog, xtype = get_first_stlog_time_from_duration(mpoint.mgroup, dt, dt_hour_n)
if xtype == "ending":
val = 0
if xtype == "start":
dt_hour_n = stlog.start_time
elif xtype == "end":
dt = stlog.end_time
# 需要将来源于自采的测点对象传入到下面
if xtype != "ending":
mrs0 = MpLogx.objects.filter(mpoint=mpointfrom, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex")
mrs = MpLogx.objects.filter(mpoint=mpointfrom, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex")
if mrs0.exists() and mrs.exists():
last_val = getattr(mrs.last(), f'val_{val_type}')
first_val = getattr(mrs0.last(), f'val_{val_type}')
if last_val >= first_val or first_val == 0:
val = last_val - first_val
elif first_val - last_val > 0 and (first_val - last_val)/first_val < 0.01:
val = 0
myLogger.info(f'{mpointfrom.code}--{dt}--{last_val}--{first_val}--last_val 小于 first_val')
else:
# 这里判断有可能清零了
max_val = max(mrs.aggregate(max=Max(f'val_{val_type}'))["max"], first_val)
myLogger.info(f'{mpointfrom.id}--{mpointfrom.code}--{dt}--{last_val}--{first_val}--清零')
val = max_val - first_val + last_val
elif mpoint.type == Mpoint.MT_COMPUTE and mpoint.formula:
formula = mpoint.formula
val = translate_eval_formula(formula, year, month, day, hour)
else:
return
ms, _ = MpointStat.safe_get_or_create(**params)
ms.val_origin = val
need_cal_co = False
if ms.current_cal_coefficient is None and mpoint.cal_coefficient is not None:
ms.current_cal_coefficient = mpoint.cal_coefficient
need_cal_co = True
elif ms.current_cal_coefficient is not None:
need_cal_co = True
if need_cal_co:
if mpoint.cal_coefficient_method == 1:
val = val * ms.current_cal_coefficient
elif mpoint.cal_coefficient_method == 2:
val = val / ms.current_cal_coefficient
# 如果有correct直接用correct代替
ms.val = ms.val_correct if ms.val_correct is not None else val
if material_code == 'elec':
val_level = get_elec_level(month, hour)
ms.val_level = val_level
# ms.val_level
ms.save()
# 更新更高级别的值
if cascade or hour == 23:
params_day = {"type": "day", "mpoint": mpoint, "year": year, "month": month, "day": day}
ms_day, _ = MpointStat.safe_get_or_create(**params_day, defaults=params_day)
if ms_day.val_correct is not None:
ms_day.val = ms_day.val_correct
else:
sum_dict_day = MpointStat.objects.filter(type="hour", mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum("val"))
ms_day.val = sum_dict_day['sum'] if sum_dict_day['sum'] is not None else 0
ms_day.val_origin = ms_day.val
ms_day.save()
if cascade or day in [28, 29, 30, 31]:
params_month = {"type": "month", "mpoint": mpoint, "year": year, "month": month}
ms_month, _ = MpointStat.safe_get_or_create(**params_month, defaults=params_month)
if ms_month.val_correct is not None:
ms_month.val = ms_month.val_correct
else:
sum_dict_month = MpointStat.objects.filter(type="day", mpoint=mpoint, year=year, month=month).aggregate(sum=Sum("val"))
ms_month.val = sum_dict_month['sum'] if sum_dict_month['sum'] is not None else 0
ms_month.val_origin = ms_month.val
ms_month.save()
if cascade or month == 12:
params_year = {"type": "year", "mpoint": mpoint, "year": year}
ms_year, _ = MpointStat.safe_get_or_create(**params_year, defaults=params_year)
if ms_year.val_correct is not None:
ms_year.val = ms_year.val_correct
else:
sum_dict_year = MpointStat.objects.filter(type="month", mpoint=mpoint, year=year).aggregate(sum=Sum("val"))
ms_year.val = sum_dict_year['sum'] if sum_dict_year['sum'] is not None else 0
ms_year.val_origin = ms_year.val
ms_year.save()
mgroup = mpoint.mgroup
if mgroup:
sflog = get_sflog(mgroup, dt)
if sflog is None:
myLogger.error(f'{mgroup.name}--{dt}')
year_s, month_s, day_s = sflog.get_ymd
params_hour_s = {
"type": "hour_s",
"mpoint": mpoint,
"sflog": sflog,
"mgroup": mgroup,
"year": year,
"month": month,
"day": day,
"year_s": year_s,
"month_s": month_s,
"day_s": day_s,
"hour": hour,
}
ms_hour_s, _ = MpointStat.safe_get_or_create(**params_hour_s, defaults=params_hour_s)
ms_hour_s.val = ms_hour_s.val_correct if ms_hour_s.val_correct is not None else ms.val
ms_hour_s.save()
# 开始往上计算
params_sflog_s = {"type": "sflog", "mpoint": mpoint, "sflog": sflog, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup}
ms_sflog_s, _ = MpointStat.safe_get_or_create(**params_sflog_s, defaults=params_sflog_s)
if ms_sflog_s.val_correct is not None:
ms_sflog_s.val = ms_sflog_s.val_correct
else:
sum_dict_sflog_s = MpointStat.objects.filter(type="hour_s", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog).aggregate(sum=Sum("val"))
ms_sflog_s.val = sum_dict_sflog_s['sum'] if sum_dict_sflog_s['sum'] is not None else 0
ms_sflog_s.val_origin = ms_sflog_s.val
ms_sflog_s.save()
# next_cal_dict = [mpoint.material.id, sflog.id, year, month, day, hour, year_s, month_s, day_s]
# if next_cal_dict == cache.get('enm_cal_dict', None):
# next_cal = 0
# else:
# next_cal = 1
# cache.set('enm_cal_dict', next_cal_dict, 60)
cal_mpointstat_manual(mpoint.id, sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s, 0)
@shared_task(base=CustomTask)
def cal_mpointstats(is_now=1, year=None, month=None, day=None, hour=None, m_code_list=[], cal_attrs=[]):
"""
计算所有自动采集测点的统计值,默认当前小时, 可手动传入时间和测点编号集
cal_attrs: 需要计算的属性,空列表默认计算所有属性["material", "pcoal", "run_hour"]
"""
if year and month and day and hour is not None:
pass
else:
now, pre = get_current_and_previous_time()
if is_now:
year, month, day, hour = now.year, now.month, now.day, now.hour
else:
year, month, day, hour = pre.year, pre.month, pre.day, pre.hour
if m_code_list:
mpoints1 = Mpoint.objects.filter(code__in=m_code_list)
for item in mpoints1:
cal_mpointstat_hour(item.id, year, month, day, hour)
mpoints_related = Mpoint.objects.none()
code2 = []
for code in m_code_list:
mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False, formula__contains='{' + code + '}')
code2.extend(mpoints_related.values_list('code', flat=True))
code2 = list(set(code2))
for code in code2:
mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False, formula__contains='{' + code + '}')
mpoints_related = mpoints_related.distinct().order_by('report_sortstr', 'create_time')
for item in mpoints_related:
cal_mpointstat_hour(item.id, year, month, day, hour)
else:
# 先统计自动采集的测点
mpoints_auto = Mpoint.objects.filter(type=Mpoint.MT_AUTO, enabled=True)
# mpoints_without_formula_group = []
for item in mpoints_auto:
# mpoints_without_formula_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour))
cal_mpointstat_hour(item.id, year, month, day, hour)
# 再统计计算测点
mpoints_compute = Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True).exclude(formula="").order_by('report_sortstr', 'create_time')
# mpoints_other_group = []
for item in mpoints_compute:
# mpoints_other_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour))
cal_mpointstat_hour(item.id, year, month, day, hour)
# 先调整一下班时间,以防计算错误
# 为了保持一致使用统计的时间点
nowx = timezone.now()
get_total_sec_now(now=nowx) # 再处理total_sec
cal_exp_duration_sec(now=nowx) # 先处理shut_sec
# 开始计算enstat
mgroups = Mgroup.objects.filter(need_enm=True).order_by("sort")
# mgroups_group = []
year_s, month_s, day_s = 0, 0, 0
for mgroup in mgroups:
# mgroups_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['material', 'run_hour']))
year_s, month_s, day_s = cal_enstat("hour_s", None, mgroup.id, year, month, day, hour, None, None, None, True, cal_attrs)
# mgroups_t = mgroups.filter(name__in=['回转窑', '水泥磨'])
# mgroups_t_group = []
# for mgroup in mgroups_t:
# mgroups_t_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['pcoal']))
# 最后计算enstat2
cal_enstat2(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s)
# task_chain = chain(group(mpoints_without_formula_group), group(mpoints_other_group), group(mgroups_group), group(mgroups_t_group), group([cal_enstat2.s(year_s=year, month_s=month)]))
# task_chain.delay()
if hour == 21:
enm_alarm.delay(year_s, month_s, day_s)
@shared_task(base=CustomTask)
def cal_mpointstat_manual(mpointId: str, sflogId: str, mgroupId: str, year: int, month: int, day: int, hour: int, year_s: int, month_s: int, day_s: int, next_cal=0):
"""
手动录入的测点数据进行往上统计,一级一级往上
"""
mpoint = Mpoint.objects.get(id=mpointId)
mgroup = Mgroup.objects.filter(id=mgroupId).first()
if not mgroup:
if day:
params_month = {"type": "month", "mpoint": mpoint, "year": year, "month": month}
ms_month, _ = MpointStat.safe_get_or_create(**params_month, defaults=params_month)
if ms_month.val_correct is not None:
ms_month.val = ms_month.val_correct
else:
sum_dict_month = MpointStat.objects.filter(type="day", mpoint=mpoint, year=year, month=month).aggregate(sum=Sum("val"))
ms_month.val = sum_dict_month['sum'] if sum_dict_month['sum'] is not None else 0
ms_month.val_origin = ms_month.val
ms_month.save()
elif month:
params_year = {"type": "year", "mpoint": mpoint, "year": year}
ms_year, _ = MpointStat.safe_get_or_create(**params_year, defaults=params_year)
if ms_year.val_correct is not None:
ms_year.val = ms_year.val_correct
else:
sum_dict_year = MpointStat.objects.filter(type="month", mpoint=mpoint, year=year).aggregate(sum=Sum("val"))
ms_year.val = sum_dict_year['sum'] if sum_dict_year['sum'] is not None else 0
ms_year.val_origin = ms_year.val
ms_year.save()
else:
if sflogId:
params_day_s = {"type": "day_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup}
ms_day_s, _ = MpointStat.safe_get_or_create(**params_day_s, defaults=params_day_s)
if ms_day_s.val_correct is not None:
ms_day_s.val = ms_day_s.val_correct
else:
sum_dict_day_s = MpointStat.objects.filter(type="sflog", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum("val"))
ms_day_s.val = sum_dict_day_s['sum'] if sum_dict_day_s['sum'] is not None else 0
ms_day_s.val_origin = ms_day_s.val
ms_day_s.save()
if day_s:
params_month_s = {"type": "month_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "mgroup": mgroup}
ms_month_s, _ = MpointStat.safe_get_or_create(**params_month_s, defaults=params_month_s)
if ms_month_s.val_correct is not None:
ms_month_s.val = ms_month_s.val_correct
else:
sum_dict_month_s = MpointStat.objects.filter(type="day_s", mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum("val"))
ms_month_s.val = sum_dict_month_s['sum'] if sum_dict_month_s['sum'] is not None else 0
ms_month_s.val_origin = ms_month_s.val
ms_month_s.save()
if month_s:
params_year_s = {"type": "year_s", "mpoint": mpoint, "year_s": year_s, "mgroup": mgroup}
ms_year_s, _ = MpointStat.safe_get_or_create(**params_year_s, defaults=params_year_s)
if ms_year_s.val_correct is not None:
ms_year_s.val = ms_year_s.val_correct
else:
sum_dict_year_s = MpointStat.objects.filter(type="month_s", mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum("val"))
ms_year_s.val = sum_dict_year_s['sum'] if sum_dict_year_s['sum'] is not None else 0
ms_year_s.val_origin = ms_year_s.val
ms_year_s.save()
if next_cal: # 计算工段级数据,手动录入时调用
start_cal_type = 'hour_s'
if hour:
start_cal_type = 'hour_s'
elif sflogId:
start_cal_type = 'sflog'
elif day_s:
start_cal_type = 'day_s'
elif month_s:
start_cal_type = 'month_s'
elif year_s:
start_cal_type = 'year_s'
myLogger.info('--------开始计算能源数据统计----')
cal_enstat(start_cal_type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
types_list = ["hour_s", "sflog", "day_s", "month_st", "month_s", "year_s"]
@shared_task(base=CustomTask)
def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, cascade=True, cal_attrs=[]):
"""
计算能源数据统计
"""
if cascade:
if type in types_list:
start_index = types_list.index(type)
new_types_list = types_list[start_index:]
for type in new_types_list:
year_s, month_s, day_s = cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, False)
return year_s, month_s, day_s
if not cal_attrs:
this_cal_attrs = ["material", "pcoal", "run_hour"]
else:
this_cal_attrs = cal_attrs[:]
mgroup = Mgroup.objects.get(id=mgroupId)
sflog = None # 默认无班次
if sflogId:
sflog = SfLog.objects.get(id=sflogId)
elif year and month and day and hour is not None:
mytz = tz.gettz(settings.TIME_ZONE)
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
sflog = get_sflog(mgroup, dt)
team = None
if sflog:
year_s, month_s, day_s = sflog.get_ymd
team = sflog.team
# 如果没有班组,那么month_st无需计算
if team is None and type == "month_st":
return year_s, month_s, day_s
if type == "hour_s":
enstat, _ = EnStat.safe_get_or_create(
type="hour_s",
mgroup=mgroup,
year=year,
month=month,
day=day,
hour=hour,
defaults={"type": "hour_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s, "year": year, "month": month, "day": day, "hour": hour, "sflog": sflog},
)
elif type == "sflog":
enstat, _ = EnStat.safe_get_or_create(type="sflog", sflog=sflog, defaults={"type": "sflog", "sflog": sflog, "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s})
elif type == "day_s":
enstat, _ = EnStat.safe_get_or_create(
type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, defaults={"type": "day_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s}
)
elif type == "month_st":
enstat, _ = EnStat.safe_get_or_create(
type="month_st", mgroup=mgroup, team=team, year_s=year_s, month_s=month_s, defaults={"type": "month_st", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "team": team}
)
elif type == "month_s":
enstat, _ = EnStat.safe_get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, defaults={"type": "month_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s})
elif type == "year_s":
enstat, _ = EnStat.safe_get_or_create(type="year_s", mgroup=mgroup, year_s=year_s, defaults={"type": "year_s", "mgroup": mgroup, "year_s": year_s})
if "material" in this_cal_attrs:
# 消耗物料统计(包括电耗)
input_materials = []
input_materials = mgroup.input_materials
input_materials_dict = {}
has_p_cost = False
if mgroup.product:
input_materials_dict[mgroup.product.id] = "P"
if mgroup.product_cost:
input_materials_dict[mgroup.product_cost.id] = "P_COST"
has_p_cost = True
input_materials_dict.update({i : "M" for i in input_materials if i not in input_materials_dict})
imaterial_cost_unit = 0
imaterial_data = []
imaterial_data_dict = {}
for mid, mtype in input_materials_dict.items():
material = Material.objects.get(id=mid)
if type == "hour_s":
mps = MpointStat.objects.filter(type="hour_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material)
elif type == "sflog":
mps = MpointStat.objects.filter(type="sflog", sflog=sflog, mpoint__material=material)
elif type == "day_s":
mps = MpointStat.objects.filter(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material)
elif type == "month_st":
mps = MpointStat.objects.filter(type="sflog", mgroup=mgroup, sflog__team=team, year_s=year_s, month_s=month_s, mpoint__material=material)
elif type == "month_s":
mps = MpointStat.objects.filter(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material)
elif type == "year_s":
mps = MpointStat.objects.filter(type="year_s", mgroup=mgroup, year_s=year_s, mpoint__material=material)
if mps.filter(mpoint__is_rep_mgroup=True).exists():
mps = mps.filter(mpoint__is_rep_mgroup=True)
amount_consume = mps.aggregate(sum=Sum("val"))["sum"]
if amount_consume is None:
amount_consume = 0
if mtype == "P": # 如果是产量
enstat.total_production = amount_consume
elif mtype == "P_COST": # 如果是产量(用于计算成本)
enstat.total_production_cost = amount_consume
else:
if material.code in ["cair", "steam"]:
price_unit = 0
else:
price_unit = get_price_unit(material, year_s, month_s)
cost = amount_consume * price_unit
try:
cost_unit = cost / enstat.total_production if has_p_cost is False else cost / enstat.total_production_cost
except ZeroDivisionError:
cost_unit = 0
imaterial_cost_unit = imaterial_cost_unit + cost_unit
if material.code == "elec":
enstat.elec_consume = amount_consume
enstat.elec_coal_consume = enstat.elec_consume * 0.1229 / 1000
try:
enstat.elec_consume_unit = enstat.elec_consume / enstat.total_production
except ZeroDivisionError:
enstat.elec_consume_unit = 0
elif material.code == "water":
enstat.water_consume = amount_consume
elif material.code == "pcoal":
enstat.pcoal_consume = amount_consume
elif material.code == "cair":
enstat.cair_consume = amount_consume
elif material.code == "steam":
enstat.out_steam = amount_consume
enstat.out_steam_coal = enstat.out_steam * 128.6 / 1000
elif material.code == "ammonia":
enstat.ammonia_consume = amount_consume
elif material.code == "ccr":
enstat.ccr_consume = amount_consume
enstat.kiln_end_heat = enstat.total_production - enstat.ccr_consume
imaterial_item = {
"material": mid,
"material_name": material.name,
"material_code": material.code,
"material_type": material.type,
"price_unit": price_unit,
"amount_consume": amount_consume,
"cost": cost,
"cost_unit": cost_unit,
}
imaterial_data.append(imaterial_item)
imaterial_data_dict.setdefault(material.name, imaterial_item)
enstat.imaterial_data = imaterial_data
enstat.imaterial_data_dict = imaterial_data_dict
# 其他成本数据
other_cost_data = []
other_cost_unit = 0
fee_qs = Fee.objects.order_by("sort")
for fee in fee_qs:
item = {"element": fee.element, "cate": fee.cate, "name": fee.name, "id": fee.id}
item["cost_unit"] = get_cost_unit(mgroup, fee, year_s, month_s)
other_cost_unit = other_cost_unit + item["cost_unit"]
other_cost_data.append(item)
enstat.other_cost_data = other_cost_data
enstat.production_cost_unit = imaterial_cost_unit + other_cost_unit
enstat.save()
if mgroup.cate == 'section':
# 更新所监测设备测点的total_production
total_production = enstat.total_production
if total_production == 0:
elec_consume_unit = 0
else:
elec_consume_unit = F("val") / total_production
MpointStat.objects.filter(
mgroup=enstat.mgroup, mpoint__material__code="elec", type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour
).update(total_production=total_production, elec_consume_unit=elec_consume_unit)
# 更新余热发电的production_elec_unit
if mgroup.name == "回转窑":
if enstat.total_production == 0:
production_elec_unit_yr = 0
else:
production_elec_unit_yr = F("total_production") / enstat.total_production
EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="余热发电").update(
production_elec_unit=production_elec_unit_yr
)
# 其他一些触发计算
if mgroup.cate == "section":
if "material" in this_cal_attrs and mgroup.name != "回转窑":
try:
enstat.en_consume_unit = enstat.elec_coal_consume / enstat.total_production
except ZeroDivisionError:
enstat.en_consume_unit = 0
enstat.save()
# 计算一些其他数据
if type == "month_st" and "material" in this_cal_attrs: # 如果计算的是班月,把主要设备电耗数据拉过来, 方便查询
res = MpointStat.objects.filter(type='sflog', year_s=year_s, month_s=month_s, sflog__team=enstat.team, mpoint__material__code='elec', mpoint__ep_monitored__isnull=False).values(
equipment=F('mpoint__ep_monitored__id'), equipment_name=F('mpoint__ep_monitored__name')).annotate(consume=Sum('val'))
res = list(res)
for item in res:
try:
item['consume_unit'] = item['consume'] / enstat.total_production
except ZeroDivisionError:
item['consume_unit'] = None
enstat.equip_elec_data = res
enstat.save()
# 开始计算互相影响的数据
if mgroup.code == "ylm":
if enstat.type in ["hour_s", "day_s", "year_s", "month_s"]:
hzy_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
if hzy_enstat:
try:
hzy_enstat.celec_consume_unit = (hzy_enstat.elec_consume + enstat.elec_consume) / hzy_enstat.total_production
except ZeroDivisionError:
hzy_enstat.celec_consume_unit = 0
hzy_enstat.save()
if enstat.mgroup.name == "回转窑": # 算单位产品(综合电耗/标煤耗/综合能耗)
# 综合电耗
if enstat.type in ["hour_s", "day_s", "year_s", "month_s"]:
ylm_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__code="ylm)").first()
if ylm_enstat:
try:
enstat.celec_consume_unit = (enstat.elec_consume + ylm_enstat.elec_consume)/enstat.total_production
except ZeroDivisionError:
enstat.celec_consume_unit = 0
enstat.save()
# pre_enstat2 = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="煤磨").first()
# if pre_enstat2:
# try:
# pre_enstat2.production_elec_consume_unit = pre_enstat2.elec_consume / enstat.total_production
# except ZeroDivisionError:
# pre_enstat2.production_elec_consume_unit = 0
# pre_enstat2.save(update_fields=["production_elec_consume_unit"])
# enstat.celec_consume_unit += pre_enstat2.production_elec_consume_unit
# 算标煤耗
if "pcoal" in this_cal_attrs:
if type in ["hour_s", "sflog", "day_s"]:
enstat.pcoal_heat = get_pcoal_heat(enstat.year_s, enstat.month_s, enstat.day_s)
enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 29307.6
elif type == "month_st":
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s, sflog__team=enstat.team).aggregate(
sum=Sum("pcoal_coal_consume")
)["sum"]
elif type == "month_s":
enstat.pcoal_coal_consume = EnStat.objects.filter(type="day_s", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
elif type == "year_s":
enstat.pcoal_coal_consume = EnStat.objects.filter(type="month_s", mgroup=enstat.mgroup, year_s=year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
if enstat.pcoal_coal_consume is None:
enstat.pcoal_coal_consume = 0
oil_consume = enstat.imaterial_data_dict.get("柴油", {}).get("amount_consume", 0)
# 算单位产品标煤耗
try:
enstat.coal_consume_unit = (enstat.pcoal_coal_consume + oil_consume*1.46) * 1000 / enstat.total_production
except ZeroDivisionError:
enstat.coal_consume_unit = 0
# 综合能耗
enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit
enstat.save()
# 触发水泥磨综合能耗
snm_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥磨").first()
if snm_enstat:
hzy_enstat = enstat
snbz_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥包装").first()
snbz_enstat_elec_consume = snbz_enstat.elec_consume if snbz_enstat else 0
hzy_enstat_cen_consume_unit = hzy_enstat.cen_consume_unit if hzy_enstat else 0
if snm_enstat.total_production == 0:
snm_enstat_cen_consume_unit = 0
else:
slpb = snm_enstat.imaterial_data_dict.get("入磨熟料", {}).get("amount_consume", 0) / snm_enstat.total_production
snm_enstat_cen_consume_unit = ((snm_enstat.elec_consume + snbz_enstat_elec_consume)/snm_enstat.total_production) * 0.1229 + slpb * hzy_enstat_cen_consume_unit
snm_enstat.cen_consume_unit = snm_enstat_cen_consume_unit
snm_enstat.save()
elif enstat.mgroup.name == "水泥磨" and enstat.type not in ["month_st", "sflog"] and "pcoal" in this_cal_attrs:
hzy_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
snbz_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥包装").first()
snbz_enstat_elec_consume = snbz_enstat.elec_consume if snbz_enstat else 0
hzy_enstat_cen_consume_unit = hzy_enstat.cen_consume_unit if hzy_enstat else 0
if enstat.total_production == 0:
enstat_cen_consume_unit = 0
else:
slpb = enstat.imaterial_data_dict.get("入磨熟料", {}).get("amount_consume", 0) / enstat.total_production
enstat_cen_consume_unit = ((enstat.elec_consume + snbz_enstat_elec_consume)/enstat.total_production) * 0.1229 + slpb * hzy_enstat_cen_consume_unit
enstat.cen_consume_unit = enstat_cen_consume_unit
enstat.save()
elif enstat.mgroup.name == "水泥包装" and enstat.type not in ["month_st", "sflog"] and "pcoal" in this_cal_attrs:
snm_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥磨").first()
if snm_enstat:
hzy_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
snbz_enstat = enstat
snbz_enstat_elec_consume = snbz_enstat.elec_consume if snbz_enstat else 0
hzy_enstat_cen_consume_unit = hzy_enstat.cen_consume_unit if hzy_enstat else 0
if snm_enstat.total_production == 0:
snm_enstat_cen_consume_unit = 0
else:
slpb = snm_enstat.imaterial_data_dict.get("入磨熟料", {}).get("amount_consume", 0) / snm_enstat.total_production
snm_enstat_cen_consume_unit = ((snm_enstat.elec_consume + snbz_enstat_elec_consume)/snm_enstat.total_production) * 0.1229 + slpb * hzy_enstat_cen_consume_unit
snm_enstat.cen_consume_unit = snm_enstat_cen_consume_unit
snm_enstat.save()
elif enstat.mgroup.name == '余热发电':
hzy_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
if hzy_enstat:
# 吨熟料发电量
try:
enstat.production_elec_unit = enstat.total_production / hzy_enstat.total_production
except ZeroDivisionError:
enstat.production_elec_unit = 0
enstat.save()
# 运转时长相关
if type != "hour_s" and "run_hour" in this_cal_attrs:
enstat.total_sec_now, enstat.shut_sec = get_total_sec_now_and_shut_sec(enstat)
enstat.run_sec = enstat.total_sec_now - enstat.shut_sec if enstat.total_sec_now - enstat.shut_sec >0 else 0
try:
enstat.run_rate = (enstat.run_sec / enstat.total_sec_now) * 100
except ZeroDivisionError:
enstat.run_rate = 0
try:
enstat.production_hour = enstat.total_production * 3600 / enstat.run_sec
except ZeroDivisionError:
enstat.production_hour = 0
enstat.save()
return year_s, month_s, day_s # 返回这个值主要为了全厂数据计算而用
def get_total_sec_now_and_shut_sec(enstat: EnStat):
from apps.wpm.models import SfLog
# if enstat.type == 'hour_s':
# # 找到停机记录,并划分到该小时
# end_time = datetime.datetime(enstat.year, enstat.month, enstat.day, enstat.hour)
# start_time = end_time - datetime.timedelta(hours=1)
# sts = StLog.objects.filter(mgroup=enstat.mgroup)
# sts = (sts.filter(start_time__gt=start_time, start_time__lt=end_time)|sts.filter(start_time__lt=start_time, end_time=None)|sts.filter(end_time__gt=start_time, end_time__lt=end_time)).distinct()
# shut_hour = 0
# for i in sts:
# if i.end_time is None:
# run_hour = 0
# if i.start_time > start_time:
# run_hour = (i.start_time - start_time).total_seconds/3600
# shut_hour = 1 - run_hour
# shut_hour = shut_hour +
# return 1, 0
if enstat.type == "sflog":
sflog = enstat.sflog
return sflog.total_sec_now, sflog.shut_sec
elif enstat.type == "day_s":
res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, work_date__day=enstat.day_s, mgroup=enstat.mgroup).aggregate(
sum1=Sum("total_sec_now"), sum2=Sum("shut_sec")
)
return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
elif enstat.type == "month_st":
res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup, team=enstat.team).aggregate(
sum1=Sum("total_sec_now"), sum2=Sum("shut_sec")
)
return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
elif enstat.type == "month_s":
res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_sec_now"), sum2=Sum("shut_sec"))
return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
elif enstat.type == "year_s":
res = SfLog.objects.filter(work_date__year=enstat.year_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_sec_now"), sum2=Sum("shut_sec"))
return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
@shared_task(base=CustomTask)
def cal_enstat2(type: str, year_s: int, month_s: int, day_s: int, cascade=True):
if cascade:
if type == "day_s":
cal_enstat2("day_s", year_s, month_s, day_s, False)
cal_enstat2("month_s", year_s, month_s, day_s, False)
elif type == "month_s":
cal_enstat2("month_s", year_s, month_s, day_s, False)
else:
return
if type == "month_s":
enstat2, _ = EnStat2.safe_get_or_create(type="month_s", year_s=year_s, month_s=month_s, defaults={"year_s": year_s, "month_s": month_s, "type": "month_s"})
elif type == "day_s":
enstat2, _ = EnStat2.safe_get_or_create(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s, defaults={"year_s": year_s, "month_s": month_s, "day_s": day_s, "type": "day_s"})
# enstat2 = EnStat2.objects.select_for_update().get(id=enstat2.id) # 加锁
try:
material_bulk_clinker = Material.objects.get(code="bulk_clinker") # 散装熟料
enstat2.bulk_clinker_price = get_price_unit(material_bulk_clinker, year_s, month_s)
except ObjectDoesNotExist:
enstat2.bulk_clinker_price = 0
try:
material_bulk_cement = Material.objects.get(code="bulk_cement") # 散装水泥
enstat2.bulk_cement_price = get_price_unit(material_bulk_cement, year_s, month_s)
except ObjectDoesNotExist:
enstat2.bulk_cement_price = 0
try:
material_bag_cement = Material.objects.get(code="bag_cement") # 袋装水泥
enstat2.bag_cement_price = get_price_unit(material_bag_cement, year_s, month_s)
except ObjectDoesNotExist:
enstat2.bag_cement_price = 0
if type == "month_s":
enstat2.bulk_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material__code="bulk_cement", year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"]
elif type == "day_s":
enstat2.bulk_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material__code="bulk_cement", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"]
if enstat2.bulk_cement_val is None:
enstat2.bulk_cement_val = 0
if type == "month_s":
enstat2.bag_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material__code='bag_cement', year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"]
elif type == "day_s":
enstat2.bag_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material__code='bag_cement', year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"]
if enstat2.bag_cement_val is None:
enstat2.bag_cement_val = 0
if type == "month_s":
enstat2.bulk_clinker_val = MpointStat.objects.filter(type="month_s", mpoint__material__code='bulk_clinker', year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"]
elif type == "day_s":
enstat2.bulk_clinker_val = MpointStat.objects.filter(type="day_s", mpoint__material__code='bulk_clinker', year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"]
if enstat2.bulk_clinker_val is None:
enstat2.bulk_clinker_val = 0
enstat2.industry_total_val = (
enstat2.bulk_cement_val * enstat2.bulk_cement_price +
enstat2.bag_cement_val * enstat2.bag_cement_price +
enstat2.bulk_clinker_val * enstat2.bulk_clinker_price) / 10000
# 出磨水泥产量
if type == "month_s":
res = EnStat.objects.filter(mgroup__product__code="cement", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
elif type == "day_s":
res = EnStat.objects.filter(mgroup__product__code="cement", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
enstat2.cement_val = res["sum"] if res["sum"] else 0
enstat2.cement_cost_unit = res["avg"] if res["avg"] else 0
# 出窑熟料产量
if type == "month_s":
res = EnStat.objects.filter(mgroup__product__code="clinker", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
elif type == "day_s":
res = EnStat.objects.filter(mgroup__product__code="clinker", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
# 出窑熟料成本
enstat2.cliker_price_cost = res["avg"] if res["avg"] else 0
enstat2.clinker_val = res["sum"] if res["sum"] else 0
# 出厂总产量
if type == "month_s":
res = EnStat.objects.filter(mgroup__product__code="out_cement", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
elif type == "day_s":
res = EnStat.objects.filter(mgroup__product__code="out_cement", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
enstat2.out_cement_val = res["sum"] if res["sum"] else 0
# enstat2.out_cement_cost_unit = res["avg"] if res["avg"] else 0
# enstat2.out_cement_val = enstat2.bag_cement_val + enstat2.bulk_cement_val + enstat2.bulk_clinker_val
enstat2.industry_add_val = enstat2.industry_total_val - enstat2.out_cement_val * enstat2.cement_cost_unit / 10000 - enstat2.bulk_clinker_val*enstat2.cliker_price_cost / 10000
# 全厂电量
# 全厂的耗电量得单独处理
use_mpoint_elec_val = False
mp_elecs = Mpoint.objects.filter(material__code="elec", code__endswith='__all')
if mp_elecs.exists(): #
use_mpoint_elec_val = True
if type == "month_s":
enstat_qs = EnStat.objects.filter(type="month_s", year_s=year_s, month_s=month_s)
elif type == "day_s":
enstat_qs = EnStat.objects.filter(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s)
res_elec_pcoal = enstat_qs.aggregate(
sum1=Sum("elec_consume"), sum2=Sum("elec_coal_consume"), sum3=Sum("pcoal_consume"), sum4=Sum("pcoal_coal_consume"), sum5=Sum("water_consume"), sum6=Sum("cair_consume")
)
if use_mpoint_elec_val:
if type == 'day_s':
enstat2.elec_consume = MpointStat.objects.filter(type='day_s', mpoint__in=mp_elecs, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))['sum']
elif type == 'month_s':
enstat2.elec_consume = MpointStat.objects.filter(type='month_s', mpoint__in=mp_elecs, year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))['sum']
if enstat2.elec_consume is None:
enstat2.elec_consume = 0
else:
enstat2.elec_consume = res_elec_pcoal["sum1"] if res_elec_pcoal["sum1"] else 0
enstat2.elec_coal_consume = enstat2.elec_consume * 0.1229 / 1000
# 其他的统计工段合就行
enstat2.pcoal_consume = res_elec_pcoal["sum3"] if res_elec_pcoal["sum3"] else 0
enstat2.pcoal_coal_consume = res_elec_pcoal["sum4"] if res_elec_pcoal["sum4"] else 0
enstat2.water_consume = res_elec_pcoal["sum5"] if res_elec_pcoal["sum5"] else 0
enstat2.cair_consume = res_elec_pcoal["sum6"] if res_elec_pcoal["sum6"] else 0
enstat2.en_consume = enstat2.pcoal_coal_consume + enstat2.elec_coal_consume
try:
enstat2.en_consume_unit = enstat2.en_consume / enstat2.industry_total_val
except ZeroDivisionError:
enstat2.en_consume_unit = 0
try:
enstat2.en_add_consume_unit = enstat2.en_consume / enstat2.industry_add_val
except ZeroDivisionError:
enstat2.en_add_consume_unit = 0
enstat2.save()
def cal_enstat_pcoal_change(enstat: EnStat, new_pcoal_heat):
type = enstat.type
if type in ["hour_s", "sflog", "day_s"]:
enstat.pcoal_heat = new_pcoal_heat
enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 29307.6
elif type == "month_st":
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s, sflog__team=enstat.team).aggregate(
sum=Sum("pcoal_coal_consume")
)["sum"]
elif type == "month_s":
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
elif type == "year_s":
enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
if enstat.pcoal_coal_consume is None:
enstat.pcoal_coal_consume = 0
oil_consume = enstat.imaterial_data_dict.get("柴油", {}).get("amount_consume", 0)
# 算单位产品标煤耗
try:
enstat.coal_consume_unit = (enstat.pcoal_coal_consume + oil_consume*1.46) * 1000 / enstat.total_production
except ZeroDivisionError:
enstat.coal_consume_unit = 0
# 综合能耗
enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit
# enstat.save()
enstat.save(update_fields=["pcoal_heat", "pcoal_coal_consume", "coal_consume_unit", "cen_consume_unit"])
# 同步更新水泥磨的综合能耗,这步有可能不成功,因为水泥磨是后算的, 但是当pcoal_change时这个就有用了
if type not in ["month_st", "sflog"]:
snm_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥磨").first()
if snm_enstat:
hzy_enstat = enstat
snbz_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥包装").first()
snbz_enstat_elec_consume = snbz_enstat.elec_consume if snbz_enstat else 0
hzy_enstat_cen_consume_unit = hzy_enstat.cen_consume_unit if hzy_enstat else 0
if snm_enstat.total_production == 0:
snm_enstat_cen_consume_unit = 0
else:
slpb = snm_enstat.imaterial_data_dict.get("入磨熟料", {}).get("amount_consume", 0) / snm_enstat.total_production
snm_enstat_cen_consume_unit = ((snm_enstat.elec_consume + snbz_enstat_elec_consume)/snm_enstat.total_production) * 0.1229 + slpb * hzy_enstat_cen_consume_unit
snm_enstat.cen_consume_unit = snm_enstat_cen_consume_unit
snm_enstat.save()
enm_alarms_list = [
["电石渣", "elec_consume_unit", "单位产品分布电耗"],
["生料工序(二次配料)", "elec_consume_unit", "单位产品分布电耗"],
["原料磨", "elec_consume_unit", "单位产品分布电耗"],
["回转窑", "celec_consume_unit", "单位产品综合电耗"],
["回转窑", "coal_consume_unit", "单位产品标煤耗"],
["煤磨", "elec_consume_unit", "单位产品分布电耗"],
["水泥磨", "elec_consume_unit", "单位产品分布电耗"],
["水泥包装", "elec_consume_unit", "单位产品分布电耗"]
]
@shared_task(base=CustomTask)
def enm_alarm(year_s: int, month_s: int, day_s: int):
"""
enm报警任务
"""
from apps.ecm.models import Event, EventCate, Eventdo
from apps.mtm.models import Goal
from apps.ecm.service import notify_event
now = timezone.now()
event_cate, _ = EventCate.safe_get_or_create(code="consume_exceed", defaults={"name": "能耗超过目标值", "code": "consume_exceed", "trigger": 30})
for item in enm_alarms_list:
mgroups = Mgroup.objects.filter(name=item[0])
for mgroup in mgroups:
enstat = EnStat.objects.filter(mgroup=mgroup, type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).first()
if enstat:
mgroup_name = item[0]
goal_cate_str = item[1]
goal_cate_str_chin = item[2]
real_val = getattr(enstat, goal_cate_str, None)
goal = Goal.objects.filter(goal_cate__code=goal_cate_str, year=year_s, mgroup=mgroup).first()
if goal:
goal_val = getattr(goal, f"goal_val_{month_s}", None)
if goal_val and real_val and real_val > goal_val: # 触发事件
event = Event()
event.obj_cate = "enm"
event.happen_time = now
event.voice_msg = f"{mgroup_name}{item[2]}超过设定目标值"
event.enm_data = {
"mgroup": mgroup.id,
"mgroup_name": mgroup.name,
"type": f"{goal_cate_str}.exceed",
"type_chin": f"{goal_cate_str_chin}-超过设定目标值",
"year_s": year_s,
"month_s": month_s,
"day_s": day_s,
"val": real_val,
"goal_val": goal_val,
"enstat": enstat.id
}
event.save()
Eventdo.safe_get_or_create(cate=event_cate, event=event, defaults={"cate": event_cate, "event": event})
notify_event(event)
@shared_task(base=CustomTask)
def king_insert_mplogx():
mpoint_codes = Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, code__startswith='K_').values_list('code', flat=True)
ml = []
for m in mpoint_codes:
ml.append({"N": m.replace('K_', '')})
_, res = kingClient.request(**kapis['read_batchtagrealvalue'], json={"objs": ml})
insert_mplogx_from_king_rest_chunk(res)
@shared_task(base=CustomTask)
def check_mpoint_offline(seconds=100):
"""监测测点采集掉线"""
now = localtime()
for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, is_unit=False)| Mpoint.objects.filter(enabled=True, type=Mpoint.MT_COMPUTE, is_rep_ep_running_state=True, is_unit=False):
mc = MpointCache(mpoint.code)
mpoint_data = mc.data
last_data = mpoint_data.get('last_data', None)
is_offline = True
if last_data and last_data['last_timex'] and (now-last_data['last_timex']).total_seconds() < seconds:
is_offline = False
if is_offline:
mc.set_fail(-2, now)