from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from apps.dpm.models import CheckTaskSet, CheckWork from django.utils import timezone from datetime import timedelta from django.db import transaction @shared_task(base=CustomTask) def dispath_checkwork_task(checktaskset: str): cts = CheckTaskSet.objects.get(id=checktaskset) with transaction.atomic(): CheckWork.objects.filter(checktaskset=cts).update(usable=False) cw = CheckWork() cw.type = 20 cw.checktaskset = cts now = timezone.now() cw.time_start = now local_time = timezone.localtime(now) cw.name = "风险点排查_" + local_time.strftime('%Y%m%d%H%M%S') if cts.expire: cw.time_end = now + timedelta(hours=cts.expire) cw.user_duty = cts.user_duty cw.riskpoint = cts.riskpoint cw.note = cts.note cw.save() # 发送通知 pass @shared_task(base=CustomTask) def expire_checkwork(): now = timezone.now() CheckWork.objects.filter(expire__gte=now).update(enabled=False)