From 7908b06cd1dd62248a97c53851edb19ce5bbb272 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Wed, 29 Mar 2023 10:22:41 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20opl=20monitor=E4=B8=8D=E7=94=A8celery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/ecm/service.py | 2 +- apps/ecm/tasks.py | 3 ++- apps/opm/serializers.py | 10 +++++++- apps/opm/services.py | 57 ++++++++++++++++++++++------------------- apps/third/dahua.py | 4 +-- 5 files changed, 44 insertions(+), 32 deletions(-) diff --git a/apps/ecm/service.py b/apps/ecm/service.py index 56f3d63a..0e489f32 100644 --- a/apps/ecm/service.py +++ b/apps/ecm/service.py @@ -600,7 +600,7 @@ def get_area_from_point(x: int, y: int, floorNo: str, area_fix_id): def snap_and_analyse(vchannel: TDevice, algo_codes: list, opl: Opl = None): - global_img_o = dhClient.snap(vchannel.code) + global_img_o = dhClient.snap(vchannel.code, raise_exception=False) happen_time = timezone.now() if global_img_o is None: return diff --git a/apps/ecm/tasks.py b/apps/ecm/tasks.py index cbd324f7..9d2cd5a4 100644 --- a/apps/ecm/tasks.py +++ b/apps/ecm/tasks.py @@ -105,7 +105,7 @@ def check_event_timeout(): @shared_task def opl_task(vc_codes: list, opl_id: str): - """作业监控任务 + """作业监控任务(废弃) """ start_time = time.time() opl_cate = Opl.objects.get(id=opl_id).cate @@ -120,6 +120,7 @@ def opl_task(vc_codes: list, opl_id: str): opl.mtask_uid = None opl.save() + @shared_task(base=CustomTask) def monitor_check(): """监控任务监控/每晚执行一次 diff --git a/apps/opm/serializers.py b/apps/opm/serializers.py index 3e2b18bb..46593d34 100644 --- a/apps/opm/serializers.py +++ b/apps/opm/serializers.py @@ -12,6 +12,7 @@ from apps.am.serializers import AreaSimpleSerializer from apps.wf.serializers import TicketSerializer from apps.system.serializers import FileSerializer from apps.third.serializers import TDeviceSimpleSerializer +from django.core.cache import cache class OplCateCreateUpdateSerializer(CustomModelSerializer): @@ -226,10 +227,14 @@ class OplListSerializer(CustomModelSerializer): charger_ = UserSimpleSerializer(source='charger', read_only=True) monitor_ = UserSimpleSerializer(source='monitor', read_only=True) ticket_ = TicketSerializer(source='ticket', read_only=True) - + mtask_uid = serializers.SerializerMethodField() + class Meta: model = Opl fields = '__all__' + + def get_mtask_uid(self, obj): + return cache.get('opl_' + obj.id, None) class OplDetailSerializer(CustomModelSerializer): @@ -248,6 +253,7 @@ class OplDetailSerializer(CustomModelSerializer): close_dos_ = serializers.SerializerMethodField() create_imgs_ = FileSerializer(source='create_imgs', many=True) close_imgs_ = FileSerializer(source='close_imgs', many=True) + mtask_uid = serializers.SerializerMethodField() class Meta: model = Opl @@ -265,6 +271,8 @@ class OplDetailSerializer(CustomModelSerializer): qs = Dictionary.objects.filter(id__in=obj.close_dos).values('id', 'name') return list(qs) + def get_mtask_uid(self, obj): + return cache.get('opl_' + obj.id, None) class OplCloseSerializer(CustomModelSerializer): class Meta: diff --git a/apps/opm/services.py b/apps/opm/services.py index 24c95515..b588ab5b 100644 --- a/apps/opm/services.py +++ b/apps/opm/services.py @@ -1,12 +1,15 @@ import time -from apps.ecm.service import check_not_in_place -from apps.ecm.tasks import opl_task +from apps.ecm.service import check_not_in_place, snap_and_analyse +from apps.ecm.models import EventCate from apps.opm.models import Operation, Opl, OplWorker from apps.third.models import TDevice from apps.utils.sms import send_sms from apps.wf.models import Ticket, Transition from django_celery_results.models import TaskResult +from threading import Thread +import uuid +from django.core.cache import cache def get_op_charger(state, ticket, new_ticket_data, handler): @@ -102,35 +105,16 @@ def opl_start(ticket: Ticket): def start_mtask(opl: Opl): - close_mtask(opl) - op = opl.operation - mtask_uid = None - vc_codes = [] - # 找到作业点的摄像头, 如果指定摄像头就用指定的摄像头 - if op.vchannels: - vc_codes = list(op.vchannels.all().values_list('code', flat=True)) - opl_id = opl.id - task = opl_task.delay(vc_codes, opl_id) - mtask_uid = task.id - opl.mtask_uid = mtask_uid - opl.save() - return dict(vc_codes=vc_codes, mtask_uid=mtask_uid) + tv = uuid.uuid4() + cache.set('opl_'+opl.id, tv, timeout=10800) + Thread(target=opl_monitor, args=(opl, tv), daemon=True).start() + return dict({'mtask_uid': tv}) def close_mtask(opl: Opl): - """关闭celery任务 + """关闭监控线程 """ - from celery.app.control import Control - from server.celery import app - celery_control = Control(app=app) - if opl.mtask_uid: - celery_control.revoke(opl.mtask_uid, terminate=True) - qs = TaskResult.objects.filter(task_args__contains=opl.id, status__in=['PENDING', 'RECEIVED', 'STARTED']) - if qs: - for i in qs: - celery_control.revoke(i.task_id, terminate=True) - opl.mtask_uid = None - opl.save() + cache.delete('opl_' + opl.id) def opl_end(ticket: Ticket): @@ -150,3 +134,22 @@ def opl_end(ticket: Ticket): else: operation.state = Operation.OP_DONE operation.save() + + +def opl_monitor(opl: Opl, tv: str): + """作业视频监控 + tv: 定义的线程ID + """ + tkey = 'opl_' + opl.id + op = opl.operation + # 找到作业点的摄像头, 如果指定摄像头就用指定的摄像头 + vchannels = op.vchannels.all() + if vchannels: + # 找到作业需要加载的算法 + algo_codes = list(EventCate.objects.filter(opl_cates=opl.cate).values_list('code', flat=True)) + if algo_codes: + while tv == cache.get(tkey, None): + for i in vchannels: + Thread(target=snap_and_analyse, args=(i, algo_codes, opl), daemon=True).start() + time.sleep(10) + cache.delete(tkey) \ No newline at end of file diff --git a/apps/third/dahua.py b/apps/third/dahua.py index f720b3eb..4be22f43 100644 --- a/apps/third/dahua.py +++ b/apps/third/dahua.py @@ -118,7 +118,7 @@ class DhClient: """ return '{}/evo-apigw/evo-oss/{}?token={}'.format(settings.DAHUA_BASE_URL, path, cache.get('dh_token', '')) - def snap(self, code: str): + def snap(self, code: str, raise_exception:bool): """摄像头实时截图 Args: @@ -138,7 +138,7 @@ class DhClient: json_data['params'] = '{\"method\":\"dev.snap\",\"id\":123,\"params\":{\"DevID\":\"' + \ str(d_code) + '\",\"DevChannel\":' + str(num) + \ ',\"PicNum\":1,\"SnapType\":1,\"CmdSrc\":0}}' - is_ok, res = self.request(**dhapis['dev_snap'], json=json_data) + is_ok, res = self.request(**dhapis['dev_snap'], json=json_data, raise_exception=raise_exception) if is_ok == 'success': res = json.loads(res) return self.get_full_pic(res['params']['PicInfo'])