# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task, group, chain from apps.utils.sql import DbConnection from server.settings import get_sysconfig, update_sysconfig from django.core.cache import cache from apps.enm.models import MpLog, Mpoint, MpointStat, EnStat from apps.wpm.models import SfLog, StLog import datetime from django.db.models import Sum, Avg from dateutil import tz from django.conf import settings from apps.wpm.services import make_sflogs from apps.mtm.models import Mgroup, Material from apps.fim.services import get_cost_unit, get_price_unit from apps.fim.models import Fee from django.core.cache import cache from apps.enm.services import translate_eval_formula import logging from django.db.models import F from apps.wpm.services import get_pcoal_val myLogger = logging.getLogger('log') def get_current_and_previous_time(): now = datetime.datetime.now() pre = now - datetime.timedelta(hours=1) return now, pre @shared_task(base=CustomTask) def get_tag_val(): config = get_sysconfig() with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: last_tag_id = config['enm'].get('last_tag_id', None) if not last_tag_id: mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first() if mr is None: last_tag_id = 0 else: last_tag_id = mr.tag_id update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id)) results = cursor.fetchall() # 获取数据后保存至本地 for row in results: mr_one = MpLog() mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row mr_one.mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'}) mr_one.save() last_tag_id = mr_one.tag_id update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) @shared_task(base=CustomTask) def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int): """ 计算某一测点, 某一时间点某一小时的统计值 """ mpoint = Mpoint.objects.get(id=mpointId) mytz = tz.gettz(settings.TIME_ZONE) dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz) if mpoint.material: # 如果计量的是物料 params = {'mpoint': mpoint, 'type': 'hour'} params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour val = 0 if mpoint.formula: formular = mpoint.formular try: val = translate_eval_formula(formular, year, month, day, hour) except: myLogger.error('公式执行错误:{}-{}'.format(mpoint.id, formular), exc_info=True) raise else: mrs = MpLog.objects.filter( mpoint=mpoint, tag_update__year=params['year'], tag_update__month=params['month'], tag_update__day=params['day'], tag_update__hour= params['hour']).order_by('tag_update') if mrs.exists(): val = mrs.last().tag_val - mrs.first().tag_val ms, _ = MpointStat.objects.get_or_create(**params, defaults=params) ms.val = val ms.save() # 更新更高级别的值 sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val')) params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day} ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day) ms_day.val = sum_dict_day['sum'] ms_day.save() sum_dict_month = MpointStat.objects.filter(type='day', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val')) params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month} ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month) ms_month.val = sum_dict_month['sum'] ms_month.save() sum_dict_year = MpointStat.objects.filter(type='month', mpoint=mpoint, year=year).aggregate(sum=Sum('val')) params_year = {'type':'year', 'mpoint': mpoint, 'year': year} ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year) ms_year.val = sum_dict_year['sum'] ms_year.save() if mpoint.mgroups_allocate: # 如果有分配系数 for allocate in mpoint.mgroups_allocate: mgroup = Mgroup.objects.get(id=allocate['mgroup']) ratio = allocate['ratio'] # 查找并绑定值班记录 sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first() if sflog is None: # 需要创建值班记录 make_sflogs(mgroup=mgroup, start_date=(dt-datetime.timedelta(days=1)).date(), end_date=dt.date()) sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first() year_s, month_s, day_s = sflog.get_ymd params_hour_s = {'type': 'hour_s', 'mpoint': mpoint, 'sflog': sflog, 'mgroup': mgroup, 'year': year, 'month': month, 'day': day, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'hour': hour} ms_hour_s, _ = MpointStat.objects.get_or_create(**params_hour_s, defaults=params_hour_s) ms_hour_s.val = ms.val*ratio ms_hour_s.save() # 开始往上计算 sum_dict_sflog_s = MpointStat.objects.filter(type='hour_s', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val')) params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'sflog': sflog, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup} ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s) ms_sflog_s.val = sum_dict_sflog_s['sum'] ms_sflog_s.save() next_cal_dict = [mpoint.material.id, sflog.id, year, month, day, hour, year_s, month_s, day_s] if next_cal_dict == cache.get('enm_cal_dict', None): next_cal = 0 else: next_cal = 1 cache.set('enm_cal_dict', next_cal_dict, 60) cal_mpointstat_manual(mpoint.id, sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s, next_cal) @shared_task(base=CustomTask) def cal_mpointstats(is_now=1, year=None, month=None, day=None, hour=None): """ 计算所有自动采集测点的统计值,默认当前小时, 可手动传入时间 """ if year and month and day and hour: pass else: now, pre = get_current_and_previous_time() if is_now: year, month, day, hour = now.year, now.month, now.day, now.hour else: year, month, day, hour = pre.year, pre.month, pre.day, pre.hour mgroups = Mgroup.objects.exclude(product=None).order_by('sort') # 必须要进行排序, 因为有的产量是经过计算而得的 # 先统计自动采集的产量值 caled_mpointids = [] for mgroup in mgroups: product = mgroup.product mpoints = Mpoint.objects.filter(material=product, is_auto=True) for ind, item in enumerate(mpoints): caled_mpointids.append(item.id) cal_mpointstat_hour(item.id, year, month, day, hour) # 统计其他测点 mpoints = Mpoint.objects.filter(is_auto=True).exclude(id__in=caled_mpointids).order_by('material', 'mgroup') for i in mpoints: cal_mpointstat_hour(i.id, year, month, day, hour) @shared_task(base=CustomTask) def cal_mpointstat_manual(mpointId: str, sflogId: str, mgroupId: str, year: int, month: int, day: int, hour: int, year_s: int, month_s: int, day_s: int, next_cal=0): """ 手动录入的测点数据进行往上统计,一级一级往上 """ mpoint = Mpoint.objects.get(id=mpointId) mgroup = Mgroup.objects.get(id=mgroupId) sum_dict_day_s = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum('val')) params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup} ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s) ms_day_s.val = sum_dict_day_s['sum'] ms_day_s.save() if mpoint.ep_monitored: if mpoint.material and mpoint.material.code == 'elec' and mpoint.ep_monitored.power_kw >= 100: # 统计班月数据使用的 sflog = SfLog.objects.get(id=sflogId) team = sflog.team sum_dict_day_st = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, sflog__team=team).aggregate(sum=Sum('val')) params_day_s = {'type':'month_st', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'mgroup': mgroup, 'team': team} ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s) ms_day_s.val = sum_dict_day_st['sum'] ms_day_s.save() sum_dict_month_s = MpointStat.objects.filter(type='day_s', mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum('val')) params_month_s = {'type':'month_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'mgroup': mgroup} ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s) ms_month_s.val = sum_dict_month_s['sum'] ms_month_s.save() sum_dict_year_s = MpointStat.objects.filter(type='month_s', mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum('val')) params_year_s = {'type':'year_s', 'mpoint': mpoint, 'year_s': year_s, 'mgroup': mgroup} ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s) ms_year_s.val = sum_dict_year_s['sum'] ms_year_s.save() if next_cal: # 二次计算 if hour: compute_enstat('hour_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s) else: compute_enstat('sflog', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s) types_list = ['hour_s', 'sflog', 'day_s', 'month_st', 'month_s', 'year_s'] def compute_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, cascade=True): """ 计算能源数据统计 """ if cascade: if type in types_list: start_index = types_list.index(type) new_types_list = types_list[start_index:] for type in new_types_list: compute_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, False) return mgroup = Mgroup.objects.get(id=mgroupId) sflog = SfLog.objects.get(id=sflogId) team = sflog.team if sflog: year_s, month_s, day_s = sflog.get_ymd if type == 'hour_s': enstat, _ = EnStat.objects.get_or_create(type="hour_s", mgroup=mgroup, year=year, month=month, day=day, hour=hour, defaults={'type': 'hour_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'year': year, 'month': month, 'day': day, 'hour': hour, 'sflog': sflog, 'team': team}) elif type == 'sflog': enstat, _ = EnStat.objects.get_or_create(type="sflog", sflog=sflog, defaults={'type': 'sflog', 'sflog': sflog, 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}) elif type == 'day_s': enstat, _ = EnStat.objects.get_or_create(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, defaults={'type': 'day_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}) elif type == 'month_st': enstat, _ = EnStat.objects.get_or_create(type="month_st", mgroup=mgroup, team=team, year_s=year_s, month_s=month_s, defaults={'type': 'month_st', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'team': team}) elif type == 'month_s': enstat, _ = EnStat.objects.get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, defaults={'type': 'month_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s}) elif type == 'year_s': enstat, _ = EnStat.objects.get_or_create(type="year_s", mgroup=mgroup, year_s=year_s, defaults={'type': 'year_s', 'mgroup': mgroup, 'year_s': year_s}) # 消耗物料统计(包括电耗) input_materials = [] has_product = False input_materials = mgroup.input_materials if mgroup.product: input_materials.insert(0, mgroup.product.id) has_product = True imaterial_cost_unit = 0 imaterial_data = [] for ind, mid in enumerate(input_materials): material = Material.objects.get(id=mid) if type == 'hour_s': mps = MpointStat.objects.filter(type='hour_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material) elif type == 'sflog': mps = MpointStat.objects.filter(type='sflog', sflog=sflog, mpoint__material=material) elif type == 'day_s': mps = MpointStat.objects.filter(type='day_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material) elif type == 'month_st': mps = MpointStat.objects.filter(type='sflog', mgroup=mgroup, sflog__team=team, year_s=year_s, month_s=month_s, mpoint__material=material) elif type == 'month_s': mps = MpointStat.objects.filter(type='month_s', mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material) elif type == 'year_s': mps = MpointStat.objects.filter(type='year_s', mgroup=mgroup, year_s=year_s, mpoint__material=material) if mps.filter(mpoint__is_all=True).exists(): mps = mps.filter(mpoint__is_all=True) amount_consume = mps.aggregate(sum=Sum('val'))['sum'] if amount_consume is None: amount_consume = 0 if ind == 0 and has_product: # 如果是产量 enstat.total_production = amount_consume else: if material.code in ['pcoal', 'cair', 'steam']: price_unit = 0 else: price_unit = get_price_unit(material, year_s, month_s) cost = amount_consume * price_unit try: cost_unit = cost/ enstat.total_production except: cost_unit = 0 imaterial_cost_unit = imaterial_cost_unit + cost_unit if material.code == 'elec': enstat.elec_consume = amount_consume enstat.elec_coal_consume = enstat.elec_consume*0.1229/1000 try: enstat.elec_consume_unit = enstat.elec_consume/enstat.total_production except: pass elif material.code == 'water': enstat.water = amount_consume elif material.code == 'pcoal': enstat.pcoal_consume = amount_consume elif material.code == 'cair': enstat.cair_consume = amount_consume elif material.code == 'steam': enstat.out_steam = amount_consume enstat.out_steam_coal = enstat.out_steam * 128.6 / 1000 elif material.code == 'ccr': enstat.ccr_consume = amount_consume try: enstat.kiln_end_heat = enstat.total_production - enstat.ccr_consume except: pass imaterial_item = {'material': mid, 'material_name': material.name, 'material_code': material.code, 'material_type': material.type, 'price_unit': price_unit, 'amount_consume': amount_consume, 'cost': cost, 'cost_unit': cost_unit} imaterial_data.append(imaterial_item) enstat.imaterial_data = imaterial_data # 其他成本数据 other_cost_data = [] other_cost_unit = 0 fee_qs = Fee.objects.order_by('sort') for fee in fee_qs: item = {'element': fee.element, 'cate': fee.cate, 'name': fee.name, 'id': fee.id} item['cost_unit'] = get_cost_unit(mgroup, fee, year_s, month_s) other_cost_unit = other_cost_unit + item['cost_unit'] other_cost_data.append(item) enstat.production_cost_unit = imaterial_cost_unit + other_cost_unit if enstat.mgroup.type == 'section': # 算能耗 if enstat.mgroup.name != '回转窑': try: enstat.en_consume_unit = enstat.elec_coal_consume / enstat.total_production except: pass # 计算一些其他数据 if type == 'month_sf': # 如果计算的是班月,把主要设备电耗数据拉过来 res = MpointStat.objects.filter(year_s=year_s, month_s=month_s, team=team, mgroup=mgroup, mpoint__ep_monitored__power_kw__gte=100).annotate( equipment=F('mpoint__ep_monitored__id', equipment_name=F('mpoint__ep_monitored__name')), consume=F('val')).values('equipment', 'equipment_name', 'consume') res = list(res) for item in res: try: item['consume_unit'] = item['consume'] / enstat.total_production except: pass enstat.equip_elec_data = res if enstat.mgroup.name == '回转窑': # 算单位产品(综合电耗/标煤耗/综合能耗) # 综合电耗 if enstat.type in ['hour_s', 'day_s', 'year_s', 'month_s']: pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name='原料磨').first() if pre_enstat: try: enstat.celec_consume_unit = enstat.elec_consume_unit + 1.45*pre_enstat.elec_consume_unit except: pass # 算标煤耗 if type in ['hour_s', 'day_s']: pcoal_val = get_pcoal_val(enstat.year_s, enstat.month_s, enstat.day_s) if pcoal_val: try: enstat.pcoal_coal_consume = enstat.pcoal_consume * pcoal_val/7000 enstat.coal_consume_unit = enstat.pcoal_coal_consume /enstat.total_production except: pass elif type == 'month_st': try: enstat.coal_consume_unit = EnStat.objects.filter(type='day_s', mgroup=enstat.mgroup, year_s=year_s, month_s=month_s, team=enstat.team).annotate(avg=Avg('coal_consume_unit'))['avg'] except: pass elif type == 'month_s': try: enstat.coal_consume_unit = EnStat.objects.filter(type='day_s', mgroup=enstat.mgroup, year_s=year_s, month_s=month_s).annotate(avg=Avg('coal_consume_unit'))['avg'] except: pass elif type == 'year_s': try: enstat.coal_consume_unit = EnStat.objects.filter(type='day_s', mgroup=enstat.mgroup, year_s=year_s).annotate(avg=Avg('coal_consume_unit'))['avg'] except: pass # 综合能耗 try: enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit except: pass if enstat.mgroup.name == '水泥磨' and enstat.type not in ['month_st', 'sflog']: pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name='回转窑').first() if pre_enstat: try: enstat.cen_consume_unit = enstat.elec_consume_unit*0.1229 + 0.7*pre_enstat.cen_consume_unit except: pass # 运转时长相关 if type != 'hour_s': enstat.total_hour_now, enstat.shut_hour = get_total_hour_now_and_shut_hour(enstat) enstat.run_hour = enstat.total_hour_now - enstat.shut_hour enstat.run_rate = (enstat.run_hour / enstat.total_hour_now)*100 enstat.production_hour = enstat.total_production / enstat.total_hour_now enstat.save() def get_total_hour_now_and_shut_hour(enstat: EnStat): from apps.wpm.models import SfLog # if enstat.type == 'hour_s': # # 找到停机记录,并划分到该小时 # end_time = datetime.datetime(enstat.year, enstat.month, enstat.day, enstat.hour) # start_time = end_time - datetime.timedelta(hours=1) # sts = StLog.objects.filter(mgroup=enstat.mgroup) # sts = (sts.filter(start_time__gt=start_time, start_time__lt=end_time)|sts.filter(start_time__lt=start_time, end_time=None)|sts.filter(end_time__gt=start_time, end_time__lt=end_time)).distinct() # shut_hour = 0 # for i in sts: # if i.end_time is None: # run_hour = 0 # if i.start_time > start_time: # run_hour = (i.start_time - start_time).total_seconds/3600 # shut_hour = 1 - run_hour # shut_hour = shut_hour + # return 1, 0 if enstat.type == 'sflog': sflog = enstat.sflog return sflog.total_hour_now, sflog.shut_hour elif enstat.type == 'day_s': res = SfLog.objects.filter(end_time__year=enstat.year_s, end_time__month=enstat.month_s, end_time__day=enstat.day_s, mgroup=enstat.mgroup).annotate( sum1 = Sum('total_hour_now'), sum2 = Sum('shut_hour') ) return res['sum1'] if res['sum1'] else 0, res['sum2'] if res['sum2'] else 0 elif enstat.type == 'month_st': res = SfLog.objects.filter(end_time__year=enstat.year_s, end_time__month=enstat.month_s, mgroup=enstat.mgroup, team=enstat.team).annotate( sum1 = Sum('total_hour_now'), sum2 = Sum('shut_hour') ) return res['sum1'] if res['sum1'] else 0, res['sum2'] if res['sum2'] else 0 elif enstat.type == 'month_s': res = SfLog.objects.filter(end_time__year=enstat.year_s, end_time__month=enstat.month_s, mgroup=enstat.mgroup).annotate( sum1 = Sum('total_hour_now'), sum2 = Sum('shut_hour') ) return res['sum1'] if res['sum1'] else 0, res['sum2'] if res['sum2'] else 0 elif enstat.type == 'year_s': res = SfLog.objects.filter(end_time__year=enstat.year_s, mgroup=enstat.mgroup).annotate( sum1 = Sum('total_hour_now'), sum2 = Sum('shut_hour') ) return res['sum1'] if res['sum1'] else 0, res['sum2'] if res['sum2'] else 0