226 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			226 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Python
		
	
	
	
| from __future__ import absolute_import, unicode_literals
 | |
| import os
 | |
| from threading import Thread
 | |
| from channels.layers import get_channel_layer
 | |
| from asgiref.sync import async_to_sync
 | |
| from celery import shared_task
 | |
| import requests
 | |
| from apps.am.models import Area
 | |
| from apps.ecm.models import EventCate, Event
 | |
| from apps.opm.models import Opl
 | |
| from apps.third.dahua import dhClient
 | |
| from apps.third.xunxi import xxClient
 | |
| from apps.third.models import TDevice
 | |
| from apps.third.tapis import xxapis
 | |
| from django.utils import timezone
 | |
| import time
 | |
| from django.core.cache import cache
 | |
| from django.conf import settings
 | |
| from apps.utils.tasks import CustomTask
 | |
| from apps.ecm.models import AlgoChannel
 | |
| from datetime import timedelta
 | |
| from apps.ai.main import algo_dict
 | |
| from datetime import datetime
 | |
| import uuid
 | |
| from apps.ecm.serializers import EventSerializer
 | |
| from django.utils import timezone
 | |
| from django.core.exceptions import ObjectDoesNotExist
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def store_img(code: str, duration: int):
 | |
|     while True:
 | |
|         global_img = dhClient.snap(code)
 | |
|         file_name = global_img.split('/')[-1].split('?')[0]
 | |
|         res = requests.get(url=global_img, verify=False)
 | |
|         path = '/media/temp/store_img/'
 | |
|         full_path = settings.BASE_DIR + path
 | |
|         if not os.path.exists(full_path):
 | |
|             os.makedirs(full_path)
 | |
|         with open(full_path + file_name, 'wb') as f:
 | |
|             f.write(res.content)
 | |
|         time.sleep(duration)
 | |
| 
 | |
| 
 | |
| def update_count_people(i: Area):
 | |
|     from apps.ecm.service import handle_xx_event_3
 | |
|     if i.third_info.get('xx_rail', None):
 | |
|         railId = i.third_info['xx_rail']['id']
 | |
|         json = {"railId": railId, "type": ""}
 | |
|         _, res = xxClient.request(**xxapis['rail_ibeacon_list'], json=json)
 | |
|         blt_list = res['recordList']
 | |
|         macs = []
 | |
|         for m in blt_list:
 | |
|             macs.append(m['userId'])
 | |
|         count_people = TDevice.objects.filter(
 | |
|             type=TDevice.DEVICE_BLT, obj_cate='people', code__in=macs).exclude(employee=None).count()
 | |
|         i.count_people = count_people
 | |
|         i.save()
 | |
|         if count_people >= i.count_people_max:
 | |
|             # 触发超员事件
 | |
|             handle_xx_event_3('over_man', i)
 | |
|         elif count_people < i.count_people_min:
 | |
|             # 触发缺员事件
 | |
|             handle_xx_event_3('lack_man', i)
 | |
|         return {'count': count_people}
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def cal_area_count():
 | |
|     """
 | |
|     计算区域内人员数量
 | |
|     """
 | |
|     areas = Area.objects.filter(type=Area.AREA_TYPE_FIX)
 | |
|     for i in areas:
 | |
|         update_count_people(i)
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def check_event_timeout():
 | |
|     """判断事件处理是否超期
 | |
|     """
 | |
|     for i in Event.objects.filter(handle_user=None, is_timeout=False):
 | |
|         cate = i.cates.all().order_by('priority', 'create_time').first()
 | |
|         if cate and cate.handle_minute > 0 and (timezone.now()-i.create_time).seconds > cate.handle_minute * 60:
 | |
|             i.is_timeout = True
 | |
|             i.save()
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def opl_task(vc_codes: list, opl_id: str):
 | |
|     """作业监控任务(废弃)
 | |
|     """
 | |
|     from apps.ecm.service import snap_and_analyse
 | |
|     start_time = time.time()
 | |
|     opl_cate = Opl.objects.get(id=opl_id).cate
 | |
|     # 找到作业需要加载的算法
 | |
|     algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True))
 | |
|     vchannels = TDevice.objects.filter(code__in=vc_codes)
 | |
|     opl = Opl.objects.get(id=opl_id)
 | |
|     while time.time()-start_time < 10800:   # 一次任务不手动关闭最多持续3小时
 | |
|         for i in vchannels:
 | |
|             Thread(target=snap_and_analyse, args=(i, algo_codes, opl), daemon=True).start()
 | |
|         time.sleep(10)
 | |
|     opl.mtask_uid = None
 | |
|     opl.save()
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def monitor_check(vc_ids = []):
 | |
|     """
 | |
| 
 | |
|     监控任务监控重启/可每隔一段时间执行一次
 | |
|     """
 | |
|     if vc_ids:
 | |
|         vcs = TDevice.objects.filter(id__in=vc_ids, type=TDevice.DEVICE_VCHANNEL)
 | |
|     else:
 | |
|         vcs = TDevice.objects.filter(ac_vchannel__always_on=True, type=TDevice.DEVICE_VCHANNEL)
 | |
|     for vc in vcs:   # 获取每一个要开启的视频通道
 | |
|         algo_codes = AlgoChannel.objects.filter(vchannel=vc, always_on=True).values_list('algo__code', flat=True)
 | |
|         ckey = 'vchannel_' + vc.id
 | |
|         if algo_codes:
 | |
|             tid = str(uuid.uuid4())
 | |
|             cache.set(ckey, {'id': tid, 'algo_codes': algo_codes, 'start_time':  datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, timeout=60)
 | |
|             Thread(target=loop_and_analyse, args=(vc, tid), daemon=True).start()
 | |
|         else:
 | |
|             cache.delete(ckey)
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def loop_and_analyse(vc: TDevice, tid: str):
 | |
|     from apps.ecm.service import snap_and_analyse
 | |
|     ckey = 'vchannel_' + vc.id
 | |
|     while cache.get(ckey, {}).get('id', None) == tid:
 | |
|         vc_dict = cache.get(ckey, {})
 | |
|         if 'algo_codes' in vc_dict:
 | |
|             intersection = list(set(vc_dict['algo_codes']) & set(algo_dict.keys()))   # 取布设与算法的交集
 | |
|             if intersection:
 | |
|                 Thread(target=snap_and_analyse, args=(vc, intersection), daemon=True).start()
 | |
|         time.sleep(10)
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def monitor_and_analyse(vchannel_code: str, algo_codes: list):
 | |
|     """rtsp流监控进行算法识别
 | |
| 
 | |
|     Args:
 | |
|         vchannel_code (str): 视频通道编号
 | |
|         algo_codes (list): 算法代号
 | |
|     """
 | |
|     # cates = EventCate.objects.filter(loop_on=True, self_algo=True)
 | |
|     # AlgoChannel.objects.filter(algo__loop_on=True, algo__self_algo=True).values('algo__code', 'vchannel')
 | |
|     import cv2
 | |
|     # RTSP URL
 | |
|     rtsp_url = dhClient.get_rtsp(vchannel_code)
 | |
|     # 打开RTSP流
 | |
|     cap = cv2.VideoCapture(rtsp_url)
 | |
|     # 检查是否成功打开
 | |
|     if not cap.isOpened():
 | |
|         print("Error opening video stream or file")
 | |
| 
 | |
|     # 循环读取帧
 | |
|     while cap.isOpened():
 | |
|         ret, frame = cap.read()
 | |
| 
 | |
|         if not ret:
 | |
|             break
 | |
| 
 | |
|         # 在这里可以对每一帧进行处理
 | |
| 
 | |
|         # 显示帧
 | |
|         cv2.namedWindow("Frame", 0)
 | |
|         cv2.resizeWindow("Frame", 1600, 900)
 | |
|         cv2.imshow("Frame", frame)
 | |
| 
 | |
|         # 按 'q' 键退出
 | |
|         if cv2.waitKey(1) & 0xFF == ord('q'):
 | |
|             break
 | |
| 
 | |
|     # 释放资源
 | |
|     cap.release()
 | |
|     cv2.destroyAllWindows()
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def compressed_all_ecm_image():
 | |
|     from apps.ecm.service import compress_global_img
 | |
|     events = Event.objects.exclude(global_img=None)
 | |
|     for event in events:
 | |
|         if event.global_img and event.global_img_compressed is None:
 | |
|             event.global_img_compressed = compress_global_img(event.global_img)
 | |
|             event.save()
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def remind_push(remindId: str):
 | |
|     from apps.ecm.models import Remind
 | |
|     from apps.ecm.serializers import RemindSerializer
 | |
|     remind = Remind.objects.get(id=remindId)
 | |
| 
 | |
|     channel_layer = get_channel_layer()
 | |
|     data = {
 | |
|         'type': 'remind',
 | |
|         'remind': RemindSerializer(instance=remind).data,
 | |
|         'msg': ''
 | |
|     }
 | |
|     async_to_sync(channel_layer.group_send)(f"user_{remind.recipient.id}", data)
 | |
| 
 | |
| 
 | |
| @shared_task(base=CustomTask)
 | |
| def event_push(eventId: str):
 | |
|     try:
 | |
|         event = Event.objects.get(id=eventId)
 | |
|     except ObjectDoesNotExist:
 | |
|         return
 | |
|     if event.handle_time is None:
 | |
|         now = timezone.now()
 | |
|         channel_layer = get_channel_layer()
 | |
|         data = {
 | |
|             'type': 'event',
 | |
|             'event': EventSerializer(instance=event).data,
 | |
|             'msg': ''
 | |
|         }
 | |
|         async_to_sync(channel_layer.group_send)('event', data)
 | |
|         event.last_ws_pushtime = timezone.now()
 | |
|         event.save()
 | |
|         if event.create_time + timedelta(hours=48) < now:
 | |
|             pass
 | |
|         else:
 | |
|             main_cate = event.cates.all().order_by('priority', 'create_time').first()
 | |
|             unhandled_push_interval = main_cate.unhandled_push_interval
 | |
|             event_push.apply_async(args=[eventId], eta=now + timedelta(seconds=unhandled_push_interval)) |