"""注册抓取链常驻保活的 beat 周期任务。 apps.resm.tasks.ensure_fetch_running 每 60 秒检查各自触发抓取链的 alive 心跳, 链已死(重启/崩溃/空闲过期)且未被手动停止时重新点火, 保证重启后自愈。 update_or_create / get_or_create 保证迁移可安全重跑。 """ from django.db import migrations ENSURE_TASK = "apps.resm.tasks.ensure_fetch_running" ENSURE_NAME = "resm: 抓取链常驻保活(beat 点火)" def seed(apps, schema_editor): IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule") PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") sched, _ = IntervalSchedule.objects.get_or_create(every=60, period="seconds") PeriodicTask.objects.update_or_create( name=ENSURE_NAME, defaults={ "task": ENSURE_TASK, "interval": sched, "crontab": None, "enabled": True, "description": "每 60 秒检查抓取自触发链 alive 心跳, 链已死且未手动停止则重新点火, 保证重启自愈", }, ) def unseed(apps, schema_editor): PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") PeriodicTask.objects.filter(name=ENSURE_NAME).delete() class Migration(migrations.Migration): dependencies = [ ("resm", "0009_seed_monitors_and_schedule"), ("django_celery_beat", "__latest__"), ] operations = [ migrations.RunPython(seed, unseed), ]