1058 lines
		
	
	
		
			56 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			1058 lines
		
	
	
		
			56 KiB
		
	
	
	
		
			Python
		
	
	
	
| # Create your tasks here
 | |
| from apps.utils.tasks import CustomTask
 | |
| from celery import shared_task
 | |
| from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2, Xscript
 | |
| from apps.wpm.models import SfLog
 | |
| import datetime
 | |
| from django.db.models import Q
 | |
| from django.db.models import Sum, Avg
 | |
| from dateutil import tz
 | |
| from django.conf import settings
 | |
| from apps.wpm.services import get_sflog
 | |
| from apps.mtm.models import Mgroup, Material
 | |
| from apps.fim.services import get_cost_unit, get_price_unit
 | |
| from apps.fim.models import Fee
 | |
| from apps.enm.services import translate_eval_formula
 | |
| import logging
 | |
| from server.settings import get_sysconfig, update_sysconfig
 | |
| from django.db.models import F
 | |
| from apps.wpm.services import get_pcoal_heat
 | |
| from django.utils import timezone
 | |
| from django.db.models import Max
 | |
| from apps.third.king.k import kingClient
 | |
| from apps.third.king.king_api import kapis
 | |
| from apps.enm.services import insert_mplogx_from_king_rest_chunk, MpointCache
 | |
| from django.utils.timezone import localtime
 | |
| from apps.wpm.tasks import get_total_sec_now, cal_exp_duration_sec
 | |
| from apps.utils.sql import DbConnection
 | |
| from apps.enm.services import db_insert_mplogx_batch, get_elec_level
 | |
| from apps.enm.xscript import main
 | |
| from django.core.exceptions import ObjectDoesNotExist
 | |
| from django.utils.timezone import make_aware
 | |
| myLogger = logging.getLogger("log")
 | |
| 
 | |
| 
 | |
| def get_current_and_previous_time():
 | |
|     now = datetime.datetime.now()
 | |
|     pre = now - datetime.timedelta(hours=1)
 | |
|     return now, pre
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def insert_mplogx_from_xscript(xscript_id):
 | |
|     xscript = Xscript.objects.get(id=xscript_id)
 | |
|     mcodes_list = Mpoint.objects.filter(enabled=True).values_list('code', flat=True)
 | |
|     if mcodes_list:
 | |
|         main(xscript, mcodes_list)
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def db_insert_mplogx(limit:bool=True):
 | |
|     """
 | |
|     从数据库转存到超表
 | |
|     """
 | |
|     config = get_sysconfig()
 | |
|     with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
 | |
|         last_tag_id = config['enm'].get('last_tag_id', None)
 | |
|         if last_tag_id is None:
 | |
|             raise Exception("last_tag_id is None")
 | |
|         cursor.execute("select count(id) from tag_value where id > %s", (last_tag_id))
 | |
|         count = cursor.fetchone()[0]
 | |
|         if limit and count > 400:
 | |
|             raise Exception("db inset count > 400")
 | |
|         cursor.execute(
 | |
|             "select id, val, tag_code, update_time from tag_value where id > %s order by id, update_time", (last_tag_id, ))
 | |
|         rows = cursor.fetchall()  # 获取数据后保存至本地
 | |
|         if rows:
 | |
|             last_tag_id = rows[-1][0]
 | |
|         db_insert_mplogx_batch(rows)
 | |
|         update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def db_ins_mplogx():
 | |
|     """
 | |
|     从数据库转存到超表
 | |
|     """
 | |
|     config = get_sysconfig()
 | |
|     with DbConnection(config['enm1']['db_host'], config['enm1']['db_user'], config['enm1']['db_password'], config['enm1']['db_database1']) as cursor:
 | |
|         bill_date = config['enm1'].get('bill_date', None)
 | |
|         batchs = config['enm1'].get('batch', None)
 | |
|         if not batchs:
 | |
|             raise Exception("batch is None")
 | |
|         try:
 | |
|             bill_date = datetime.datetime.strptime(bill_date, '%Y-%m-%d %H:%M:%S')
 | |
|         except ValueError:
 | |
|             raise Exception(f"Invalid date format in {bill_date}")
 | |
|         if bill_date is None:
 | |
|             raise Exception("bill_date is None")
 | |
|         query = """
 | |
|                     SELECT id, de_real_quantity, CONCAT('x', inv_name) AS inv_name, bill_date
 | |
|                     FROM sa_weigh_view
 | |
|                     WHERE bill_date >= %s and de_real_quantity > 0
 | |
|                     AND inv_name IN %s
 | |
|                     ORDER BY bill_date
 | |
|                 """
 | |
|         cursor.execute(query, (bill_date, tuple(batchs)))
 | |
|         rows = cursor.fetchall()  # 获取数据后保存至本地
 | |
|         if rows:
 | |
|             bill_date_x = rows[-1][-1]
 | |
|             db_insert_mplogx_batch(rows)
 | |
|             update_sysconfig({'enm1': {'bill_date': bill_date_x.strftime('%Y-%m-%d %H:%M:%S')}})
 | |
|         
 | |
| 
 | |
| 
 | |
| # @shared_task(base=CustomTask)
 | |
| # def mpoint_val_on_change(mpLogId: str):
 | |
| #     """测点值变化的时候执行任务(废弃)
 | |
| #     """
 | |
| #     mplog = MpLog.objects.get(id=mpLogId)
 | |
| #     mpoint = mplog.mpoint
 | |
| #     module, func = mpoint.func_on_change.rsplit(".", 1)
 | |
| #     m = importlib.import_module(module)
 | |
| #     f = getattr(m, func)
 | |
| #     f(mplog)  # 同步执行
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal_attrs=[]):
 | |
|     """
 | |
|     重跑某一段时间的任务
 | |
|     """
 | |
|     mytz = tz.gettz(settings.TIME_ZONE)
 | |
|     start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
 | |
|     start_time.replace(tzinfo=mytz)
 | |
|     end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
 | |
|     start_time.replace(tzinfo=mytz)
 | |
|     current_time = start_time
 | |
|     while current_time <= end_time:
 | |
|         year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
 | |
|         cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs)
 | |
|         myLogger.info("now: {}  cal_mpointstats completed: {}".format(datetime.datetime.now(), current_time))
 | |
|         current_time += datetime.timedelta(hours=1)
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def correct_bill_date():
 | |
|     """
 | |
|     定时更新bill_date时间为前一天
 | |
|     """
 | |
|     now_time = datetime.datetime.now()
 | |
|     update_time = now_time - datetime.timedelta(hours=24)
 | |
|     bill_date = make_aware(update_time)
 | |
|     update_sysconfig({'enm1': {'bill_date': bill_date.strftime('%Y-%m-%d %H:%M:%S')}})
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstats_scheduled_tasks(m_code_list=None, cal_attrs=None):
 | |
|     """
 | |
|     重跑某一段时间的任务
 | |
|     """
 | |
|     if m_code_list is None:
 | |
|         m_code_list = []
 | |
|     if cal_attrs is None:
 | |
|         cal_attrs = []
 | |
|     end_time = datetime.datetime.now()
 | |
|     start_time = end_time - datetime.timedelta(hours=24)
 | |
|     start_time = make_aware(start_time)
 | |
|     end_time = make_aware(end_time)
 | |
|     current_time = start_time
 | |
|     while current_time <= end_time:
 | |
|         year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
 | |
|         cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs)
 | |
|         current_time += datetime.timedelta(hours=1)
 | |
| 
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int, cascade=True, sflog_hours=[]):
 | |
|     """
 | |
|     计算某一测点, 某一时间点某一小时的统计值
 | |
|     """
 | |
|     val_level = None
 | |
|     mpoint = Mpoint.objects.get(id=mpointId)
 | |
|     mytz = tz.gettz(settings.TIME_ZONE)
 | |
|     dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz)  # 整点时间
 | |
|     dt_hour_p= dt - datetime.timedelta(hours=1) # 上个整点
 | |
|     dt_hour_n= dt + datetime.timedelta(hours=1) # 下个整点
 | |
|     if (mpoint.material or mpoint.type == Mpoint.MT_COMPUTE) and mpoint.val_type in ['float', 'int']:  # 如果计量的是物料  # 累计量 有的会清零,需要额外处理(还未做)
 | |
|         material_code = mpoint.material.code if mpoint.material else None
 | |
|         params = {"mpoint": mpoint, "type": "hour"}
 | |
|         params["year"], params["month"], params["day"], params["hour"] = year, month, day, hour
 | |
|         val = 0
 | |
|         val_type = mpoint.val_type
 | |
|         if mpoint.type == Mpoint.MT_AUTO:
 | |
|             if mpoint.is_unit:
 | |
|                 val = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lt=dt_hour_n).aggregate(sum=Sum(f'val_{mpoint.val_type}'))["sum"]
 | |
|                 if val is None:
 | |
|                     val = 0
 | |
|             else:
 | |
|                 mrs0 = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex")
 | |
|                 mrs = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex")
 | |
|                 if mrs0.exists() and mrs.exists():
 | |
|                     last_val = getattr(mrs.last(), f'val_{val_type}')
 | |
|                     first_val = getattr(mrs0.last(), f'val_{val_type}')
 | |
|                     if last_val >= first_val or first_val == 0:
 | |
|                         val = last_val - first_val
 | |
|                     elif first_val - last_val > 0 and (first_val - last_val)/first_val < 0.01:
 | |
|                         val = 0
 | |
|                         myLogger.info(f'{mpoint.code}--{dt}--{last_val}--{first_val}--last_val 小于 first_val')
 | |
|                     else:
 | |
|                         # 这里判断有可能清零了
 | |
|                         max_val = max(mrs.aggregate(max=Max(f'val_{val_type}'))["max"], first_val)
 | |
|                         myLogger.info(f'{mpoint.id}--{mpoint.code}--{dt}--{last_val}--{first_val}--清零')
 | |
|                         val = max_val - first_val + last_val
 | |
|             # if mpoint.code == 'x水泥+P.O42.5 散装':
 | |
|             
 | |
|         elif mpoint.type == Mpoint.MT_COMPUTE and mpoint.formula:
 | |
|             formula = mpoint.formula
 | |
|             val = translate_eval_formula(formula, year, month, day, hour)
 | |
|         else:
 | |
|             return
 | |
|         ms, _ = MpointStat.safe_get_or_create(**params)
 | |
|         ms.val_origin = val
 | |
|         need_cal_co = False
 | |
|         if ms.current_cal_coefficient is None and mpoint.cal_coefficient is not None:
 | |
|             ms.current_cal_coefficient = mpoint.cal_coefficient
 | |
|             need_cal_co = True
 | |
|         elif ms.current_cal_coefficient is not None:
 | |
|             need_cal_co = True
 | |
|         
 | |
|         if need_cal_co:
 | |
|             if mpoint.cal_coefficient_method == 1:
 | |
|                 val = val * ms.current_cal_coefficient
 | |
|             elif mpoint.cal_coefficient_method == 2:
 | |
|                 val = val / ms.current_cal_coefficient
 | |
|         # 如果有correct直接用correct代替
 | |
|         ms.val = ms.val_correct if ms.val_correct is not None else val
 | |
|         if material_code == 'elec':
 | |
|             val_level = get_elec_level(month, hour)
 | |
|             ms.val_level = val_level
 | |
|         # ms.val_level
 | |
|         ms.save()
 | |
| 
 | |
|         # 更新更高级别的值
 | |
|         if cascade or hour == 23:
 | |
|             params_day = {"type": "day", "mpoint": mpoint, "year": year, "month": month, "day": day}
 | |
|             ms_day, _ = MpointStat.safe_get_or_create(**params_day, defaults=params_day)
 | |
|             if ms_day.val_correct is not None:
 | |
|                 ms_day.val = ms_day.val_correct
 | |
|             else:
 | |
|                 sum_dict_day = MpointStat.objects.filter(type="hour", mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum("val"))
 | |
|                 ms_day.val = sum_dict_day['sum'] if sum_dict_day['sum'] is not None else 0
 | |
|                 ms_day.val_origin = ms_day.val
 | |
|             ms_day.save()
 | |
| 
 | |
|         if cascade or day in [28, 29, 30, 31]:
 | |
|             params_month = {"type": "month", "mpoint": mpoint, "year": year, "month": month}
 | |
|             ms_month, _ = MpointStat.safe_get_or_create(**params_month, defaults=params_month)
 | |
|             if ms_month.val_correct is not None:
 | |
|                 ms_month.val = ms_month.val_correct
 | |
|             else:
 | |
|                 sum_dict_month = MpointStat.objects.filter(type="day", mpoint=mpoint, year=year, month=month).aggregate(sum=Sum("val"))
 | |
|                 ms_month.val = sum_dict_month['sum'] if sum_dict_month['sum'] is not None else 0
 | |
|                 ms_month.val_origin = ms_month.val
 | |
|             ms_month.save()
 | |
| 
 | |
|         if cascade or month == 12:
 | |
|             params_year = {"type": "year", "mpoint": mpoint, "year": year}
 | |
|             ms_year, _ = MpointStat.safe_get_or_create(**params_year, defaults=params_year)
 | |
|             if ms_year.val_correct is not None:
 | |
|                 ms_year.val = ms_year.val_correct
 | |
|             else:
 | |
|                 sum_dict_year = MpointStat.objects.filter(type="month", mpoint=mpoint, year=year).aggregate(sum=Sum("val"))
 | |
|                 ms_year.val = sum_dict_year['sum'] if sum_dict_year['sum'] is not None else 0
 | |
|                 ms_year.val_origin = ms_year.val
 | |
|             ms_year.save()
 | |
| 
 | |
|         mgroup = mpoint.mgroup
 | |
|         if mgroup:
 | |
|                 sflog = get_sflog(mgroup, dt)
 | |
|                 if sflog is None:
 | |
|                     myLogger.error(f'{mgroup.name}--{dt}')
 | |
|                 year_s, month_s, day_s = sflog.get_ymd
 | |
| 
 | |
|                 params_hour_s = {
 | |
|                     "type": "hour_s",
 | |
|                     "mpoint": mpoint,
 | |
|                     "sflog": sflog,
 | |
|                     "mgroup": mgroup,
 | |
|                     "year": year,
 | |
|                     "month": month,
 | |
|                     "day": day,
 | |
|                     "year_s": year_s,
 | |
|                     "month_s": month_s,
 | |
|                     "day_s": day_s,
 | |
|                     "hour": hour,
 | |
|                 }
 | |
|                 ms_hour_s, _ = MpointStat.safe_get_or_create(**params_hour_s, defaults=params_hour_s)
 | |
|                 ms_hour_s.val = ms_hour_s.val_correct  if ms_hour_s.val_correct is not None else ms.val
 | |
|                 ms_hour_s.save()
 | |
| 
 | |
|                 # 开始往上计算
 | |
|                 params_sflog_s = {"type": "sflog", "mpoint": mpoint, "sflog": sflog, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup}
 | |
|                 ms_sflog_s, _ = MpointStat.safe_get_or_create(**params_sflog_s, defaults=params_sflog_s)
 | |
|                 if ms_sflog_s.val_correct is not None:
 | |
|                     ms_sflog_s.val = ms_sflog_s.val_correct
 | |
|                 else:
 | |
|                     sum_dict_sflog_s = MpointStat.objects.filter(type="hour_s", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog).aggregate(sum=Sum("val"))
 | |
|                     ms_sflog_s.val = sum_dict_sflog_s['sum'] if sum_dict_sflog_s['sum'] is not None else 0
 | |
|                     ms_sflog_s.val_origin = ms_sflog_s.val
 | |
|                 ms_sflog_s.save()
 | |
| 
 | |
|                 # next_cal_dict = [mpoint.material.id, sflog.id, year, month, day, hour, year_s, month_s, day_s]
 | |
|                 # if next_cal_dict == cache.get('enm_cal_dict', None):
 | |
|                 #     next_cal = 0
 | |
|                 # else:
 | |
|                 #     next_cal = 1
 | |
|                 #     cache.set('enm_cal_dict', next_cal_dict, 60)
 | |
|                 cal_mpointstat_manual(mpoint.id, sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s, 0)
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstats(is_now=1, year=None, month=None, day=None, hour=None, m_code_list=[], cal_attrs=[]):
 | |
|     """
 | |
|     计算所有自动采集测点的统计值,默认当前小时, 可手动传入时间和测点编号集
 | |
|     cal_attrs: 需要计算的属性,空列表默认计算所有属性["material", "pcoal", "run_hour"]
 | |
|     """
 | |
| 
 | |
|     if year and month and day and hour is not None:
 | |
|         pass
 | |
|     else:
 | |
|         now, pre = get_current_and_previous_time()
 | |
|         if is_now:
 | |
|             year, month, day, hour = now.year, now.month, now.day, now.hour
 | |
|         else:
 | |
|             year, month, day, hour = pre.year, pre.month, pre.day, pre.hour
 | |
|     if m_code_list:
 | |
|         mpoints1 = Mpoint.objects.filter(code__in=m_code_list)
 | |
|         for item in mpoints1:
 | |
|             cal_mpointstat_hour(item.id, year, month, day, hour)
 | |
|         mpoints_related = Mpoint.objects.none()
 | |
|         code2 = []
 | |
|         for code in m_code_list:
 | |
|             mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True,  material__isnull=False, formula__contains='{' + code + '}')
 | |
|             code2.extend(mpoints_related.values_list('code', flat=True))
 | |
|         code2 = list(set(code2))
 | |
|         for code in code2:
 | |
|             mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True,  material__isnull=False, formula__contains='{' + code + '}')
 | |
|         mpoints_related = mpoints_related.distinct().order_by('report_sortstr', 'create_time')
 | |
|         for item in mpoints_related:
 | |
|             cal_mpointstat_hour(item.id, year, month, day, hour)
 | |
|     else:
 | |
|         # 先统计自动采集的测点
 | |
|         mpoints_auto = Mpoint.objects.filter(type=Mpoint.MT_AUTO, enabled=True)
 | |
|         # mpoints_without_formula_group = []
 | |
|         for item in mpoints_auto:
 | |
|             # mpoints_without_formula_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour))
 | |
|             cal_mpointstat_hour(item.id, year, month, day, hour)
 | |
| 
 | |
|         # 再统计计算测点
 | |
|         mpoints_compute = Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True).exclude(formula="").order_by('report_sortstr', 'create_time')
 | |
|         # mpoints_other_group = []
 | |
|         for item in mpoints_compute:
 | |
|             # mpoints_other_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour))
 | |
|             cal_mpointstat_hour(item.id, year, month, day, hour)
 | |
| 
 | |
|     # 先调整一下班时间,以防计算错误
 | |
|     # 为了保持一致使用统计的时间点
 | |
|     nowx = timezone.now()
 | |
|     get_total_sec_now(now=nowx)  # 再处理total_sec
 | |
|     cal_exp_duration_sec(now=nowx)  # 先处理shut_sec
 | |
|     
 | |
| 
 | |
|     # 开始计算enstat
 | |
|     mgroups = Mgroup.objects.filter(need_enm=True).order_by("sort")
 | |
|     # mgroups_group = []
 | |
|     year_s, month_s, day_s = 0, 0, 0
 | |
|     for mgroup in mgroups:
 | |
|         # mgroups_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['material', 'run_hour']))
 | |
|         year_s, month_s, day_s = cal_enstat("hour_s", None, mgroup.id, year, month, day, hour, None, None, None, True, cal_attrs)
 | |
| 
 | |
|     # mgroups_t = mgroups.filter(name__in=['回转窑', '水泥磨'])
 | |
|     # mgroups_t_group = []
 | |
|     # for mgroup in mgroups_t:
 | |
|     #     mgroups_t_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['pcoal']))
 | |
| 
 | |
|     # 最后计算enstat2
 | |
|     cal_enstat2(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s)
 | |
|     # task_chain = chain(group(mpoints_without_formula_group), group(mpoints_other_group), group(mgroups_group), group(mgroups_t_group), group([cal_enstat2.s(year_s=year, month_s=month)]))
 | |
|     # task_chain.delay()
 | |
|     if hour == 21:
 | |
|         enm_alarm.delay(year_s, month_s, day_s)
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstat_manual(mpointId: str, sflogId: str, mgroupId: str, year: int, month: int, day: int, hour: int, year_s: int, month_s: int, day_s: int, next_cal=0):
 | |
|     """
 | |
|     手动录入的测点数据进行往上统计,一级一级往上
 | |
|     """
 | |
|     mpoint = Mpoint.objects.get(id=mpointId)
 | |
|     mgroup = Mgroup.objects.get(id=mgroupId)
 | |
|     if sflogId:
 | |
|         params_day_s = {"type": "day_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup}
 | |
|         ms_day_s, _ = MpointStat.safe_get_or_create(**params_day_s, defaults=params_day_s)
 | |
|         if ms_day_s.val_correct is not None:
 | |
|             ms_day_s.val = ms_day_s.val_correct
 | |
|         else:
 | |
|             sum_dict_day_s = MpointStat.objects.filter(type="sflog", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum("val"))
 | |
|             ms_day_s.val = sum_dict_day_s['sum'] if sum_dict_day_s['sum'] is not None else 0
 | |
|             ms_day_s.val_origin = ms_day_s.val
 | |
|         ms_day_s.save()
 | |
| 
 | |
|     if day_s:
 | |
|         params_month_s = {"type": "month_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "mgroup": mgroup}
 | |
|         ms_month_s, _ = MpointStat.safe_get_or_create(**params_month_s, defaults=params_month_s)
 | |
|         if ms_month_s.val_correct is not None:
 | |
|             ms_month_s.val = ms_month_s.val_correct
 | |
|         else:
 | |
|             sum_dict_month_s = MpointStat.objects.filter(type="day_s", mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum("val"))
 | |
|             ms_month_s.val = sum_dict_month_s['sum'] if sum_dict_month_s['sum'] is not None else 0
 | |
|             ms_month_s.val_origin = ms_month_s.val
 | |
|         ms_month_s.save()
 | |
|     
 | |
|     if month_s:
 | |
|         params_year_s = {"type": "year_s", "mpoint": mpoint, "year_s": year_s, "mgroup": mgroup}
 | |
|         ms_year_s, _ = MpointStat.safe_get_or_create(**params_year_s, defaults=params_year_s)
 | |
|         if ms_year_s.val_correct is not None:
 | |
|             ms_year_s.val = ms_year_s.val_correct
 | |
|         else:
 | |
|             sum_dict_year_s = MpointStat.objects.filter(type="month_s", mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum("val"))
 | |
|             ms_year_s.val = sum_dict_year_s['sum'] if sum_dict_year_s['sum'] is not None else 0
 | |
|             ms_year_s.val_origin = ms_year_s.val
 | |
|         ms_year_s.save()
 | |
| 
 | |
|     if next_cal:  # 计算工段级数据,手动录入时调用
 | |
|         start_cal_type = 'hour_s'
 | |
|         if hour:
 | |
|             start_cal_type = 'hour_s'
 | |
|         elif sflogId:
 | |
|             start_cal_type = 'sflog'
 | |
|         elif day_s:
 | |
|             start_cal_type = 'day_s'
 | |
|         elif month_s:
 | |
|             start_cal_type = 'month_s'
 | |
|         elif year_s:
 | |
|             start_cal_type = 'year_s'
 | |
|         cal_enstat(start_cal_type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
 | |
| 
 | |
| 
 | |
| 
 | |
| types_list = ["hour_s", "sflog", "day_s", "month_st", "month_s", "year_s"]
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, cascade=True, cal_attrs=[]):
 | |
|     """
 | |
|     计算能源数据统计
 | |
|     """
 | |
|     if cascade:
 | |
|         if type in types_list:
 | |
|             start_index = types_list.index(type)
 | |
|             new_types_list = types_list[start_index:]
 | |
|             for type in new_types_list:
 | |
|                 year_s, month_s, day_s = cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, False)
 | |
|         return year_s, month_s, day_s
 | |
|     if not cal_attrs:
 | |
|         this_cal_attrs = ["material", "pcoal", "run_hour"]
 | |
|     else:
 | |
|         this_cal_attrs = cal_attrs[:]
 | |
| 
 | |
|     mgroup = Mgroup.objects.get(id=mgroupId)
 | |
|     sflog = None  # 默认无班次
 | |
|     if sflogId:
 | |
|         sflog = SfLog.objects.get(id=sflogId)
 | |
|     elif year and month and day and hour is not None:
 | |
|         mytz = tz.gettz(settings.TIME_ZONE)
 | |
|         dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
 | |
|         sflog = get_sflog(mgroup, dt)
 | |
|     team = None
 | |
|     if sflog:
 | |
|         year_s, month_s, day_s = sflog.get_ymd
 | |
|         team = sflog.team
 | |
|     # 如果没有班组,那么month_st无需计算
 | |
|     if team is None and type == "month_st":
 | |
|         return year_s, month_s, day_s
 | |
|     if type == "hour_s":
 | |
|         enstat, _ = EnStat.safe_get_or_create(
 | |
|             type="hour_s",
 | |
|             mgroup=mgroup,
 | |
|             year=year,
 | |
|             month=month,
 | |
|             day=day,
 | |
|             hour=hour,
 | |
|             defaults={"type": "hour_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s, "year": year, "month": month, "day": day, "hour": hour, "sflog": sflog},
 | |
|         )
 | |
|     elif type == "sflog":
 | |
|         enstat, _ = EnStat.safe_get_or_create(type="sflog", sflog=sflog, defaults={"type": "sflog", "sflog": sflog, "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s})
 | |
|     elif type == "day_s":
 | |
|         enstat, _ = EnStat.safe_get_or_create(
 | |
|             type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, defaults={"type": "day_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s}
 | |
|         )
 | |
|     elif type == "month_st":
 | |
|         enstat, _ = EnStat.safe_get_or_create(
 | |
|             type="month_st", mgroup=mgroup, team=team, year_s=year_s, month_s=month_s, defaults={"type": "month_st", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "team": team}
 | |
|         )
 | |
|     elif type == "month_s":
 | |
|         enstat, _ = EnStat.safe_get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, defaults={"type": "month_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s})
 | |
|     elif type == "year_s":
 | |
|         enstat, _ = EnStat.safe_get_or_create(type="year_s", mgroup=mgroup, year_s=year_s, defaults={"type": "year_s", "mgroup": mgroup, "year_s": year_s})
 | |
| 
 | |
|     if "material" in this_cal_attrs:
 | |
|         # 消耗物料统计(包括电耗)
 | |
|         input_materials = []
 | |
|         input_materials = mgroup.input_materials
 | |
|         input_materials_dict = {}
 | |
|         has_p_cost = False
 | |
|         if mgroup.product:
 | |
|             input_materials_dict[mgroup.product.id] = "P"
 | |
|         if mgroup.product_cost:
 | |
|             input_materials_dict[mgroup.product_cost.id] = "P_COST"
 | |
|             has_p_cost = True
 | |
|         input_materials_dict.update({i : "M" for i in input_materials if i not in input_materials_dict})
 | |
|         imaterial_cost_unit = 0
 | |
|         imaterial_data = []
 | |
|         imaterial_data_dict = {}
 | |
|         for mid, mtype in input_materials_dict.items():
 | |
|             material = Material.objects.get(id=mid)
 | |
|             if type == "hour_s":
 | |
|                 mps = MpointStat.objects.filter(type="hour_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material)
 | |
|             elif type == "sflog":
 | |
|                 mps = MpointStat.objects.filter(type="sflog", sflog=sflog, mpoint__material=material)
 | |
|             elif type == "day_s":
 | |
|                 mps = MpointStat.objects.filter(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material)
 | |
|             elif type == "month_st":
 | |
|                 mps = MpointStat.objects.filter(type="sflog", mgroup=mgroup, sflog__team=team, year_s=year_s, month_s=month_s, mpoint__material=material)
 | |
|             elif type == "month_s":
 | |
|                 mps = MpointStat.objects.filter(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material)
 | |
|             elif type == "year_s":
 | |
|                 mps = MpointStat.objects.filter(type="year_s", mgroup=mgroup, year_s=year_s, mpoint__material=material)
 | |
|             if mps.filter(mpoint__is_rep_mgroup=True).exists():
 | |
|                 mps = mps.filter(mpoint__is_rep_mgroup=True)
 | |
|             amount_consume = mps.aggregate(sum=Sum("val"))["sum"]
 | |
|             if amount_consume is None:
 | |
|                 amount_consume = 0
 | |
|             if mtype == "P":  # 如果是产量
 | |
|                 enstat.total_production = amount_consume
 | |
|             elif mtype == "P_COST":  # 如果是产量(用于计算成本)
 | |
|                 enstat.total_production_cost = amount_consume
 | |
|             else:
 | |
|                 if material.code in ["cair", "steam"]:
 | |
|                     price_unit = 0
 | |
|                 else:
 | |
|                     price_unit = get_price_unit(material, year_s, month_s)
 | |
|                 cost = amount_consume * price_unit
 | |
|                 try:
 | |
|                     cost_unit = cost / enstat.total_production if has_p_cost is False else cost / enstat.total_production_cost
 | |
|                 except ZeroDivisionError:
 | |
|                     cost_unit = 0
 | |
|                 imaterial_cost_unit = imaterial_cost_unit + cost_unit
 | |
|                 if material.code == "elec":
 | |
|                     enstat.elec_consume = amount_consume
 | |
|                     enstat.elec_coal_consume = enstat.elec_consume * 0.1229 / 1000
 | |
|                     try:
 | |
|                         enstat.elec_consume_unit = enstat.elec_consume / enstat.total_production
 | |
|                     except ZeroDivisionError:
 | |
|                         enstat.elec_consume_unit = 0
 | |
|                 elif material.code == "water":
 | |
|                     enstat.water_consume = amount_consume
 | |
|                 elif material.code == "pcoal":
 | |
|                     enstat.pcoal_consume = amount_consume
 | |
|                 elif material.code == "cair":
 | |
|                     enstat.cair_consume = amount_consume
 | |
|                 elif material.code == "steam":
 | |
|                     enstat.out_steam = amount_consume
 | |
|                     enstat.out_steam_coal = enstat.out_steam * 128.6 / 1000
 | |
|                 elif material.code == "ammonia":
 | |
|                     enstat.ammonia_consume = amount_consume
 | |
|                 elif material.code == "ccr":
 | |
|                     enstat.ccr_consume = amount_consume
 | |
|                     enstat.kiln_end_heat = enstat.total_production - enstat.ccr_consume
 | |
| 
 | |
|                 imaterial_item = {
 | |
|                     "material": mid,
 | |
|                     "material_name": material.name,
 | |
|                     "material_code": material.code,
 | |
|                     "material_type": material.type,
 | |
|                     "price_unit": price_unit,
 | |
|                     "amount_consume": amount_consume,
 | |
|                     "cost": cost,
 | |
|                     "cost_unit": cost_unit,
 | |
|                 }
 | |
|                 imaterial_data.append(imaterial_item)
 | |
|                 imaterial_data_dict.setdefault(material.name, imaterial_item)
 | |
|         enstat.imaterial_data = imaterial_data
 | |
|         enstat.imaterial_data_dict = imaterial_data_dict
 | |
|         # 其他成本数据
 | |
|         other_cost_data = []
 | |
|         other_cost_unit = 0
 | |
|         fee_qs = Fee.objects.order_by("sort")
 | |
|         for fee in fee_qs:
 | |
|             item = {"element": fee.element, "cate": fee.cate, "name": fee.name, "id": fee.id}
 | |
|             item["cost_unit"] = get_cost_unit(mgroup, fee, year_s, month_s)
 | |
|             other_cost_unit = other_cost_unit + item["cost_unit"]
 | |
|             other_cost_data.append(item)
 | |
|         enstat.other_cost_data = other_cost_data
 | |
|         enstat.production_cost_unit = imaterial_cost_unit + other_cost_unit
 | |
|         enstat.save()
 | |
|         if mgroup.cate == 'section':
 | |
|             # 更新所监测设备测点的total_production
 | |
|             total_production = enstat.total_production
 | |
|             if total_production == 0:
 | |
|                 elec_consume_unit = 0
 | |
|             else:
 | |
|                 elec_consume_unit = F("val") / total_production
 | |
|             MpointStat.objects.filter(
 | |
|                 mgroup=enstat.mgroup, mpoint__material__code="elec", type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour
 | |
|             ).update(total_production=total_production, elec_consume_unit=elec_consume_unit)
 | |
|             # 更新余热发电的production_elec_unit
 | |
|             if mgroup.name == "回转窑":
 | |
|                 if enstat.total_production == 0:
 | |
|                     production_elec_unit_yr = 0
 | |
|                 else:
 | |
|                     production_elec_unit_yr = F("total_production") / enstat.total_production
 | |
|                 EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="余热发电").update(
 | |
|                     production_elec_unit=production_elec_unit_yr
 | |
|                 )
 | |
| 
 | |
|     # 其他一些触发计算
 | |
|     if mgroup.cate == "section":
 | |
|         if "material" in this_cal_attrs and mgroup.name != "回转窑":
 | |
|             try:
 | |
|                 enstat.en_consume_unit = enstat.elec_coal_consume / enstat.total_production
 | |
|             except ZeroDivisionError:
 | |
|                 enstat.en_consume_unit = 0
 | |
|             enstat.save()
 | |
|         # 计算一些其他数据
 | |
|         if type == "month_st" and "material" in this_cal_attrs:  # 如果计算的是班月,把主要设备电耗数据拉过来, 方便查询
 | |
|             res = MpointStat.objects.filter(type='sflog', year_s=year_s, month_s=month_s, sflog__team=enstat.team, mpoint__material__code='elec', mpoint__ep_monitored__isnull=False).values(
 | |
|                 equipment=F('mpoint__ep_monitored__id'), equipment_name=F('mpoint__ep_monitored__name')).annotate(consume=Sum('val'))
 | |
|             res = list(res)
 | |
|             for item in res:
 | |
|                 try:
 | |
|                     item['consume_unit'] = item['consume'] / enstat.total_production
 | |
|                 except ZeroDivisionError:
 | |
|                     item['consume_unit'] = None
 | |
|             enstat.equip_elec_data = res
 | |
|             enstat.save()
 | |
|         # 开始计算互相影响的数据
 | |
|         if mgroup.code == "ylm":
 | |
|             if enstat.type in ["hour_s", "day_s", "year_s", "month_s"]:
 | |
|                 hzy_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
 | |
|                 if hzy_enstat:
 | |
|                     try:
 | |
|                         hzy_enstat.celec_consume_unit = (hzy_enstat.elec_consume + enstat.elec_consume) / hzy_enstat.total_production
 | |
|                     except ZeroDivisionError:
 | |
|                         hzy_enstat.celec_consume_unit = 0
 | |
|                     hzy_enstat.save()
 | |
|         if enstat.mgroup.name == "回转窑":  # 算单位产品(综合电耗/标煤耗/综合能耗)
 | |
|             # 综合电耗
 | |
|             if enstat.type in ["hour_s", "day_s", "year_s", "month_s"]:
 | |
|                 ylm_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__code="ylm)").first()
 | |
|                 if ylm_enstat:
 | |
|                     try:
 | |
|                         enstat.celec_consume_unit = (enstat.elec_consume +  ylm_enstat.elec_consume)/enstat.total_production
 | |
|                     except ZeroDivisionError:
 | |
|                         enstat.celec_consume_unit = 0
 | |
|                     enstat.save()
 | |
|                 # pre_enstat2 = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="煤磨").first()
 | |
|                 # if pre_enstat2:
 | |
|                 #     try:
 | |
|                 #         pre_enstat2.production_elec_consume_unit = pre_enstat2.elec_consume / enstat.total_production
 | |
|                 #     except ZeroDivisionError:
 | |
|                 #         pre_enstat2.production_elec_consume_unit = 0
 | |
|                 #     pre_enstat2.save(update_fields=["production_elec_consume_unit"])
 | |
|                 #     enstat.celec_consume_unit += pre_enstat2.production_elec_consume_unit
 | |
|             # 算标煤耗
 | |
|             if "pcoal" in this_cal_attrs:
 | |
|                 if type in ["hour_s", "sflog", "day_s"]:
 | |
|                     enstat.pcoal_heat = get_pcoal_heat(enstat.year_s, enstat.month_s, enstat.day_s)
 | |
|                     enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 29307.6
 | |
|                 elif type == "month_st":
 | |
|                     enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s, sflog__team=enstat.team).aggregate(
 | |
|                         sum=Sum("pcoal_coal_consume")
 | |
|                     )["sum"]
 | |
|                 elif type == "month_s":
 | |
|                     enstat.pcoal_coal_consume = EnStat.objects.filter(type="day_s", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
 | |
|                 elif type == "year_s":
 | |
|                     enstat.pcoal_coal_consume = EnStat.objects.filter(type="month_s", mgroup=enstat.mgroup, year_s=year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
 | |
| 
 | |
|                 if enstat.pcoal_coal_consume is None:
 | |
|                     enstat.pcoal_coal_consume = 0
 | |
|                 
 | |
|                 oil_consume = enstat.imaterial_data_dict.get("柴油", {}).get("amount_consume", 0)
 | |
| 
 | |
|                 # 算单位产品标煤耗
 | |
|                 try:
 | |
|                     enstat.coal_consume_unit = (enstat.pcoal_coal_consume + oil_consume*1.46) * 1000 / enstat.total_production
 | |
|                 except ZeroDivisionError:
 | |
|                     enstat.coal_consume_unit = 0
 | |
| 
 | |
|                 # 综合能耗
 | |
|                 enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit
 | |
|                 enstat.save()
 | |
| 
 | |
|                 # 触发水泥磨综合能耗
 | |
|                 snm_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥磨").first()
 | |
|                 if snm_enstat:
 | |
|                     hzy_enstat = enstat
 | |
|                     snbz_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥包装").first()
 | |
|                     snbz_enstat_elec_consume = snbz_enstat.elec_consume if snbz_enstat else 0
 | |
|                     hzy_enstat_cen_consume_unit = hzy_enstat.cen_consume_unit if hzy_enstat else 0
 | |
|                     if snm_enstat.total_production == 0:
 | |
|                         snm_enstat_cen_consume_unit = 0
 | |
|                     else:
 | |
|                         slpb = snm_enstat.imaterial_data_dict.get("入磨熟料", {}).get("amount_consume", 0) / snm_enstat.total_production
 | |
|                         snm_enstat_cen_consume_unit = ((snm_enstat.elec_consume + snbz_enstat_elec_consume)/snm_enstat.total_production) * 0.1229 + slpb * hzy_enstat_cen_consume_unit
 | |
|                     snm_enstat.cen_consume_unit = snm_enstat_cen_consume_unit
 | |
|                     snm_enstat.save()
 | |
| 
 | |
|         elif enstat.mgroup.name == "水泥磨" and enstat.type not in ["month_st", "sflog"] and "pcoal" in this_cal_attrs:
 | |
|             hzy_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
 | |
|             snbz_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥包装").first()
 | |
|             snbz_enstat_elec_consume = snbz_enstat.elec_consume if snbz_enstat else 0
 | |
|             hzy_enstat_cen_consume_unit = hzy_enstat.cen_consume_unit if hzy_enstat else 0
 | |
|             if enstat.total_production == 0:
 | |
|                 enstat_cen_consume_unit = 0
 | |
|             else:
 | |
|                 slpb = enstat.imaterial_data_dict.get("入磨熟料", {}).get("amount_consume", 0) / enstat.total_production
 | |
|                 enstat_cen_consume_unit = ((enstat.elec_consume + snbz_enstat_elec_consume)/enstat.total_production) * 0.1229 + slpb * hzy_enstat_cen_consume_unit
 | |
|             enstat.cen_consume_unit = enstat_cen_consume_unit
 | |
|             enstat.save()
 | |
|         elif enstat.mgroup.name == "水泥包装" and enstat.type not in ["month_st", "sflog"] and "pcoal" in this_cal_attrs:
 | |
|             snm_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥磨").first()
 | |
|             if snm_enstat:
 | |
|                 hzy_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
 | |
|                 snbz_enstat = enstat
 | |
|                 snbz_enstat_elec_consume = snbz_enstat.elec_consume if snbz_enstat else 0
 | |
|                 hzy_enstat_cen_consume_unit = hzy_enstat.cen_consume_unit if hzy_enstat else 0
 | |
|                 if snm_enstat.total_production == 0:
 | |
|                     snm_enstat_cen_consume_unit = 0
 | |
|                 else:
 | |
|                     slpb = snm_enstat.imaterial_data_dict.get("入磨熟料", {}).get("amount_consume", 0) / snm_enstat.total_production
 | |
|                     snm_enstat_cen_consume_unit = ((snm_enstat.elec_consume + snbz_enstat_elec_consume)/snm_enstat.total_production) * 0.1229 + slpb * hzy_enstat_cen_consume_unit
 | |
|                 snm_enstat.cen_consume_unit = snm_enstat_cen_consume_unit
 | |
|                 snm_enstat.save()
 | |
|         elif enstat.mgroup.name == '余热发电':
 | |
|             hzy_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first()
 | |
|             if hzy_enstat:
 | |
|                 # 吨熟料发电量
 | |
|                 try:
 | |
|                     enstat.production_elec_unit = enstat.total_production / hzy_enstat.total_production
 | |
|                 except ZeroDivisionError:
 | |
|                     enstat.production_elec_unit = 0
 | |
|                 enstat.save()
 | |
|         # 运转时长相关
 | |
|         if type != "hour_s" and "run_hour" in this_cal_attrs:
 | |
|             enstat.total_sec_now, enstat.shut_sec = get_total_sec_now_and_shut_sec(enstat)
 | |
|             enstat.run_sec = enstat.total_sec_now - enstat.shut_sec if enstat.total_sec_now - enstat.shut_sec >0 else 0
 | |
|             try:
 | |
|                 enstat.run_rate = (enstat.run_sec / enstat.total_sec_now) * 100
 | |
|             except ZeroDivisionError:
 | |
|                 enstat.run_rate = 0
 | |
|             try:
 | |
|                 enstat.production_hour = enstat.total_production * 3600 / enstat.run_sec
 | |
|             except ZeroDivisionError:
 | |
|                 enstat.production_hour = 0
 | |
|             enstat.save()
 | |
|     return year_s, month_s, day_s  # 返回这个值主要为了全厂数据计算而用
 | |
| 
 | |
| 
 | |
| def get_total_sec_now_and_shut_sec(enstat: EnStat):
 | |
|     from apps.wpm.models import SfLog
 | |
| 
 | |
|     # if enstat.type == 'hour_s':
 | |
|     #     # 找到停机记录,并划分到该小时
 | |
|     #     end_time = datetime.datetime(enstat.year, enstat.month, enstat.day, enstat.hour)
 | |
|     #     start_time = end_time - datetime.timedelta(hours=1)
 | |
|     #     sts = StLog.objects.filter(mgroup=enstat.mgroup)
 | |
|     #     sts = (sts.filter(start_time__gt=start_time, start_time__lt=end_time)|sts.filter(start_time__lt=start_time, end_time=None)|sts.filter(end_time__gt=start_time, end_time__lt=end_time)).distinct()
 | |
|     #     shut_hour = 0
 | |
|     #     for i in sts:
 | |
|     #         if i.end_time is None:
 | |
|     #             run_hour = 0
 | |
|     #             if i.start_time > start_time:
 | |
|     #                 run_hour = (i.start_time - start_time).total_seconds/3600
 | |
|     #             shut_hour = 1 - run_hour
 | |
|     #         shut_hour = shut_hour +
 | |
|     #     return 1, 0
 | |
|     if enstat.type == "sflog":
 | |
|         sflog = enstat.sflog
 | |
|         return sflog.total_sec_now, sflog.shut_sec
 | |
|     elif enstat.type == "day_s":
 | |
|         res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, work_date__day=enstat.day_s, mgroup=enstat.mgroup).aggregate(
 | |
|             sum1=Sum("total_sec_now"), sum2=Sum("shut_sec")
 | |
|         )
 | |
|         return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
 | |
|     elif enstat.type == "month_st":
 | |
|         res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup, team=enstat.team).aggregate(
 | |
|             sum1=Sum("total_sec_now"), sum2=Sum("shut_sec")
 | |
|         )
 | |
|         return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
 | |
|     elif enstat.type == "month_s":
 | |
|         res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_sec_now"), sum2=Sum("shut_sec"))
 | |
|         return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
 | |
|     elif enstat.type == "year_s":
 | |
|         res = SfLog.objects.filter(work_date__year=enstat.year_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_sec_now"), sum2=Sum("shut_sec"))
 | |
|         return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_enstat2(type: str, year_s: int, month_s: int, day_s: int, cascade=True):
 | |
|     if cascade:
 | |
|         if type == "day_s":
 | |
|             cal_enstat2("day_s", year_s, month_s, day_s, False)
 | |
|             cal_enstat2("month_s", year_s, month_s, day_s, False)
 | |
|         elif type == "month_s":
 | |
|             cal_enstat2("month_s", year_s, month_s, day_s, False)
 | |
|         else:
 | |
|             return
 | |
|     if type == "month_s":
 | |
|         enstat2, _ = EnStat2.safe_get_or_create(type="month_s", year_s=year_s, month_s=month_s, defaults={"year_s": year_s, "month_s": month_s, "type": "month_s"})
 | |
|     elif type == "day_s":
 | |
|         enstat2, _ = EnStat2.safe_get_or_create(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s, defaults={"year_s": year_s, "month_s": month_s, "day_s": day_s, "type": "day_s"})
 | |
|     # enstat2 = EnStat2.objects.select_for_update().get(id=enstat2.id)  # 加锁
 | |
|     try:
 | |
|         material_bulk_clinker = Material.objects.get(code="bulk_clinker") # 散装熟料
 | |
|         enstat2.bulk_clinker_price = get_price_unit(material_bulk_clinker, year_s, month_s)
 | |
|     except ObjectDoesNotExist:
 | |
|         enstat2.bulk_clinker_price = 0
 | |
|     try:
 | |
|         material_bulk_cement = Material.objects.get(code="bulk_cement") # 散装水泥
 | |
|         enstat2.bulk_cement_price = get_price_unit(material_bulk_cement, year_s, month_s)
 | |
|     except ObjectDoesNotExist:
 | |
|         enstat2.bulk_cement_price = 0
 | |
|     try:
 | |
|         material_bag_cement = Material.objects.get(code="bag_cement") # 袋装水泥
 | |
|         enstat2.bag_cement_price = get_price_unit(material_bag_cement, year_s, month_s)
 | |
|     except ObjectDoesNotExist:
 | |
|         enstat2.bag_cement_price = 0
 | |
| 
 | |
| 
 | |
|     if type == "month_s":
 | |
|         enstat2.bulk_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material__code="bulk_cement", year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"]
 | |
|     elif type == "day_s":
 | |
|         enstat2.bulk_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material__code="bulk_cement", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"]
 | |
|     if enstat2.bulk_cement_val is None:
 | |
|         enstat2.bulk_cement_val = 0
 | |
| 
 | |
|     if type == "month_s":
 | |
|         enstat2.bag_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material__code='bag_cement', year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"]
 | |
|     elif type == "day_s":
 | |
|         enstat2.bag_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material__code='bag_cement', year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"]
 | |
|     if enstat2.bag_cement_val is None:
 | |
|         enstat2.bag_cement_val = 0
 | |
| 
 | |
|     
 | |
|     if type == "month_s":
 | |
|         enstat2.bulk_clinker_val = MpointStat.objects.filter(type="month_s", mpoint__material__code='bulk_clinker', year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"]
 | |
|     elif type == "day_s":
 | |
|         enstat2.bulk_clinker_val = MpointStat.objects.filter(type="day_s", mpoint__material__code='bulk_clinker', year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"]
 | |
|     if enstat2.bulk_clinker_val is None:
 | |
|         enstat2.bulk_clinker_val = 0
 | |
| 
 | |
|     enstat2.industry_total_val = (
 | |
|         enstat2.bulk_cement_val * enstat2.bulk_cement_price + 
 | |
|         enstat2.bag_cement_val * enstat2.bag_cement_price + 
 | |
|         enstat2.bulk_clinker_val * enstat2.bulk_clinker_price) / 10000
 | |
| 
 | |
|     # 出磨水泥产量
 | |
|     if type == "month_s":
 | |
|         res = EnStat.objects.filter(mgroup__product__code="cement", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
 | |
|     elif type == "day_s":
 | |
|         res = EnStat.objects.filter(mgroup__product__code="cement", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
 | |
|     
 | |
|     enstat2.cement_val = res["sum"] if res["sum"] else 0
 | |
|     enstat2.cement_cost_unit = res["avg"] if res["avg"] else 0
 | |
| 
 | |
|     # 出窑熟料产量
 | |
|     if type == "month_s":
 | |
|         res = EnStat.objects.filter(mgroup__product__code="clinker", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
 | |
|     elif type == "day_s":
 | |
|         res = EnStat.objects.filter(mgroup__product__code="clinker", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
 | |
|     
 | |
|     # 出窑熟料成本
 | |
|     enstat2.cliker_price_cost = res["avg"] if res["avg"] else 0
 | |
|     enstat2.clinker_val = res["sum"] if res["sum"] else 0
 | |
| 
 | |
|     # 出厂总产量
 | |
|     if type == "month_s":
 | |
|         res = EnStat.objects.filter(mgroup__product__code="out_cement", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
 | |
|     elif type == "day_s":
 | |
|         res = EnStat.objects.filter(mgroup__product__code="out_cement", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit"))
 | |
|     
 | |
|     enstat2.out_cement_val = res["sum"] if res["sum"] else 0
 | |
|     # enstat2.out_cement_cost_unit = res["avg"] if res["avg"] else 0
 | |
| 
 | |
|     # enstat2.out_cement_val = enstat2.bag_cement_val + enstat2.bulk_cement_val + enstat2.bulk_clinker_val
 | |
| 
 | |
|     
 | |
|     enstat2.industry_add_val = enstat2.industry_total_val - enstat2.out_cement_val * enstat2.cement_cost_unit / 10000 - enstat2.bulk_clinker_val*enstat2.cliker_price_cost / 10000
 | |
| 
 | |
|     # 全厂电量
 | |
|     # 全厂的耗电量得单独处理
 | |
|     use_mpoint_elec_val = False
 | |
|     mp_elecs = Mpoint.objects.filter(material__code="elec", code__endswith='__all')
 | |
|     if mp_elecs.exists():  # 
 | |
|         use_mpoint_elec_val = True
 | |
|     if type == "month_s":
 | |
|         enstat_qs = EnStat.objects.filter(type="month_s", year_s=year_s, month_s=month_s)
 | |
|     elif type == "day_s":
 | |
|         enstat_qs = EnStat.objects.filter(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s)
 | |
|     res_elec_pcoal = enstat_qs.aggregate(
 | |
|         sum1=Sum("elec_consume"), sum2=Sum("elec_coal_consume"), sum3=Sum("pcoal_consume"), sum4=Sum("pcoal_coal_consume"), sum5=Sum("water_consume"), sum6=Sum("cair_consume")
 | |
|     )
 | |
|     if use_mpoint_elec_val:
 | |
|         if type == 'day_s':
 | |
|             enstat2.elec_consume = MpointStat.objects.filter(type='day_s', mpoint__in=mp_elecs, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))['sum']
 | |
|         elif type == 'month_s':
 | |
|             enstat2.elec_consume = MpointStat.objects.filter(type='month_s', mpoint__in=mp_elecs, year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))['sum']
 | |
|         if enstat2.elec_consume is None:
 | |
|             enstat2.elec_consume = 0
 | |
|     else:
 | |
|         enstat2.elec_consume = res_elec_pcoal["sum1"] if res_elec_pcoal["sum1"] else 0
 | |
|     enstat2.elec_coal_consume = enstat2.elec_consume * 0.1229 / 1000
 | |
| 
 | |
|     # 其他的统计工段合就行
 | |
|     enstat2.pcoal_consume = res_elec_pcoal["sum3"] if res_elec_pcoal["sum3"] else 0
 | |
|     enstat2.pcoal_coal_consume = res_elec_pcoal["sum4"] if res_elec_pcoal["sum4"] else 0
 | |
|     enstat2.water_consume = res_elec_pcoal["sum5"] if res_elec_pcoal["sum5"] else 0
 | |
|     enstat2.cair_consume = res_elec_pcoal["sum6"] if res_elec_pcoal["sum6"] else 0
 | |
|     enstat2.en_consume = enstat2.pcoal_coal_consume + enstat2.elec_coal_consume
 | |
|     try:
 | |
|         enstat2.en_consume_unit = enstat2.en_consume / enstat2.industry_total_val
 | |
|     except ZeroDivisionError:
 | |
|         enstat2.en_consume_unit = 0
 | |
|     try:
 | |
|         enstat2.en_add_consume_unit = enstat2.en_consume / enstat2.industry_add_val
 | |
|     except ZeroDivisionError:
 | |
|         enstat2.en_add_consume_unit = 0
 | |
|     enstat2.save()
 | |
| 
 | |
| 
 | |
| def cal_enstat_pcoal_change(enstat: EnStat, new_pcoal_heat):
 | |
|     type = enstat.type
 | |
|     if type in ["hour_s", "sflog", "day_s"]:
 | |
|         enstat.pcoal_heat = new_pcoal_heat
 | |
|         enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 29307.6
 | |
|     elif type == "month_st":
 | |
|         enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s, sflog__team=enstat.team).aggregate(
 | |
|             sum=Sum("pcoal_coal_consume")
 | |
|         )["sum"]
 | |
|     elif type == "month_s":
 | |
|         enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
 | |
|     elif type == "year_s":
 | |
|         enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"]
 | |
| 
 | |
|     if enstat.pcoal_coal_consume is None:
 | |
|         enstat.pcoal_coal_consume = 0
 | |
| 
 | |
|     oil_consume = enstat.imaterial_data_dict.get("柴油", {}).get("amount_consume", 0)
 | |
| 
 | |
|     # 算单位产品标煤耗
 | |
|     try:
 | |
|         enstat.coal_consume_unit = (enstat.pcoal_coal_consume + oil_consume*1.46) * 1000 / enstat.total_production
 | |
|     except ZeroDivisionError:
 | |
|         enstat.coal_consume_unit = 0
 | |
| 
 | |
|     # 综合能耗
 | |
|     enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit
 | |
|     # enstat.save()
 | |
|     enstat.save(update_fields=["pcoal_heat", "pcoal_coal_consume", "coal_consume_unit", "cen_consume_unit"])
 | |
| 
 | |
|     # 同步更新水泥磨的综合能耗,这步有可能不成功,因为水泥磨是后算的, 但是当pcoal_change时这个就有用了
 | |
|     if type not in ["month_st", "sflog"]:
 | |
|         snm_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥磨").first()
 | |
|         if snm_enstat:
 | |
|             hzy_enstat = enstat
 | |
|             snbz_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥包装").first()
 | |
|             snbz_enstat_elec_consume = snbz_enstat.elec_consume if snbz_enstat else 0
 | |
|             hzy_enstat_cen_consume_unit = hzy_enstat.cen_consume_unit if hzy_enstat else 0
 | |
|             if snm_enstat.total_production == 0:
 | |
|                 snm_enstat_cen_consume_unit = 0
 | |
|             else:
 | |
|                 slpb = snm_enstat.imaterial_data_dict.get("入磨熟料", {}).get("amount_consume", 0) / snm_enstat.total_production
 | |
|                 snm_enstat_cen_consume_unit = ((snm_enstat.elec_consume + snbz_enstat_elec_consume)/snm_enstat.total_production) * 0.1229 + slpb * hzy_enstat_cen_consume_unit
 | |
|             snm_enstat.cen_consume_unit = snm_enstat_cen_consume_unit
 | |
|             snm_enstat.save()
 | |
| 
 | |
| enm_alarms_list = [
 | |
| 
 | |
|     ["电石渣", "elec_consume_unit", "单位产品分布电耗"],
 | |
|     ["生料工序(二次配料)", "elec_consume_unit", "单位产品分布电耗"],
 | |
|     ["原料磨", "elec_consume_unit", "单位产品分布电耗"],
 | |
|     ["回转窑", "celec_consume_unit", "单位产品综合电耗"], 
 | |
|     ["回转窑", "coal_consume_unit", "单位产品标煤耗"], 
 | |
|     ["煤磨", "elec_consume_unit", "单位产品分布电耗"],
 | |
|     ["水泥磨", "elec_consume_unit", "单位产品分布电耗"],
 | |
|     ["水泥包装", "elec_consume_unit", "单位产品分布电耗"]
 | |
| ]
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def enm_alarm(year_s: int, month_s: int, day_s: int):
 | |
|     """
 | |
|     enm报警任务
 | |
|     """
 | |
|     from apps.ecm.models import Event, EventCate, Eventdo
 | |
|     from apps.mtm.models import Goal
 | |
|     from apps.ecm.service import notify_event
 | |
| 
 | |
|     now = timezone.now()
 | |
|     event_cate, _ = EventCate.safe_get_or_create(code="consume_exceed", defaults={"name": "能耗超过目标值", "code": "consume_exceed", "trigger": 30})
 | |
|     for item in enm_alarms_list:
 | |
|         mgroups = Mgroup.objects.filter(name=item[0])
 | |
|         for mgroup in mgroups:
 | |
|             enstat = EnStat.objects.filter(mgroup=mgroup, type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).first()
 | |
|             if enstat:
 | |
|                 mgroup_name = item[0]
 | |
|                 goal_cate_str = item[1]
 | |
|                 goal_cate_str_chin = item[2]
 | |
|                 real_val = getattr(enstat, goal_cate_str, None)
 | |
|                 goal = Goal.objects.filter(goal_cate__code=goal_cate_str, year=year_s, mgroup=mgroup).first()
 | |
|                 if goal:
 | |
|                     goal_val = getattr(goal, f"goal_val_{month_s}", None)
 | |
|                     if goal_val and real_val and real_val > goal_val:  # 触发事件
 | |
|                         event = Event()
 | |
|                         event.obj_cate = "enm"
 | |
|                         event.happen_time = now
 | |
|                         event.voice_msg = f"{mgroup_name}{item[2]}超过设定目标值"
 | |
|                         event.enm_data = {
 | |
|                             "mgroup": mgroup.id,
 | |
|                             "mgroup_name": mgroup.name,
 | |
|                             "type": f"{goal_cate_str}.exceed",
 | |
|                             "type_chin": f"{goal_cate_str_chin}-超过设定目标值",
 | |
|                             "year_s": year_s,
 | |
|                             "month_s": month_s,
 | |
|                             "day_s": day_s,
 | |
|                             "val": real_val,
 | |
|                             "goal_val": goal_val,
 | |
|                             "enstat": enstat.id
 | |
|                         }
 | |
|                         event.save()
 | |
|                         Eventdo.safe_get_or_create(cate=event_cate, event=event, defaults={"cate": event_cate, "event": event})
 | |
|                         notify_event(event)
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def king_insert_mplogx():
 | |
|     mpoint_codes = Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, code__startswith='K_').values_list('code', flat=True)
 | |
|     ml = []
 | |
|     for m in mpoint_codes:
 | |
|         ml.append({"N": m.replace('K_', '')})
 | |
|     _, res = kingClient.request(**kapis['read_batchtagrealvalue'], json={"objs": ml})
 | |
|     insert_mplogx_from_king_rest_chunk(res)
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def check_mpoint_offline(seconds=100):
 | |
|     """监测测点采集掉线"""
 | |
|     now = localtime()
 | |
|     for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, is_unit=False)| Mpoint.objects.filter(enabled=True, type=Mpoint.MT_COMPUTE, is_rep_ep_running_state=True, is_unit=False):
 | |
|         mc = MpointCache(mpoint.code)
 | |
|         mpoint_data = mc.data
 | |
|         last_data = mpoint_data.get('last_data', None)
 | |
|         is_offline = True
 | |
|         if last_data and last_data['last_timex'] and (now-last_data['last_timex']).total_seconds() < seconds:
 | |
|             is_offline = False
 | |
|         if is_offline:
 | |
|             mc.set_fail(-2, now) |