# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from apps.enm.models import MpLogx, Mpoint, MpointStat, EnStat, EnStat2 from apps.wpm.models import SfLog import datetime from django.db.models import Sum, Avg from dateutil import tz from django.conf import settings from apps.wpm.services import get_sflog from apps.mtm.models import Mgroup, Material from apps.fim.services import get_cost_unit, get_price_unit from apps.fim.models import Fee from apps.enm.services import translate_eval_formula import logging from server.settings import get_sysconfig from django.db.models import F from apps.wpm.services import get_pcoal_heat from django.utils import timezone from django.db.models import Max from apps.third.king.k import kingClient from apps.third.king.king_api import kapis from apps.enm.services import insert_mplogx_from_king_rest_chunk, MpointCache from django.utils.timezone import localtime from apps.wpm.tasks import get_total_sec_now, cal_exp_duration_sec myLogger = logging.getLogger("log") def get_current_and_previous_time(): now = datetime.datetime.now() pre = now - datetime.timedelta(hours=1) return now, pre # @shared_task(base=CustomTask) # def get_tag_val(): # """ # 应该用不着 # """ # config = get_sysconfig() # with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: # last_tag_id = config['enm'].get('last_tag_id', None) # if not last_tag_id: # mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first() # if mr is None: # last_tag_id = 0 # else: # last_tag_id = mr.tag_id # update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) # cursor.execute( # "select id, val, tag_code, update_time from tag_value where id > %s order by update_time, id", (last_tag_id)) # results = cursor.fetchall() # 获取数据后保存至本地 # need_func = {} # 需要执行测点函数的字典, 此种情况只执行一次 # for row in results: # mr_one = MpLog() # mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row # mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={ # 'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'}) # mr_one.mpoint = mpoint # mr_one.save() # last_tag_id = mr_one.tag_id # if mpoint.func_on_change: # need_func[mpoint.id] = mr_one.id # # 执行测点函数 # for key in need_func: # mpoint_val_on_change.delay(need_func[key]) # update_sysconfig({'enm': {'last_tag_id': last_tag_id}}) # @shared_task(base=CustomTask) # def mpoint_val_on_change(mpLogId: str): # """测点值变化的时候执行任务(废弃) # """ # mplog = MpLog.objects.get(id=mpLogId) # mpoint = mplog.mpoint # module, func = mpoint.func_on_change.rsplit(".", 1) # m = importlib.import_module(module) # f = getattr(m, func) # f(mplog) # 同步执行 @shared_task(base=CustomTask) def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal_attrs=[]): """ 重跑某一段时间的任务 """ mytz = tz.gettz(settings.TIME_ZONE) start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") start_time.replace(tzinfo=mytz) end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") start_time.replace(tzinfo=mytz) current_time = start_time while current_time <= end_time: year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs=[]) current_time += datetime.timedelta(hours=1) @shared_task(base=CustomTask) def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int, cascade=True, sflog_hours=[]): """ 计算某一测点, 某一时间点某一小时的统计值 """ mpoint = Mpoint.objects.get(id=mpointId) mytz = tz.gettz(settings.TIME_ZONE) dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz) # 整点时间 dt_hour_p= dt - datetime.timedelta(hours=1) # 上个整点 dt_hour_n= dt + datetime.timedelta(hours=1) # 下个整点 if mpoint.material and mpoint.val_type in ['float', 'int']: # 如果计量的是物料 # 累计量 有的会清零,需要额外处理(还未做) params = {"mpoint": mpoint, "type": "hour"} params["year"], params["month"], params["day"], params["hour"] = year, month, day, hour val = 0 val_type = mpoint.val_type if mpoint.type == Mpoint.MT_AUTO: mrs0 = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt_hour_p, timex__lte=dt).order_by("timex") mrs = MpLogx.objects.filter(mpoint=mpoint, timex__gte=dt, timex__lte=dt_hour_n).order_by("timex") if mrs0.exists() and mrs.exists(): last_val = getattr(mrs.last(), f'val_{val_type}') first_val = getattr(mrs0.last(), f'val_{val_type}') if last_val >= first_val: val = last_val - first_val else: # 这里判断有可能清零了 max_val = max(mrs.aggregate(max=Max(f'val_{val_type}'))["max"], first_val) val = max_val - first_val + last_val elif mpoint.type == Mpoint.MT_COMPUTE and mpoint.formula: formula = mpoint.formula val = translate_eval_formula(formula, year, month, day, hour) else: return ms, _ = MpointStat.objects.get_or_create(**params, defaults=params) ms.val = ms.val_correct if ms.val_correct is not None else val ms.save() # 更新更高级别的值 if cascade or hour == 23: params_day = {"type": "day", "mpoint": mpoint, "year": year, "month": month, "day": day} ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day) if ms_day.val_correct is not None: ms_day.val = ms_day.val_correct else: sum_dict_day = MpointStat.objects.filter(type="hour", mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum("val")) ms_day.val = sum_dict_day['sum'] if sum_dict_day['sum'] is not None else 0 ms_day.val_origin = ms_day.val ms_day.save() if cascade or day in [28, 29, 30, 31]: params_month = {"type": "month", "mpoint": mpoint, "year": year, "month": month} ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month) if ms_month.val_correct is not None: ms_month.val = ms_month.val_correct else: sum_dict_month = MpointStat.objects.filter(type="day", mpoint=mpoint, year=year, month=month).aggregate(sum=Sum("val")) ms_month.val = sum_dict_month['sum'] if sum_dict_month['sum'] is not None else 0 ms_month.val_origin = ms_month.val ms_month.save() if cascade or month == 12: params_year = {"type": "year", "mpoint": mpoint, "year": year} ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year) if ms_year.val_correct is not None: ms_year.val = ms_year.val_correct else: sum_dict_year = MpointStat.objects.filter(type="month", mpoint=mpoint, year=year).aggregate(sum=Sum("val")) ms_year.val = sum_dict_year['sum'] if sum_dict_year['sum'] is not None else 0 ms_year.val_origin = ms_year.val ms_year.save() mgroup = mpoint.mgroup if mgroup: sflog = get_sflog(mgroup, dt) if sflog is None: myLogger.error(f'{mgroup.name}--{dt}') year_s, month_s, day_s = sflog.get_ymd params_hour_s = { "type": "hour_s", "mpoint": mpoint, "sflog": sflog, "mgroup": mgroup, "year": year, "month": month, "day": day, "year_s": year_s, "month_s": month_s, "day_s": day_s, "hour": hour, } ms_hour_s, _ = MpointStat.objects.get_or_create(**params_hour_s, defaults=params_hour_s) ms_hour_s.val = ms_hour_s.val_correct if ms_hour_s.val_correct is not None else ms.val ms_hour_s.save() # 开始往上计算 params_sflog_s = {"type": "sflog", "mpoint": mpoint, "sflog": sflog, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup} ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s) if ms_sflog_s.val_correct is not None: ms_sflog_s.val = ms_sflog_s.val_correct else: sum_dict_sflog_s = MpointStat.objects.filter(type="hour_s", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog).aggregate(sum=Sum("val")) ms_sflog_s.val = sum_dict_sflog_s['sum'] if sum_dict_sflog_s['sum'] is not None else 0 ms_sflog_s.val_origin = ms_sflog_s.val ms_sflog_s.save() # next_cal_dict = [mpoint.material.id, sflog.id, year, month, day, hour, year_s, month_s, day_s] # if next_cal_dict == cache.get('enm_cal_dict', None): # next_cal = 0 # else: # next_cal = 1 # cache.set('enm_cal_dict', next_cal_dict, 60) cal_mpointstat_manual(mpoint.id, sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s, 0) @shared_task(base=CustomTask) def cal_mpointstats(is_now=1, year=None, month=None, day=None, hour=None, m_code_list=[], cal_attrs=[]): """ 计算所有自动采集测点的统计值,默认当前小时, 可手动传入时间和测点编号集 cal_attrs: 需要计算的属性,空列表默认计算所有属性["material", "pcoal", "run_hour"] """ if year and month and day and hour is not None: pass else: now, pre = get_current_and_previous_time() if is_now: year, month, day, hour = now.year, now.month, now.day, now.hour else: year, month, day, hour = pre.year, pre.month, pre.day, pre.hour if m_code_list: mpoints1 = Mpoint.objects.filter(code__in=m_code_list) for item in mpoints1: cal_mpointstat_hour(item.id, year, month, day, hour) mpoints_related = Mpoint.objects.none() code2 = [] for code in m_code_list: mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False, formula__contains='{' + code + '}') code2.extend(mpoints_related.values_list('code', flat=True)) code2 = list(set(code2)) for code in code2: mpoints_related = mpoints_related | Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False, formula__contains='{' + code + '}') mpoints_related = mpoints_related.distinct().order_by('report_sortstr', 'create_time') for item in mpoints_related: cal_mpointstat_hour(item.id, year, month, day, hour) else: # 先统计自动采集的测点 mpoints_auto = Mpoint.objects.filter(type=Mpoint.MT_AUTO, enabled=True) # mpoints_without_formula_group = [] for item in mpoints_auto: # mpoints_without_formula_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour)) cal_mpointstat_hour(item.id, year, month, day, hour) # 再统计计算测点 mpoints_compute = Mpoint.objects.filter(type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False).exclude(formula="").order_by('report_sortstr', 'create_time') # mpoints_other_group = [] for item in mpoints_compute: # mpoints_other_group.append(cal_mpointstat_hour.s(item.id, year, month, day, hour)) cal_mpointstat_hour(item.id, year, month, day, hour) # 先调整一下班时间,以防计算错误 get_total_sec_now() # 先处理total_sec cal_exp_duration_sec() # 再处理shut_sec # 开始计算enstat mgroups = Mgroup.objects.filter(need_enm=True).order_by("sort") # mgroups_group = [] year_s, month_s, day_s = 0, 0, 0 for mgroup in mgroups: # mgroups_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['material', 'run_hour'])) year_s, month_s, day_s = cal_enstat("hour_s", None, mgroup.id, year, month, day, hour, None, None, None, True, cal_attrs) # mgroups_t = mgroups.filter(name__in=['回转窑', '水泥磨']) # mgroups_t_group = [] # for mgroup in mgroups_t: # mgroups_t_group.append(cal_enstat.s('hour_s', None, mgroup.id, year, month, day, hour, None, None, None, True, ['pcoal'])) # 最后计算enstat2 cal_enstat2(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s) # task_chain = chain(group(mpoints_without_formula_group), group(mpoints_other_group), group(mgroups_group), group(mgroups_t_group), group([cal_enstat2.s(year_s=year, month_s=month)])) # task_chain.delay() if hour == 21: enm_alarm.delay(year_s, month_s, day_s) @shared_task(base=CustomTask) def cal_mpointstat_manual(mpointId: str, sflogId: str, mgroupId: str, year: int, month: int, day: int, hour: int, year_s: int, month_s: int, day_s: int, next_cal=0): """ 手动录入的测点数据进行往上统计,一级一级往上 """ mpoint = Mpoint.objects.get(id=mpointId) mgroup = Mgroup.objects.get(id=mgroupId) if sflogId: params_day_s = {"type": "day_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup} ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s) if ms_day_s.val_correct is not None: ms_day_s.val = ms_day_s.val_correct else: sum_dict_day_s = MpointStat.objects.filter(type="sflog", mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum("val")) ms_day_s.val = sum_dict_day_s['sum'] if sum_dict_day_s['sum'] is not None else 0 ms_day_s.val_origin = ms_day_s.val ms_day_s.save() if day_s: params_month_s = {"type": "month_s", "mpoint": mpoint, "year_s": year_s, "month_s": month_s, "mgroup": mgroup} ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s) if ms_month_s.val_correct is not None: ms_month_s.val = ms_month_s.val_correct else: sum_dict_month_s = MpointStat.objects.filter(type="day_s", mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum("val")) ms_month_s.val = sum_dict_month_s['sum'] if sum_dict_month_s['sum'] is not None else 0 ms_month_s.val_origin = ms_month_s.val ms_month_s.save() if month_s: params_year_s = {"type": "year_s", "mpoint": mpoint, "year_s": year_s, "mgroup": mgroup} ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s) if ms_year_s.val_correct is not None: ms_year_s.val = ms_year_s.val_correct else: sum_dict_year_s = MpointStat.objects.filter(type="month_s", mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum("val")) ms_year_s.val = sum_dict_year_s['sum'] if sum_dict_year_s['sum'] is not None else 0 ms_year_s.val_origin = ms_year_s.val ms_year_s.save() if next_cal: # 计算工段级数据,手动录入时调用 start_cal_type = 'hour_s' if hour: start_cal_type = 'hour_s' elif sflogId: start_cal_type = 'sflog' elif day_s: start_cal_type = 'day_s' elif month_s: start_cal_type = 'month_s' elif year_s: start_cal_type = 'year_s' cal_enstat(start_cal_type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s) types_list = ["hour_s", "sflog", "day_s", "month_st", "month_s", "year_s"] @shared_task(base=CustomTask) def cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, cascade=True, cal_attrs=[]): """ 计算能源数据统计 """ if cascade: if type in types_list: start_index = types_list.index(type) new_types_list = types_list[start_index:] for type in new_types_list: year_s, month_s, day_s = cal_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s, False) return year_s, month_s, day_s if not cal_attrs: this_cal_attrs = ["material", "pcoal", "run_hour"] else: this_cal_attrs = cal_attrs[:] mgroup = Mgroup.objects.get(id=mgroupId) sflog = None # 默认无班次 if sflogId: sflog = SfLog.objects.get(id=sflogId) elif year and month and day and hour is not None: mytz = tz.gettz(settings.TIME_ZONE) dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz) sflog = get_sflog(mgroup, dt) team = None if sflog: year_s, month_s, day_s = sflog.get_ymd team = sflog.team # 如果没有班组,那么month_st无需计算 if team is None and type == "month_st": return year_s, month_s, day_s if type == "hour_s": enstat, _ = EnStat.objects.get_or_create( type="hour_s", mgroup=mgroup, year=year, month=month, day=day, hour=hour, defaults={"type": "hour_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s, "year": year, "month": month, "day": day, "hour": hour, "sflog": sflog}, ) elif type == "sflog": enstat, _ = EnStat.objects.get_or_create(type="sflog", sflog=sflog, defaults={"type": "sflog", "sflog": sflog, "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s}) elif type == "day_s": enstat, _ = EnStat.objects.get_or_create( type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, defaults={"type": "day_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "day_s": day_s} ) elif type == "month_st" and team: enstat, _ = EnStat.objects.get_or_create( type="month_st", mgroup=mgroup, team=team, year_s=year_s, month_s=month_s, defaults={"type": "month_st", "mgroup": mgroup, "year_s": year_s, "month_s": month_s, "team": team} ) elif type == "month_s": enstat, _ = EnStat.objects.get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, defaults={"type": "month_s", "mgroup": mgroup, "year_s": year_s, "month_s": month_s}) elif type == "year_s": enstat, _ = EnStat.objects.get_or_create(type="year_s", mgroup=mgroup, year_s=year_s, defaults={"type": "year_s", "mgroup": mgroup, "year_s": year_s}) if "material" in this_cal_attrs: # 消耗物料统计(包括电耗) input_materials = [] has_product = False input_materials = mgroup.input_materials if mgroup.product: input_materials.insert(0, mgroup.product.id) has_product = True imaterial_cost_unit = 0 imaterial_data = [] for ind, mid in enumerate(input_materials): material = Material.objects.get(id=mid) if type == "hour_s": mps = MpointStat.objects.filter(type="hour_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material) elif type == "sflog": mps = MpointStat.objects.filter(type="sflog", sflog=sflog, mpoint__material=material) elif type == "day_s": mps = MpointStat.objects.filter(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material) elif type == "month_st": mps = MpointStat.objects.filter(type="sflog", mgroup=mgroup, sflog__team=team, year_s=year_s, month_s=month_s, mpoint__material=material) elif type == "month_s": mps = MpointStat.objects.filter(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material) elif type == "year_s": mps = MpointStat.objects.filter(type="year_s", mgroup=mgroup, year_s=year_s, mpoint__material=material) if mps.filter(mpoint__is_rep_mgroup=True).exists(): mps = mps.filter(mpoint__is_rep_mgroup=True) amount_consume = mps.aggregate(sum=Sum("val"))["sum"] if amount_consume is None: amount_consume = 0 if ind == 0 and has_product: # 如果是产量 enstat.total_production = amount_consume else: if material.code in ["pcoal", "cair", "steam"]: price_unit = 0 else: price_unit = get_price_unit(material, year_s, month_s) cost = amount_consume * price_unit try: cost_unit = cost / enstat.total_production except ZeroDivisionError: cost_unit = 0 imaterial_cost_unit = imaterial_cost_unit + cost_unit if material.code == "elec": enstat.elec_consume = amount_consume enstat.elec_coal_consume = enstat.elec_consume * 0.1229 / 1000 try: enstat.elec_consume_unit = enstat.elec_consume / enstat.total_production except ZeroDivisionError: enstat.elec_consume_unit = 0 elif material.code == "water": enstat.water_consume = amount_consume elif material.code == "pcoal": enstat.pcoal_consume = amount_consume elif material.code == "cair": enstat.cair_consume = amount_consume elif material.code == "steam": enstat.out_steam = amount_consume enstat.out_steam_coal = enstat.out_steam * 128.6 / 1000 elif material.code == "ccr": enstat.ccr_consume = amount_consume enstat.kiln_end_heat = enstat.total_production - enstat.ccr_consume imaterial_item = { "material": mid, "material_name": material.name, "material_code": material.code, "material_type": material.type, "price_unit": price_unit, "amount_consume": amount_consume, "cost": cost, "cost_unit": cost_unit, } imaterial_data.append(imaterial_item) enstat.imaterial_data = imaterial_data # 其他成本数据 other_cost_data = [] other_cost_unit = 0 fee_qs = Fee.objects.order_by("sort") for fee in fee_qs: item = {"element": fee.element, "cate": fee.cate, "name": fee.name, "id": fee.id} item["cost_unit"] = get_cost_unit(mgroup, fee, year_s, month_s) other_cost_unit = other_cost_unit + item["cost_unit"] other_cost_data.append(item) enstat.other_cost_data = other_cost_data enstat.production_cost_unit = imaterial_cost_unit + other_cost_unit enstat.save() if mgroup.cate == 'section': # 更新所监测设备测点的total_production total_production = enstat.total_production if total_production == 0: elec_consume_unit = 0 else: elec_consume_unit = F("val") / total_production MpointStat.objects.filter( mgroup=enstat.mgroup, mpoint__material__code="elec", type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour ).exclude(mpoint__ep_monitored=None).update(total_production=total_production, elec_consume_unit=elec_consume_unit) # 更新余热发电的production_elec_unit if mgroup.name == "回转窑": if enstat.total_production == 0: production_elec_unit_yr = 0 else: production_elec_unit_yr = F("total_production") / enstat.total_production EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="余热发电").update( production_elec_unit=production_elec_unit_yr ) # 其他一些触发计算 if mgroup.cate == "section": if "material" in this_cal_attrs and mgroup.name != "回转窑": try: enstat.en_consume_unit = enstat.elec_coal_consume / enstat.total_production except ZeroDivisionError: enstat.en_consume_unit = 0 enstat.save() # 计算一些其他数据 if type == "month_st" and "material" in this_cal_attrs: # 如果计算的是班月,把主要设备电耗数据拉过来, 方便查询 res = MpointStat.objects.filter(type="sflog", year_s=year_s, month_s=month_s, sflog__team=enstat.team, mpoint__ep_monitored__isnull=False).values( "elec_consume_unit", equipment=F("mpoint__ep_monitored__id"), equipment_name=F("mpoint__ep_monitored__name") ) enstat.equip_elec_data = list(res) enstat.save() if enstat.mgroup.name == "回转窑": # 算单位产品(综合电耗/标煤耗/综合能耗) # 综合电耗 if enstat.type in ["hour_s", "day_s", "year_s", "month_s"]: pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="原料磨").first() if pre_enstat: enstat.celec_consume_unit = enstat.elec_consume_unit + get_sysconfig('enm.enm_lhxs') * pre_enstat.elec_consume_unit enstat.save() # 算总煤耗 if "pcoal" in this_cal_attrs: if type in ["hour_s", "sflog", "day_s"]: enstat.pcoal_heat = get_pcoal_heat(enstat.year_s, enstat.month_s, enstat.day_s) enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 7000 elif type == "month_st": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s, sflog__team=enstat.team).aggregate( sum=Sum("pcoal_coal_consume") )["sum"] elif type == "month_s": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=year_s, month_s=month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"] elif type == "year_s": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"] if enstat.pcoal_coal_consume is None: enstat.pcoal_coal_consume = 0 # 算单位产品标煤耗 try: enstat.coal_consume_unit = enstat.pcoal_coal_consume * 1000 / enstat.total_production except ZeroDivisionError: enstat.coal_consume_unit = 0 # 综合能耗 enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit enstat.save() elif enstat.mgroup.name == "水泥磨" and enstat.type not in ["month_st", "sflog"] and "pcoal" in this_cal_attrs: pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first() if pre_enstat: # 综合能耗 enstat.cen_consume_unit = enstat.elec_consume_unit * 0.1229 + 0.7 * pre_enstat.cen_consume_unit enstat.save() elif enstat.mgroup.name == '余热发电': pre_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="回转窑").first() if pre_enstat: # 吨熟料发电量 try: enstat.production_elec_unit = enstat.total_production / pre_enstat.total_production enstat.save() except ZeroDivisionError: enstat.production_elec_unit = 0 enstat.save() # 运转时长相关 if type != "hour_s" and "run_hour" in this_cal_attrs: enstat.total_sec_now, enstat.shut_sec = get_total_sec_now_and_shut_sec(enstat) enstat.run_sec = enstat.total_sec_now - enstat.shut_sec try: enstat.run_rate = (enstat.run_sec / enstat.total_sec_now) * 100 except ZeroDivisionError: enstat.run_rate = 0 try: enstat.production_hour = enstat.total_production * 3600 / enstat.run_sec except ZeroDivisionError: enstat.production_hour = 0 enstat.save() enstat.save() return year_s, month_s, day_s # 返回这个值主要为了全厂数据计算而用 def get_total_sec_now_and_shut_sec(enstat: EnStat): from apps.wpm.models import SfLog # if enstat.type == 'hour_s': # # 找到停机记录,并划分到该小时 # end_time = datetime.datetime(enstat.year, enstat.month, enstat.day, enstat.hour) # start_time = end_time - datetime.timedelta(hours=1) # sts = StLog.objects.filter(mgroup=enstat.mgroup) # sts = (sts.filter(start_time__gt=start_time, start_time__lt=end_time)|sts.filter(start_time__lt=start_time, end_time=None)|sts.filter(end_time__gt=start_time, end_time__lt=end_time)).distinct() # shut_hour = 0 # for i in sts: # if i.end_time is None: # run_hour = 0 # if i.start_time > start_time: # run_hour = (i.start_time - start_time).total_seconds/3600 # shut_hour = 1 - run_hour # shut_hour = shut_hour + # return 1, 0 if enstat.type == "sflog": sflog = enstat.sflog return sflog.total_sec_now, sflog.shut_sec elif enstat.type == "day_s": res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, work_date__day=enstat.day_s, mgroup=enstat.mgroup).aggregate( sum1=Sum("total_sec_now"), sum2=Sum("shut_sec") ) return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0 elif enstat.type == "month_st": res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup, team=enstat.team).aggregate( sum1=Sum("total_sec_now"), sum2=Sum("shut_sec") ) return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0 elif enstat.type == "month_s": res = SfLog.objects.filter(work_date__year=enstat.year_s, work_date__month=enstat.month_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_sec_now"), sum2=Sum("shut_sec")) return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0 elif enstat.type == "year_s": res = SfLog.objects.filter(work_date__year=enstat.year_s, mgroup=enstat.mgroup).aggregate(sum1=Sum("total_sec_now"), sum2=Sum("shut_sec")) return res["sum1"] if res["sum1"] else 0, res["sum2"] if res["sum2"] else 0 @shared_task(base=CustomTask) def cal_enstat2(type: str, year_s: int, month_s: int, day_s: int, cascade=True): if cascade: if type == "day_s": cal_enstat2("day_s", year_s, month_s, day_s, False) cal_enstat2("month_s", year_s, month_s, day_s, False) elif type == "month_s": cal_enstat2("month_s", year_s, month_s, day_s, False) else: return if type == "month_s": enstat2, _ = EnStat2.objects.get_or_create(type="month_s", year_s=year_s, month_s=month_s, defaults={"year_s": year_s, "month_s": month_s, "type": "month_s"}) elif type == "day_s": enstat2, _ = EnStat2.objects.get_or_create(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s, defaults={"year_s": year_s, "month_s": month_s, "day_s": day_s, "type": "day_s"}) # enstat2 = EnStat2.objects.select_for_update().get(id=enstat2.id) # 加锁 material_cement = Material.objects.get(code="cement") material_clinker = Material.objects.get(code="clinker") material_bulk_cement = Material.objects.get(code="bulk_cement") material_bag_cement = Material.objects.get(code="bag_cement") enstat2.bulk_cement_price = get_price_unit(material_bulk_cement, year_s, month_s) enstat2.clinker_price = get_price_unit(material_clinker, year_s, month_s) enstat2.bag_cement_price = get_price_unit(material_bag_cement, year_s, month_s) if type == "month_s": enstat2.bulk_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material=material_bulk_cement, year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"] elif type == "day_s": enstat2.bulk_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material=material_bulk_cement, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"] if enstat2.bulk_cement_val is None: enstat2.bulk_cement_val = 0 if type == "month_s": enstat2.bag_cement_val = MpointStat.objects.filter(type="month_s", mpoint__material=material_bag_cement, year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"] elif type == "day_s": enstat2.bag_cement_val = MpointStat.objects.filter(type="day_s", mpoint__material=material_bag_cement, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"] if enstat2.bag_cement_val is None: enstat2.bag_cement_val = 0 if type == "month_s": enstat2.clinker_val = MpointStat.objects.filter(type="month_s", mpoint__material=material_cement, year_s=year_s, month_s=month_s).aggregate(sum=Sum("val"))["sum"] elif type == "day_s": enstat2.clinker_val = MpointStat.objects.filter(type="day_s", mpoint__material=material_cement, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("val"))["sum"] if enstat2.clinker_val is None: enstat2.clinker_val = 0 enstat2.industry_total_val = (enstat2.bulk_cement_val * enstat2.bulk_cement_price + enstat2.bag_cement_val * enstat2.bag_cement_price + enstat2.clinker_val * enstat2.clinker_price) / 10000 if type == "month_s": res = EnStat.objects.filter(mgroup__product__code="cement", type="month_s", year_s=year_s, month_s=month_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit")) elif type == "day_s": res = EnStat.objects.filter(mgroup__product__code="cement", type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum("total_production"), avg=Avg("production_cost_unit")) enstat2.cement_val = res["sum"] if res["sum"] else 0 enstat2.cement_cost_unit = res["avg"] if res["avg"] else 0 enstat2.industry_add_val = enstat2.industry_total_val - enstat2.cement_val * enstat2.cement_cost_unit / 10000 # 全厂电量 # 全厂的耗电量和水量都得单独处理 use_mpoint_elec_val = False mp_elecs = Mpoint.objects.filter(material__code="elec", code__endswith='__all') if mp_elecs.exists(): # use_mpoint_elec_val = True if type == "month_s": enstat_qs = EnStat.objects.filter(type="month_s", year_s=year_s, month_s=month_s) elif type == "day_s": enstat_qs = EnStat.objects.filter(type="day_s", year_s=year_s, month_s=month_s, day_s=day_s) res_elec_pcoal = enstat_qs.aggregate( sum1=Sum("elec_consume"), sum2=Sum("elec_coal_consume"), sum3=Sum("pcoal_consume"), sum4=Sum("pcoal_coal_consume"), sum5=Sum("water_consume"), sum6=Sum("cair_consume") ) if use_mpoint_elec_val: if type == 'day_s': enstat2.elec_consume = MpointStat.objects.filter(type='day', mpoint__in=mp_elecs, year=year_s, month=month_s, day=day_s).aggregate(sum=Sum("val"))['sum'] elif type == 'month_s': enstat2.elec_consume = MpointStat.objects.filter(type='month', mpoint__in=mp_elecs, year=year_s, month=month_s).aggregate(sum=Sum("val"))['sum'] if enstat2.elec_consume is None: enstat2.elec_consume = 0 else: enstat2.elec_consume = res_elec_pcoal["sum1"] if res_elec_pcoal["sum1"] else 0 enstat2.elec_coal_consume = enstat2.elec_consume * 0.1229 / 1000 # 其他的统计工段合就行 enstat2.pcoal_consume = res_elec_pcoal["sum3"] if res_elec_pcoal["sum3"] else 0 enstat2.pcoal_coal_consume = res_elec_pcoal["sum4"] if res_elec_pcoal["sum4"] else 0 enstat2.water_consume = res_elec_pcoal["sum5"] if res_elec_pcoal["sum5"] else 0 enstat2.cair_consume = res_elec_pcoal["sum6"] if res_elec_pcoal["sum6"] else 0 enstat2.en_consume = enstat2.pcoal_coal_consume + enstat2.elec_coal_consume try: enstat2.en_consume_unit = enstat2.en_consume / enstat2.industry_total_val except ZeroDivisionError: enstat2.en_consume_unit = 0 try: enstat2.en_add_consume_unit = enstat2.en_consume / enstat2.industry_add_val except ZeroDivisionError: enstat2.en_add_consume_unit = 0 enstat2.save() def cal_enstat_pcoal_change(enstat: EnStat, new_pcoal_heat): type = enstat.type if type in ["hour_s", "sflog", "day_s"]: enstat.pcoal_heat = new_pcoal_heat enstat.pcoal_coal_consume = enstat.pcoal_consume * enstat.pcoal_heat / 7000 elif type == "month_st": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s, sflog__team=enstat.team).aggregate( sum=Sum("pcoal_coal_consume") )["sum"] elif type == "month_s": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s, month_s=enstat.month_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"] elif type == "year_s": enstat.pcoal_coal_consume = EnStat.objects.filter(type="sflog", mgroup=enstat.mgroup, year_s=enstat.year_s).aggregate(sum=Sum("pcoal_coal_consume"))["sum"] if enstat.pcoal_coal_consume is None: enstat.pcoal_coal_consume = 0 # 算单位产品标煤耗 try: enstat.coal_consume_unit = enstat.pcoal_coal_consume * 1000 / enstat.total_production except ZeroDivisionError: enstat.coal_consume_unit = 0 # 综合能耗 enstat.cen_consume_unit = enstat.coal_consume_unit + 0.1229 * enstat.elec_consume_unit enstat.save(update_fields=["pcoal_heat", "pcoal_coal_consume", "coal_consume_unit", "cen_consume_unit"]) # 同步更新水泥磨的综合能耗,这步有可能不成功,因为水泥磨是后算的, 但是当pcoal_change时这个就有用了 if type not in ["month_st", "sflog"]: next_enstat = EnStat.objects.filter(type=enstat.type, year_s=enstat.year_s, month_s=enstat.month_s, day_s=enstat.day_s, hour=enstat.hour, mgroup__name="水泥磨").first() if next_enstat: next_enstat.cen_consume_unit = next_enstat.elec_consume_unit * 0.1229 + 0.7 * enstat.cen_consume_unit next_enstat.save(update_fields=["cen_consume_unit"]) enm_alarms_list = [["回转窑", "celec_consume_unit", "单位产品综合电耗"], ["回转窑", "coal_consume_unit", "单位产品标煤耗"], ["水泥磨", "elec_consume_unit", "单位产品分布电耗"]] @shared_task(base=CustomTask) def enm_alarm(year_s: int, month_s: int, day_s: int): """ enm报警任务 """ from apps.ecm.models import Event, EventCate, Eventdo from apps.mtm.models import Goal from apps.ecm.service import notify_event now = timezone.now() event_cate, _ = EventCate.objects.get_or_create(code="consume_exceed", defaults={"name": "能耗超过目标值", "code": "consume_exceed", "trigger": 30}) for item in enm_alarms_list: mgroups = Mgroup.objects.filter(name=item[0]) for mgroup in mgroups: enstat = EnStat.objects.filter(mgroup=mgroup, type="day_s", year_s=year_s, month_s=month_s, day_s=day_s).first() if enstat: mgroup_name = item[0] goal_cate_str = item[1] real_val = getattr(enstat, goal_cate_str, None) goal = Goal.objects.filter(goal_cate__code=goal_cate_str, year=year_s, mgroup=mgroup).first() if goal: goal_val = getattr(goal, f"goal_val_{month_s}", None) if goal_val and real_val and real_val > goal_val: # 触发事件 event = Event() event.obj_cate = "enm" event.happen_time = now event.voice_msg = f"{mgroup_name}{item[2]}超过设定目标值" event.enm_data = { "mgroup": mgroup.id, "mgroup_name": mgroup.name, "type": f"{goal_cate_str}.exceed", "year_s": year_s, "month_s": month_s, "day_s": day_s, "val": real_val, "goal_val": goal_val, "enstat": enstat.id, } event.save() Eventdo.objects.get_or_create(cate=event_cate, event=event, defaults={"cate": event_cate, "event": event}) notify_event(event) @shared_task(base=CustomTask) def king_insert_mplogx(): mpoint_codes = Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO, code__startswith='K_').values_list('code', flat=True) ml = [] for m in mpoint_codes: ml.append({"N": m.replace('K_', '')}) _, res = kingClient.request(**kapis['read_batchtagrealvalue'], json={"objs": ml}) insert_mplogx_from_king_rest_chunk(res) @shared_task(base=CustomTask) def check_mpoint_offline(seconds=100): """监测测点采集掉线""" now = localtime() for mpoint in Mpoint.objects.filter(enabled=True, type=Mpoint.MT_AUTO)| Mpoint.objects.filter(enabled=True, type=Mpoint.MT_COMPUTE, is_rep_ep_running_state=True): mc = MpointCache(mpoint.code) mpoint_data = mc.data last_data = mpoint_data.get('last_data', None) is_offline = True if last_data and last_data['last_timex'] and (now-last_data['last_timex']).total_seconds() < seconds: is_offline = False if is_offline: mc.set_fail(-2, now)