662 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			662 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
| # Create your tasks here
 | |
| from __future__ import absolute_import, unicode_literals
 | |
| from apps.utils.tasks import CustomTask
 | |
| from celery import shared_task, group, chain
 | |
| from apps.utils.sql import DbConnection
 | |
| from server.settings import get_sysconfig, update_sysconfig
 | |
| import importlib
 | |
| from django.core.cache import cache
 | |
| from apps.enm.models import MpLog, Mpoint, MpointStat, EnStat, EnStat2
 | |
| from apps.wpm.models import SfLog, StLog
 | |
| import datetime
 | |
| from django.db.models import Sum, Avg
 | |
| from dateutil import tz
 | |
| from django.conf import settings
 | |
| from apps.wpm.services import make_sflogs
 | |
| from apps.mtm.models import Mgroup, Material
 | |
| from apps.fim.services import get_cost_unit, get_price_unit
 | |
| from apps.fim.models import Fee
 | |
| from django.core.cache import cache
 | |
| from apps.enm.services import translate_eval_formula
 | |
| import logging
 | |
| from django.db.models import F
 | |
| from apps.wpm.services import get_pcoal_heat
 | |
| import traceback
 | |
| from django.utils import timezone
 | |
| myLogger = logging.getLogger('log')
 | |
| 
 | |
| def get_current_and_previous_time():
 | |
|     now = datetime.datetime.now()
 | |
|     pre = now - datetime.timedelta(hours=1)
 | |
|     return now, pre
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def get_tag_val():
 | |
|     config = get_sysconfig()
 | |
|     with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
 | |
|         last_tag_id = config['enm'].get('last_tag_id', None)
 | |
|         if not last_tag_id:
 | |
|             mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
 | |
|             if mr is None:
 | |
|                 last_tag_id = 0
 | |
|             else:
 | |
|                 last_tag_id = mr.tag_id
 | |
|                 update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
 | |
|         cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by update_time, id", (last_tag_id))
 | |
|         results = cursor.fetchall()  # 获取数据后保存至本地
 | |
|         need_func = {}  # 需要执行测点函数的字典, 此种情况只执行一次
 | |
|         for row in results:
 | |
|             mr_one = MpLog()
 | |
|             mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
 | |
|             mpoint, _  = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
 | |
|             mr_one.mpoint = mpoint
 | |
|             mr_one.save()
 | |
|             last_tag_id = mr_one.tag_id
 | |
|             if mpoint.func_on_change:
 | |
|                 need_func[mpoint.id] = mr_one.id
 | |
|         # 执行测点函数
 | |
|         for key in need_func:
 | |
|             mpoint_val_on_change.delay(need_func[key])
 | |
|         update_sysconfig({'enm': {'last_tag_id': last_tag_id}})
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def mpoint_val_on_change(mpLogId: str):
 | |
|     """测点值变化的时候执行任务
 | |
|     """
 | |
|     mplog = MpLog.objects.get(id=mpLogId)
 | |
|     mpoint = mplog.mpoint
 | |
|     module, func = mpoint.func_on_change.rsplit(".", 1)
 | |
|     m = importlib.import_module(module)
 | |
|     f = getattr(m, func)
 | |
|     f(mplog)  # 同步执行
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstats_duration(start_time: str, end_time: str):
 | |
|     """
 | |
|     重跑某一段时间的任务
 | |
|     """
 | |
|     start_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
 | |
|     end_time = datetime.datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
 | |
|     current_time = start_time
 | |
|     while current_time <= end_time:
 | |
|         year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
 | |
|         cal_mpointstats(0, year, month, day, hour)
 | |
|         current_time += datetime.timedelta(hours=1)
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int):
 | |
|     """
 | |
|     计算某一测点, 某一时间点某一小时的统计值
 | |
|     """
 | |
|     mpoint = Mpoint.objects.get(id=mpointId)
 | |
|     mytz = tz.gettz(settings.TIME_ZONE)
 | |
|     dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
 | |
|     if mpoint.material:  # 如果计量的是物料
 | |
|         params = {'mpoint': mpoint, 'type': 'hour'}
 | |
|         params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour
 | |
|         val = 0
 | |
|         if mpoint.formula:
 | |
|             formula = mpoint.formula
 | |
|             try:
 | |
|                 val = translate_eval_formula(formula, year, month, day, hour)
 | |
|             except:
 | |
|                 myLogger.error('公式执行错误:{}-{}'.format(mpoint.id, formula), exc_info=True)
 | |
|                 return
 | |
|         else:
 | |
|             mrs = MpLog.objects.filter(
 | |
|                 mpoint=mpoint, 
 | |
|                 tag_update__year=params['year'],
 | |
|                 tag_update__month=params['month'],
 | |
|                 tag_update__day=params['day'],
 | |
|                 tag_update__hour= params['hour']).order_by('tag_update')
 | |
|             if mrs.exists():
 | |
|                 val = mrs.last().tag_val - mrs.first().tag_val
 | |
|         ms, _ = MpointStat.objects.get_or_create(**params, defaults=params)
 | |
|         ms.val = val
 | |
|         ms.save()
 | |
| 
 | |
|         # 更新更高级别的值
 | |
|         sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val'))
 | |
|         params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day}
 | |
|         ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day)
 | |
|         ms_day.val = sum_dict_day['sum']
 | |
|         ms_day.save()
 | |
| 
 | |
|         sum_dict_month = MpointStat.objects.filter(type='day', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val'))
 | |
|         params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month}
 | |
|         ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month)
 | |
|         ms_month.val = sum_dict_month['sum']
 | |
|         ms_month.save()
 | |
| 
 | |
|         sum_dict_year = MpointStat.objects.filter(type='month', mpoint=mpoint, year=year).aggregate(sum=Sum('val'))
 | |
|         params_year = {'type':'year', 'mpoint': mpoint, 'year': year}
 | |
|         ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year)
 | |
|         ms_year.val = sum_dict_year['sum']
 | |
|         ms_year.save()
 | |
| 
 | |
|         if mpoint.mgroups_allocate:  # 如果有分配系数
 | |
|             for allocate in mpoint.mgroups_allocate:
 | |
|                 mgroup = Mgroup.objects.get(id=allocate['mgroup']) 
 | |
|                 ratio = allocate['ratio']
 | |
|                 # 查找并绑定值班记录
 | |
|                 sflog = SfLog.objects.filter(start_time__lt=dt, end_time__gte=dt, mgroup=mgroup).first()
 | |
|                 if sflog is None:  # 需要创建值班记录
 | |
|                     make_sflogs(mgroup=mgroup, start_date=(dt-datetime.timedelta(days=1)).date(), end_date=dt.date())
 | |
|                     sflog = SfLog.objects.filter(start_time__lt=dt, end_time__gte=dt, mgroup=mgroup).first()
 | |
|                 year_s, month_s, day_s = sflog.get_ymd
 | |
| 
 | |
|                 params_hour_s = {'type': 'hour_s', 'mpoint': mpoint, 'sflog': sflog,  'mgroup': mgroup, 'year': year, 'month': month, 'day': day, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'hour': hour}
 | |
|                 ms_hour_s, _ = MpointStat.objects.get_or_create(**params_hour_s, defaults=params_hour_s)
 | |
|                 ms_hour_s.val = ms.val*ratio
 | |
|                 ms_hour_s.save()
 | |
|                 
 | |
| 
 | |
|                 # 开始往上计算
 | |
|                 sum_dict_sflog_s = MpointStat.objects.filter(type='hour_s', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
 | |
|                 params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'sflog': sflog, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup}
 | |
|                 ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s)
 | |
|                 ms_sflog_s.val = sum_dict_sflog_s['sum']
 | |
|                 ms_sflog_s.save()
 | |
| 
 | |
|                 # next_cal_dict = [mpoint.material.id, sflog.id, year, month, day, hour, year_s, month_s, day_s]
 | |
|                 # if next_cal_dict == cache.get('enm_cal_dict', None):
 | |
|                 #     next_cal = 0
 | |
|                 # else:
 | |
|                 #     next_cal = 1
 | |
|                 #     cache.set('enm_cal_dict', next_cal_dict, 60)
 | |
|                 cal_mpointstat_manual(mpoint.id, sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s, 0)
 | |
| 
 | |
|         
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstats(is_now=1, year=None,  month=None, day=None, hour=None):
 | |
|     """
 | |
|     计算所有自动采集测点的统计值,默认当前小时, 可手动传入时间
 | |
|     """
 | |
|     if year and month and day and hour is not None:
 | |
|         pass
 | |
|     else:
 | |
|         now, pre = get_current_and_previous_time()
 | |
|         if is_now:
 | |
|             year, month, day, hour = now.year, now.month, now.day, now.hour
 | |
|         else:
 | |
|             year, month, day, hour = pre.year, pre.month, pre.day, pre.hour
 | |
| 
 | |
|     # 先统计不带公式的测点
 | |
|     mpoints_without_formula = Mpoint.objects.filter(is_auto=True, formula='', func_on_change='')
 | |
|     # mpoints_without_formula_group = []
 | |
|     for item in mpoints_without_formula:
 | |
|         # mpoints_without_formula_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour))
 | |
|         cal_mpointstat_hour(item.id, year, month, day, hour)
 | |
| 
 | |
|     # 再统计其他测点
 | |
|     mpoints_other = Mpoint.objects.filter(is_auto=True, func_on_change='').exclude(formula='')
 | |
|     # mpoints_other_group = []
 | |
|     for item in mpoints_other:
 | |
|         # mpoints_other_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour))
 | |
|         cal_mpointstat_hour(item.id, year, month, day, hour)
 | |
| 
 | |
|     # 开始计算enstat
 | |
|     mgroups = Mgroup.objects.all().order_by('sort')
 | |
|     # mgroups_group = []
 | |
|     year_s, month_s, day_s = 0, 0, 0
 | |
|     for mgroup in mgroups:
 | |
|         # mgroups_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['material', 'run_hour']))
 | |
|         year_s, month_s, day_s = cal_enstat('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True)
 | |
| 
 | |
|     # mgroups_t = mgroups.filter(name__in=['回转窑', '水泥磨'])
 | |
|     # mgroups_t_group = []
 | |
|     # for mgroup in mgroups_t:
 | |
|     #     mgroups_t_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['pcoal']))
 | |
|     
 | |
| 
 | |
|     # 最后计算enstat2
 | |
|     cal_enstat2(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s)
 | |
|     # task_chain = chain(group(mpoints_without_formula_group), group(mpoints_other_group), group(mgroups_group), group(mgroups_t_group), group([cal_enstat2.s(year_s=year, month_s=month)]))
 | |
|     # task_chain.delay()
 | |
|     if hour == 21:
 | |
|         enm_alarm.delay(year_s, month_s, day_s)
 | |
|     
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstat_manual(mpointId: str, sflogId: str, mgroupId: str, year: int, month: int, day: int, hour: int, year_s: int, month_s: int, day_s: int, next_cal=0):
 | |
|     """
 | |
|     手动录入的测点数据进行往上统计,一级一级往上
 | |
|     """
 | |
|     mpoint = Mpoint.objects.get(id=mpointId)
 | |
|     mgroup = Mgroup.objects.get(id=mgroupId)
 | |
|     sum_dict_day_s = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum('val'))
 | |
|     params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup}
 | |
|     ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
 | |
|     ms_day_s.val = sum_dict_day_s['sum']
 | |
|     ms_day_s.save()
 | |
| 
 | |
|     sum_dict_month_s = MpointStat.objects.filter(type='day_s', mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum('val'))
 | |
|     params_month_s = {'type':'month_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'mgroup': mgroup}
 | |
|     ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
 | |
|     ms_month_s.val = sum_dict_month_s['sum']
 | |
|     ms_month_s.save()
 | |
| 
 | |
|     sum_dict_year_s = MpointStat.objects.filter(type='month_s', mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum('val'))
 | |
|     params_year_s = {'type':'year_s', 'mpoint': mpoint, 'year_s': year_s, 'mgroup': mgroup}
 | |
|     ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
 | |
|     ms_year_s.val = sum_dict_year_s['sum']
 | |
|     ms_year_s.save()
 | |
|     
 | |
|     if next_cal:  # 二次计算
 | |
|         if hour:
 | |
|             cal_enstat('hour_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
 | |
|         else:
 | |
|             cal_enstat('sflog', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
 | |
| 
 | |
| 
 | |
| 
 | |
| types_list = ['hour_s', 'sflog', 'day_s', 'month_st', 'month_s', 'year_s']
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, cascade=True, cal_attrs=[]):
 | |
|     """
 | |
|     计算能源数据统计
 | |
|     """
 | |
|     if cascade:
 | |
|         if type in types_list:
 | |
|             start_index = types_list.index(type)
 | |
|             new_types_list = types_list[start_index:]
 | |
|             for type in new_types_list:
 | |
|                 year_s, month_s, day_s = cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, False)
 | |
|         return year_s, month_s, day_s 
 | |
|     if not cal_attrs:
 | |
|         this_cal_attrs = ['material', 'pcoal', 'run_hour']
 | |
|     else:
 | |
|         this_cal_attrs = cal_attrs[:]
 | |
| 
 | |
|     mgroup = Mgroup.objects.get(id=mgroupId)
 | |
|     if sflogId:
 | |
|         sflog = SfLog.objects.get(id=sflogId)
 | |
|     elif year and month and day and hour is not None:
 | |
|         mytz = tz.gettz(settings.TIME_ZONE)
 | |
|         dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
 | |
|         sflog = SfLog.objects.get(start_time__lt=dt, end_time__gte=dt, mgroup=mgroup)
 | |
|     if sflog:
 | |
|         year_s, month_s, day_s = sflog.get_ymd
 | |
|     team = sflog.team
 | |
|     if team is None and type == 'month_st':
 | |
|         return year_s, month_s, day_s
 | |
|     if type == 'hour_s':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="hour_s", mgroup=mgroup, year=year, month=month, day=day, hour=hour, 
 | |
|                                                  defaults={'type': 'hour_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 
 | |
|                                                            'year': year, 'month': month, 'day': day, 'hour': hour, 'sflog': sflog})
 | |
|     elif type == 'sflog':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="sflog", sflog=sflog, 
 | |
|                                                 defaults={'type': 'sflog', 'sflog': sflog, 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s})
 | |
|     elif type == 'day_s':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s,
 | |
|                                                 defaults={'type': 'day_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s})
 | |
|     elif type == 'month_st':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="month_st", mgroup=mgroup, team=team, year_s=year_s, month_s=month_s,
 | |
|                                                 defaults={'type': 'month_st', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'team': team})
 | |
|     elif type == 'month_s':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s,
 | |
|                                                 defaults={'type': 'month_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s})
 | |
|     elif type == 'year_s':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="year_s", mgroup=mgroup, year_s=year_s,
 | |
|                                                 defaults={'type': 'year_s', 'mgroup': mgroup, 'year_s': year_s})
 | |
| 
 | |
|     if 'material' in this_cal_attrs:
 | |
|         # 消耗物料统计(包括电耗)
 | |
|         input_materials = []
 | |
|         has_product = False
 | |
|         input_materials = mgroup.input_materials
 | |
|         if mgroup.product:
 | |
|             input_materials.insert(0, mgroup.product.id)
 | |
|             has_product = True
 | |
|         imaterial_cost_unit = 0
 | |
|         imaterial_data = []
 | |
|         for ind, mid in enumerate(input_materials):
 | |
|             material = Material.objects.get(id=mid)
 | |
|             if type == 'hour_s':
 | |
|                 mps = MpointStat.objects.filter(type='hour_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material)
 | |
|             elif type == 'sflog':
 | |
|                 mps = MpointStat.objects.filter(type='sflog', sflog=sflog, mpoint__material=material)
 | |
|             elif type == 'day_s':
 | |
|                 mps = MpointStat.objects.filter(type='day_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material)
 | |
|             elif type == 'month_st':
 | |
|                 mps = MpointStat.objects.filter(type='sflog', mgroup=mgroup, sflog__team=team, year_s=year_s, month_s=month_s, mpoint__material=material)
 | |
|             elif type == 'month_s':
 | |
|                 mps = MpointStat.objects.filter(type='month_s', mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material)
 | |
|             elif type == 'year_s':
 | |
|                 mps = MpointStat.objects.filter(type='year_s', mgroup=mgroup, year_s=year_s, mpoint__material=material) 
 | |
|             if mps.filter(mpoint__is_all=True).exists():
 | |
|                 mps = mps.filter(mpoint__is_all=True)
 | |
|             amount_consume = mps.aggregate(sum=Sum('val'))['sum']
 | |
|             if amount_consume is None:
 | |
|                 amount_consume = 0
 | |
|             if ind == 0 and has_product:  # 如果是产量
 | |
|                 enstat.total_production = amount_consume
 | |
|             else:
 | |
|                 if material.code in ['pcoal', 'cair', 'steam']:
 | |
|                     price_unit = 0
 | |
|                 else:
 | |
|                     price_unit = get_price_unit(material, year_s, month_s)
 | |
|                 cost = amount_consume * price_unit
 | |
|                 try:
 | |
|                     cost_unit = cost/ enstat.total_production
 | |
|                 except Exception as e:
 | |
|                     cost_unit = 0
 | |
|                 imaterial_cost_unit = imaterial_cost_unit + cost_unit
 | |
|                 if material.code == 'elec':
 | |
|                     enstat.elec_consume = amount_consume
 | |
|                     enstat.elec_coal_consume = enstat.elec_consume*0.1229/1000
 | |
|                     try:
 | |
|                         enstat.elec_consume_unit = enstat.elec_consume/enstat.total_production
 | |
|                     except Exception as e:
 | |
|                         pass
 | |
|                 elif material.code == 'water':
 | |
|                     enstat.water = amount_consume
 | |
|                 elif material.code == 'pcoal':
 | |
|                     enstat.pcoal_consume = amount_consume
 | |
|                 elif material.code == 'cair':
 | |
|                     enstat.cair_consume = amount_consume
 | |
|                 elif material.code == 'steam':
 | |
|                     enstat.out_steam = amount_consume
 | |
|                     enstat.out_steam_coal = enstat.out_steam * 128.6 / 1000
 | |
|                 elif material.code == 'ccr':
 | |
|                     enstat.ccr_consume = amount_consume
 | |
|                     enstat.kiln_end_heat = enstat.total_production - enstat.ccr_consume
 | |
| 
 | |
|                 imaterial_item = {'material': mid, 'material_name': material.name, 'material_code': material.code,  'material_type': material.type, 'price_unit': price_unit, 'amount_consume': amount_consume, 'cost': cost, 'cost_unit': cost_unit}
 | |
|                 imaterial_data.append(imaterial_item)
 | |
|         enstat.imaterial_data = imaterial_data
 | |
|         # 其他成本数据
 | |
|         other_cost_data = []
 | |
|         other_cost_unit = 0
 | |
|         fee_qs = Fee.objects.order_by('sort')
 | |
|         for fee in fee_qs:
 | |
|             item = {'element': fee.element, 'cate': fee.cate, 'name': fee.name, 'id': fee.id}
 | |
|             item['cost_unit'] = get_cost_unit(mgroup, fee, year_s, month_s)
 | |
|             other_cost_unit = other_cost_unit + item['cost_unit']
 | |
|             other_cost_data.append(item)
 | |
|         enstat.other_cost_data = other_cost_data
 | |
|         enstat.production_cost_unit = imaterial_cost_unit + other_cost_unit
 | |
|         enstat.save()
 | |
|     if enstat.total_production:
 | |
|         MpointStat.objects.filter(mgroup=enstat.mgroup, mpoint__material__code='elec').exclude(mpoint__ep_monitored=None).update(total_production=enstat.total_production, 
 | |
|                                                                                                                          elec_consume_unit=F('val')/enstat.total_production)
 | |
|     if enstat.mgroup.cate == 'section':
 | |
|         if 'material' in this_cal_attrs:
 | |
|             # 算能耗
 | |
|             if enstat.mgroup.name != '回转窑':
 | |
|                 try:
 | |
|                     enstat.en_consume_unit = enstat.elec_coal_consume / enstat.total_production
 | |
|                 except Exception as e:
 | |
|                     pass
 | |
|             enstat.save()
 | |
|         # 计算一些其他数据
 | |
|         if type == 'month_st' and 'material' in this_cal_attrs: # 如果计算的是班月,把主要设备电耗数据拉过来, 方便查询
 | |
|             # res = MpointStat.objects.filter(type='sflog', year_s=year_s, month_s=month_s, sflog__team=enstat.team, mpoint__ep_monitored__power_kw__gte=100).values(
 | |
|             #     equipment=F('mpoint__ep_monitored__id'), equipment_name=F('mpoint__ep_monitored__name')).annotate(consume=Sum('val'))
 | |
|             # res = list(res)
 | |
|             # for item in res:
 | |
|             #     try:
 | |
|             #         item['consume_unit'] = item['consume'] / enstat.total_production
 | |
|             #     except ZeroDivisionError:
 | |
|             #         item['consume_unit'] = None
 | |
|             res = MpointStat.objects.filter(type='sflog', year_s=year_s, month_s=month_s, sflog__team=enstat.team, mpoint__ep_monitored__power_kw__gte=100).values(
 | |
|                 'elec_consume_unit', equipment=F('mpoint__ep_monitored__id'), equipment_name=F('mpoint__ep_monitored__name'))
 | |
|             enstat.equip_elec_data = list(res)
 | |
|             enstat.save()
 | |
|         if enstat.mgroup.name == '回转窑':  # 算单位产品(综合电耗/标煤耗/综合能耗)
 | |
|             # 综合电耗
 | |
|             if enstat.type in ['hour_s', 'day_s', 'year_s', 'month_s']:
 | |
|                 pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name='原料磨').first()
 | |
|                 if pre_enstat:
 | |
|                     try:
 | |
|                         enstat.celec_consume_unit = enstat.elec_consume_unit + 1.45*pre_enstat.elec_consume_unit
 | |
|                         enstat.save()
 | |
|                     except Exception as e:
 | |
|                         pass
 | |
|             
 | |
|             # 算总煤耗
 | |
|             if 'pcoal' in this_cal_attrs:
 | |
|                 if type in ['hour_s', 'sflog', 'day_s']:
 | |
|                     enstat.pcoal_heat = get_pcoal_heat(enstat.year_s, enstat.month_s, enstat.day_s)
 | |
|                     enstat.pcoal_coal_consume =  enstat.pcoal_consume * enstat.pcoal_heat/7000
 | |
|                 elif type == 'month_st':
 | |
|                     enstat.pcoal_coal_consume = EnStat.objects.filter(type='sflog', mgroup=enstat.mgroup, year_s=year_s, month_s=month_s, sflog__team=enstat.team).aggregate(sum=Sum('pcoal_coal_consume'))['sum']
 | |
|                 elif type == 'month_s':
 | |
|                     enstat.pcoal_coal_consume = EnStat.objects.filter(type='sflog', mgroup=enstat.mgroup, year_s=year_s, month_s=month_s).aggregate(sum=Sum('pcoal_coal_consume'))['sum']
 | |
|                 elif type == 'year_s':
 | |
|                     enstat.pcoal_coal_consume = EnStat.objects.filter(type='sflog', mgroup=enstat.mgroup, year_s=year_s).aggregate(sum=Sum('pcoal_coal_consume'))['sum']
 | |
|                 
 | |
|                 if enstat.pcoal_coal_consume is None:
 | |
|                     enstat.pcoal_coal_consume = 0
 | |
| 
 | |
|                 # 算单位产品标煤耗
 | |
|                 try:
 | |
|                     enstat.coal_consume_unit = enstat.pcoal_coal_consume /enstat.total_production
 | |
|                 except ZeroDivisionError:
 | |
|                     pass
 | |
|                 
 | |
|                 # 综合能耗
 | |
|                 enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit
 | |
|                 enstat.save()
 | |
|         if enstat.mgroup.name == '水泥磨' and enstat.type not in ['month_st', 'sflog'] and 'pcoal' in this_cal_attrs: 
 | |
|             pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name='回转窑').first()
 | |
|             if pre_enstat:
 | |
|                 # 综合能耗
 | |
|                 enstat.cen_consume_unit = enstat.elec_consume_unit*0.1229 + 0.7*pre_enstat.cen_consume_unit
 | |
|             enstat.save()
 | |
|         # 运转时长相关
 | |
|         if type != 'hour_s' and 'run_hour' in this_cal_attrs:
 | |
|             enstat.total_hour_now, enstat.shut_hour = get_total_hour_now_and_shut_hour(enstat)
 | |
|             enstat.run_hour = enstat.total_hour_now - enstat.shut_hour
 | |
|             try:
 | |
|                 enstat.run_rate = (enstat.run_hour / enstat.total_hour_now)*100
 | |
|                 enstat.production_hour = enstat.total_production / enstat.total_hour_now
 | |
|             except ZeroDivisionError:
 | |
|                     pass
 | |
|             enstat.save()
 | |
|     enstat.save()
 | |
|     return year_s, month_s, day_s  # 返回这个值主要为了全厂数据计算而用
 | |
| 
 | |
| 
 | |
| def get_total_hour_now_and_shut_hour(enstat: EnStat):
 | |
|     from apps.wpm.models import SfLog
 | |
|     # if enstat.type == 'hour_s':
 | |
|     #     # 找到停机记录,并划分到该小时
 | |
|     #     end_time = datetime.datetime(enstat.year, enstat.month, enstat.day, enstat.hour)
 | |
|     #     start_time = end_time - datetime.timedelta(hours=1)
 | |
|     #     sts = StLog.objects.filter(mgroup=enstat.mgroup)
 | |
|     #     sts = (sts.filter(start_time__gt=start_time, start_time__lt=end_time)|sts.filter(start_time__lt=start_time, end_time=None)|sts.filter(end_time__gt=start_time, end_time__lt=end_time)).distinct()
 | |
|     #     shut_hour = 0
 | |
|     #     for i in sts:
 | |
|     #         if i.end_time is None:
 | |
|     #             run_hour = 0
 | |
|     #             if i.start_time > start_time:
 | |
|     #                 run_hour = (i.start_time - start_time).total_seconds/3600
 | |
|     #             shut_hour = 1 - run_hour
 | |
|     #         shut_hour = shut_hour + 
 | |
|     #     return 1, 0
 | |
|     now = datetime.datetime.now().replace(tzinfo=tz.gettz(settings.TIME_ZONE))
 | |
|     if enstat.type == 'sflog':
 | |
|         sflog = enstat.sflog
 | |
|         return sflog.total_hour_now, sflog.shut_hour
 | |
|     elif enstat.type == 'day_s':
 | |
|         res = SfLog.objects.filter(end_time__year=enstat.year_s, end_time__month=enstat.month_s, end_time__day=enstat.day_s,
 | |
|                                    mgroup=enstat.mgroup, end_time__lt=now).aggregate(
 | |
|             sum1 = Sum('total_hour_now'),
 | |
|             sum2 = Sum('shut_hour')
 | |
|         )
 | |
|         return res['sum1'] if res['sum1'] else 0, res['sum2'] if res['sum2'] else 0
 | |
|     elif enstat.type == 'month_st':
 | |
|         res = SfLog.objects.filter(end_time__year=enstat.year_s, end_time__month=enstat.month_s, mgroup=enstat.mgroup, team=enstat.team, end_time__lt=now).aggregate(
 | |
|             sum1 = Sum('total_hour_now'),
 | |
|             sum2 = Sum('shut_hour')
 | |
|         )
 | |
|         return res['sum1'] if res['sum1'] else 0, res['sum2'] if res['sum2'] else 0
 | |
|     elif enstat.type == 'month_s':
 | |
|         res = SfLog.objects.filter(end_time__year=enstat.year_s, end_time__month=enstat.month_s, mgroup=enstat.mgroup, end_time__lt=now).aggregate(
 | |
|             sum1 = Sum('total_hour_now'),
 | |
|             sum2 = Sum('shut_hour')
 | |
|         )
 | |
|         return res['sum1'] if res['sum1'] else 0, res['sum2'] if res['sum2'] else 0
 | |
|     elif enstat.type == 'year_s':
 | |
|         res = SfLog.objects.filter(end_time__year=enstat.year_s, mgroup=enstat.mgroup, end_time__lt=now).aggregate(
 | |
|             sum1 = Sum('total_hour_now'),
 | |
|             sum2 = Sum('shut_hour')
 | |
|         )
 | |
|         return res['sum1'] if res['sum1'] else 0, res['sum2'] if res['sum2'] else 0
 | |
|     
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_enstat2(type: str, year_s: int, month_s: int, day_s: int, cascade=True):
 | |
|     if cascade:
 | |
|         if type == 'day_s':
 | |
|             cal_enstat2('day_s', year_s, month_s, day_s, False)
 | |
|             cal_enstat2('month_s', year_s, month_s, day_s, False)
 | |
|         elif type == 'month_s':
 | |
|             cal_enstat2('month_s', year_s, month_s, day_s, False)
 | |
|         else:
 | |
|             return
 | |
|     if type == 'month_s':
 | |
|         enstat2, _ = EnStat2.objects.get_or_create(type="month_s", year_s=year_s, month_s=month_s, defaults={'year_s': year_s, 'month_s': month_s, 'type': 'month_s'})
 | |
|     elif type == 'day_s':
 | |
|         enstat2, _ = EnStat2.objects.get_or_create(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s, defaults={'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'type': 'day_s'})
 | |
|     # enstat2 = EnStat2.objects.select_for_update().get(id=enstat2.id)  # 加锁
 | |
|     material_cement = Material.objects.get(code='cement')
 | |
|     material_clinker = Material.objects.get(code='clinker')
 | |
|     material_bulk_cement = Material.objects.get(code='bulk_cement')
 | |
|     material_bag_cement = Material.objects.get(code='bag_cement')
 | |
| 
 | |
|     enstat2.bulk_cement_price = get_price_unit(material_bulk_cement, year_s, month_s)
 | |
|     enstat2.clinker_price = get_price_unit(material_clinker, year_s, month_s)    
 | |
|     enstat2.bag_cement_price = get_price_unit(material_bag_cement, year_s, month_s)
 | |
|     if type == 'month_s':
 | |
|         enstat2.bulk_cement_val = MpointStat.objects.filter(type='month_s', mpoint__material=material_bulk_cement, year_s=year_s, month_s=month_s).aggregate(sum=Sum('val'))['sum']
 | |
|     elif type == 'day_s':
 | |
|         enstat2.bulk_cement_val = MpointStat.objects.filter(type='day_s', mpoint__material=material_bulk_cement, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))['sum']
 | |
|     if enstat2.bulk_cement_val is None:
 | |
|         enstat2.bulk_cement_val = 0
 | |
|     if type == 'month_s':
 | |
|         enstat2.bag_cement_val = MpointStat.objects.filter(type='month_s', mpoint__material=material_bag_cement, year_s=year_s, month_s=month_s).aggregate(sum=Sum('val'))['sum']
 | |
|     elif type == 'day_s':
 | |
|         enstat2.bag_cement_val = MpointStat.objects.filter(type='day_s', mpoint__material=material_bag_cement, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))['sum']
 | |
|     if enstat2.bag_cement_val is None:
 | |
|         enstat2.bag_cement_val = 0
 | |
|     if type == 'month_s':
 | |
|         enstat2.clinker_val = MpointStat.objects.filter(type='month_s', mpoint__material=material_cement, year_s=year_s, month_s=month_s).aggregate(sum=Sum('val'))['sum']
 | |
|     elif type == 'day_s':
 | |
|         enstat2.clinker_val = MpointStat.objects.filter(type='day_s', mpoint__material=material_cement, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))['sum']
 | |
|     if enstat2.clinker_val is None:
 | |
|         enstat2.clinker_val = 0
 | |
|     
 | |
|     enstat2.industry_total_val = (enstat2.bulk_cement_val*enstat2.bulk_cement_price+enstat2.bag_cement_val*enstat2.bag_cement_price+enstat2.clinker_val*enstat2.clinker_price)/10000
 | |
|     
 | |
|     if type == 'month_s':
 | |
|         res = EnStat.objects.filter(mgroup__product__code='cement', type='month_s', year_s=year_s, month_s=month_s).aggregate(sum=Sum('total_production'), avg=Avg('production_cost_unit'))
 | |
|     elif type == 'day_s':
 | |
|         res = EnStat.objects.filter(mgroup__product__code='cement', type='day_s', year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('total_production'), avg=Avg('production_cost_unit'))
 | |
| 
 | |
|     enstat2.cement_val = res['sum'] if res['sum'] else 0
 | |
|     enstat2.cement_cost_unit = res['avg'] if res['avg'] else 0
 | |
|     enstat2.industry_add_val = enstat2.industry_total_val - enstat2.cement_val * enstat2.cement_cost_unit /10000
 | |
| 
 | |
|     # 全厂电量
 | |
|     if type == 'month_s':
 | |
|         enstat_qs = EnStat.objects.filter(type='month_s', year_s=year_s, month_s=month_s)
 | |
|     elif type == 'day_s':
 | |
|         enstat_qs = EnStat.objects.filter(type='day_s', year_s=year_s, month_s=month_s, day_s=day_s)
 | |
|     res_elec_pcoal = enstat_qs.aggregate(sum1=Sum('elec_consume'), sum2=Sum('elec_coal_consume'), sum3=Sum('pcoal_consume'), sum4=Sum('pcoal_coal_consume'),
 | |
|                                          sum5=Sum('water_consume'), sum6=Sum('cair_consume'))
 | |
|     enstat2.elec_consume = res_elec_pcoal['sum1'] if res_elec_pcoal['sum1'] else 0
 | |
|     enstat2.elec_coal_consume = enstat2.elec_consume*0.1229/1000
 | |
|     enstat2.pcoal_consume = res_elec_pcoal['sum3'] if res_elec_pcoal['sum3'] else 0
 | |
|     enstat2.pcoal_coal_consume = res_elec_pcoal['sum4'] if res_elec_pcoal['sum4'] else 0
 | |
|     enstat2.water_consume = res_elec_pcoal['sum5'] if res_elec_pcoal['sum5'] else 0
 | |
|     enstat2.cair_consume = res_elec_pcoal['sum6'] if res_elec_pcoal['sum6'] else 0
 | |
|     enstat2.en_consume = enstat2.pcoal_coal_consume + enstat2.elec_coal_consume
 | |
|     try:
 | |
|         enstat2.en_consume_unit = enstat2.en_consume/enstat2.industry_total_val
 | |
|     except ZeroDivisionError:
 | |
|         pass
 | |
|     try:
 | |
|         enstat2.en_add_consume_unit = enstat2.en_consume/enstat2.industry_add_val
 | |
|     except ZeroDivisionError:
 | |
|         pass
 | |
|     enstat2.save()
 | |
| 
 | |
| 
 | |
| def cal_enstat_pcoal_change(enstat, new_pcoal_heat):
 | |
|         type = enstat.type
 | |
|         if type in ['hour_s', 'sflog', 'day_s']:
 | |
|             enstat.pcoal_heat = new_pcoal_heat
 | |
|             enstat.pcoal_coal_consume =  enstat.pcoal_consume * enstat.pcoal_heat/7000
 | |
|         elif type == 'month_st':
 | |
|             enstat.pcoal_coal_consume = EnStat.objects.filter(type='sflog', mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s, sflog__team=enstat.team).aggregate(sum=Sum('pcoal_coal_consume'))['sum']
 | |
|         elif type == 'month_s':
 | |
|             enstat.pcoal_coal_consume = EnStat.objects.filter(type='sflog', mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s).aggregate(sum=Sum('pcoal_coal_consume'))['sum']
 | |
|         elif type == 'year_s':
 | |
|             enstat.pcoal_coal_consume = EnStat.objects.filter(type='sflog', mgroup=enstat.mgroup, year_s=enstat.year_s).aggregate(sum=Sum('pcoal_coal_consume'))['sum']
 | |
|         
 | |
|         if enstat.pcoal_coal_consume is None:
 | |
|             enstat.pcoal_coal_consume = 0
 | |
|         # 算单位产品标煤耗
 | |
|         try:
 | |
|             enstat.coal_consume_unit = enstat.pcoal_coal_consume /enstat.total_production
 | |
|         except ZeroDivisionError:
 | |
|             pass
 | |
|         
 | |
|         # 综合能耗
 | |
|         enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit
 | |
| 
 | |
|         enstat.save(update_fields=['pcoal_heat', 'pcoal_coal_consume', 'coal_consume_unit', 'cen_consume_unit'])
 | |
|         # 同步更新水泥磨的综合能耗,这步有可能不成功,因为水泥磨是后算的, 但是当pcoal_change时这个就有用了
 | |
|         if type not in ['month_st', 'sflog']:
 | |
|             next_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name='水泥磨').first()
 | |
|             if next_enstat:
 | |
|                 next_enstat.cen_consume_unit = next_enstat.elec_consume_unit*0.1229 + 0.7*enstat.cen_consume_unit
 | |
|                 next_enstat.save(update_fields=['cen_consume_unit'])
 | |
| 
 | |
| 
 | |
| enm_alarms_list = [
 | |
|     ['回转窑', 'celec_consume_unit', '单位产品综合电耗'],
 | |
|     ['回转窑', 'coal_consume_unit', '单位产品标煤耗'],
 | |
|     ['水泥磨', 'elec_consume_unit', '单位产品分布电耗']
 | |
| ]
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def enm_alarm(year_s: int, month_s: int, day_s: int):
 | |
|     """
 | |
|     enm报警任务
 | |
|     """
 | |
|     from apps.ecm.models import Event, EventCate,  Eventdo
 | |
|     from apps.mtm.models import Goal
 | |
|     from apps.ecm.service import notify_event
 | |
|     now = timezone.now()
 | |
|     event_cate, _ = EventCate.objects.get_or_create(code='consume_exceed', defaults={'name': '能耗超过目标值', 'code': 'consume_exceed', 'trigger': 30})
 | |
|     for item in enm_alarms_list:
 | |
|         mgroups = Mgroup.objects.filter(name=item[0])
 | |
|         for mgroup in mgroups:
 | |
|             enstat = EnStat.objects.filter(mgroup=mgroup, type='day_s', year_s=year_s, month_s=month_s, day_s=day_s).first()
 | |
|             if enstat:
 | |
|                 mgroup_name = item[0]
 | |
|                 goal_cate_str = item[1]
 | |
|                 real_val = getattr(enstat, goal_cate_str, None)
 | |
|                 goal = Goal.objects.filter(goal_cate__code=goal_cate_str, year=year_s, mgroup=mgroup).first()
 | |
|                 if goal:
 | |
|                     goal_val = getattr(goal, f'goal_val_{month_s}', None)
 | |
|                     if goal_val and real_val and real_val > goal_val:  # 触发事件
 | |
|                         event = Event()
 | |
|                         event.obj_cate = 'enm'
 | |
|                         event.happen_time = now
 | |
|                         event.voice_msg = f'{mgroup_name}{item[2]}超过设定目标值'
 | |
|                         event.enm_data = {'mgroup': mgroup.id, 'mgroup_name': mgroup.name, 'type': f'{goal_cate_str}.exceed', 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'val': real_val, 'goal_val': goal_val, 'enstat': enstat.id}
 | |
|                         event.save()
 | |
|                         Eventdo.objects.get_or_create(cate=event_cate, event=event, defaults={
 | |
|                         'cate': event_cate,
 | |
|                         'event': event
 | |
|                         })
 | |
|                         notify_event(event)
 | |
| 
 | |
|      |