feat: enm 修改重跑能源计算 不用从mplogx 开始计算

This commit is contained in:
TianyangZhang 2026-04-24 15:00:47 +08:00
parent b6b79da3b1
commit 7577a46900
3 changed files with 176 additions and 8 deletions

View File

@ -206,6 +206,7 @@ class EnStat2Serializer(CustomModelSerializer):
class ReCalSerializer(serializers.Serializer):
start_time = serializers.DateTimeField(label="开始时间")
end_time = serializers.DateTimeField(label="结束时间")
mpoint_stat = serializers.BooleanField(label="从MpointStat开始计算", required=False, default=False)
class MpointStatCorrectSerializer(CustomModelSerializer):

View File

@ -114,22 +114,185 @@ def db_ins_mplogx():
@shared_task(base=CustomTask)
def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal_attrs=[]):
def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal_attrs=[], mpoint_stat=False):
"""
重跑某一段时间的任务
mpoint_stat: True时从已有的MpointStat hour记录开始重算(跳过MpLogx)只重算day/month/year/sflog聚合速度更快
"""
mytz = tz.gettz(settings.TIME_ZONE)
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
start_time.replace(tzinfo=mytz)
start_time = start_time.replace(tzinfo=mytz)
end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
start_time.replace(tzinfo=mytz)
end_time = end_time.replace(tzinfo=mytz)
if mpoint_stat:
_recal_from_mpointstat(start_time, end_time, m_code_list, cal_attrs)
else:
current_time = start_time
while current_time <= end_time:
year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs)
myLogger.info("now: {} cal_mpointstats completed: {}".format(datetime.datetime.now(), current_time))
current_time += datetime.timedelta(hours=1)
def _recal_from_mpointstat(start_time, end_time, m_code_list=[], cal_attrs=[]):
"""
从已有的MpointStat hour记录开始批量重算day/month/year/sflog聚合
不读取MpLogx速度远快于完整重算
"""
# 确定需要重算的测点
if m_code_list:
mpoints = list(Mpoint.objects.filter(code__in=m_code_list, enabled=True))
# 也要包含依赖这些测点的计算测点
related = Mpoint.objects.none()
for code in m_code_list:
related = related | Mpoint.objects.filter(
type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False,
formula__contains='{' + code + '}'
)
mpoints.extend(list(related.distinct()))
mpoints = list({mp.id: mp for mp in mpoints}.values()) # 去重
else:
mpoints = list(Mpoint.objects.filter(enabled=True, material__isnull=False))
mpoint_ids = [mp.id for mp in mpoints]
# 收集需要重算的所有 (year, month, day) 和 (year, month)
days_set = set()
months_set = set()
years_set = set()
current_time = start_time
while current_time <= end_time:
days_set.add((current_time.year, current_time.month, current_time.day))
months_set.add((current_time.year, current_time.month))
years_set.add(current_time.year)
current_time += datetime.timedelta(hours=1)
myLogger.info(f"_recal_from_mpointstat: {len(mpoints)} mpoints, {len(days_set)} days")
# 批量重算 day = sum(hour)
for year, month, day in days_set:
# 一次查询拿到所有测点该天的hour汇总
hour_sums = dict(
MpointStat.objects.filter(
type="hour", mpoint_id__in=mpoint_ids, year=year, month=month, day=day
).values('mpoint_id').annotate(total=Sum('val')).values_list('mpoint_id', 'total')
)
for mp in mpoints:
val = hour_sums.get(mp.id)
if val is None:
continue
params_day = {"type": "day", "mpoint": mp, "year": year, "month": month, "day": day}
ms_day, _ = MpointStat.safe_get_or_create(**params_day, defaults=params_day)
if ms_day.val_correct is None:
ms_day.val = val
ms_day.val_origin = val
ms_day.save()
# 批量重算 month = sum(day)
for year, month in months_set:
day_sums = dict(
MpointStat.objects.filter(
type="day", mpoint_id__in=mpoint_ids, year=year, month=month
).values('mpoint_id').annotate(total=Sum('val')).values_list('mpoint_id', 'total')
)
for mp in mpoints:
val = day_sums.get(mp.id)
if val is None:
continue
params_month = {"type": "month", "mpoint": mp, "year": year, "month": month}
ms_month, _ = MpointStat.safe_get_or_create(**params_month, defaults=params_month)
if ms_month.val_correct is None:
ms_month.val = val
ms_month.val_origin = val
ms_month.save()
# 批量重算 year = sum(month)
for year in years_set:
month_sums = dict(
MpointStat.objects.filter(
type="month", mpoint_id__in=mpoint_ids, year=year
).values('mpoint_id').annotate(total=Sum('val')).values_list('mpoint_id', 'total')
)
for mp in mpoints:
val = month_sums.get(mp.id)
if val is None:
continue
params_year = {"type": "year", "mpoint": mp, "year": year}
ms_year, _ = MpointStat.safe_get_or_create(**params_year, defaults=params_year)
if ms_year.val_correct is None:
ms_year.val = val
ms_year.val_origin = val
ms_year.save()
# 重算 sflog 相关统计 (hour_s -> sflog)
mytz = tz.gettz(settings.TIME_ZONE)
mgroups = Mgroup.objects.filter(need_enm=True).order_by("sort")
current_time = start_time
sflog_cache = {}
while current_time <= end_time:
year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz)
for mgroup in mgroups:
cache_key = (mgroup.id, year, month, day, hour)
if cache_key not in sflog_cache:
sflog = get_sflog(mgroup, dt)
sflog_cache[cache_key] = sflog
sflog = sflog_cache[cache_key]
if sflog is None:
continue
year_s, month_s, day_s = sflog.get_ymd
# 获取该mgroup下的测点
group_mpoints = [mp for mp in mpoints if mp.mgroup_id == mgroup.id]
for mp in group_mpoints:
# 找到对应的hour stat
ms_hour = MpointStat.objects.filter(
type="hour", mpoint=mp, year=year, month=month, day=day, hour=hour
).first()
if ms_hour is None:
continue
params_hour_s = {
"type": "hour_s", "mpoint": mp, "sflog": sflog, "mgroup": mgroup,
"year": year, "month": month, "day": day,
"year_s": year_s, "month_s": month_s, "day_s": day_s, "hour": hour,
}
ms_hour_s, _ = MpointStat.safe_get_or_create(**params_hour_s, defaults=params_hour_s)
ms_hour_s.val = ms_hour_s.val_correct if ms_hour_s.val_correct is not None else ms_hour.val
ms_hour_s.save()
# 重算 sflog 聚合
for mp in group_mpoints:
sflog_key = (mp.id, sflog.id, year_s, month_s, day_s)
if sflog_key in sflog_cache:
continue # 同一sflog只算一次
sflog_cache[sflog_key] = True
params_sflog_s = {
"type": "sflog", "mpoint": mp, "sflog": sflog,
"year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup,
}
ms_sflog_s, _ = MpointStat.safe_get_or_create(**params_sflog_s, defaults=params_sflog_s)
if ms_sflog_s.val_correct is None:
sum_val = MpointStat.objects.filter(
type="hour_s", mpoint=mp, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog
).aggregate(sum=Sum("val"))
ms_sflog_s.val = sum_val['sum'] if sum_val['sum'] is not None else 0
ms_sflog_s.val_origin = ms_sflog_s.val
ms_sflog_s.save()
myLogger.info("now: {} _recal_from_mpointstat completed: {}".format(datetime.datetime.now(), current_time))
current_time += datetime.timedelta(hours=1)
# 重算 enstat
current_time = start_time
while current_time <= end_time:
year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour
cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs)
myLogger.info("now: {} cal_mpointstats completed: {}".format(datetime.datetime.now(), current_time))
for mgroup in mgroups:
cal_enstat("hour_s", None, mgroup.id, year, month, day, hour, None, None, None, True, cal_attrs)
current_time += datetime.timedelta(hours=1)
myLogger.info("_recal_from_mpointstat completed all")
@shared_task(base=CustomTask)
def correct_bill_date():

View File

@ -378,10 +378,14 @@ class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, CustomListM
重新运行某段时间的enm计算
"""
data = request.data
sr = ReCalSerializer(data=data)
sr = ReCalSerializer(data=request.data)
sr.is_valid(raise_exception=True)
task = cal_mpointstats_duration.delay(data["start_time"], data["end_time"])
data = sr.validated_data
task = cal_mpointstats_duration.delay(
data["start_time"].strftime("%Y-%m-%d %H:%M:%S"),
data["end_time"].strftime("%Y-%m-%d %H:%M:%S"),
mpoint_stat=data.get("mpoint_stat", False)
)
return Response({"task_id": task.task_id})
@action(methods=["get"], detail=False, perms_map={"get": "*"})