import requests from apps.am.models import Access, Area from apps.am.tasks import cache_areas_info from apps.ecm.models import Event, EventCate, Eventdo from apps.hrm.models import Employee from apps.system.models import User from apps.third.clients import xxClient from apps.third.models import TDevice from apps.third.tapis import xxapis from apps.utils.queryset import get_child_queryset2 from django.core.cache import cache import time import shapely.geometry from apps.third.clients import dhClient, spClient from django.utils import timezone from django.conf import settings import os from apps.utils.speech import generate_voice requests.packages.urllib3.disable_warnings() def get_area_info_from_cache(target: str, cache: list): for i in cache: if i['id'] == target: return i return None def save_dahua_pic(pic: str): """保存大华报警图片到本地 返回本地路径 """ file_name = pic.split('/')[-1] full_url = dhClient.get_full_pic(pic) res = requests.get(url=full_url, verify=False) path = '/media/' + timezone.now().strftime('%Y/%m/%d/') full_path = settings.BASE_DIR + path if not os.path.exists(full_path): os.makedirs(full_path) with open(full_path + file_name, 'wb') as f: f.write(res.content) return path + file_name class EcmService: """事件处理服务 """ @classmethod def get_ep_default(cls): return { 'area_fix_id': None, # 当前所在固定区域ID 'time0': None, # 定位首次出现时间戳 "time1": None, # 首次在该区域时间戳 "time2": int(time.time()), # 当前时间戳 } def create_remind_and_speak(cls): """ 创建事件提醒并发送短信/微信/音响 """ pass @classmethod def dispatch_dahua_event(cls, data: dict): """分发大华事件进行处理 """ vchannel_code = data['info']['nodeCode'] alarm_type = data['info']['alarmType'] vchannel = TDevice.objects.filter(code=vchannel_code).first() print(data) if alarm_type in [1001003, 1001000] and vchannel: # 内部人员/或陌生人报警 # 加载算法逻辑 # 安全帽检测 ec = EventCate.objects.filter(code='helmet').first() # 模拟发生安全帽事件 # 视频区域 area = vchannel.area if ec and area: # 保存照片 face_img = save_dahua_pic(data['info']['alarmPicture']) global_img = save_dahua_pic(data['info']['extend']['globalScenePicUrl']) event = Event() event.face_img = face_img event.global_img = global_img event.area = area event.obj_cate = 'people' ep = None # 找到人员 if alarm_type == 1001003: ep = Employee.objects.filter(id_number=data['info']['extend']['candidateInfo'][0]['id']).first() if ep: voice_msg = '位于{}的{}{},您未佩戴安全帽,请及时处理' ep_name = ep.name ep_type = '员工' if ep.type == 'rempoyee': ep_type = '相关方人员' elif ep.type == 'visitor': ep_type = '访客' event.voice_msg = voice_msg.format(area.name, ep_type, ep_name) else: event.voice_msg = '位于{}的未知人员,您未佩戴安全帽,请及时处理'.format(area.name) event.employee = ep _, event.voice, _ = generate_voice(event.voice_msg, ec.voice_person) event.save() Eventdo.objects.get_or_create(cate=ec, event=event, defaults={ 'cate': ec, 'event': event }) # 喇叭提醒 if event.voice: sps = list(TDevice.objects.filter(area=area, type=TDevice.DEVICE_SPEAKER).values_list('code', flat=True)) if len(sps) == 0: # 找覆盖区的音响 sps = list(TDevice.objects.filter(areas=area, type=TDevice.DEVICE_SPEAKER).values_list('code', flat=True)) if sps: spClient.speak(event.voice, sps) return @classmethod def dispatch_xunxi_event(cls, data: dict): """分发寻息事件进行处理 """ if data.type == 'rail': if data.data.type == 1: # 围栏进入 cls.rail_in(data=data.data) elif data.data.type == 2: # 围栏离开 cls.rail_out(data=data.data) elif data.type == 'onKeyAlarm': # 一键呼救 cls.one_key_alarm(data=data) elif data.type == 'location': # 定位信息 cls.loc_change(data=data) elif data.type == 'onOffLine': if data.data.online: # 标签定位在线 pass else: # 标签定位离线 pass elif data.type == 'lowpower': # 低电量 cls.low_power(data=data.data) elif data.type == 'bltOnOffLineV2': if data.data.online: # 标签通信在线 cls.blt_online(data=data.data) else: # 标签通信离线 cls.blt_offline(data=data.data) @classmethod def rail_in(cls, data): """围栏进入事件 """ # 找到所在围栏 area = Area.objects.filter(third_info__xx_rail__id=data['railId']).first() # 找到进入对象 blts = TDevice.objects.filter(code=data['userId']).first() if area and blts and blts.employee: # 如果是人 if area.type == Area.AREA_TYPE_FIX: # 更新人员位置信息缓存 key_str = 'ep_{}'.format(blts.employee.id) ep_loc_dict = cache.get_or_set( key_str, cls.get_ep_default(), timeout=None ) if ep_loc_dict['area_fix_id'] != area.id: # 如果区域未变化则不动 ep_loc_dict['time1'] = ep_loc_dict['time2'] ep_loc_dict['area_fix_id'] = area.id cache.set(key_str, ep_loc_dict) ep_blt = blts.employee # 标签绑定人员 if ep_blt: for i in Access.objects.filter(area=area).order_by('sort'): # 优先自定义权限过滤 if i.post: # 如果是按岗位设定的 eps_access = Employee.objects.filter(user__posts=i.post) if ep_blt in eps_access and i.type == Access.ACCESS_IN_YES: return elif ep_blt in eps_access and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass elif i.dept: # 如果是按部门设定的 if i.dept.type == 'dept': # 如果是内部部门 depts = get_child_queryset2(i.dept) if ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_YES: return elif ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass elif i.dept.type == 'rparty': # 如果是相关方 if ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_YES: return elif ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass elif i.employee: # 如果是按人设定的 if ep_blt == i.employee and i.type == Access.ACCESS_IN_YES: return elif ep_blt == i.employee and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass # 通用权限设置过滤 if ep_blt.type == 'employee' and area.employee_yes: return elif ep_blt.type == 'remployee' and area.remployee_yes: return elif ep_blt.type == 'visitor' and area.visitor_yes: return else: # 触发非法进入事件 pass elif area and (not blts): # 触发未知标签进入事件 pass @classmethod def rail_out(cls, data): pass @classmethod def low_power(cls, data): # 有绑定对象再提示低电量 blts = TDevice.objects.filter(code=data['userId']).first() if blts.employee: # 触发低电量提醒事件 pass @classmethod def one_key_alarm(cls, data): pass @classmethod def loc_change(cls, data): blts = TDevice.objects.filter(code=data['userId']).first() if blts.employee: # 从缓存查询人员位置信息 time2 = int(time.time()) key_str = 'ep_{}'.format(blts.employee.id) ep_loc_dict = cache.get_or_set( key_str, cls.get_ep_default(), timeout=None ) ep_loc_dict['time2'] = time2 # 从缓存里获取固定区域列表信息 area_fix_list = cache.get('area_fix_list', None) if not area_fix_list: area_fix_list = cache_areas_info() if ep_loc_dict.get('area_fix_id', None): # 如果存在所在固定区域 area_info = get_area_info_from_cache(ep_loc_dict['area_fix_id'], area_fix_list) if area_info: # 在该固定区域停留时间(分钟) stay_minute = int((ep_loc_dict['time2']-ep_loc_dict['time1'])/60) # 判断停留时间是否合理 # 先通过自定义权限过滤(暂未做) # 再经过通用设置过滤 if 0 < stay_minute < area_info['stay_minute_min']: # 触发离岗事件 return elif area_info['stay_minute_max'] < stay_minute: # 触发超时滞留事件 return else: for i in area_fix_list: if data['floorNo'] == i['floor_no']: point = shapely.geometry.Point(data['xMillimeter'], data['yMillimeter']) if i['poly_shape'].intersects(point): # 如果点在多边形中 ep_loc_dict['time1'] = time2 ep_loc_dict['are_id'] = i['area_fix_id'] ep_loc_dict['time2'] = time2 cache.set(key_str, ep_loc_dict) @classmethod def blt_online(cls, data): pass @classmethod def blt_offline(cls, data): pass