109 lines
4.5 KiB
Python
109 lines
4.5 KiB
Python
|
||
import importlib
|
||
import os
|
||
import cv2
|
||
from django.conf import settings
|
||
import logging
|
||
|
||
myLogger = logging.getLogger('log')
|
||
|
||
|
||
algo_dict = {
|
||
"helmet": "apps.ai.client.helmet",
|
||
"fire1": "apps.ai.client.fire1",
|
||
"fangtangfu": "apps.ai.client.fangtangfu",
|
||
"jingjiedai": "apps.ai.client.jingjiedai",
|
||
"qiping": "apps.ai.client.qiping"
|
||
}
|
||
|
||
|
||
def ai_analyse(codes: list, global_img: str, face_img: str = '', is_dahua_pic: bool = True):
|
||
"""算法分析图片
|
||
|
||
Args:
|
||
codes: 算法列表
|
||
global_img (str): 全景图片url地址
|
||
face_img (str): 人脸图片url地址
|
||
"""
|
||
results = {} # dict key: 触发的事件标识字符 value: 多个矩形框坐标列表; 有两个图片key值
|
||
global_img_path = ''
|
||
face_img_path = ''
|
||
if is_dahua_pic: # 先保存到本地/主要是现在算法必须得用可访问的远程地址
|
||
from apps.ecm.service import save_dahua_pic
|
||
global_img_path = save_dahua_pic(global_img)
|
||
global_img = settings.BASE_URL_OUT + global_img_path # 用新地址
|
||
if face_img:
|
||
face_img_path = save_dahua_pic(face_img)
|
||
face_img = settings.BASE_URL_OUT + face_img_path
|
||
for i in codes:
|
||
if i in algo_dict and i not in results: # 如果算法支持且没有识别过
|
||
module, func = algo_dict[i].rsplit(".", 1)
|
||
m = importlib.import_module(module)
|
||
f = getattr(m, func)
|
||
try:
|
||
is_happend, res, rectangle_dict = False, None, {}
|
||
if i == 'helmet': # 如果是安全帽抠图识别
|
||
if face_img:
|
||
is_happend, res = f(ip=settings.AI_IP, pic_url=face_img)
|
||
else:
|
||
is_happend, res = getattr(m, 'helmet2')(ip=settings.AI_IP, pic_url=global_img)
|
||
else:
|
||
is_happend, res = f(ip=settings.AI_IP, pic_url=global_img)
|
||
if i in ['fire1', 'jingjiedai', 'qiping']: # 如果是这3类算法就无需再识别,都在一个算法里处理了
|
||
has_fire = False # 默认没有灭火器
|
||
has_jingjiedai = False # 默认没有警戒带
|
||
qiping_qd = False # 气瓶倾倒未发生
|
||
rectangle_dict .update({'qiping': []}) # 气瓶倾倒的坐标字典
|
||
for x in res.FireinfoList:
|
||
if x.fire == 3 and 'qiping' in codes: # 气瓶倾倒
|
||
qiping_qd = True
|
||
rectangle_dict['qiping'].append(
|
||
[(x.coord.uleft.x, x.coord.uleft.y), (x.coord.lright.x, x.coord.lright.y)]) # 加入矩形框
|
||
if x.fire == 0:
|
||
has_fire = True
|
||
if x.fire == 2:
|
||
has_jingjiedai = True
|
||
if (i == 'fire1' and not has_fire) or (i == 'jingjiedai' and not has_jingjiedai) or (i == 'qiping' and qiping_qd):
|
||
results.update({i: rectangle_dict.get(i, [])})
|
||
if is_happend and (i not in results):
|
||
results.update({i: rectangle_dict.get(i, [])})
|
||
except Exception:
|
||
myLogger.error('算法处理错误', exc_info=True)
|
||
if global_img_path:
|
||
os.remove(settings.BASE_DIR + global_img_path) # 删除临时图片
|
||
if face_img_path:
|
||
os.remove(settings.BASE_DIR + face_img_path)
|
||
return results
|
||
|
||
|
||
def ai_analyse_2(codes: list, global_img: str, face_img: str = '', is_dahua_pic: bool = True):
|
||
"""算法分析图片后保存并返回相对地址
|
||
|
||
Args:
|
||
codes: 算法列表
|
||
global_img (str): 全景图片url地址
|
||
face_img (str): 人脸图片url地址
|
||
"""
|
||
results = ai_analyse(codes, global_img, face_img, is_dahua_pic)
|
||
if results: # 如果触发事件先保存下来
|
||
from apps.ecm.service import save_dahua_pic
|
||
global_img_path = save_dahua_pic(global_img)
|
||
global_img_local = settings.BASE_DIR + global_img_path
|
||
draw(global_img_local, results)
|
||
results.update({'global_img': global_img_path})
|
||
return results
|
||
|
||
|
||
def draw(path, results):
|
||
rects = []
|
||
for i in results:
|
||
rects.extend(results[i])
|
||
if rects:
|
||
img = cv2.imread(path)
|
||
font = cv2.FONT_HERSHEY_COMPLEX_SMALL
|
||
for i in results:
|
||
for m in results[i]:
|
||
cv2.rectangle(img, results[i][m][0], results[i][m][1], (0, 255, 0), 4)
|
||
cv2.putText(img, i, results[i][m][0], font, 2, (0, 0, 255), 1)
|
||
cv2.imwrite(path, img)
|