127 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			127 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
 | 
						||
import importlib
 | 
						||
import os
 | 
						||
 | 
						||
from django.conf import settings
 | 
						||
import logging
 | 
						||
import math
 | 
						||
from apps.utils.tasks import send_mail_task
 | 
						||
import traceback
 | 
						||
 | 
						||
myLogger = logging.getLogger('log')
 | 
						||
 | 
						||
 | 
						||
algo_dict = {
 | 
						||
    "helmet": "apps.ai.client.helmet",
 | 
						||
    "fire1": "apps.ai.client.fire1",
 | 
						||
    "fangtangfu": "apps.ai.client.fangtangfu",
 | 
						||
    "jingjiedai": "apps.ai.client.jingjiedai",
 | 
						||
    "qiping": "apps.ai.client.qiping"
 | 
						||
}
 | 
						||
 | 
						||
 | 
						||
def ai_analyse(codes: list, global_img: str, face_img: str = '', is_dahua_pic: bool = True):
 | 
						||
    """算法分析图片
 | 
						||
 | 
						||
    Args:
 | 
						||
        codes: 算法列表
 | 
						||
        global_img (str): 全景图片url地址
 | 
						||
        face_img (str): 人脸图片url地址
 | 
						||
    Return:
 | 
						||
        {'qiping': [[x, y]]}
 | 
						||
    """
 | 
						||
    results = {}  # dict key: 触发的事件标识字符 value: 多个矩形框坐标列表; 有两个图片key值
 | 
						||
    global_img_path = ''
 | 
						||
    face_img_path = ''
 | 
						||
    if is_dahua_pic and settings.AI_IP not in ['127.0.0.1', '192.168.10.249']:  # 先保存到本地/主要是现在算法必须得用可访问的远程地址
 | 
						||
        from apps.ecm.service import save_dahua_pic
 | 
						||
        global_img_path = save_dahua_pic(global_img, '/media/temp/')
 | 
						||
        global_img = settings.BASE_URL_OUT + global_img_path  # 用新地址
 | 
						||
        if face_img:
 | 
						||
            face_img_path = save_dahua_pic(face_img, '/media/temp/')
 | 
						||
            face_img = settings.BASE_URL_OUT + face_img_path
 | 
						||
    for i in codes:
 | 
						||
        if i in algo_dict and i not in results:  # 如果算法支持且没有识别过
 | 
						||
            module, func = algo_dict[i].rsplit(".", 1)
 | 
						||
            m = importlib.import_module(module)
 | 
						||
            f = getattr(m, func)
 | 
						||
            try:
 | 
						||
                is_happend, res, rectangle_dict = False, None, {}
 | 
						||
                if i == 'helmet':  # 如果是安全帽识别
 | 
						||
                    if face_img:  # 如果有小图
 | 
						||
                        is_happend, res = f(ip=settings.AI_IP, pic_url=face_img)
 | 
						||
                        # if is_happend:  # 补偿机制: 如果小图识别为未带安全帽,为了保证正确率,再用大图识别一次
 | 
						||
                        #     is_happend, res = getattr(m, 'helmet2')(ip=settings.AI_IP, pic_url=global_img)
 | 
						||
                    else:
 | 
						||
                        is_happend, res = getattr(m, 'helmet2')(ip=settings.AI_IP, pic_url=global_img)
 | 
						||
                else:
 | 
						||
                    is_happend, res = f(ip=settings.AI_IP, pic_url=global_img)
 | 
						||
                if i in ['fire1', 'jingjiedai', 'qiping']:  # 如果是这3类算法就无需再识别,都在一个算法里处理了
 | 
						||
                    has_fire = False  # 默认没有灭火器
 | 
						||
                    has_jingjiedai = False  # 默认没有警戒带
 | 
						||
                    qiping_qd = False  # 气瓶倾倒未发生
 | 
						||
                    rectangle_dict .update({'qiping': []})  # 气瓶倾倒的坐标字典
 | 
						||
                    for x in res.FireinfoList:
 | 
						||
                        if x.fire == 3:  # 气瓶倾倒
 | 
						||
                            qiping_qd = True
 | 
						||
                            rectangle_dict['qiping'].append(
 | 
						||
                                [(math.ceil(x.coord.uleft.x), math.ceil(x.coord.uleft.y)), (math.ceil(x.coord.lright.x), math.ceil(x.coord.lright.y))])  # 加入矩形框
 | 
						||
                        if x.fire == 0:
 | 
						||
                            has_fire = True
 | 
						||
                        if x.fire == 2:
 | 
						||
                            has_jingjiedai = True
 | 
						||
                    if (i == 'fire1' and not has_fire) or (i == 'jingjiedai' and not has_jingjiedai) or (i == 'qiping' and qiping_qd):
 | 
						||
                        results.update({i: rectangle_dict.get(i, [])})
 | 
						||
                if is_happend and (i not in results):
 | 
						||
                    results.update({i: rectangle_dict.get(i, [])})
 | 
						||
            except Exception:
 | 
						||
                myLogger.error('算法处理错误', exc_info=True)
 | 
						||
    if global_img_path:
 | 
						||
        os.remove(settings.BASE_DIR + global_img_path)  # 删除临时图片
 | 
						||
    if face_img_path:
 | 
						||
        os.remove(settings.BASE_DIR + face_img_path)
 | 
						||
    return results
 | 
						||
 | 
						||
 | 
						||
def ai_analyse_2(codes: list, global_img: str, face_img: str = '', is_dahua_pic: bool = True):
 | 
						||
    """算法分析图片后保存并返回相对地址
 | 
						||
 | 
						||
    Args:
 | 
						||
        codes: 算法列表
 | 
						||
        global_img (str): 全景图片url地址
 | 
						||
        face_img (str): 人脸图片url地址
 | 
						||
    """
 | 
						||
    results = ai_analyse(codes, global_img, face_img, is_dahua_pic)
 | 
						||
    try:
 | 
						||
        if results and is_dahua_pic:  # 如果触发事件且是大华图片先保存下来
 | 
						||
            from apps.ecm.service import save_dahua_pic
 | 
						||
            global_img_path = save_dahua_pic(global_img)
 | 
						||
            global_img_local = settings.BASE_DIR + global_img_path
 | 
						||
            draw(global_img_local, results)  # 绘制矩形框
 | 
						||
            results.update({'global_img': global_img_path})
 | 
						||
    except Exception as e:
 | 
						||
        send_mail_task.delay(message=traceback.format_exc())
 | 
						||
        myLogger.error('算法返回信息处理错误', exc_info=True)
 | 
						||
    return results
 | 
						||
 | 
						||
 | 
						||
def draw(path, results):
 | 
						||
    import cv2
 | 
						||
    rects = []
 | 
						||
    for i in results:
 | 
						||
        rects.extend(results[i])
 | 
						||
    if rects:
 | 
						||
        img = cv2.imread(path)
 | 
						||
        font = cv2.FONT_HERSHEY_COMPLEX_SMALL
 | 
						||
        for i in results:
 | 
						||
            for m in results[i]:
 | 
						||
                cv2.rectangle(img, m[0], m[1], (255, 255, 0), 3)
 | 
						||
                cv2.putText(img, i, m[0], font, 2, (255, 255, 0), 1)
 | 
						||
        cv2.imwrite(path, img)
 | 
						||
 | 
						||
{'qiping': [
 | 
						||
    [
 | 
						||
        (1142.2359619140625, 1295.6361083984375), (1363.11669921875, 1347.201904296875)
 | 
						||
        ]
 | 
						||
    ]
 | 
						||
    } |