diff --git a/apps/ecm/models.py b/apps/ecm/models.py index c5abca5f..06a21e56 100644 --- a/apps/ecm/models.py +++ b/apps/ecm/models.py @@ -38,8 +38,8 @@ class EventCate(CommonAModel): class AlgoChannel(BaseModel): - algo = models.ForeignKey(EventCate, verbose_name='关联算法', on_delete=models.CASCADE) - vchannel = models.ForeignKey(TDevice, verbose_name='视频通道', on_delete=models.CASCADE) + algo = models.ForeignKey(EventCate, verbose_name='关联算法', on_delete=models.CASCADE, related_name='ac_algo') + vchannel = models.ForeignKey(TDevice, verbose_name='视频通道', on_delete=models.CASCADE, related_name='ac_vchannel') always_on = models.BooleanField("实时开启", default=False) class Meta: diff --git a/apps/ecm/tasks.py b/apps/ecm/tasks.py index aee6c02f..cbd324f7 100644 --- a/apps/ecm/tasks.py +++ b/apps/ecm/tasks.py @@ -21,6 +21,7 @@ from django.conf import settings from apps.utils.tasks import CustomTask from apps.ecm.models import AlgoChannel from datetime import timedelta +from apps.ai.main import algo_dict @shared_task(base=CustomTask) @@ -121,20 +122,32 @@ def opl_task(vc_codes: list, opl_id: str): @shared_task(base=CustomTask) def monitor_check(): - """监控 监控任务的执行 + """监控任务监控/每晚执行一次 """ - acs = AlgoChannel.objects.filter(always_on=True) - for i in acs: - td = i.vchannel - if td.mtask_uid: - pass - else: - pass + from apps.third.models import TDevice + from celery.app.control import Control + from server.celery import app + celery_control = Control(app=app) + vcs = TDevice.objects.filter(ac_vchannel__always_on=True) + for vc in vcs: # 获取每一个视频通道 + algo_codes = AlgoChannel.objects.filter(vchannel=vc, always_on=True).values_list('algo__code', flat=True) + cache.set(vc.code, {'algo_codes': algo_codes}) + if vc.mtask_uid: # 如果存在进行中的任务 + celery_control.revoke(vc.mtask_uid, terminate=True) # 先关闭,再开启 + task = loop_and_analyse.delay(vc.code) + vc.mtask_uid = task.id + vc.save() @shared_task(base=CustomTask) def loop_and_analyse(vchannel_code: str): - td = TDevice.objects.filter(code=vchannel_code).first() - pass + vc = TDevice.objects.filter(code=vchannel_code).first() + while True: + vc_dict = cache.get(vchannel_code,{}) + if 'algo_codes' in vc_dict: + intersection = list(set(vc_dict['algo_codes']) & set(algo_dict.keys())) # 取布设与算法的交集 + if intersection: + Thread(target=snap_and_analyse, args=(vc, intersection), daemon=True).start() + time.sleep(10) @shared_task(base=CustomTask) def monitor_and_analyse(vchannel_code: str, algo_codes: list):