factory/apps/enm/tasks.py

186 lines
8.5 KiB
Python

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from apps.utils.sql import DbConnection
from server.settings import get_sysconfig
from django.core.cache import cache
from apps.enm.models import MpLog, Mpoint, MpointStat
from apps.wpm.models import SfLog
import datetime
from django.db.models import Sum
from dateutil import tz
from django.conf import settings
from django.utils.timezone import localtime
from apps.wpm.services import make_sflogs
def get_current_and_previous_time():
now = datetime.datetime.now()
pre = now - datetime.timedelta(hours=1)
return now, pre
@shared_task(base=CustomTask)
def get_tag_val():
config = get_sysconfig()
with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
last_tag_id = cache.get('last_tag_id')
if last_tag_id is None:
mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
if mr is None:
last_tag_id = 0
else:
last_tag_id = mr.tag_id
cache.set('last_tag_id', last_tag_id)
cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id))
results = cursor.fetchall() # 获取数据后保存至本地
for row in results:
mr_one = MpLog()
mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
mr_one.mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
mr_one.save()
@shared_task(base=CustomTask)
def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int):
"""
计算某一测点, 某一时间点某一小时的统计值
"""
mpoint = Mpoint.objects.get(id=mpointId)
mytz = tz.gettz(settings.TIME_ZONE)
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
if mpoint.cate == 'elec': # 是否是电能
params = {'mpoint': mpoint, 'type': 'hour'}
params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour
mrs = MpLog.objects.filter(
mpoint=mpoint,
tag_update__year=params['year'],
tag_update__month=params['month'],
tag_update__day=params['day'],
tag_update__hour= params['hour']).order_by('tag_update')
val = 0
if mrs.exists():
val = mrs.last().tag_val - mrs.first().tag_val
ms, _ = MpointStat.objects.get_or_create(**params, defaults=params)
ms.val = val
ms.save()
# 绑定值班记录
sflog = SfLog.objects.filter(strat_time__lte=dt, end_time__gt=dt, mgroup=mpoint.mgroup).first()
year_s, month_s, day_s = 0, 0, 0
if sflog:
ms.sflog = sflog
end_time_local = localtime(sflog.end_time)
year_s, month_s, day_s = end_time_local.year, end_time_local.month, end_time_local.day
ms.year_s = year_s
ms.month_s = month_s
ms.day_s = day_s
ms.save()
else:
raise Exception('未找到值班记录')
# 更新更高级别的值
sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val'))
params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day}
ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day)
ms_day.val = sum_dict_day['sum']
ms_day.save()
sum_dict_month = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val'))
params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month}
ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month)
ms_month.val = sum_dict_month['sum']
ms_month.save()
sum_dict_year = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year).aggregate(sum=Sum('val'))
params_year = {'type':'year', 'mpoint': mpoint, 'year': year}
ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year)
ms_year.val = sum_dict_year['sum']
ms_year.save()
if year_s and month_s and day_s:
# 这种是距离点相减
# params_s = {'mpoint': mpoint, 'type': 'sflog'}
# mrs = MpLog.objects.filter(
# mpoint=mpoint,
# tag_update__gte=sflog.create_time, tag_update__lt=sflog.end_time).order_by('tag_update')
# val = 0
# if mrs.exists():
# val = mrs.last().tag_val - mrs.first().tag_val
# params_s_default = {'mpoint': mpoint, 'type': 'sflog', 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
# ms, _ = MpointStat.objects.get_or_create(**params_s, defaults=params_s_default)
# ms.val = val
# ms.save()
# 这种是加和
sum_dict_sflog_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog).aggregate(sum=Sum('val'))
params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s)
ms_sflog_s.val = sum_dict_sflog_s['sum']
ms_sflog_s.save()
sum_dict_day_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
ms_day_s.val = sum_dict_day_s['sum']
ms_day_s.save()
sum_dict_month_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month=month_s).aggregate(sum=Sum('val'))
params_month_s = {'type':'month', 'mpoint': mpoint, 'year': year_s, 'month': month_s}
ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
ms_month_s.val = sum_dict_month_s['sum']
ms_month_s.save()
sum_dict_year_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s).aggregate(sum=Sum('val'))
params_year_s = {'type':'year', 'mpoint': mpoint, 'year': year_s}
ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
ms_year_s.val = sum_dict_year_s['sum']
ms_year_s.save()
@shared_task(base=CustomTask)
def cal_mpointstats(is_now=1):
"""
计算所有测点的统计值,默认当前小时
"""
now, pre = get_current_and_previous_time()
if is_now:
for mpoint in Mpoint.objects.all():
cal_mpointstat_hour.delay(mpoint.id, now.year, now.moth, now.day, now.hour)
else:
for mpoint in Mpoint.objects.all():
cal_mpointstat_hour.delay(mpoint.id, pre.year, pre.month, pre.day, pre.hour)
def cal_sflog_en_val(sflogId: str):
"""
计算某值班记录相应能耗
"""
sflog = SfLog.objects.get(id=sflogId)
mpoints = Mpoint.objects.filter(mgroup=sflog.mgroup)
# 统计电耗
mpoints_elec = mpoints.filter(cate='elec')
elec_val = 0
params = {'mpoint': mpoint, 'tag_update__gte': sflog.create_time, 'tag_update__lt': sflog.end_time}
for mpoint in mpoints_elec:
mrs = MpLog.objects.filter(**params).order_by('tag_update')
if mrs.exists():
elec_val += mrs.last().tag_val - mrs.first().tag_val
SfLog.objects.filter(id=sflog).update(elec_val=elec_val, cal_time=datetime.datetime.now())
@shared_task(base=CustomTask)
def cal_sflogs_en_val(is_today=1):
"""
计算班组能耗值, 默认开始时间是当天的
"""
now = datetime.datetime.now()
if is_today:
sflogs = SfLog.objects.filter(start_time__year=now.year, start_time__month=now.month, start_time__day=now.day)
for i in sflogs:
cal_sflog_en_val(i.id)
else:
pre = now - datetime.timedelta(days=1)
sflogs = SfLog.objects.filter(start_time__year=pre.year, start_time__month=pre.month, start_time__day=pre.day)
for i in sflogs:
cal_sflog_en_val(i.id)