183 lines
9.2 KiB
Python
183 lines
9.2 KiB
Python
# Create your tasks here
|
|
from __future__ import absolute_import, unicode_literals
|
|
from apps.utils.tasks import CustomTask
|
|
from celery import shared_task
|
|
from apps.utils.sql import DbConnection
|
|
from server.settings import get_sysconfig
|
|
from django.core.cache import cache
|
|
from apps.enm.models import MpLog, Mpoint, MpointStat, EnStat
|
|
from apps.wpm.models import SfLog
|
|
import datetime
|
|
from django.db.models import Sum
|
|
from dateutil import tz
|
|
from django.conf import settings
|
|
from django.utils.timezone import localtime
|
|
from apps.wpm.services import make_sflogs
|
|
from apps.mtm.models import Mgroup
|
|
|
|
def get_current_and_previous_time():
|
|
now = datetime.datetime.now()
|
|
pre = now - datetime.timedelta(hours=1)
|
|
return now, pre
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def get_tag_val():
|
|
config = get_sysconfig()
|
|
with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
|
|
last_tag_id = cache.get('last_tag_id')
|
|
if last_tag_id is None:
|
|
mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
|
|
if mr is None:
|
|
last_tag_id = 0
|
|
else:
|
|
last_tag_id = mr.tag_id
|
|
cache.set('last_tag_id', last_tag_id)
|
|
cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id))
|
|
results = cursor.fetchall() # 获取数据后保存至本地
|
|
for row in results:
|
|
mr_one = MpLog()
|
|
mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
|
|
mr_one.mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
|
|
mr_one.save()
|
|
|
|
@shared_task(base=CustomTask)
|
|
def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int):
|
|
"""
|
|
计算某一测点, 某一时间点某一小时的统计值
|
|
"""
|
|
mpoint = Mpoint.objects.get(id=mpointId)
|
|
mytz = tz.gettz(settings.TIME_ZONE)
|
|
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
|
|
if mpoint.material: # 如果计量的是物料
|
|
params = {'mpoint': mpoint, 'type': 'hour'}
|
|
params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour
|
|
mrs = MpLog.objects.filter(
|
|
mpoint=mpoint,
|
|
tag_update__year=params['year'],
|
|
tag_update__month=params['month'],
|
|
tag_update__day=params['day'],
|
|
tag_update__hour= params['hour']).order_by('tag_update')
|
|
val = 0
|
|
if mrs.exists():
|
|
val = mrs.last().tag_val - mrs.first().tag_val
|
|
ms, _ = MpointStat.objects.get_or_create(**params, defaults=params)
|
|
ms.val = val
|
|
ms.save()
|
|
|
|
# 更新更高级别的值
|
|
sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val'))
|
|
params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day}
|
|
ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day)
|
|
ms_day.val = sum_dict_day['sum']
|
|
ms_day.save()
|
|
|
|
sum_dict_month = MpointStat.objects.filter(type='day', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val'))
|
|
params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month}
|
|
ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month)
|
|
ms_month.val = sum_dict_month['sum']
|
|
ms_month.save()
|
|
|
|
sum_dict_year = MpointStat.objects.filter(type='month', mpoint=mpoint, year=year).aggregate(sum=Sum('val'))
|
|
params_year = {'type':'year', 'mpoint': mpoint, 'year': year}
|
|
ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year)
|
|
ms_year.val = sum_dict_year['sum']
|
|
ms_year.save()
|
|
|
|
if mpoint.mgroups_allocate: # 如果有分配系数
|
|
for allocate in mpoint.mgroups_allocate:
|
|
mgroup = Mgroup.objects.get(id=allocate['mgroup'])
|
|
ratio = allocate['ratio']
|
|
# 查找并绑定值班记录
|
|
sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first()
|
|
if sflog is None: # 需要创建值班记录
|
|
make_sflogs(mgroup=mgroup, start_date=(dt-datetime.timedelta(days=1)).date(), end_date=dt.date())
|
|
sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mgroup).first()
|
|
end_time_local = localtime(sflog.end_time)
|
|
year_s, month_s, day_s = end_time_local.year, end_time_local.month, end_time_local.day
|
|
ms.year_s = year_s
|
|
ms.month_s = month_s
|
|
ms.day_s = day_s
|
|
ms.save()
|
|
|
|
|
|
# 开始往上计算
|
|
sum_dict_sflog_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
|
|
params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
|
|
ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s)
|
|
ms_sflog_s.val = sum_dict_sflog_s['sum']*ratio
|
|
ms_sflog_s.save()
|
|
|
|
sum_dict_day_s = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
|
|
params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
|
|
ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
|
|
ms_day_s.val = sum_dict_day_s['sum']
|
|
ms_day_s.save()
|
|
|
|
sum_dict_month_s = MpointStat.objects.filter(type='day_s', mpoint=mpoint, year_s=year_s, month_s=month_s).aggregate(sum=Sum('val'))
|
|
params_month_s = {'type':'month_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s}
|
|
ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
|
|
ms_month_s.val = sum_dict_month_s['sum']
|
|
ms_month_s.save()
|
|
|
|
sum_dict_year_s = MpointStat.objects.filter(type='month_s', mpoint=mpoint, year_s=year_s).aggregate(sum=Sum('val'))
|
|
params_year_s = {'type':'year_s', 'mpoint': mpoint, 'year_s': year_s}
|
|
ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
|
|
ms_year_s.val = sum_dict_year_s['sum']
|
|
ms_year_s.save()
|
|
|
|
# compute_enstat.delay(mpoint.id, year_s, month_s, day_s, hour)
|
|
|
|
|
|
@shared_task(base=CustomTask)
|
|
def cal_mpointstats(is_now=1):
|
|
"""
|
|
计算所有测点的统计值,默认当前小时
|
|
"""
|
|
now, pre = get_current_and_previous_time()
|
|
if is_now:
|
|
for mpoint in Mpoint.objects.filter(is_auto=True):
|
|
cal_mpointstat_hour.delay(mpoint.id, now.year, now.month, now.day, now.hour)
|
|
else:
|
|
for mpoint in Mpoint.objects.filter(is_auto=True):
|
|
cal_mpointstat_hour.delay(mpoint.id, pre.year, pre.month, pre.day, pre.hour)
|
|
|
|
|
|
def cal_mpointstat_manual(mpointId: str, year_s: int, month_s: int, day_s=None):
|
|
"""
|
|
手动录入的测点数据进行往上统计,一级一级往上
|
|
"""
|
|
mpoint = Mpoint.objects.get(id=mpointId)
|
|
if day_s is not None:
|
|
sum_dict_day_s = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
|
|
params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
|
|
ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
|
|
ms_day_s.val = sum_dict_day_s['sum']
|
|
ms_day_s.save()
|
|
|
|
sum_dict_month_s = MpointStat.objects.filter(type='day_s', mpoint=mpoint, year_s=year_s, month_s=month_s).aggregate(sum=Sum('val'))
|
|
params_month_s = {'type':'month_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s}
|
|
ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
|
|
ms_month_s.val = sum_dict_month_s['sum']
|
|
ms_month_s.save()
|
|
|
|
sum_dict_year_s = MpointStat.objects.filter(type='month_s', mpoint=mpoint, year_s=year_s).aggregate(sum=Sum('val'))
|
|
params_year_s = {'type':'year_s', 'mpoint': mpoint, 'year_s': year_s}
|
|
ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
|
|
ms_year_s.val = sum_dict_year_s['sum']
|
|
ms_year_s.save()
|
|
|
|
|
|
def compute_enstat(mpointId: str, year_s, month_s, day_s, sflogId, hour):
|
|
"""
|
|
计算能源数据统计
|
|
"""
|
|
mpoint = Mpoint.objects.get(id=mpointId)
|
|
mgroup = mpoint.mgroup
|
|
sflog = SfLog.objects.get(id=sflogId)
|
|
if mpoint.material.code == 'elec':
|
|
MpointStat.objects.filter(type="hour", mpoint__material__code='elec', mpoint__mgroup=mgroup, year_s=year_s, month_s=month_s, day_s=day_s, hour=hour).aggregate(sum=Sum('val'))
|
|
# hour
|
|
enm_hour, _ = EnStat.objects.get_or_create(type='hour', )
|
|
elif mpoint.material == mgroup.product:
|
|
pass |