from __future__ import absolute_import, unicode_literals import os from threading import Thread from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from celery import shared_task import requests from apps.am.models import Area from apps.ecm.models import EventCate, Eventdo, Event from apps.ecm.service import notify_event, snap_and_analyse from apps.hrm.models import Employee from apps.opm.models import Opl from apps.third.dahua import dhClient from apps.third.xunxi import xxClient from apps.third.models import TDevice from apps.third.tapis import xxapis from django.utils import timezone import time from django.core.cache import cache from django.conf import settings from apps.utils.tasks import CustomTask from apps.ecm.models import AlgoChannel from datetime import timedelta from apps.ai.main import algo_dict from datetime import datetime import uuid from apps.utils.img import compress_image from apps.ecm.serializers import EventSerializer @shared_task(base=CustomTask) def store_img(code: str, duration: int): while True: global_img = dhClient.snap(code) file_name = global_img.split('/')[-1].split('?')[0] res = requests.get(url=global_img, verify=False) path = '/media/temp/store_img/' full_path = settings.BASE_DIR + path if not os.path.exists(full_path): os.makedirs(full_path) with open(full_path + file_name, 'wb') as f: f.write(res.content) time.sleep(duration) def update_count_people(i: Area): if i.third_info.get('xx_rail', None): railId = i.third_info['xx_rail']['id'] json = {"railId": railId, "type": ""} _, res = xxClient.request(**xxapis['rail_ibeacon_list'], json=json) blt_list = res['recordList'] macs = [] for m in blt_list: macs.append(m['userId']) count_people = TDevice.objects.filter( type=TDevice.DEVICE_BLT, obj_cate='people', code__in=macs).exclude(employee=None).count() i.count_people = count_people i.save() if count_people >= i.count_people_max: # 触发超员事件 handle_xx_event_3('over_man', i) elif count_people < i.count_people_min: # 触发缺员事件 handle_xx_event_3('lack_man', i) return {'count': count_people} def handle_xx_event_3(name: str, area: Area): cate = EventCate.objects.filter(code=name).first() if cate: # 告警间隔内不触发 last_event = Event.objects.filter(cates=cate, area=area, obj_cate='area').order_by('-create_time').first() same_allow_minute = cate.same_allow_minute if same_allow_minute >0 and last_event and last_event.create_time + timedelta(minutes=cate.same_allow_minute) > timezone.now(): return event = Event() event.area = area event.obj_cate = 'area' event.happen_time = timezone.now() event.save() Eventdo.objects.get_or_create(cate=cate, event=event, defaults={ 'cate': cate, 'event': event }) voice_msg = area.name + '下有' + str(area.count_people) + '人,' + cate.name + ',请及时处理' notify_event(event, voice_msg=voice_msg) @shared_task def cal_area_count(): """ 计算区域内人员数量 """ areas = Area.objects.filter(type=Area.AREA_TYPE_FIX) for i in areas: update_count_people(i) @shared_task def check_event_timeout(): """判断事件处理是否超期 """ for i in Event.objects.filter(handle_user=None, is_timeout=False): cate = i.cates.all().order_by('priority', 'create_time').first() if cate and cate.handle_minute > 0 and (timezone.now()-i.create_time).seconds > cate.handle_minute * 60: i.is_timeout = True i.save() @shared_task def opl_task(vc_codes: list, opl_id: str): """作业监控任务(废弃) """ start_time = time.time() opl_cate = Opl.objects.get(id=opl_id).cate # 找到作业需要加载的算法 algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True)) vchannels = TDevice.objects.filter(code__in=vc_codes) opl = Opl.objects.get(id=opl_id) while time.time()-start_time < 10800: # 一次任务不手动关闭最多持续3小时 for i in vchannels: Thread(target=snap_and_analyse, args=(i, algo_codes, opl), daemon=True).start() time.sleep(10) opl.mtask_uid = None opl.save() @shared_task(base=CustomTask) def monitor_check(vc_ids = []): """ 监控任务监控重启/可每隔一段时间执行一次 """ if vc_ids: vcs = TDevice.objects.filter(id__in=vc_ids, type=TDevice.DEVICE_VCHANNEL) else: vcs = TDevice.objects.filter(ac_vchannel__always_on=True, type=TDevice.DEVICE_VCHANNEL) for vc in vcs: # 获取每一个要开启的视频通道 algo_codes = AlgoChannel.objects.filter(vchannel=vc, always_on=True).values_list('algo__code', flat=True) ckey = 'vchannel_' + vc.id if algo_codes: tid = str(uuid.uuid4()) cache.set(ckey, {'id': tid, 'algo_codes': algo_codes, 'start_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, timeout=60) Thread(target=loop_and_analyse, args=(vc, tid), daemon=True).start() else: cache.delete(ckey) @shared_task(base=CustomTask) def loop_and_analyse(vc: TDevice, tid: str): ckey = 'vchannel_' + vc.id while cache.get(ckey, {}).get('id', None) == tid: vc_dict = cache.get(ckey, {}) if 'algo_codes' in vc_dict: intersection = list(set(vc_dict['algo_codes']) & set(algo_dict.keys())) # 取布设与算法的交集 if intersection: Thread(target=snap_and_analyse, args=(vc, intersection), daemon=True).start() time.sleep(10) @shared_task(base=CustomTask) def monitor_and_analyse(vchannel_code: str, algo_codes: list): """rtsp流监控进行算法识别 Args: vchannel_code (str): 视频通道编号 algo_codes (list): 算法代号 """ # cates = EventCate.objects.filter(loop_on=True, self_algo=True) # AlgoChannel.objects.filter(algo__loop_on=True, algo__self_algo=True).values('algo__code', 'vchannel') import cv2 # RTSP URL rtsp_url = dhClient.get_rtsp(vchannel_code) # 打开RTSP流 cap = cv2.VideoCapture(rtsp_url) # 检查是否成功打开 if not cap.isOpened(): print("Error opening video stream or file") # 循环读取帧 while cap.isOpened(): ret, frame = cap.read() if not ret: break # 在这里可以对每一帧进行处理 # 显示帧 cv2.namedWindow("Frame", 0) cv2.resizeWindow("Frame", 1600, 900) cv2.imshow("Frame", frame) # 按 'q' 键退出 if cv2.waitKey(1) & 0xFF == ord('q'): break # 释放资源 cap.release() cv2.destroyAllWindows() @shared_task(base=CustomTask) def compressed_all_ecm_image(): from apps.ecm.service import compress_global_img events = Event.objects.exclude(global_img=None) for event in events: if event.global_img and event.global_img_compressed is None: event.global_img_compressed = compress_global_img(event.global_img) event.save() @shared_task(base=CustomTask) def remind_push(remindId: str): from apps.ecm.models import Remind from apps.ecm.serializers import RemindSerializer remind = Remind.objects.get(id=remindId) channel_layer = get_channel_layer() data = { 'type': 'remind', 'remind': RemindSerializer(instance=remind).data, 'msg': '' } async_to_sync(channel_layer.group_send)(f"user_{remind.recipient.id}", data) @shared_task(base=CustomTask) def event_push(eventId: str): event = Event.objects.get(id=eventId) channel_layer = get_channel_layer() data = { 'type': 'event', 'event': EventSerializer(instance=event).data, 'msg': '' } async_to_sync(channel_layer.group_send)('event', data)