# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from apps.utils.sql import DbConnection from server.settings import get_sysconfig from django.core.cache import cache from apps.enm.models import MpLog, Mpoint, MpointStat, EnStat from apps.wpm.models import SfLog import datetime from django.db.models import Sum from dateutil import tz from django.conf import settings from django.utils.timezone import localtime from apps.wpm.services import make_sflogs from apps.mtm.models import Mgroup, Material from apps.fim.services import get_cost_unit, get_price_unit from apps.fim.models import Fee def get_current_and_previous_time(): now = datetime.datetime.now() pre = now - datetime.timedelta(hours=1) return now, pre @shared_task(base=CustomTask) def get_tag_val(): config = get_sysconfig() with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: last_tag_id = cache.get('last_tag_id') if last_tag_id is None: mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first() if mr is None: last_tag_id = 0 else: last_tag_id = mr.tag_id cache.set('last_tag_id', last_tag_id) cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id)) results = cursor.fetchall() # 获取数据后保存至本地 for row in results: mr_one = MpLog() mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row mr_one.mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'}) mr_one.save() last_tag_id = mr_one.tag_id cache.set('last_tag_id', last_tag_id) @shared_task(base=CustomTask) def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int): """ 计算某一测点, 某一时间点某一小时的统计值 """ mpoint = Mpoint.objects.get(id=mpointId) mytz = tz.gettz(settings.TIME_ZONE) dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz) if mpoint.material: # 如果计量的是物料 params = {'mpoint': mpoint, 'type': 'hour'} params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour mrs = MpLog.objects.filter( mpoint=mpoint, tag_update__year=params['year'], tag_update__month=params['month'], tag_update__day=params['day'], tag_update__hour= params['hour']).order_by('tag_update') val = 0 if mrs.exists(): val = mrs.last().tag_val - mrs.first().tag_val ms, _ = MpointStat.objects.get_or_create(**params, defaults=params) ms.val = val ms.save() # 更新更高级别的值 sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val')) params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day} ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day) ms_day.val = sum_dict_day['sum'] ms_day.save() sum_dict_month = MpointStat.objects.filter(type='day', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val')) params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month} ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month) ms_month.val = sum_dict_month['sum'] ms_month.save() sum_dict_year = MpointStat.objects.filter(type='month', mpoint=mpoint, year=year).aggregate(sum=Sum('val')) params_year = {'type':'year', 'mpoint': mpoint, 'year': year} ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year) ms_year.val = sum_dict_year['sum'] ms_year.save() if mpoint.mgroups_allocate: # 如果有分配系数 for allocate in mpoint.mgroups_allocate: mgroup = Mgroup.objects.get(id=allocate['mgroup']) ratio = allocate['ratio'] # 查找并绑定值班记录 sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first() if sflog is None: # 需要创建值班记录 make_sflogs(mgroup=mgroup, start_date=(dt-datetime.timedelta(days=1)).date(), end_date=dt.date()) sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first() end_time_local = localtime(sflog.end_time) year_s, month_s, day_s = end_time_local.year, end_time_local.month, end_time_local.day params_hour_s = {'type': 'hour_s', 'mpoint': mpoint, 'sflog': sflog, 'mgroup': mgroup, 'year': year, 'month': month, 'day': day, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'hour': hour} ms_hour_s, _ = MpointStat.objects.get_or_create(**params_hour_s, defaults=params_hour_s) ms_hour_s.val = ms.val*ratio ms_hour_s.save() # 开始往上计算 sum_dict_sflog_s = MpointStat.objects.filter(type='hour_s', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val')) params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'sflog': sflog, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup} ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s) ms_sflog_s.val = sum_dict_sflog_s['sum'] ms_sflog_s.save() cal_mpointstat_manual(mpoint.id, mgroup.id, year_s, month_s, day_s) compute_enstat('hour_s', sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s) @shared_task(base=CustomTask) def cal_mpointstats(is_now=1): """ 计算所有自动采集测点的统计值,默认当前小时 """ now, pre = get_current_and_previous_time() if is_now: for mpoint in Mpoint.objects.filter(is_auto=True): cal_mpointstat_hour.delay(mpoint.id, now.year, now.month, now.day, now.hour) else: for mpoint in Mpoint.objects.filter(is_auto=True): cal_mpointstat_hour.delay(mpoint.id, pre.year, pre.month, pre.day, pre.hour) @shared_task(base=CustomTask) def cal_mpointstat_manual(mpointId: str, mgroupId: str, year_s: int, month_s: int, day_s: int): """ 手动录入的测点数据进行往上统计,一级一级往上 """ mpoint = Mpoint.objects.get(id=mpointId) mgroup = Mgroup.objects.get(id=mgroupId) sum_dict_day_s = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum('val')) params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup} ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s) ms_day_s.val = sum_dict_day_s['sum'] ms_day_s.save() sum_dict_month_s = MpointStat.objects.filter(type='day_s', mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum('val')) params_month_s = {'type':'month_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'mgroup': mgroup} ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s) ms_month_s.val = sum_dict_month_s['sum'] ms_month_s.save() sum_dict_year_s = MpointStat.objects.filter(type='month_s', mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum('val')) params_year_s = {'type':'year_s', 'mpoint': mpoint, 'year_s': year_s, 'mgroup': mgroup} ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s) ms_year_s.val = sum_dict_year_s['sum'] ms_year_s.save() def compute_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s): """ 计算能源数据统计 """ mgroup = Mgroup.objects.get(id=mgroupId) sflog = SfLog.objects.get(id=sflogId) if type == 'hour_s': enstat, _ = EnStat.objects.get_or_create(type="hour_s", mgroup=mgroup, year=year, month=month, day=day, hour=hour, defaults={'type': 'hour_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'year': year, 'month': month, 'day': day, 'hour': hour, 'total_production': 0, 'elec_consume': 0}) elif type == 'sflog': enstat, _ = EnStat.objects.get_or_create(type="sflog", sflog=sflog, defaults={'type': 'sflog', 'sflog': sflog, 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'total_production': 0, 'elec_consume': 0}) elif type == 'day_s': enstat, _ = EnStat.objects.get_or_create(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, defaults={'type': 'day_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'total_production': 0, 'elec_consume': 0}) elif type == 'month_s': enstat, _ = EnStat.objects.get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s, defaults={'type': 'month_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'total_production': 0, 'elec_consume': 0}) elif type == 'year_s': enstat, _ = EnStat.objects.get_or_create(type="year_s", mgroup=mgroup, year_s=year_s, month_s=month_s, defaults={'type': 'year_s', 'mgroup': mgroup, 'year_s': year_s, 'total_production': 0, 'elec_consume': 0}) # 物料统计 input_materials = mgroup.input_materials if mgroup.product: input_materials.insert(0, mgroup.product.id) cost_unit_total = 0 imaterial_data = [] for mid in input_materials: material = Material.objects.get(id=mid) if type == 'hour_s': mps = MpointStat.objects.filter(type='hour_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material) elif type == 'sflog': mps = MpointStat.objects.filter(type='sflog', sflog=sflog, mpoint__material=material) elif type == 'day_s': mps = MpointStat.objects.filter(type='day_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material) elif type == 'month_s': mps = MpointStat.objects.filter(type='month_s', mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material) elif type == 'year_s': mps = MpointStat.objects.filter(type='year_s', mgroup=mgroup, year_s=year_s, mpoint__material=material) if mps.filter(mpoint__is_all=True).exists(): mps = mps.filter(mpoint__is_all=True) amount_consume = mps.aggregate(sum=Sum('val'))['sum'] if amount_consume is None: amount_consume = 0 if mid == mgroup.product.id: enstat.total_production = amount_consume enstat.save() else: price_unit = get_price_unit(material, year_s, month_s) cost = amount_consume * price_unit try: cost_unit = cost/ enstat.total_production except: cost_unit = 0 cost_unit_total = cost_unit_total + cost_unit if material.code == 'elec': enstat.elec_consume = amount_consume enstat.save() imaterial_data.append({'material': mid, 'material_name': material.name, 'material_type': material.type, 'price_unit': price_unit, 'cost': cost, 'cost_unit': cost_unit}) enstat.imaterial_data = imaterial_data enstat.save() other_cost_data = [] for fee in Fee.objects.order_by('sort'): item = {'element': fee.element, 'cate': fee.cate, 'name': fee.name, 'id': fee.id} item['cost_unit'] = get_cost_unit(mgroup, fee, year_s, month_s) cost_unit_total = cost_unit_total + item['cost_unit'] other_cost_data.append(item) enstat.other_cost_data = other_cost_data enstat.production_cost_unit = cost_unit_total enstat.save() if type == 'hour_s': compute_enstat('sflog', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s) elif type == 'sflog': compute_enstat('day_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s) elif type == 'day_s': compute_enstat('month_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s) elif type == 'month_s': compute_enstat('year_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)