from apps.utils.tasks import CustomTask from celery import shared_task from apps.dpm.models import CheckTaskSet, CheckWork from django.utils import timezone from datetime import timedelta @shared_task(base=CustomTask) def dispath_checkwork_task(checktaskset: str): cts = CheckTaskSet.objects.get(id=checktaskset) cw = CheckWork() cw.type = 20 cw.checktaskset = cts cw.name = '风险点检查(自动派发)' now = timezone.now() cw.time_start = now if cts.expire: cw.time_end = now + timedelta(hours=cts.expire) cw.user_duty = cts.user_duty cw.riskpoint = cts.riskpoint cw.note = cts.note cw.save() # 发送通知 pass @shared_task(base=CustomTask) def expire_checkwork(): now = timezone.now() CheckWork.objects.filter(expire__gte=now).update(enabled=False)