fix: opl monitor不用celery

This commit is contained in:
caoqianming 2023-03-29 10:22:41 +08:00
parent 9b0de2e011
commit 7908b06cd1
5 changed files with 44 additions and 32 deletions

View File

@ -600,7 +600,7 @@ def get_area_from_point(x: int, y: int, floorNo: str, area_fix_id):
def snap_and_analyse(vchannel: TDevice, algo_codes: list, opl: Opl = None): def snap_and_analyse(vchannel: TDevice, algo_codes: list, opl: Opl = None):
global_img_o = dhClient.snap(vchannel.code) global_img_o = dhClient.snap(vchannel.code, raise_exception=False)
happen_time = timezone.now() happen_time = timezone.now()
if global_img_o is None: if global_img_o is None:
return return

View File

@ -105,7 +105,7 @@ def check_event_timeout():
@shared_task @shared_task
def opl_task(vc_codes: list, opl_id: str): def opl_task(vc_codes: list, opl_id: str):
"""作业监控任务 """作业监控任务(废弃)
""" """
start_time = time.time() start_time = time.time()
opl_cate = Opl.objects.get(id=opl_id).cate opl_cate = Opl.objects.get(id=opl_id).cate
@ -120,6 +120,7 @@ def opl_task(vc_codes: list, opl_id: str):
opl.mtask_uid = None opl.mtask_uid = None
opl.save() opl.save()
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
def monitor_check(): def monitor_check():
"""监控任务监控/每晚执行一次 """监控任务监控/每晚执行一次

View File

@ -12,6 +12,7 @@ from apps.am.serializers import AreaSimpleSerializer
from apps.wf.serializers import TicketSerializer from apps.wf.serializers import TicketSerializer
from apps.system.serializers import FileSerializer from apps.system.serializers import FileSerializer
from apps.third.serializers import TDeviceSimpleSerializer from apps.third.serializers import TDeviceSimpleSerializer
from django.core.cache import cache
class OplCateCreateUpdateSerializer(CustomModelSerializer): class OplCateCreateUpdateSerializer(CustomModelSerializer):
@ -226,11 +227,15 @@ class OplListSerializer(CustomModelSerializer):
charger_ = UserSimpleSerializer(source='charger', read_only=True) charger_ = UserSimpleSerializer(source='charger', read_only=True)
monitor_ = UserSimpleSerializer(source='monitor', read_only=True) monitor_ = UserSimpleSerializer(source='monitor', read_only=True)
ticket_ = TicketSerializer(source='ticket', read_only=True) ticket_ = TicketSerializer(source='ticket', read_only=True)
mtask_uid = serializers.SerializerMethodField()
class Meta: class Meta:
model = Opl model = Opl
fields = '__all__' fields = '__all__'
def get_mtask_uid(self, obj):
return cache.get('opl_' + obj.id, None)
class OplDetailSerializer(CustomModelSerializer): class OplDetailSerializer(CustomModelSerializer):
cate_name = serializers.CharField(source='cate.name', read_only=True) cate_name = serializers.CharField(source='cate.name', read_only=True)
@ -248,6 +253,7 @@ class OplDetailSerializer(CustomModelSerializer):
close_dos_ = serializers.SerializerMethodField() close_dos_ = serializers.SerializerMethodField()
create_imgs_ = FileSerializer(source='create_imgs', many=True) create_imgs_ = FileSerializer(source='create_imgs', many=True)
close_imgs_ = FileSerializer(source='close_imgs', many=True) close_imgs_ = FileSerializer(source='close_imgs', many=True)
mtask_uid = serializers.SerializerMethodField()
class Meta: class Meta:
model = Opl model = Opl
@ -265,6 +271,8 @@ class OplDetailSerializer(CustomModelSerializer):
qs = Dictionary.objects.filter(id__in=obj.close_dos).values('id', 'name') qs = Dictionary.objects.filter(id__in=obj.close_dos).values('id', 'name')
return list(qs) return list(qs)
def get_mtask_uid(self, obj):
return cache.get('opl_' + obj.id, None)
class OplCloseSerializer(CustomModelSerializer): class OplCloseSerializer(CustomModelSerializer):
class Meta: class Meta:

View File

@ -1,12 +1,15 @@
import time import time
from apps.ecm.service import check_not_in_place from apps.ecm.service import check_not_in_place, snap_and_analyse
from apps.ecm.tasks import opl_task from apps.ecm.models import EventCate
from apps.opm.models import Operation, Opl, OplWorker from apps.opm.models import Operation, Opl, OplWorker
from apps.third.models import TDevice from apps.third.models import TDevice
from apps.utils.sms import send_sms from apps.utils.sms import send_sms
from apps.wf.models import Ticket, Transition from apps.wf.models import Ticket, Transition
from django_celery_results.models import TaskResult from django_celery_results.models import TaskResult
from threading import Thread
import uuid
from django.core.cache import cache
def get_op_charger(state, ticket, new_ticket_data, handler): def get_op_charger(state, ticket, new_ticket_data, handler):
@ -102,35 +105,16 @@ def opl_start(ticket: Ticket):
def start_mtask(opl: Opl): def start_mtask(opl: Opl):
close_mtask(opl) tv = uuid.uuid4()
op = opl.operation cache.set('opl_'+opl.id, tv, timeout=10800)
mtask_uid = None Thread(target=opl_monitor, args=(opl, tv), daemon=True).start()
vc_codes = [] return dict({'mtask_uid': tv})
# 找到作业点的摄像头, 如果指定摄像头就用指定的摄像头
if op.vchannels:
vc_codes = list(op.vchannels.all().values_list('code', flat=True))
opl_id = opl.id
task = opl_task.delay(vc_codes, opl_id)
mtask_uid = task.id
opl.mtask_uid = mtask_uid
opl.save()
return dict(vc_codes=vc_codes, mtask_uid=mtask_uid)
def close_mtask(opl: Opl): def close_mtask(opl: Opl):
"""关闭celery任务 """关闭监控线程
""" """
from celery.app.control import Control cache.delete('opl_' + opl.id)
from server.celery import app
celery_control = Control(app=app)
if opl.mtask_uid:
celery_control.revoke(opl.mtask_uid, terminate=True)
qs = TaskResult.objects.filter(task_args__contains=opl.id, status__in=['PENDING', 'RECEIVED', 'STARTED'])
if qs:
for i in qs:
celery_control.revoke(i.task_id, terminate=True)
opl.mtask_uid = None
opl.save()
def opl_end(ticket: Ticket): def opl_end(ticket: Ticket):
@ -150,3 +134,22 @@ def opl_end(ticket: Ticket):
else: else:
operation.state = Operation.OP_DONE operation.state = Operation.OP_DONE
operation.save() operation.save()
def opl_monitor(opl: Opl, tv: str):
"""作业视频监控
tv: 定义的线程ID
"""
tkey = 'opl_' + opl.id
op = opl.operation
# 找到作业点的摄像头, 如果指定摄像头就用指定的摄像头
vchannels = op.vchannels.all()
if vchannels:
# 找到作业需要加载的算法
algo_codes = list(EventCate.objects.filter(opl_cates=opl.cate).values_list('code', flat=True))
if algo_codes:
while tv == cache.get(tkey, None):
for i in vchannels:
Thread(target=snap_and_analyse, args=(i, algo_codes, opl), daemon=True).start()
time.sleep(10)
cache.delete(tkey)

View File

@ -118,7 +118,7 @@ class DhClient:
""" """
return '{}/evo-apigw/evo-oss/{}?token={}'.format(settings.DAHUA_BASE_URL, path, cache.get('dh_token', '')) return '{}/evo-apigw/evo-oss/{}?token={}'.format(settings.DAHUA_BASE_URL, path, cache.get('dh_token', ''))
def snap(self, code: str): def snap(self, code: str, raise_exception:bool):
"""摄像头实时截图 """摄像头实时截图
Args: Args:
@ -138,7 +138,7 @@ class DhClient:
json_data['params'] = '{\"method\":\"dev.snap\",\"id\":123,\"params\":{\"DevID\":\"' + \ json_data['params'] = '{\"method\":\"dev.snap\",\"id\":123,\"params\":{\"DevID\":\"' + \
str(d_code) + '\",\"DevChannel\":' + str(num) + \ str(d_code) + '\",\"DevChannel\":' + str(num) + \
',\"PicNum\":1,\"SnapType\":1,\"CmdSrc\":0}}' ',\"PicNum\":1,\"SnapType\":1,\"CmdSrc\":0}}'
is_ok, res = self.request(**dhapis['dev_snap'], json=json_data) is_ok, res = self.request(**dhapis['dev_snap'], json=json_data, raise_exception=raise_exception)
if is_ok == 'success': if is_ok == 'success':
res = json.loads(res) res = json.loads(res)
return self.get_full_pic(res['params']['PicInfo']) return self.get_full_pic(res['params']['PicInfo'])