diff --git a/apps/ecm/tasks.py b/apps/ecm/tasks.py index 9d2cd5a4..6458f58b 100644 --- a/apps/ecm/tasks.py +++ b/apps/ecm/tasks.py @@ -22,7 +22,8 @@ from apps.utils.tasks import CustomTask from apps.ecm.models import AlgoChannel from datetime import timedelta from apps.ai.main import algo_dict - +from datetime import datetime +import uuid @shared_task(base=CustomTask) def store_img(code: str, duration: int): @@ -122,28 +123,30 @@ def opl_task(vc_codes: list, opl_id: str): @shared_task(base=CustomTask) -def monitor_check(): - """监控任务监控/每晚执行一次 +def monitor_check(vc_ids = []): """ - from apps.third.models import TDevice - from celery.app.control import Control - from server.celery import app - celery_control = Control(app=app) - vcs = TDevice.objects.filter(ac_vchannel__always_on=True) - for vc in vcs: # 获取每一个视频通道 + + 监控任务监控重启/可每隔一段时间执行一次 + """ + if vc_ids: + vcs = TDevice.objects.filter(id__in=vc_ids, type=TDevice.DEVICE_VCHANNEL) + else: + vcs = TDevice.objects.filter(ac_vchannel__always_on=True, type=TDevice.DEVICE_VCHANNEL) + for vc in vcs: # 获取每一个要开启的视频通道 algo_codes = AlgoChannel.objects.filter(vchannel=vc, always_on=True).values_list('algo__code', flat=True) - cache.set(vc.code, {'algo_codes': algo_codes}) - if vc.mtask_uid: # 如果存在进行中的任务 - celery_control.revoke(vc.mtask_uid, terminate=True) # 先关闭,再开启 - task = loop_and_analyse.delay(vc.code) - vc.mtask_uid = task.id - vc.save() + ckey = 'vchannel_' + vc.id + if algo_codes: + tid = str(uuid.uuid4()) + cache.set(ckey, {'id': tid, 'algo_codes': algo_codes, 'start_time': datetime.strftime("%Y-%m-%d %H:%M:%S")}) + Thread(target=loop_and_analyse, args=(vc, tid), daemon=True).start() + else: + cache.delete(ckey) @shared_task(base=CustomTask) -def loop_and_analyse(vchannel_code: str): - vc = TDevice.objects.filter(code=vchannel_code).first() - while True: - vc_dict = cache.get(vchannel_code,{}) +def loop_and_analyse(vc: TDevice, tid: str): + ckey = 'vchannel_' + vc.id + while cache.get(ckey, {}).get('id', None) == tid: + vc_dict = cache.get(ckey, {}) if 'algo_codes' in vc_dict: intersection = list(set(vc_dict['algo_codes']) & set(algo_dict.keys())) # 取布设与算法的交集 if intersection: diff --git a/apps/opm/models.py b/apps/opm/models.py index 1662e3d6..2ca62208 100644 --- a/apps/opm/models.py +++ b/apps/opm/models.py @@ -117,7 +117,7 @@ class Opl(CommonBDModel): on_delete=models.SET_NULL, related_name='opl_ticket', null=True, blank=True) - mtask_uid = models.CharField('监控任务ID', max_length=100, null=True, blank=True) + mtask_uid = models.CharField('监控任务ID', max_length=100, null=True, blank=True) # 废弃字段 class OplWorker(BaseModel): diff --git a/apps/third/models.py b/apps/third/models.py index 6b339138..f19fe7f3 100755 --- a/apps/third/models.py +++ b/apps/third/models.py @@ -45,7 +45,7 @@ class TDevice(BaseModel): is_clock = models.BooleanField('是否打卡设备', default=False) access_list = models.JSONField('自动下发人员类型', default=list, null=False, blank=True, help_text='employee/remployee/visitor/driver') - mtask_uid = models.CharField('监控任务ID', max_length=100, null=True, blank=True) + mtask_uid = models.CharField('监控任务ID', max_length=100, null=True, blank=True) # 废弃字段 # algos = models.ManyToManyField('ecm.eventcate', through='ecm.algochannel', blank=True) third_info = models.JSONField('三方信息', default=dict, null=False, blank=True) diff --git a/apps/third/serializers.py b/apps/third/serializers.py index 34a2181f..5b32c91a 100755 --- a/apps/third/serializers.py +++ b/apps/third/serializers.py @@ -4,6 +4,7 @@ from apps.am.models import Area from apps.hrm.serializers import EmployeeSimpleSerializer from apps.third.models import BltBind, TDevice, Tlog from apps.utils.serializers import CustomModelSerializer +from django.core.cache import cache class PicSerializer(serializers.Serializer): @@ -13,11 +14,14 @@ class PicSerializer(serializers.Serializer): class TDeviceSerializer(CustomModelSerializer): employee_ = EmployeeSimpleSerializer(source='employee', read_only=True) area_name = serializers.CharField(source='area.name', read_only=True) + mtask_ = serializers.SerializerMethodField() class Meta: model = TDevice fields = '__all__' + def get_mtask_(self, obj): + return cache.get('vchannel_' + obj.code, {}) class TDeviceUpdateSerializer(CustomModelSerializer): class Meta: diff --git a/apps/third/views_d.py b/apps/third/views_d.py index 2227d405..7e3b0f98 100644 --- a/apps/third/views_d.py +++ b/apps/third/views_d.py @@ -15,6 +15,7 @@ from rest_framework.views import APIView from rest_framework.exceptions import ParseError from django.db import transaction from rest_framework.mixins import ListModelMixin, CreateModelMixin, DestroyModelMixin, UpdateModelMixin +from apps.utils.serializers import PkSerializer class BltViewSet(CustomGenericViewSet): @@ -166,6 +167,19 @@ class TDeviceViewSet(ListModelMixin, UpdateModelMixin, DestroyModelMixin, Custom else: return Response(data) + @action(methods=['post'], detail=False, perms_map={'post': '*'}, + serializer_class=PkSerializer) + def start_mtask(self, request): + """开启视频算法监控 + + 开启视频算法监控 + """ + request_data = request.data + pks = request_data.get('pks', []) + from apps.ecm.tasks import monitor_check + monitor_check(pks) + return Response() + @action(methods=['post'], detail=False, perms_map={'post': '*'}, serializer_class=Serializer) @transaction.atomic