feat: 测点统计任务优化

This commit is contained in:
caoqianming 2023-07-10 10:13:45 +08:00
parent 56d9c50a39
commit 2884becf3f
7 changed files with 172 additions and 52 deletions

28
apps/enm/filters.py Normal file
View File

@ -0,0 +1,28 @@
from django_filters import rest_framework as filters
from apps.enm.models import MpointStat
class MpointStatFilter(filters.FilterSet):
has_create_by = filters.BooleanFilter(method='filter_has_create_by')
class Meta:
model = MpointStat
fields = {
"mpoint": ["exact"],
"mpoint__mgroup": ["exact"],
"mpoint__material": ["exact"],
"mpoint__mgroup__belong_dept": ["exact"],
"sflog": ["exact"],
"hour": ["exact"],
"day": ["exact"],
"month": ["exact"],
"year": ["exact"],
"day_s": ["exact"],
"month_s": ["exact"],
"year_s": ["exact"]
}
def filter_has_create_by(self, queryset, name, value):
if value:
queryset = queryset.exclude(create_by=None)
else:
queryset = queryset.filter(create_by=None)
return queryset

View File

@ -0,0 +1,24 @@
# Generated by Django 3.2.12 on 2023-07-07 05:40
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('enm', '0003_auto_20230704_1129'),
]
operations = [
migrations.RemoveField(
model_name='mpointstat',
name='material',
),
migrations.AlterField(
model_name='mpointstat',
name='mpoint',
field=models.ForeignKey(default=3349214860231012352, on_delete=django.db.models.deletion.CASCADE, to='enm.mpoint', verbose_name='关联测点'),
preserve_default=False,
),
]

View File

@ -0,0 +1,31 @@
# Generated by Django 3.2.12 on 2023-07-07 09:07
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('enm', '0004_auto_20230707_1340'),
]
operations = [
migrations.AddField(
model_name='mpoint',
name='is_auto',
field=models.BooleanField(default=True, verbose_name='是否自动采集'),
),
migrations.AddField(
model_name='mpointstat',
name='create_by',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='mpointstat_create_by', to=settings.AUTH_USER_MODEL, verbose_name='创建人'),
),
migrations.AddField(
model_name='mpointstat',
name='update_by',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='mpointstat_update_by', to=settings.AUTH_USER_MODEL, verbose_name='最后编辑人'),
),
]

View File

@ -1,5 +1,5 @@
from django.db import models from django.db import models
from apps.utils.models import BaseModel, CommonBModel from apps.utils.models import BaseModel, CommonBModel, CommonADModel
from apps.wpm.models import SfLog from apps.wpm.models import SfLog
from apps.mtm.models import Material, Mgroup from apps.mtm.models import Material, Mgroup
@ -15,6 +15,7 @@ class Mpoint(CommonBModel):
ep_monitored = models.ForeignKey('em.equipment', verbose_name='监测哪个设备', related_name='mp_ep_monitored', on_delete=models.SET_NULL, null=True, blank=True) ep_monitored = models.ForeignKey('em.equipment', verbose_name='监测哪个设备', related_name='mp_ep_monitored', on_delete=models.SET_NULL, null=True, blank=True)
ep_belong = models.ForeignKey('em.equipment', verbose_name='属于哪个设备', related_name='mp_ep_belong', on_delete=models.SET_NULL, null=True, blank=True) ep_belong = models.ForeignKey('em.equipment', verbose_name='属于哪个设备', related_name='mp_ep_belong', on_delete=models.SET_NULL, null=True, blank=True)
mgroup = models.ForeignKey('mtm.mgroup', verbose_name='所在集合', on_delete=models.SET_NULL, null=True, blank=True) mgroup = models.ForeignKey('mtm.mgroup', verbose_name='所在集合', on_delete=models.SET_NULL, null=True, blank=True)
is_auto = models.BooleanField('是否自动采集', default=True)
class MpLog(BaseModel): class MpLog(BaseModel):
@ -27,7 +28,7 @@ class MpLog(BaseModel):
tag_val = models.FloatField('当前值') tag_val = models.FloatField('当前值')
class MpointStat(BaseModel): class MpointStat(CommonADModel):
"""测点统计表 """测点统计表
""" """
type = models.CharField('统计维度', max_length=50, default='hour', help_text='year/month/day/year_s/month_s/day_s/sflog/hour') type = models.CharField('统计维度', max_length=50, default='hour', help_text='year/month/day/year_s/month_s/day_s/sflog/hour')
@ -41,8 +42,7 @@ class MpointStat(BaseModel):
hour = models.PositiveSmallIntegerField('', null=True, blank=True) hour = models.PositiveSmallIntegerField('', null=True, blank=True)
sflog = models.ForeignKey(SfLog, verbose_name='关联值班记录', on_delete=models.CASCADE, null=True, blank=True) sflog = models.ForeignKey(SfLog, verbose_name='关联值班记录', on_delete=models.CASCADE, null=True, blank=True)
mpoint = models.ForeignKey(Mpoint, verbose_name='关联测点', on_delete=models.CASCADE, null=True, blank=True) mpoint = models.ForeignKey(Mpoint, verbose_name='关联测点', on_delete=models.CASCADE)
material = models.ForeignKey(Material, verbose_name='计量的物料', on_delete=models.CASCADE, null=True, blank=True)
val = models.FloatField('统计值', default=0) val = models.FloatField('统计值', default=0)

View File

@ -42,7 +42,7 @@ class MpointStatSerializer(CustomModelSerializer):
class Meta: class Meta:
model = MpointStat model = MpointStat
fields = '__all__' fields = '__all__'
read_only_fields = EXCLUDE_FIELDS + ['mpoint_name', 'type', 'mpoint', 'year', 'month', 'day'] read_only_fields = EXCLUDE_FIELDS + ['mpoint_name', 'type', 'year', 'month', 'day']
def check_required_keys(dictionary, keys): def check_required_keys(dictionary, keys):
for key in keys: for key in keys:
@ -51,16 +51,21 @@ class MpointStatSerializer(CustomModelSerializer):
return True return True
def validate(self, attrs): def validate(self, attrs):
if 'sflog' in attrs and attrs['sflog']: mpoint = attrs['mpoint']
if mpoint.material and mpoint.is_auto is False and 'sflog' in attrs and attrs['sflog']:
attrs['type'] = 'sflog' attrs['type'] = 'sflog'
sflog = attrs['sflog'] sflog = attrs['sflog']
end_time_local = localtime(sflog.end_time) end_time_local = localtime(sflog.end_time)
attrs['year_s'], attrs['month_s'], attrs['day_s'] = end_time_local.year, end_time_local.month, end_time_local.day attrs['year_s'], attrs['month_s'], attrs['day_s'] = end_time_local.year, end_time_local.month, end_time_local.day
else: else:
keys = ['hour', 'day_s', 'month_s', 'year_s'] raise ParseError('该数据不支持手工录入')
for ind, key in enumerate(keys): # if 'sflog' in attrs and attrs['sflog']:
if key in attrs and attrs['key']:
if not self.check_required_keys(attrs, keys[ind+1:]): # else:
raise ParseError('缺少数据') # keys = ['hour', 'day_s', 'month_s', 'year_s']
attrs['type'] = key # for ind, key in enumerate(keys):
# if key in attrs and attrs['key']:
# if not self.check_required_keys(attrs, keys[ind+1:]):
# raise ParseError('缺少数据')
# attrs['type'] = key
return super().validate(attrs) return super().validate(attrs)

View File

@ -70,32 +70,31 @@ def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: in
ms_day.val = sum_dict_day['sum'] ms_day.val = sum_dict_day['sum']
ms_day.save() ms_day.save()
sum_dict_month = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val')) sum_dict_month = MpointStat.objects.filter(type='day', mpoint=mpoint, year=year, month=month).aggregate(sum=Sum('val'))
params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month} params_month = {'type':'month', 'mpoint': mpoint, 'year': year, 'month': month}
ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month) ms_month, _ = MpointStat.objects.get_or_create(**params_month, defaults=params_month)
ms_month.val = sum_dict_month['sum'] ms_month.val = sum_dict_month['sum']
ms_month.save() ms_month.save()
sum_dict_year = MpointStat.objects.filter(type='hour', mpoint=mpoint, year=year).aggregate(sum=Sum('val')) sum_dict_year = MpointStat.objects.filter(type='month', mpoint=mpoint, year=year).aggregate(sum=Sum('val'))
params_year = {'type':'year', 'mpoint': mpoint, 'year': year} params_year = {'type':'year', 'mpoint': mpoint, 'year': year}
ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year) ms_year, _ = MpointStat.objects.get_or_create(**params_year, defaults=params_year)
ms_year.val = sum_dict_year['sum'] ms_year.val = sum_dict_year['sum']
ms_year.save() ms_year.save()
# 绑定值班记录 # 查找并绑定值班记录
sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mpoint.mgroup).first() sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mpoint.mgroup).first()
year_s, month_s, day_s = 0, 0, 0 if sflog is None: # 需要创建值班记录
if sflog: from apps.wpm.services import make_sflogs
ms.sflog = sflog make_sflogs(mgroup=mpoint.mgroup, start_date=(dt-datetime.timedelta(days=1)).date(), end_date=dt.date())
end_time_local = localtime(sflog.end_time) sflog = SfLog.objects.filter(start_time__lte=dt, end_time__gt=dt, mgroup=mpoint.mgroup).first()
year_s, month_s, day_s = end_time_local.year, end_time_local.month, end_time_local.day ms.sflog = sflog
ms.year_s = year_s end_time_local = localtime(sflog.end_time)
ms.month_s = month_s year_s, month_s, day_s = end_time_local.year, end_time_local.month, end_time_local.day
ms.day_s = day_s ms.year_s = year_s
ms.save() ms.month_s = month_s
else: ms.day_s = day_s
raise Exception('未找到值班记录') ms.save()
if year_s and month_s and day_s:
# 这种是距离点相减 # 这种是距离点相减
# params_s = {'mpoint': mpoint, 'type': 'sflog'} # params_s = {'mpoint': mpoint, 'type': 'sflog'}
# mrs = MpLog.objects.filter( # mrs = MpLog.objects.filter(
@ -109,30 +108,30 @@ def cal_mpointstat_hour(mpointId: str, year: int, month: int, day: int, hour: in
# ms.val = val # ms.val = val
# ms.save() # ms.save()
# 这种是加和 # 开始往上计算
sum_dict_sflog_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog).aggregate(sum=Sum('val')) sum_dict_sflog_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog).aggregate(sum=Sum('val'))
params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s} params_sflog_s = {'type':'sflog', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s) ms_sflog_s, _ = MpointStat.objects.get_or_create(**params_sflog_s, defaults=params_sflog_s)
ms_sflog_s.val = sum_dict_sflog_s['sum'] ms_sflog_s.val = sum_dict_sflog_s['sum']
ms_sflog_s.save() ms_sflog_s.save()
sum_dict_day_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val')) sum_dict_day_s = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s} params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s) ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
ms_day_s.val = sum_dict_day_s['sum'] ms_day_s.val = sum_dict_day_s['sum']
ms_day_s.save() ms_day_s.save()
sum_dict_month_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s, month=month_s).aggregate(sum=Sum('val')) sum_dict_month_s = MpointStat.objects.filter(type='day_s', mpoint=mpoint, year_s=year_s, month_s=month_s).aggregate(sum=Sum('val'))
params_month_s = {'type':'month', 'mpoint': mpoint, 'year': year_s, 'month': month_s} params_month_s = {'type':'month_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s}
ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s) ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
ms_month_s.val = sum_dict_month_s['sum'] ms_month_s.val = sum_dict_month_s['sum']
ms_month_s.save() ms_month_s.save()
sum_dict_year_s = MpointStat.objects.filter(type='hour', mpoint=mpoint, year_s=year_s).aggregate(sum=Sum('val')) sum_dict_year_s = MpointStat.objects.filter(type='month_s', mpoint=mpoint, year_s=year_s).aggregate(sum=Sum('val'))
params_year_s = {'type':'year', 'mpoint': mpoint, 'year': year_s} params_year_s = {'type':'year_s', 'mpoint': mpoint, 'year_s': year_s}
ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s) ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
ms_year_s.val = sum_dict_year_s['sum'] ms_year_s.val = sum_dict_year_s['sum']
ms_year_s.save() ms_year_s.save()
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
def cal_mpointstats(is_now=1): def cal_mpointstats(is_now=1):
@ -141,11 +140,33 @@ def cal_mpointstats(is_now=1):
""" """
now, pre = get_current_and_previous_time() now, pre = get_current_and_previous_time()
if is_now: if is_now:
for mpoint in Mpoint.objects.all(): for mpoint in Mpoint.objects.filter(is_auto=True):
cal_mpointstat_hour.delay(mpoint.id, now.year, now.moth, now.day, now.hour) cal_mpointstat_hour.delay(mpoint.id, now.year, now.moth, now.day, now.hour)
else: else:
for mpoint in Mpoint.objects.all(): for mpoint in Mpoint.objects.filter(is_auto=True):
cal_mpointstat_hour.delay(mpoint.id, pre.year, pre.month, pre.day, pre.hour) cal_mpointstat_hour.delay(mpoint.id, pre.year, pre.month, pre.day, pre.hour)
def cal_mpointstat_manual(mpointId: str, year_s: int, month_s: int, day_s: int|None):
"""
手动录入的测点数据进行往上统计一级一级往上
"""
mpoint = Mpoint.objects.get(id=mpointId)
if day_s is not None:
sum_dict_day_s = MpointStat.objects.filter(type='sflog', mpoint=mpoint, year_s=year_s, month_s=month_s, day_s=day_s).aggregate(sum=Sum('val'))
params_day_s = {'type':'day_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s, 'day_s': day_s}
ms_day_s, _ = MpointStat.objects.get_or_create(**params_day_s, defaults=params_day_s)
ms_day_s.val = sum_dict_day_s['sum']
ms_day_s.save()
sum_dict_month_s = MpointStat.objects.filter(type='day_s', mpoint=mpoint, year_s=year_s, month_s=month_s).aggregate(sum=Sum('val'))
params_month_s = {'type':'month_s', 'mpoint': mpoint, 'year_s': year_s, 'month_s': month_s}
ms_month_s, _ = MpointStat.objects.get_or_create(**params_month_s, defaults=params_month_s)
ms_month_s.val = sum_dict_month_s['sum']
ms_month_s.save()
sum_dict_year_s = MpointStat.objects.filter(type='month_s', mpoint=mpoint, year_s=year_s).aggregate(sum=Sum('val'))
params_year_s = {'type':'year_s', 'mpoint': mpoint, 'year_s': year_s}
ms_year_s, _ = MpointStat.objects.get_or_create(**params_year_s, defaults=params_year_s)
ms_year_s.val = sum_dict_year_s['sum']
ms_year_s.save()

View File

@ -4,6 +4,8 @@ from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet
from rest_framework.mixins import ListModelMixin from rest_framework.mixins import ListModelMixin
from apps.utils.mixins import BulkCreateModelMixin, BulkDestroyModelMixin from apps.utils.mixins import BulkCreateModelMixin, BulkDestroyModelMixin
from apps.enm.serializers import (MpointSerializer, MpLogSerializer, MpointStatSerializer) from apps.enm.serializers import (MpointSerializer, MpLogSerializer, MpointStatSerializer)
from apps.enm.filters import MpointStatFilter
from apps.enm.tasks import cal_mpointstat_manual
class MpointViewSet(CustomModelViewSet): class MpointViewSet(CustomModelViewSet):
@ -42,4 +44,13 @@ class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, ListModelMi
queryset = MpointStat.objects.all() queryset = MpointStat.objects.all()
serializer_class = MpointStatSerializer serializer_class = MpointStatSerializer
select_related_fields = ['mpoint'] select_related_fields = ['mpoint']
filterset_fields = ['mpoint', 'mpoint__mgroup', 'mpoint__mgroup__belong_dept', 'sflog', 'hour', 'day', 'month', 'year', 'day_s', 'month_s', 'year_s'] filterset_class = MpointStatFilter
def perform_create(self, serializer):
ins = serializer.save()
cal_mpointstat_manual(ins.mpoint.id, ins.year_s, ins.month_s, ins.day_s)
def perform_destroy(self, instance):
mpoint, year_s, month_s, day_s = instance.mpoint, instance.year_s, instance.month_s, instance.day_s
instance.delete()
cal_mpointstat_manual(mpoint.id, year_s, month_s, day_s)