152 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			152 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
	
| # Create your tasks here
 | |
| from __future__ import absolute_import, unicode_literals
 | |
| from apps.utils.tasks import CustomTask
 | |
| from celery import shared_task
 | |
| from apps.utils.sql import DbConnection
 | |
| from server.settings import get_sysconfig
 | |
| from django.core.cache import cache
 | |
| from apps.enm.models import MpLog, Mpoint, MpointStat
 | |
| from apps.wpm.models import SfLog
 | |
| import datetime
 | |
| from django.db.models import Sum
 | |
| from dateutil import tz
 | |
| from django.conf import settings
 | |
| from django.utils.timezone import localtime
 | |
| from apps.wpm.services import make_sflogs
 | |
| 
 | |
| def get_current_and_previous_time():
 | |
|     now = datetime.datetime.now()
 | |
|     pre = now - datetime.timedelta(hours=1)
 | |
|     return now, pre
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def get_tag_val():
 | |
|     config = get_sysconfig()
 | |
|     with DbConnection(config['enm']['db_host'], config['enm']['db_user'], config['enm']['db_password'], config['enm']['db_database']) as cursor:
 | |
|         last_tag_id = cache.get('last_tag_id')
 | |
|         if last_tag_id is None:
 | |
|             mr = MpLog.objects.all().order_by('-tag_update', 'tag_id').first()
 | |
|             if mr is None:
 | |
|                 last_tag_id = 0
 | |
|             else:
 | |
|                 last_tag_id = mr.tag_id
 | |
|                 cache.set('last_tag_id', last_tag_id)
 | |
|         cursor.execute("select id, val, tag_code, update_time from tag_value where id > %s order by id", (last_tag_id))
 | |
|         results = cursor.fetchall()  # 获取数据后保存至本地
 | |
|         for row in results:
 | |
|             mr_one = MpLog()
 | |
|             mr_one.tag_id, mr_one.tag_val, mr_one.tag_code, mr_one.tag_update = row
 | |
|             mr_one.mpoint, _  = Mpoint.objects.get_or_create(code=mr_one.tag_code, defaults={'name': mr_one.tag_code, 'code': mr_one.tag_code, 'unit': 'unknown'})
 | |
|             mr_one.save()
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: int):
 | |
|     """
 | |
|     计算某一测点, 某一时间点某一小时的统计值
 | |
|     """
 | |
|     mpoint = Mpoint.objects.get(id=mpointId)
 | |
|     mytz = tz.gettz(settings.TIME_ZONE)
 | |
|     dt = datetime.datetime(year=year, month=month, day=day, hour=hour, tzinfo=mytz)
 | |
|     if mpoint.material:  # 如果计量的是物料
 | |
|         params = {'mpoint': mpoint, 'type': 'hour'}
 | |
|         params['year'], params['month'], params['day'], params['hour'] = year, month, day, hour
 | |
|         mrs = MpLog.objects.filter(
 | |
|             mpoint=mpoint, 
 | |
|             tag_update__year=params['year'],
 | |
|             tag_update__month=params['month'],
 | |
|             tag_update__day=params['day'],
 | |
|             tag_update__hour= params['hour']).order_by('tag_update')
 | |
|         val = 0
 | |
|         if mrs.exists():
 | |
|             val = mrs.last().tag_val - mrs.first().tag_val
 | |
|         ms, _ = MpointStat.objects.get_or_create(**params, defaults=params)
 | |
|         ms.val = val
 | |
|         ms.save()
 | |
|         # 更新更高级别的值
 | |
|         sum_dict_day = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month, day=day).aggregate(sum=Sum('val'))
 | |
|         params_day = {'type':'day', 'mpoint': mpoint, 'year': year, 'month': month, 'day': day}
 | |
|         ms_day, _ = MpointStat.objects.get_or_create(**params_day, defaults=params_day)
 | |
|         ms_day.val = sum_dict_day['sum']
 | |
|         ms_day.save()
 | |
| 
 | |
|         sum_dict_month = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val'))
 | |
|         params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month}
 | |
|         ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month)
 | |
|         ms_month.val = sum_dict_month['sum']
 | |
|         ms_month.save()
 | |
| 
 | |
|         sum_dict_year = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year).aggregate(sum=Sum('val'))
 | |
|         params_year = {'type':'year', 'mpoint': mpoint, 'year': year}
 | |
|         ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year)
 | |
|         ms_year.val = sum_dict_year['sum']
 | |
|         ms_year.save()
 | |
| 
 | |
|         # 绑定值班记录
 | |
|         sflog  = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mpoint.mgroup).first()
 | |
|         year_s, month_s, day_s = 0, 0, 0
 | |
|         if sflog:
 | |
|             ms.sflog = sflog
 | |
|             end_time_local = localtime(sflog.end_time)
 | |
|             year_s, month_s, day_s = end_time_local.year, end_time_local.month, end_time_local.day
 | |
|             ms.year_s = year_s
 | |
|             ms.month_s = month_s
 | |
|             ms.day_s = day_s
 | |
|             ms.save()
 | |
|         else:
 | |
|             raise Exception('未找到值班记录')
 | |
|         if year_s and month_s and day_s:
 | |
|             # 这种是距离点相减
 | |
|             # params_s = {'mpoint': mpoint, 'type': 'sflog'}
 | |
|             # mrs = MpLog.objects.filter(
 | |
|             #             mpoint=mpoint, 
 | |
|             #             tag_update__gte=sflog.create_time, tag_update__lt=sflog.end_time).order_by('tag_update')
 | |
|             # val = 0
 | |
|             # if mrs.exists():
 | |
|             #     val = mrs.last().tag_val - mrs.first().tag_val
 | |
|             # params_s_default = {'mpoint': mpoint, 'type': 'sflog', 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
 | |
|             # ms, _ = MpointStat.objects.get_or_create(**params_s, defaults=params_s_default)
 | |
|             # ms.val = val
 | |
|             # ms.save()
 | |
| 
 | |
|             # 这种是加和
 | |
|             sum_dict_sflog_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog).aggregate(sum=Sum('val'))
 | |
|             params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
 | |
|             ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s)
 | |
|             ms_sflog_s.val = sum_dict_sflog_s['sum']
 | |
|             ms_sflog_s.save()
 | |
| 
 | |
|             sum_dict_day_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
 | |
|             params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
 | |
|             ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
 | |
|             ms_day_s.val = sum_dict_day_s['sum']
 | |
|             ms_day_s.save()
 | |
| 
 | |
|             sum_dict_month_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month=month_s).aggregate(sum=Sum('val'))
 | |
|             params_month_s = {'type':'month', 'mpoint': mpoint, 'year': year_s, 'month': month_s}
 | |
|             ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
 | |
|             ms_month_s.val = sum_dict_month_s['sum']
 | |
|             ms_month_s.save()
 | |
| 
 | |
|             sum_dict_year_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s).aggregate(sum=Sum('val'))
 | |
|             params_year_s = {'type':'year', 'mpoint': mpoint, 'year': year_s}
 | |
|             ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
 | |
|             ms_year_s.val = sum_dict_year_s['sum']
 | |
|             ms_year_s.save()
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def cal_mpointstats(is_now=1):
 | |
|     """
 | |
|     计算所有测点的统计值,默认当前小时
 | |
|     """
 | |
|     now, pre = get_current_and_previous_time()
 | |
|     if is_now:
 | |
|         for mpoint in Mpoint.objects.all():
 | |
|             cal_mpointstat_hour.delay(mpoint.id, now.year, now.moth, now.day, now.hour)
 | |
|     else:
 | |
|         for mpoint in Mpoint.objects.all():
 | |
|             cal_mpointstat_hour.delay(mpoint.id, pre.year, pre.month, pre.day, pre.hour)
 | |
| 
 | |
| 
 | |
| 
 |