diff --git a/apps/enm/serializers.py b/apps/enm/serializers.py index 96eb9caf..1525f019 100644 --- a/apps/enm/serializers.py +++ b/apps/enm/serializers.py @@ -206,6 +206,7 @@ class EnStat2Serializer(CustomModelSerializer): class ReCalSerializer(serializers.Serializer): start_time = serializers.DateTimeField(label="开始时间") end_time = serializers.DateTimeField(label="结束时间") + mpoint_stat = serializers.BooleanField(label="从MpointStat开始计算", required=False, default=False) class MpointStatCorrectSerializer(CustomModelSerializer): diff --git a/apps/enm/tasks.py b/apps/enm/tasks.py index 6edfbd93..ff6dca1c 100644 --- a/apps/enm/tasks.py +++ b/apps/enm/tasks.py @@ -114,22 +114,185 @@ def db_ins_mplogx(): @shared_task(base=CustomTask) -def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal_attrs=[]): +def cal_mpointstats_duration(start_time: str, end_time: str, m_code_list=[], cal_attrs=[], mpoint_stat=False): """ 重跑某一段时间的任务 + mpoint_stat: True时从已有的MpointStat hour记录开始重算(跳过MpLogx),只重算day/month/year/sflog聚合,速度更快 """ mytz = tz.gettz(settings.TIME_ZONE) start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") - start_time.replace(tzinfo=mytz) + start_time = start_time.replace(tzinfo=mytz) end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") - start_time.replace(tzinfo=mytz) + end_time = end_time.replace(tzinfo=mytz) + + if mpoint_stat: + _recal_from_mpointstat(start_time, end_time, m_code_list, cal_attrs) + else: + current_time = start_time + while current_time <= end_time: + year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour + cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs) + myLogger.info("now: {} cal_mpointstats completed: {}".format(datetime.datetime.now(), current_time)) + current_time += datetime.timedelta(hours=1) + + +def _recal_from_mpointstat(start_time, end_time, m_code_list=[], cal_attrs=[]): + """ + 从已有的MpointStat hour记录开始,批量重算day/month/year/sflog聚合 + 不读取MpLogx,速度远快于完整重算 + """ + # 确定需要重算的测点 + if m_code_list: + mpoints = list(Mpoint.objects.filter(code__in=m_code_list, enabled=True)) + # 也要包含依赖这些测点的计算测点 + related = Mpoint.objects.none() + for code in m_code_list: + related = related | Mpoint.objects.filter( + type=Mpoint.MT_COMPUTE, enabled=True, material__isnull=False, + formula__contains='{' + code + '}' + ) + mpoints.extend(list(related.distinct())) + mpoints = list({mp.id: mp for mp in mpoints}.values()) # 去重 + else: + mpoints = list(Mpoint.objects.filter(enabled=True, material__isnull=False)) + + mpoint_ids = [mp.id for mp in mpoints] + + # 收集需要重算的所有 (year, month, day) 和 (year, month) + days_set = set() + months_set = set() + years_set = set() + current_time = start_time + while current_time <= end_time: + days_set.add((current_time.year, current_time.month, current_time.day)) + months_set.add((current_time.year, current_time.month)) + years_set.add(current_time.year) + current_time += datetime.timedelta(hours=1) + + myLogger.info(f"_recal_from_mpointstat: {len(mpoints)} mpoints, {len(days_set)} days") + + # 批量重算 day = sum(hour) + for year, month, day in days_set: + # 一次查询拿到所有测点该天的hour汇总 + hour_sums = dict( + MpointStat.objects.filter( + type="hour", mpoint_id__in=mpoint_ids, year=year, month=month, day=day + ).values('mpoint_id').annotate(total=Sum('val')).values_list('mpoint_id', 'total') + ) + for mp in mpoints: + val = hour_sums.get(mp.id) + if val is None: + continue + params_day = {"type": "day", "mpoint": mp, "year": year, "month": month, "day": day} + ms_day, _ = MpointStat.safe_get_or_create(**params_day, defaults=params_day) + if ms_day.val_correct is None: + ms_day.val = val + ms_day.val_origin = val + ms_day.save() + + # 批量重算 month = sum(day) + for year, month in months_set: + day_sums = dict( + MpointStat.objects.filter( + type="day", mpoint_id__in=mpoint_ids, year=year, month=month + ).values('mpoint_id').annotate(total=Sum('val')).values_list('mpoint_id', 'total') + ) + for mp in mpoints: + val = day_sums.get(mp.id) + if val is None: + continue + params_month = {"type": "month", "mpoint": mp, "year": year, "month": month} + ms_month, _ = MpointStat.safe_get_or_create(**params_month, defaults=params_month) + if ms_month.val_correct is None: + ms_month.val = val + ms_month.val_origin = val + ms_month.save() + + # 批量重算 year = sum(month) + for year in years_set: + month_sums = dict( + MpointStat.objects.filter( + type="month", mpoint_id__in=mpoint_ids, year=year + ).values('mpoint_id').annotate(total=Sum('val')).values_list('mpoint_id', 'total') + ) + for mp in mpoints: + val = month_sums.get(mp.id) + if val is None: + continue + params_year = {"type": "year", "mpoint": mp, "year": year} + ms_year, _ = MpointStat.safe_get_or_create(**params_year, defaults=params_year) + if ms_year.val_correct is None: + ms_year.val = val + ms_year.val_origin = val + ms_year.save() + + # 重算 sflog 相关统计 (hour_s -> sflog) + mytz = tz.gettz(settings.TIME_ZONE) + mgroups = Mgroup.objects.filter(need_enm=True).order_by("sort") + current_time = start_time + sflog_cache = {} + while current_time <= end_time: + year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour + dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=0, second=0, tzinfo=mytz) + for mgroup in mgroups: + cache_key = (mgroup.id, year, month, day, hour) + if cache_key not in sflog_cache: + sflog = get_sflog(mgroup, dt) + sflog_cache[cache_key] = sflog + sflog = sflog_cache[cache_key] + if sflog is None: + continue + year_s, month_s, day_s = sflog.get_ymd + # 获取该mgroup下的测点 + group_mpoints = [mp for mp in mpoints if mp.mgroup_id == mgroup.id] + for mp in group_mpoints: + # 找到对应的hour stat + ms_hour = MpointStat.objects.filter( + type="hour", mpoint=mp, year=year, month=month, day=day, hour=hour + ).first() + if ms_hour is None: + continue + params_hour_s = { + "type": "hour_s", "mpoint": mp, "sflog": sflog, "mgroup": mgroup, + "year": year, "month": month, "day": day, + "year_s": year_s, "month_s": month_s, "day_s": day_s, "hour": hour, + } + ms_hour_s, _ = MpointStat.safe_get_or_create(**params_hour_s, defaults=params_hour_s) + ms_hour_s.val = ms_hour_s.val_correct if ms_hour_s.val_correct is not None else ms_hour.val + ms_hour_s.save() + + # 重算 sflog 聚合 + for mp in group_mpoints: + sflog_key = (mp.id, sflog.id, year_s, month_s, day_s) + if sflog_key in sflog_cache: + continue # 同一sflog只算一次 + sflog_cache[sflog_key] = True + params_sflog_s = { + "type": "sflog", "mpoint": mp, "sflog": sflog, + "year_s": year_s, "month_s": month_s, "day_s": day_s, "mgroup": mgroup, + } + ms_sflog_s, _ = MpointStat.safe_get_or_create(**params_sflog_s, defaults=params_sflog_s) + if ms_sflog_s.val_correct is None: + sum_val = MpointStat.objects.filter( + type="hour_s", mpoint=mp, year_s=year_s, month_s=month_s, day_s=day_s, sflog=sflog + ).aggregate(sum=Sum("val")) + ms_sflog_s.val = sum_val['sum'] if sum_val['sum'] is not None else 0 + ms_sflog_s.val_origin = ms_sflog_s.val + ms_sflog_s.save() + + myLogger.info("now: {} _recal_from_mpointstat completed: {}".format(datetime.datetime.now(), current_time)) + current_time += datetime.timedelta(hours=1) + + # 重算 enstat current_time = start_time while current_time <= end_time: year, month, day, hour = current_time.year, current_time.month, current_time.day, current_time.hour - cal_mpointstats(0, year, month, day, hour, m_code_list, cal_attrs) - myLogger.info("now: {} cal_mpointstats completed: {}".format(datetime.datetime.now(), current_time)) + for mgroup in mgroups: + cal_enstat("hour_s", None, mgroup.id, year, month, day, hour, None, None, None, True, cal_attrs) current_time += datetime.timedelta(hours=1) + myLogger.info("_recal_from_mpointstat completed all") + @shared_task(base=CustomTask) def correct_bill_date(): diff --git a/apps/enm/views.py b/apps/enm/views.py index 49d4c5a2..11a74a1b 100644 --- a/apps/enm/views.py +++ b/apps/enm/views.py @@ -378,10 +378,14 @@ class MpointStatViewSet(BulkCreateModelMixin, BulkDestroyModelMixin, CustomListM 重新运行某段时间的enm计算 """ - data = request.data - sr = ReCalSerializer(data=data) + sr = ReCalSerializer(data=request.data) sr.is_valid(raise_exception=True) - task = cal_mpointstats_duration.delay(data["start_time"], data["end_time"]) + data = sr.validated_data + task = cal_mpointstats_duration.delay( + data["start_time"].strftime("%Y-%m-%d %H:%M:%S"), + data["end_time"].strftime("%Y-%m-%d %H:%M:%S"), + mpoint_stat=data.get("mpoint_stat", False) + ) return Response({"task_id": task.task_id}) @action(methods=["get"], detail=False, perms_map={"get": "*"})