# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from apps.utils.sql import DbConnection from server.settings import get_sysconfig from django.core.cache import cache from apps.enm.models import Mrecord, Mpoint, HourStat import datetime def get_current_and_previous_hour(): now = datetime.datetime.now() current_hour = now.hour current_date = now.date() current_start_time = datetime.datetime.combine(current_date, datetime.time(current_hour, 0, 0)) current_end_time = datetime.datetime.combine(current_date, datetime.time(current_hour, 59, 59)) current_time_range = (current_start_time, current_end_time) previous_hour = current_hour - 1 if current_hour > 0 else 23 previous_date = current_date if previous_hour < current_hour else current_date - datetime.timedelta(days=1) previous_start_time = datetime.datetime.combine(previous_date, datetime.time(previous_hour, 0, 0)) previous_end_time = datetime.datetime.combine(previous_date, datetime.time(previous_hour, 59, 59)) previous_time_range = (previous_start_time, previous_end_time) return current_time_range, previous_time_range @shared_task(base=CustomTask) def get_tag_val(): config = get_sysconfig() with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: last_tag_id = cache.get('last_tag_id') if last_tag_id is None: mr = Mrecord.objects.all().order_by('-tag_update', 'tag_id').first() if mr is None: last_tag_id = 0 else: last_tag_id = mr.tag_id cache.set('last_tag_id', last_tag_id) cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id)) results = cursor.fetchall() # 获取数据后保存至本地 for row in results: mr_one = Mrecord() mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row mr_one.mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'}) mr_one.save() @shared_task(base=CustomTask) def cal_hourstat(): """ 计算小时统计量,默认计算本小时和上小时 """ for mpoint in Mpoint.objects.all(): c_t_r, p_t_r = get_current_and_previous_hour() if mpoint.cate == 'elec': # 是否是电能 # 计算本小时,可能不需要 start_time = c_t_r[0] params = {'mpoint': mpoint} params['year'], params['month'], params['day'], params['hour'] = start_time.year, start_time.month, start_time.day, start_time.hour mrs = Mrecord.objects.filter(mpoint=mpoint, tag_update__gte=c_t_r[0], tag_update__lte=c_t_r[1]).order_by('tag_update') val = 0 if mrs.exists(): val = mrs.last() - mrs.first() hs, _ = HourStat.objects.get_or_create(**params, defaults=params) hs.val = val hs.save() # 计算上小时 start_time2 = p_t_r[0] params = {'mpoint': mpoint} params['year'], params['month'], params['day'], params['hour'] = start_time2.year, start_time2.month, start_time2.day, start_time2.hour hs, _ = HourStat.objects.get_or_create(**params, defaults=params) if not hs.is_calculated: mrs = Mrecord.objects.filter(mpoint=mpoint, tag_update__gte=p_t_r[0], tag_update__lte=p_t_r[1]).order_by('tag_update') val = 0 if mrs.exists(): val = mrs.last() - mrs.first() hs.val = val hs.is_calculated = True hs.save()