factory/apps/ecm/tasks.py

223 lines
7.7 KiB
Python

from __future__ import absolute_import, unicode_literals
import os
from threading import Thread
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from celery import shared_task
import requests
from apps.am.models import Area
from apps.ecm.models import EventCate, Event
from apps.opm.models import Opl
from apps.third.dahua import dhClient
from apps.third.xunxi import xxClient
from apps.third.models import TDevice
from apps.third.tapis import xxapis
from django.utils import timezone
import time
from django.core.cache import cache
from django.conf import settings
from apps.utils.tasks import CustomTask
from apps.ecm.models import AlgoChannel
from datetime import timedelta
from apps.ai.main import algo_dict
from datetime import datetime
import uuid
from apps.ecm.serializers import EventSerializer
from django.utils import timezone
from django.core.exceptions import ObjectDoesNotExist
@shared_task(base=CustomTask)
def store_img(code: str, duration: int):
while True:
global_img = dhClient.snap(code)
file_name = global_img.split('/')[-1].split('?')[0]
res = requests.get(url=global_img, verify=False)
path = '/media/temp/store_img/'
full_path = settings.BASE_DIR + path
if not os.path.exists(full_path):
os.makedirs(full_path)
with open(full_path + file_name, 'wb') as f:
f.write(res.content)
time.sleep(duration)
def update_count_people(i: Area):
from apps.ecm.service import handle_xx_event_3
if i.third_info.get('xx_rail', None):
railId = i.third_info['xx_rail']['id']
json = {"railId": railId, "type": ""}
_, res = xxClient.request(**xxapis['rail_ibeacon_list'], json=json)
blt_list = res['recordList']
macs = []
for m in blt_list:
macs.append(m['userId'])
count_people = TDevice.objects.filter(
type=TDevice.DEVICE_BLT, obj_cate='people', code__in=macs).exclude(employee=None).count()
i.count_people = count_people
i.save()
if count_people >= i.count_people_max:
# 触发超员事件
handle_xx_event_3('over_man', i)
elif count_people < i.count_people_min:
# 触发缺员事件
handle_xx_event_3('lack_man', i)
return {'count': count_people}
@shared_task
def cal_area_count():
"""
计算区域内人员数量
"""
areas = Area.objects.filter(type=Area.AREA_TYPE_FIX)
for i in areas:
update_count_people(i)
@shared_task
def check_event_timeout():
"""判断事件处理是否超期
"""
for i in Event.objects.filter(handle_user=None, is_timeout=False):
cate = i.cates.all().order_by('priority', 'create_time').first()
if cate and cate.handle_minute > 0 and (timezone.now()-i.create_time).seconds > cate.handle_minute * 60:
i.is_timeout = True
i.save()
@shared_task
def opl_task(vc_codes: list, opl_id: str):
"""作业监控任务(废弃)
"""
from apps.ecm.service import snap_and_analyse
start_time = time.time()
opl_cate = Opl.objects.get(id=opl_id).cate
# 找到作业需要加载的算法
algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True))
vchannels = TDevice.objects.filter(code__in=vc_codes)
opl = Opl.objects.get(id=opl_id)
while time.time()-start_time < 10800: # 一次任务不手动关闭最多持续3小时
for i in vchannels:
Thread(target=snap_and_analyse, args=(i, algo_codes, opl), daemon=True).start()
time.sleep(10)
opl.mtask_uid = None
opl.save()
@shared_task(base=CustomTask)
def monitor_check(vc_ids = []):
"""
监控任务监控重启/可每隔一段时间执行一次
"""
if vc_ids:
vcs = TDevice.objects.filter(id__in=vc_ids, type=TDevice.DEVICE_VCHANNEL)
else:
vcs = TDevice.objects.filter(ac_vchannel__always_on=True, type=TDevice.DEVICE_VCHANNEL)
for vc in vcs: # 获取每一个要开启的视频通道
algo_codes = AlgoChannel.objects.filter(vchannel=vc, always_on=True).values_list('algo__code', flat=True)
ckey = 'vchannel_' + vc.id
if algo_codes:
tid = str(uuid.uuid4())
cache.set(ckey, {'id': tid, 'algo_codes': algo_codes, 'start_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, timeout=60)
Thread(target=loop_and_analyse, args=(vc, tid), daemon=True).start()
else:
cache.delete(ckey)
@shared_task(base=CustomTask)
def loop_and_analyse(vc: TDevice, tid: str):
from apps.ecm.service import snap_and_analyse
ckey = 'vchannel_' + vc.id
while cache.get(ckey, {}).get('id', None) == tid:
vc_dict = cache.get(ckey, {})
if 'algo_codes' in vc_dict:
intersection = list(set(vc_dict['algo_codes']) & set(algo_dict.keys())) # 取布设与算法的交集
if intersection:
Thread(target=snap_and_analyse, args=(vc, intersection), daemon=True).start()
time.sleep(10)
@shared_task(base=CustomTask)
def monitor_and_analyse(vchannel_code: str, algo_codes: list):
"""rtsp流监控进行算法识别
Args:
vchannel_code (str): 视频通道编号
algo_codes (list): 算法代号
"""
# cates = EventCate.objects.filter(loop_on=True, self_algo=True)
# AlgoChannel.objects.filter(algo__loop_on=True, algo__self_algo=True).values('algo__code', 'vchannel')
import cv2
# RTSP URL
rtsp_url = dhClient.get_rtsp(vchannel_code)
# 打开RTSP流
cap = cv2.VideoCapture(rtsp_url)
# 检查是否成功打开
if not cap.isOpened():
print("Error opening video stream or file")
# 循环读取帧
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 在这里可以对每一帧进行处理
# 显示帧
cv2.namedWindow("Frame", 0)
cv2.resizeWindow("Frame", 1600, 900)
cv2.imshow("Frame", frame)
# 按 'q' 键退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 释放资源
cap.release()
cv2.destroyAllWindows()
@shared_task(base=CustomTask)
def compressed_all_ecm_image():
from apps.ecm.service import compress_global_img
events = Event.objects.exclude(global_img=None)
for event in events:
if event.global_img and event.global_img_compressed is None:
event.global_img_compressed = compress_global_img(event.global_img)
event.save()
@shared_task(base=CustomTask)
def remind_push(remindId: str):
from apps.ecm.models import Remind
from apps.ecm.serializers import RemindSerializer
remind = Remind.objects.get(id=remindId)
channel_layer = get_channel_layer()
data = {
'type': 'remind',
'remind': RemindSerializer(instance=remind).data,
'msg': ''
}
async_to_sync(channel_layer.group_send)(f"user_{remind.recipient.id}", data)
@shared_task(base=CustomTask)
def event_push(eventId: str):
try:
event = Event.objects.get(id=eventId)
except ObjectDoesNotExist:
return
if event.handle_time is None:
now = timezone.now()
channel_layer = get_channel_layer()
data = {
'type': 'event',
'event': EventSerializer(instance=event).data,
'msg': ''
}
async_to_sync(channel_layer.group_send)('event', data)
event.last_ws_pushtime = timezone.now()
event.save()
main_cate = event.cates.all().order_by('priority', 'create_time').first()
unhandled_push_interval = main_cate.unhandled_push_interval
event_push.apply_async(args=[eventId], eta=now + timedelta(seconds=unhandled_push_interval))