# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from apps.utils.sql import DbConnection from server.settings import get_sysconfig from django.core.cache import cache from apps.enm.models import MpLog, Mpoint, MpointStat from apps.wpm.models import SfLog import datetime from django.db.models import Sum def get_current_and_previous_time(): now = datetime.datetime.now() pre = now - datetime.timedelta(hours=1) return now, pre @shared_task(base=CustomTask) def get_tag_val(): config = get_sysconfig() with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor: last_tag_id = cache.get('last_tag_id') if last_tag_id is None: mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first() if mr is None: last_tag_id = 0 else: last_tag_id = mr.tag_id cache.set('last_tag_id', last_tag_id) cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id)) results = cursor.fetchall() # 获取数据后保存至本地 for row in results: mr_one = MpLog() mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row mr_one.mpoint, _ = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'}) mr_one.save() @shared_task(base=CustomTask) def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int): """ 计算某一测点, 某一时间点某一小时的统计值 """ mpoint = Mpoint.objects.get(id=mpointId) if mpoint.cate == 'elec': # 是否是电能 params = {'mpoint': mpoint, 'type': 'hour'} params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour mrs = MpLog.objects.filter( mpoint=mpoint, tag_update__year=params['year'], tag_update__month=params['month'], tag_update__day=params['day'], tag_update__hour= params['hour']).order_by('tag_update') val = 0 if mrs.exists(): val = mrs.last().tag_val - mrs.first().tag_val ms, _ = MpointStat.objects.get_or_create(**params, defaults=params) ms.val = val ms.save() # 更新更高级别的值 sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val')) params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day} ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day) ms_day.val = sum_dict_day['sum'] ms_day.save() sum_dict_month = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val')) params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month} ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month) ms_month.val = sum_dict_month['sum'] ms_month.save() sum_dict_year = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year).aggregate(sum=Sum('val')) params_year = {'type':'year', 'mpoint': mpoint, 'year': year} ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year) ms_year.val = sum_dict_year['sum'] ms_year.save() @shared_task(base=CustomTask) def cal_mpointstats(is_now=1): """ 计算所有测点的统计值,默认当前小时 """ now, pre = get_current_and_previous_time() if is_now: for mpoint in Mpoint.objects.all(): cal_mpointstat_hour.delay(mpoint.id, now.year, now.moth, now.day, now.hour) else: for mpoint in Mpoint.objects.all(): cal_mpointstat_hour.delay(mpoint.id, pre.year, pre.month, pre.day, pre.hour) def cal_sflog_en_val(sflogId: str): """ 计算某值班记录相应能耗 """ sflog = SfLog.objects.get(id=sflogId) mpoints = Mpoint.objects.filter(mgroup=sflog.mgroup) # 统计电耗 mpoints_elec = mpoints.filter(cate='elec') elec_val = 0 params = {'mpoint': mpoint, 'tag_update__gte': sflog.create_time, 'tag_update__lt': sflog.end_time} for mpoint in mpoints_elec: mrs = MpLog.objects.filter(**params).order_by('tag_update') if mrs.exists(): elec_val += mrs.last().tag_val - mrs.first().tag_val SfLog.objects.filter(id=sflog).update(elec_val=elec_val, cal_time=datetime.datetime.now()) @shared_task(base=CustomTask) def cal_sflogs_en_val(is_today=1): """ 计算班组能耗值, 默认开始时间是当天的 """ now = datetime.datetime.now() if is_today: sflogs = SfLog.objects.filter(start_time__year=now.year, start_time__month=now.month, start_time__day=now.day) for i in sflogs: cal_sflog_en_val(i.id) else: pre = now - datetime.timedelta(days=1) sflogs = SfLog.objects.filter(start_time__year=pre.year, start_time__month=pre.month, start_time__day=pre.day) for i in sflogs: cal_sflog_en_val(i.id)