feat: 算法布设可开启实时监控任务(循环抓拍)

This commit is contained in:
caoqianming 2023-03-21 18:17:55 +08:00
parent 74b8681a8a
commit e4aad35f06
2 changed files with 25 additions and 12 deletions

View File

@ -38,8 +38,8 @@ class EventCate(CommonAModel):
class AlgoChannel(BaseModel): class AlgoChannel(BaseModel):
algo = models.ForeignKey(EventCate, verbose_name='关联算法', on_delete=models.CASCADE) algo = models.ForeignKey(EventCate, verbose_name='关联算法', on_delete=models.CASCADE, related_name='ac_algo')
vchannel = models.ForeignKey(TDevice, verbose_name='视频通道', on_delete=models.CASCADE) vchannel = models.ForeignKey(TDevice, verbose_name='视频通道', on_delete=models.CASCADE, related_name='ac_vchannel')
always_on = models.BooleanField("实时开启", default=False) always_on = models.BooleanField("实时开启", default=False)
class Meta: class Meta:

View File

@ -21,6 +21,7 @@ from django.conf import settings
from apps.utils.tasks import CustomTask from apps.utils.tasks import CustomTask
from apps.ecm.models import AlgoChannel from apps.ecm.models import AlgoChannel
from datetime import timedelta from datetime import timedelta
from apps.ai.main import algo_dict
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
@ -121,20 +122,32 @@ def opl_task(vc_codes: list, opl_id: str):
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
def monitor_check(): def monitor_check():
"""监控 监控任务的执行 """监控任务监控/每晚执行一次
""" """
acs = AlgoChannel.objects.filter(always_on=True) from apps.third.models import TDevice
for i in acs: from celery.app.control import Control
td = i.vchannel from server.celery import app
if td.mtask_uid: celery_control = Control(app=app)
pass vcs = TDevice.objects.filter(ac_vchannel__always_on=True)
else: for vc in vcs: # 获取每一个视频通道
pass algo_codes = AlgoChannel.objects.filter(vchannel=vc, always_on=True).values_list('algo__code', flat=True)
cache.set(vc.code, {'algo_codes': algo_codes})
if vc.mtask_uid: # 如果存在进行中的任务
celery_control.revoke(vc.mtask_uid, terminate=True) # 先关闭,再开启
task = loop_and_analyse.delay(vc.code)
vc.mtask_uid = task.id
vc.save()
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
def loop_and_analyse(vchannel_code: str): def loop_and_analyse(vchannel_code: str):
td = TDevice.objects.filter(code=vchannel_code).first() vc = TDevice.objects.filter(code=vchannel_code).first()
pass while True:
vc_dict = cache.get(vchannel_code,{})
if 'algo_codes' in vc_dict:
intersection = list(set(vc_dict['algo_codes']) & set(algo_dict.keys())) # 取布设与算法的交集
if intersection:
Thread(target=snap_and_analyse, args=(vc, intersection), daemon=True).start()
time.sleep(10)
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
def monitor_and_analyse(vchannel_code: str, algo_codes: list): def monitor_and_analyse(vchannel_code: str, algo_codes: list):