factory/apps/enm/tasks.py

280 lines
15 KiB
Python

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task, group, chain
from apps.utils.sql import DbConnection
from server.settings import get_sysconfig
from django.core.cache import cache
from apps.enm.models import MpLog, Mpoint, MpointStat, EnStat
from apps.wpm.models import SfLog
import datetime
from django.db.models import Sum
from dateutil import tz
from django.conf import settings
from apps.wpm.services import make_sflogs
from apps.mtm.models import Mgroup, Material
from apps.fim.services import get_cost_unit, get_price_unit
from apps.fim.models import Fee
from django.core.cache import cache
from apps.enm.services import translate_eval_formula
import logging
myLogger = logging.getLogger('log')
def get_current_and_previous_time():
now = datetime.datetime.now()
pre = now - datetime.timedelta(hours=1)
return now, pre
@shared_task(base=CustomTask)
def get_tag_val():
config = get_sysconfig()
with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
last_tag_id = cache.get('last_tag_id')
if last_tag_id is None:
mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
if mr is None:
last_tag_id = 0
else:
last_tag_id = mr.tag_id
cache.set('last_tag_id', last_tag_id)
cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id))
results = cursor.fetchall() # 获取数据后保存至本地
for row in results:
mr_one = MpLog()
mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
mr_one.mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
mr_one.save()
last_tag_id = mr_one.tag_id
cache.set('last_tag_id', last_tag_id)
@shared_task(base=CustomTask)
def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int):
"""
计算某一测点, 某一时间点某一小时的统计值
"""
mpoint = Mpoint.objects.get(id=mpointId)
mytz = tz.gettz(settings.TIME_ZONE)
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
if mpoint.material: # 如果计量的是物料
params = {'mpoint': mpoint, 'type': 'hour'}
params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour
val = 0
if mpoint.formula:
formular = mpoint.formular
try:
val = translate_eval_formula(formular, year, month, day, hour)
except:
myLogger.error('公式执行错误:{}-{}'.format(mpoint.id, formular), exc_info=True)
raise
else:
mrs = MpLog.objects.filter(
mpoint=mpoint,
tag_update__year=params['year'],
tag_update__month=params['month'],
tag_update__day=params['day'],
tag_update__hour= params['hour']).order_by('tag_update')
if mrs.exists():
val = mrs.last().tag_val - mrs.first().tag_val
ms, _ = MpointStat.objects.get_or_create(**params, defaults=params)
ms.val = val
ms.save()
# 更新更高级别的值
sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val'))
params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day}
ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day)
ms_day.val = sum_dict_day['sum']
ms_day.save()
sum_dict_month = MpointStat.objects.filter(type='day', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val'))
params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month}
ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month)
ms_month.val = sum_dict_month['sum']
ms_month.save()
sum_dict_year = MpointStat.objects.filter(type='month', mpoint=mpoint, year=year).aggregate(sum=Sum('val'))
params_year = {'type':'year', 'mpoint': mpoint, 'year': year}
ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year)
ms_year.val = sum_dict_year['sum']
ms_year.save()
if mpoint.mgroups_allocate: # 如果有分配系数
for allocate in mpoint.mgroups_allocate:
mgroup = Mgroup.objects.get(id=allocate['mgroup'])
ratio = allocate['ratio']
# 查找并绑定值班记录
sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first()
if sflog is None: # 需要创建值班记录
make_sflogs(mgroup=mgroup, start_date=(dt-datetime.timedelta(days=1)).date(), end_date=dt.date())
sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first()
year_s, month_s, day_s = sflog.get_ymd
params_hour_s = {'type': 'hour_s', 'mpoint': mpoint, 'sflog': sflog, 'mgroup': mgroup, 'year': year, 'month': month, 'day': day, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'hour': hour}
ms_hour_s, _ = MpointStat.objects.get_or_create(**params_hour_s, defaults=params_hour_s)
ms_hour_s.val = ms.val*ratio
ms_hour_s.save()
# 开始往上计算
sum_dict_sflog_s = MpointStat.objects.filter(type='hour_s', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'sflog': sflog, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup}
ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s)
ms_sflog_s.val = sum_dict_sflog_s['sum']
ms_sflog_s.save()
next_cal_dict = [mpoint.material.id, sflog.id, year, month, day, hour, year_s, month_s, day_s]
if next_cal_dict == cache.get('enm_cal_dict', None):
next_cal = 0
else:
next_cal = 1
cache.set('enm_cal_dict', next_cal_dict, 60)
cal_mpointstat_manual(mpoint.id, sflog.id, mgroup.id, year, month, day, hour, year_s, month_s, day_s, next_cal)
@shared_task(base=CustomTask)
def cal_mpointstats(is_now=1, year=None, month=None, day=None, hour=None):
"""
计算所有自动采集测点的统计值,默认当前小时, 可手动传入时间
"""
if year and month and day and hour:
pass
else:
now, pre = get_current_and_previous_time()
if is_now:
year, month, day, hour = now.year, now.month, now.day, now.hour
else:
year, month, day, hour = pre.year, pre.month, pre.day, pre.hour
mgroups = Mgroup.objects.exclude(product=None).order_by('sort') # 必须要进行排序, 因为有的产量是经过计算而得的
# 先统计自动采集的产量值
caled_mpointids = []
for mgroup in mgroups:
product = mgroup.product
mpoints = Mpoint.objects.filter(material=product, is_auto=True)
for ind, item in enumerate(mpoints):
caled_mpointids.append(item.id)
cal_mpointstat_hour(item.id, year, month, day, hour)
# 统计其他测点
mpoints = Mpoint.objects.filter(is_auto=True).exclude(id__in=caled_mpointids).order_by('material', 'mgroup')
for i in mpoints:
cal_mpointstat_hour(i.id, year, month, day, hour)
@shared_task(base=CustomTask)
def cal_mpointstat_manual(mpointId: str, sflogId: str, mgroupId: str, year: int, month: int, day: int, hour: int, year_s: int, month_s: int, day_s: int, next_cal=0):
"""
手动录入的测点数据进行往上统计,一级一级往上
"""
mpoint = Mpoint.objects.get(id=mpointId)
mgroup = Mgroup.objects.get(id=mgroupId)
sum_dict_day_s = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, mgroup=mgroup).aggregate(sum=Sum('val'))
params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'mgroup': mgroup}
ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
ms_day_s.val = sum_dict_day_s['sum']
ms_day_s.save()
sum_dict_month_s = MpointStat.objects.filter(type='day_s', mpoint=mpoint, year_s=year_s, month_s=month_s, mgroup=mgroup).aggregate(sum=Sum('val'))
params_month_s = {'type':'month_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'mgroup': mgroup}
ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
ms_month_s.val = sum_dict_month_s['sum']
ms_month_s.save()
sum_dict_year_s = MpointStat.objects.filter(type='month_s', mpoint=mpoint, year_s=year_s, mgroup=mgroup).aggregate(sum=Sum('val'))
params_year_s = {'type':'year_s', 'mpoint': mpoint, 'year_s': year_s, 'mgroup': mgroup}
ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
ms_year_s.val = sum_dict_year_s['sum']
ms_year_s.save()
if next_cal: # 二次计算
if hour:
compute_enstat('hour_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
compute_enstat('sflog', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
compute_enstat('day_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
compute_enstat('month_st', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
compute_enstat('month_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
compute_enstat('year_s', sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s)
def compute_enstat(type, sflogId, mgroupId, year, month, day, hour, year_s, month_s, day_s):
"""
计算能源数据统计
"""
mgroup = Mgroup.objects.get(id=mgroupId)
sflog = SfLog.objects.get(id=sflogId)
team = sflog.team
if type == 'hour_s':
enstat, _ = EnStat.objects.get_or_create(type="hour_s", mgroup=mgroup, year=year, month=month, day=day, hour=hour,
defaults={'type': 'hour_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s,
'year': year, 'month': month, 'day': day, 'hour': hour,
'total_production': 0, 'elec_consume': 0})
elif type == 'sflog':
enstat, _ = EnStat.objects.get_or_create(type="sflog", sflog=sflog,
defaults={'type': 'sflog', 'sflog': sflog, 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'total_production': 0, 'elec_consume': 0})
elif type == 'day_s':
enstat, _ = EnStat.objects.get_or_create(type="day_s", mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s,
defaults={'type': 'day_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s, 'total_production': 0, 'elec_consume': 0})
elif type == 'month_st':
enstat, _ = EnStat.objects.get_or_create(type="month_st", mgroup=mgroup, team=team, year_s=year_s, month_s=month_s,
defaults={'type': 'month_sf', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'team': team, 'total_production': 0, 'elec_consume': 0})
elif type == 'month_s':
enstat, _ = EnStat.objects.get_or_create(type="month_s", mgroup=mgroup, year_s=year_s, month_s=month_s,
defaults={'type': 'month_s', 'mgroup': mgroup, 'year_s': year_s, 'month_s': month_s, 'total_production': 0, 'elec_consume': 0})
elif type == 'year_s':
enstat, _ = EnStat.objects.get_or_create(type="year_s", mgroup=mgroup, year_s=year_s,
defaults={'type': 'year_s', 'mgroup': mgroup, 'year_s': year_s, 'total_production': 0, 'elec_consume': 0})
# 物料统计
input_materials = mgroup.input_materials
if mgroup.product:
input_materials.insert(0, mgroup.product.id)
cost_unit_total = 0
imaterial_data = []
for ind, mid in enumerate(input_materials):
material = Material.objects.get(id=mid)
if type == 'hour_s':
mps = MpointStat.objects.filter(type='hour_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour, mpoint__material=material)
elif type == 'sflog':
mps = MpointStat.objects.filter(type='sflog', sflog=sflog, mpoint__material=material)
elif type == 'day_s':
mps = MpointStat.objects.filter(type='day_s', mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, mpoint__material=material)
elif type == 'month_st':
mps = MpointStat.objects.filter(type='sflog', mgroup=mgroup, sflog__team=team, year_s=year_s, month_s=month_s, mpoint__material=material)
elif type == 'month_s':
mps = MpointStat.objects.filter(type='month_s', mgroup=mgroup, year_s=year_s, month_s=month_s, mpoint__material=material)
elif type == 'year_s':
mps = MpointStat.objects.filter(type='year_s', mgroup=mgroup, year_s=year_s, mpoint__material=material)
if mps.filter(mpoint__is_all=True).exists():
mps = mps.filter(mpoint__is_all=True)
amount_consume = mps.aggregate(sum=Sum('val'))['sum']
if amount_consume is None:
amount_consume = 0
if ind == 0: # 如果是产量
enstat.total_production = amount_consume
enstat.save()
else:
price_unit = get_price_unit(material, year_s, month_s)
cost = amount_consume * price_unit
try:
cost_unit = cost/ enstat.total_production
except:
cost_unit = 0
cost_unit_total = cost_unit_total + cost_unit
if material.code == 'elec':
enstat.elec_consume = amount_consume
enstat.save()
imaterial_item = {'material': mid, 'material_name': material.name, 'material_type': material.type, 'price_unit': price_unit, 'amount_consume': amount_consume, 'cost': cost, 'cost_unit': cost_unit}
imaterial_data.append(imaterial_item)
enstat.imaterial_data = imaterial_data
enstat.save()
other_cost_data = []
for fee in Fee.objects.order_by('sort'):
item = {'element': fee.element, 'cate': fee.cate, 'name': fee.name, 'id': fee.id}
item['cost_unit'] = get_cost_unit(mgroup, fee, year_s, month_s)
cost_unit_total = cost_unit_total + item['cost_unit']
other_cost_data.append(item)
enstat.other_cost_data = other_cost_data
enstat.production_cost_unit = cost_unit_total
enstat.save()