126 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			126 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
| 
 | ||
| import importlib
 | ||
| import os
 | ||
| import cv2
 | ||
| from django.conf import settings
 | ||
| import logging
 | ||
| import math
 | ||
| from apps.utils.tasks import send_mail_task
 | ||
| import traceback
 | ||
| 
 | ||
| myLogger = logging.getLogger('log')
 | ||
| 
 | ||
| 
 | ||
| algo_dict = {
 | ||
|     "helmet": "apps.ai.client.helmet",
 | ||
|     "fire1": "apps.ai.client.fire1",
 | ||
|     "fangtangfu": "apps.ai.client.fangtangfu",
 | ||
|     "jingjiedai": "apps.ai.client.jingjiedai",
 | ||
|     "qiping": "apps.ai.client.qiping"
 | ||
| }
 | ||
| 
 | ||
| 
 | ||
| def ai_analyse(codes: list, global_img: str, face_img: str = '', is_dahua_pic: bool = True):
 | ||
|     """算法分析图片
 | ||
| 
 | ||
|     Args:
 | ||
|         codes: 算法列表
 | ||
|         global_img (str): 全景图片url地址
 | ||
|         face_img (str): 人脸图片url地址
 | ||
|     Return:
 | ||
|         {'qiping': [[x, y]]}
 | ||
|     """
 | ||
|     results = {}  # dict key: 触发的事件标识字符 value: 多个矩形框坐标列表; 有两个图片key值
 | ||
|     global_img_path = ''
 | ||
|     face_img_path = ''
 | ||
|     if is_dahua_pic and settings.AI_IP not in ['127.0.0.1', '192.168.10.249']:  # 先保存到本地/主要是现在算法必须得用可访问的远程地址
 | ||
|         from apps.ecm.service import save_dahua_pic
 | ||
|         global_img_path = save_dahua_pic(global_img, '/media/temp/')
 | ||
|         global_img = settings.BASE_URL_OUT + global_img_path  # 用新地址
 | ||
|         if face_img:
 | ||
|             face_img_path = save_dahua_pic(face_img, '/media/temp/')
 | ||
|             face_img = settings.BASE_URL_OUT + face_img_path
 | ||
|     for i in codes:
 | ||
|         if i in algo_dict and i not in results:  # 如果算法支持且没有识别过
 | ||
|             module, func = algo_dict[i].rsplit(".", 1)
 | ||
|             m = importlib.import_module(module)
 | ||
|             f = getattr(m, func)
 | ||
|             try:
 | ||
|                 is_happend, res, rectangle_dict = False, None, {}
 | ||
|                 if i == 'helmet':  # 如果是安全帽识别
 | ||
|                     if face_img:  # 如果有小图
 | ||
|                         is_happend, res = f(ip=settings.AI_IP, pic_url=face_img)
 | ||
|                         # if is_happend:  # 补偿机制: 如果小图识别为未带安全帽,为了保证正确率,再用大图识别一次
 | ||
|                         #     is_happend, res = getattr(m, 'helmet2')(ip=settings.AI_IP, pic_url=global_img)
 | ||
|                     else:
 | ||
|                         is_happend, res = getattr(m, 'helmet2')(ip=settings.AI_IP, pic_url=global_img)
 | ||
|                 else:
 | ||
|                     is_happend, res = f(ip=settings.AI_IP, pic_url=global_img)
 | ||
|                 if i in ['fire1', 'jingjiedai', 'qiping']:  # 如果是这3类算法就无需再识别,都在一个算法里处理了
 | ||
|                     has_fire = False  # 默认没有灭火器
 | ||
|                     has_jingjiedai = False  # 默认没有警戒带
 | ||
|                     qiping_qd = False  # 气瓶倾倒未发生
 | ||
|                     rectangle_dict .update({'qiping': []})  # 气瓶倾倒的坐标字典
 | ||
|                     for x in res.FireinfoList:
 | ||
|                         if x.fire == 3:  # 气瓶倾倒
 | ||
|                             qiping_qd = True
 | ||
|                             rectangle_dict['qiping'].append(
 | ||
|                                 [(math.ceil(x.coord.uleft.x), math.ceil(x.coord.uleft.y)), (math.ceil(x.coord.lright.x), math.ceil(x.coord.lright.y))])  # 加入矩形框
 | ||
|                         if x.fire == 0:
 | ||
|                             has_fire = True
 | ||
|                         if x.fire == 2:
 | ||
|                             has_jingjiedai = True
 | ||
|                     if (i == 'fire1' and not has_fire) or (i == 'jingjiedai' and not has_jingjiedai) or (i == 'qiping' and qiping_qd):
 | ||
|                         results.update({i: rectangle_dict.get(i, [])})
 | ||
|                 if is_happend and (i not in results):
 | ||
|                     results.update({i: rectangle_dict.get(i, [])})
 | ||
|             except Exception:
 | ||
|                 myLogger.error('算法处理错误', exc_info=True)
 | ||
|     if global_img_path:
 | ||
|         os.remove(settings.BASE_DIR + global_img_path)  # 删除临时图片
 | ||
|     if face_img_path:
 | ||
|         os.remove(settings.BASE_DIR + face_img_path)
 | ||
|     return results
 | ||
| 
 | ||
| 
 | ||
| def ai_analyse_2(codes: list, global_img: str, face_img: str = '', is_dahua_pic: bool = True):
 | ||
|     """算法分析图片后保存并返回相对地址
 | ||
| 
 | ||
|     Args:
 | ||
|         codes: 算法列表
 | ||
|         global_img (str): 全景图片url地址
 | ||
|         face_img (str): 人脸图片url地址
 | ||
|     """
 | ||
|     results = ai_analyse(codes, global_img, face_img, is_dahua_pic)
 | ||
|     try:
 | ||
|         if results and is_dahua_pic:  # 如果触发事件且是大华图片先保存下来
 | ||
|             from apps.ecm.service import save_dahua_pic
 | ||
|             global_img_path = save_dahua_pic(global_img)
 | ||
|             global_img_local = settings.BASE_DIR + global_img_path
 | ||
|             draw(global_img_local, results)  # 绘制矩形框
 | ||
|             results.update({'global_img': global_img_path})
 | ||
|     except Exception as e:
 | ||
|         send_mail_task.delay(message=traceback.format_exc())
 | ||
|         myLogger.error('算法返回信息处理错误', exc_info=True)
 | ||
|     return results
 | ||
| 
 | ||
| 
 | ||
| def draw(path, results):
 | ||
|     rects = []
 | ||
|     for i in results:
 | ||
|         rects.extend(results[i])
 | ||
|     if rects:
 | ||
|         img = cv2.imread(path)
 | ||
|         font = cv2.FONT_HERSHEY_COMPLEX_SMALL
 | ||
|         for i in results:
 | ||
|             for m in results[i]:
 | ||
|                 cv2.rectangle(img, m[0], m[1], (255, 255, 0), 3)
 | ||
|                 cv2.putText(img, i, m[0], font, 2, (255, 255, 0), 1)
 | ||
|         cv2.imwrite(path, img)
 | ||
| 
 | ||
| {'qiping': [
 | ||
|     [
 | ||
|         (1142.2359619140625, 1295.6361083984375), (1363.11669921875, 1347.201904296875)
 | ||
|         ]
 | ||
|     ]
 | ||
|     } |