factory/apps/enm/tasks.py

109 lines
4.9 KiB
Python

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from apps.utils.sql import DbConnection
from server.settings import get_sysconfig
from django.core.cache import cache
from apps.enm.models import MpLog, Mpoint, MpointStat
import datetime
from django.db.models import Sum
def get_current_and_previous_time():
now = datetime.datetime.now()
pre = now - datetime.timedelta(hours=1)
return now, pre
def get_current_and_previous_hour():
now = datetime.datetime.now()
current_hour = now.hour
current_date = now.date()
current_start_time = datetime.datetime.combine(current_date, datetime.time(current_hour, 0, 0))
current_end_time = datetime.datetime.combine(current_date, datetime.time(current_hour, 59, 59))
current_time_range = (current_start_time, current_end_time)
previous_hour = current_hour - 1 if current_hour > 0 else 23
previous_date = current_date if previous_hour < current_hour else current_date - datetime.timedelta(days=1)
previous_start_time = datetime.datetime.combine(previous_date, datetime.time(previous_hour, 0, 0))
previous_end_time = datetime.datetime.combine(previous_date, datetime.time(previous_hour, 59, 59))
previous_time_range = (previous_start_time, previous_end_time)
return current_time_range, previous_time_range
@shared_task(base=CustomTask)
def get_tag_val():
config = get_sysconfig()
with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
last_tag_id = cache.get('last_tag_id')
if last_tag_id is None:
mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
if mr is None:
last_tag_id = 0
else:
last_tag_id = mr.tag_id
cache.set('last_tag_id', last_tag_id)
cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id))
results = cursor.fetchall() # 获取数据后保存至本地
for row in results:
mr_one = MpLog()
mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
mr_one.mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
mr_one.save()
@shared_task(base=CustomTask)
def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int):
"""
计算某一测点, 某一时间点某一小时的统计值
"""
mpoint = Mpoint.objects.get(id=mpointId)
if mpoint.cate == 'elec': # 是否是电能
params = {'mpoint': mpoint, 'type': 'hour'}
params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour
mrs = MpLog.objects.filter(
mpoint=mpoint,
tag_update__year=params['year'],
tag_update__month=params['month'],
tag_update__day=params['day'],
tag_update__hour= params['hour']).order_by('tag_update')
val = 0
if mrs.exists():
val = mrs.last() - mrs.first()
ms, _ = MpointStat.objects.get_or_create(**params, defaults=params)
ms.val = val
ms.save()
# 更新更高级别的值
sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val'))
params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day}
ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day)
ms_day.val = sum_dict_day['sum']
ms_day.save()
sum_dict_month = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val'))
params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month}
ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month)
ms_month.val = sum_dict_month['sum']
ms_month.save()
sum_dict_year = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year).aggregate(sum=Sum('val'))
params_year = {'type':'year', 'mpoint': mpoint, 'year': year}
ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year)
ms_year.val = sum_dict_year['sum']
ms_year.save()
@shared_task(base=CustomTask)
def cal_mpointstats(is_now=1):
"""
计算所有测点的统计值,默认当前小时
"""
now, pre = get_current_and_previous_time()
if is_now:
for mpoint in Mpoint.objects.all():
cal_mpointstat_hour.delay(mpoint.id, now.year, now.moth, now.day, now.hour)
else:
for mpoint in Mpoint.objects.all():
cal_mpointstat_hour.delay(mpoint.id, pre.year, pre.month, pre.day, pre.hour)