280 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			280 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
| # Create your tasks here
 | |
| from __future__ import absolute_import, unicode_literals
 | |
| from apps.utils.tasks import CustomTask
 | |
| from celery import shared_task, group, chain
 | |
| from apps.utils.sql import DbConnection
 | |
| from server.settings import get_sysconfig
 | |
| from django.core.cache import cache
 | |
| from apps.enm.models import MpLog, Mpoint, MpointStat, EnStat
 | |
| from apps.wpm.models import SfLog
 | |
| import datetime
 | |
| from django.db.models import Sum
 | |
| from dateutil import tz
 | |
| from django.conf import settings
 | |
| from apps.wpm.services import make_sflogs
 | |
| from apps.mtm.models import Mgroup, Material
 | |
| from apps.fim.services import get_cost_unit, get_price_unit
 | |
| from apps.fim.models import Fee
 | |
| from django.core.cache import cache
 | |
| from apps.enm.services import translate_eval_formula
 | |
| import logging
 | |
| myLogger = logging.getLogger('log')
 | |
| 
 | |
| def get_current_and_previous_time():
 | |
|     now = datetime.datetime.now()
 | |
|     pre = now - datetime.timedelta(hours=1)
 | |
|     return now, pre
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def get_tag_val():
 | |
|     config = get_sysconfig()
 | |
|     with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
 | |
|         last_tag_id = cache.get('last_tag_id')
 | |
|         if last_tag_id is None:
 | |
|             mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
 | |
|             if mr is None:
 | |
|                 last_tag_id = 0
 | |
|             else:
 | |
|                 last_tag_id = mr.tag_id
 | |
|                 cache.set('last_tag_id', last_tag_id)
 | |
|         cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id))
 | |
|         results = cursor.fetchall()  # 获取数据后保存至本地
 | |
|         for row in results:
 | |
|             mr_one = MpLog()
 | |
|             mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
 | |
|             mr_one.mpoint, _  = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
 | |
|             mr_one.save()
 | |
|             last_tag_id = mr_one.tag_id
 | |
|         cache.set('last_tag_id', last_tag_id)
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int):
 | |
|     """
 | |
|     计算某一测点, 某一时间点某一小时的统计值
 | |
|     """
 | |
|     mpoint = Mpoint.objects.get(id=mpointId)
 | |
|     mytz = tz.gettz(settings.TIME_ZONE)
 | |
|     dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
 | |
|     if mpoint.material:  # 如果计量的是物料
 | |
|         params = {'mpoint': mpoint, 'type': 'hour'}
 | |
|         params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour
 | |
|         val = 0
 | |
|         if mpoint.formula:
 | |
|             formular = mpoint.formular
 | |
|             try:
 | |
|                 val = translate_eval_formula(formular, year, month, day, hour)
 | |
|             except:
 | |
|                 myLogger.error('公式执行错误:{}-{}'.format(mpoint.id, formular), exc_info=True)
 | |
|                 raise
 | |
|         else:
 | |
|             mrs = MpLog.objects.filter(
 | |
|                 mpoint=mpoint, 
 | |
|                 tag_update__year=params['year'],
 | |
|                 tag_update__month=params['month'],
 | |
|                 tag_update__day=params['day'],
 | |
|                 tag_update__hour= params['hour']).order_by('tag_update')
 | |
|             if mrs.exists():
 | |
|                 val = mrs.last().tag_val - mrs.first().tag_val
 | |
|         ms, _ = MpointStat.objects.get_or_create(**params, defaults=params)
 | |
|         ms.val = val
 | |
|         ms.save()
 | |
| 
 | |
|         # 更新更高级别的值
 | |
|         sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val'))
 | |
|         params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day}
 | |
|         ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day)
 | |
|         ms_day.val = sum_dict_day['sum']
 | |
|         ms_day.save()
 | |
| 
 | |
|         sum_dict_month = MpointStat.objects.filter(type='day', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val'))
 | |
|         params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month}
 | |
|         ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month)
 | |
|         ms_month.val = sum_dict_month['sum']
 | |
|         ms_month.save()
 | |
| 
 | |
|         sum_dict_year = MpointStat.objects.filter(type='month', mpoint=mpoint, year=year).aggregate(sum=Sum('val'))
 | |
|         params_year = {'type':'year', 'mpoint': mpoint, 'year': year}
 | |
|         ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year)
 | |
|         ms_year.val = sum_dict_year['sum']
 | |
|         ms_year.save()
 | |
| 
 | |
|         if mpoint.mgroups_allocate:  # 如果有分配系数
 | |
|             for allocate in mpoint.mgroups_allocate:
 | |
|                 mgroup = Mgroup.objects.get(id=allocate['mgroup']) 
 | |
|                 ratio = allocate['ratio']
 | |
|                 # 查找并绑定值班记录
 | |
|                 sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first()
 | |
|                 if sflog is None:  # 需要创建值班记录
 | |
|                     make_sflogs(mgroup=mgroup, start_date=(dt-datetime.timedelta(days=1)).date(), end_date=dt.date())
 | |
|                     sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first()
 | |
|                 year_s, month_s, day_s = sflog.get_ymd
 | |
| 
 | |
|                 params_hour_s = {'type': 'hour_s', 'mpoint': mpoint, 'sflog': sflog,  'mgroup': mgroup, 'year': year, 'month': month, 'day': day, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'hour': hour}
 | |
|                 ms_hour_s, _ = MpointStat.objects.get_or_create(**params_hour_s, defaults=params_hour_s)
 | |
|                 ms_hour_s.val = ms.val*ratio
 | |
|                 ms_hour_s.save()
 | |
|                 
 | |
| 
 | |
|                 # 开始往上计算
 | |
|                 sum_dict_sflog_s = MpointStat.objects.filter(type='hour_s', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
 | |
|                 params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'sflog': sflog, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup}
 | |
|                 ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s)
 | |
|                 ms_sflog_s.val = sum_dict_sflog_s['sum']
 | |
|                 ms_sflog_s.save()
 | |
| 
 | |
|                 next_cal_dict = [mpoint.material.id, sflog.id, year, month, day, hour, year_s, month_s, day_s]
 | |
|                 if next_cal_dict == cache.get('enm_cal_dict', None):
 | |
|                     next_cal = 0
 | |
|                 else:
 | |
|                     next_cal = 1
 | |
|                     cache.set('enm_cal_dict', next_cal_dict, 60)
 | |
|                 cal_mpointstat_manual(mpoint.id, sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s, next_cal)
 | |
| 
 | |
|         
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstats(is_now=1, year=None,  month=None, day=None, hour=None):
 | |
|     """
 | |
|     计算所有自动采集测点的统计值,默认当前小时, 可手动传入时间
 | |
|     """
 | |
|     if year and month and day and hour:
 | |
|         pass
 | |
|     else:
 | |
|         now, pre = get_current_and_previous_time()
 | |
|         if is_now:
 | |
|             year, month, day, hour = now.year, now.month, now.day, now.hour
 | |
|         else:
 | |
|             year, month, day, hour = pre.year, pre.month, pre.day, pre.hour
 | |
| 
 | |
|     mgroups = Mgroup.objects.exclude(product=None).order_by('sort')  # 必须要进行排序, 因为有的产量是经过计算而得的
 | |
|     # 先统计自动采集的产量值
 | |
|     caled_mpointids = []
 | |
|     for mgroup in mgroups:
 | |
|         product = mgroup.product
 | |
|         mpoints = Mpoint.objects.filter(material=product, is_auto=True)
 | |
|         for ind, item in enumerate(mpoints):
 | |
|             caled_mpointids.append(item.id)
 | |
|             cal_mpointstat_hour(item.id, year, month, day, hour)
 | |
| 
 | |
|     # 统计其他测点
 | |
|     mpoints = Mpoint.objects.filter(is_auto=True).exclude(id__in=caled_mpointids).order_by('material', 'mgroup')
 | |
|     for i in mpoints:
 | |
|         cal_mpointstat_hour(i.id, year, month, day, hour)
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstat_manual(mpointId: str, sflogId: str, mgroupId: str, year: int, month: int, day: int, hour: int, year_s: int, month_s: int, day_s: int, next_cal=0):
 | |
|     """
 | |
|     手动录入的测点数据进行往上统计,一级一级往上
 | |
|     """
 | |
|     mpoint = Mpoint.objects.get(id=mpointId)
 | |
|     mgroup = Mgroup.objects.get(id=mgroupId)
 | |
|     sum_dict_day_s = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum('val'))
 | |
|     params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup}
 | |
|     ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
 | |
|     ms_day_s.val = sum_dict_day_s['sum']
 | |
|     ms_day_s.save()
 | |
| 
 | |
|     sum_dict_month_s = MpointStat.objects.filter(type='day_s', mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum('val'))
 | |
|     params_month_s = {'type':'month_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'mgroup': mgroup}
 | |
|     ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
 | |
|     ms_month_s.val = sum_dict_month_s['sum']
 | |
|     ms_month_s.save()
 | |
| 
 | |
|     sum_dict_year_s = MpointStat.objects.filter(type='month_s', mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum('val'))
 | |
|     params_year_s = {'type':'year_s', 'mpoint': mpoint, 'year_s': year_s, 'mgroup': mgroup}
 | |
|     ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
 | |
|     ms_year_s.val = sum_dict_year_s['sum']
 | |
|     ms_year_s.save()
 | |
|     
 | |
|     if next_cal:  # 二次计算
 | |
|         if hour:
 | |
|             compute_enstat('hour_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
 | |
|         compute_enstat('sflog', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
 | |
|         compute_enstat('day_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
 | |
|         compute_enstat('month_st', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
 | |
|         compute_enstat('month_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
 | |
|         compute_enstat('year_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
 | |
| 
 | |
| 
 | |
| def compute_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s):
 | |
|     """
 | |
|     计算能源数据统计
 | |
|     """
 | |
|     mgroup = Mgroup.objects.get(id=mgroupId)
 | |
|     sflog = SfLog.objects.get(id=sflogId)
 | |
|     team = sflog.team
 | |
|     if type == 'hour_s':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="hour_s", mgroup=mgroup, year=year, month=month, day=day, hour=hour, 
 | |
|                                                  defaults={'type': 'hour_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 
 | |
|                                                            'year': year, 'month': month, 'day': day, 'hour': hour,
 | |
|                                                            'total_production': 0, 'elec_consume': 0})
 | |
|     elif type == 'sflog':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="sflog", sflog=sflog, 
 | |
|                                                 defaults={'type': 'sflog', 'sflog': sflog, 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'total_production': 0, 'elec_consume': 0})
 | |
|     elif type == 'day_s':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s,
 | |
|                                                 defaults={'type': 'day_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'total_production': 0, 'elec_consume': 0})
 | |
|     elif type == 'month_st':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="month_st", mgroup=mgroup, team=team, year_s=year_s, month_s=month_s,
 | |
|                                                 defaults={'type': 'month_sf', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'team': team, 'total_production': 0, 'elec_consume': 0})
 | |
|     elif type == 'month_s':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s,
 | |
|                                                 defaults={'type': 'month_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'total_production': 0, 'elec_consume': 0})
 | |
|     elif type == 'year_s':
 | |
|         enstat, _ = EnStat.objects.get_or_create(type="year_s", mgroup=mgroup, year_s=year_s,
 | |
|                                                 defaults={'type': 'year_s', 'mgroup': mgroup, 'year_s': year_s, 'total_production': 0, 'elec_consume': 0})
 | |
| 
 | |
|     # 物料统计
 | |
|     input_materials = mgroup.input_materials
 | |
|     if mgroup.product:
 | |
|         input_materials.insert(0, mgroup.product.id)
 | |
|     cost_unit_total = 0
 | |
|     imaterial_data = []
 | |
|     for ind, mid in enumerate(input_materials):
 | |
|         material = Material.objects.get(id=mid)
 | |
|         if type == 'hour_s':
 | |
|             mps = MpointStat.objects.filter(type='hour_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material)
 | |
|         elif type == 'sflog':
 | |
|             mps = MpointStat.objects.filter(type='sflog', sflog=sflog, mpoint__material=material)
 | |
|         elif type == 'day_s':
 | |
|             mps = MpointStat.objects.filter(type='day_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material)
 | |
|         elif type == 'month_st':
 | |
|             mps = MpointStat.objects.filter(type='sflog', mgroup=mgroup, sflog__team=team, year_s=year_s, month_s=month_s, mpoint__material=material)
 | |
|         elif type == 'month_s':
 | |
|             mps = MpointStat.objects.filter(type='month_s', mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material)
 | |
|         elif type == 'year_s':
 | |
|             mps = MpointStat.objects.filter(type='year_s', mgroup=mgroup, year_s=year_s, mpoint__material=material) 
 | |
|         if mps.filter(mpoint__is_all=True).exists():
 | |
|             mps = mps.filter(mpoint__is_all=True)
 | |
|         amount_consume = mps.aggregate(sum=Sum('val'))['sum']
 | |
|         if amount_consume is None:
 | |
|             amount_consume = 0
 | |
|         if ind == 0:  # 如果是产量
 | |
|             enstat.total_production = amount_consume
 | |
|             enstat.save()
 | |
|         else:
 | |
|             price_unit = get_price_unit(material, year_s, month_s)
 | |
|             cost = amount_consume * price_unit
 | |
|             try:
 | |
|                 cost_unit = cost/ enstat.total_production
 | |
|             except:
 | |
|                 cost_unit = 0
 | |
|             cost_unit_total = cost_unit_total + cost_unit
 | |
|             if material.code == 'elec':
 | |
|                 enstat.elec_consume = amount_consume
 | |
|                 enstat.save()
 | |
|             imaterial_item = {'material': mid, 'material_name': material.name, 'material_type': material.type, 'price_unit': price_unit, 'amount_consume': amount_consume, 'cost': cost, 'cost_unit': cost_unit}
 | |
|             imaterial_data.append(imaterial_item)
 | |
|         enstat.imaterial_data = imaterial_data
 | |
|         enstat.save()
 | |
|     other_cost_data = []
 | |
|     for fee in Fee.objects.order_by('sort'):
 | |
|         item = {'element': fee.element, 'cate': fee.cate, 'name': fee.name, 'id': fee.id}
 | |
|         item['cost_unit'] = get_cost_unit(mgroup, fee, year_s, month_s)
 | |
|         cost_unit_total = cost_unit_total + item['cost_unit']
 | |
|         other_cost_data.append(item)
 | |
|     enstat.other_cost_data = other_cost_data
 | |
|     enstat.production_cost_unit = cost_unit_total
 | |
|     enstat.save() |