import requests from apps.am.models import Access, Area from apps.am.tasks import cache_areas_info from apps.ecm.models import Event, EventCate from apps.hrm.models import Employee from apps.system.models import User from apps.third.clients import xxClient from apps.third.models import TDevice from apps.third.tapis import xxapis from apps.utils.queryset import get_child_queryset2 from django.core.cache import cache import time import shapely.geometry from apps.third.clients import dhClient from django.utils import timezone from django.conf import settings import os requests.packages.urllib3.disable_warnings() def get_area_info_from_cache(target: str, cache: list): for i in cache: if i['id'] == target: return i return None def save_dahua_pic(pic: str): """保存大华报警图片到本地 返回本地路径 """ file_name = pic.split('/')[-1] full_url = dhClient.get_full_pic(pic) res = requests.get(url=full_url, verify=False) path = '/media/' + timezone.now().strftime('%Y/%m/%d/') full_path = settings.BASE_DIR + path if not os.path.exists(full_path): os.makedirs(full_path) with open(full_path + file_name, 'wb') as f: f.write(res.content) return path + file_name class EcmService: """事件处理服务 """ @classmethod def get_ep_default(cls): return { 'area_fix_id': None, # 当前所在固定区域ID 'time0': None, # 定位首次出现时间戳 "time1": None, # 首次在该区域时间戳 "time2": int(time.time()), # 当前时间戳 } def create_remind_and_speak(cls): """ 创建事件提醒并发送短信/微信/音响 """ pass @classmethod def dispatch_dahua_event(cls, data: dict): """分发大华事件进行处理 """ vchannel_code = data['info']['nodeCode'] alarm_type = data['info']['alarmType'] vchannel = TDevice.objects.filter(code=vchannel_code).first() print(data) if alarm_type in [1001003, 1001000] and vchannel: # 内部人员报警 # 加载算法逻辑 # 安全帽检测 ec = EventCate.objects.filter(code='helmet').first() # 模拟发生安全帽事件 # 视频区域 area = vchannel.area if ec and area: # 保存照片 face_img = save_dahua_pic(data['info']['alarmPicture']) global_img = save_dahua_pic(data['info']['extend']['globalScenePicUrl']) event = Event() event.face_img = face_img event.global_img = global_img event.area = area event.obj_cate = 'people' event.employee = None @classmethod def dispatch_xunxi_event(cls, data: dict): """分发寻息事件进行处理 """ if data.type == 'rail': if data.data.type == 1: # 围栏进入 cls.rail_in(data=data.data) elif data.data.type == 2: # 围栏离开 cls.rail_out(data=data.data) elif data.type == 'onKeyAlarm': # 一键呼救 cls.one_key_alarm(data=data) elif data.type == 'location': # 定位信息 cls.loc_change(data=data) elif data.type == 'onOffLine': if data.data.online: # 标签定位在线 pass else: # 标签定位离线 pass elif data.type == 'lowpower': # 低电量 cls.low_power(data=data.data) elif data.type == 'bltOnOffLineV2': if data.data.online: # 标签通信在线 cls.blt_online(data=data.data) else: # 标签通信离线 cls.blt_offline(data=data.data) @classmethod def rail_in(cls, data): """围栏进入事件 """ # 找到所在围栏 area = Area.objects.filter(third_info__xx_rail__id=data['railId']).first() # 找到进入对象 blts = TDevice.objects.filter(code=data['userId']).first() if area and blts and blts.employee: # 如果是人 if area.type == Area.AREA_TYPE_FIX: # 更新人员位置信息缓存 key_str = 'ep_{}'.format(blts.employee.id) ep_loc_dict = cache.get_or_set( key_str, cls.get_ep_default(), timeout=None ) if ep_loc_dict['area_fix_id'] != area.id: # 如果区域未变化则不动 ep_loc_dict['time1'] = ep_loc_dict['time2'] ep_loc_dict['area_fix_id'] = area.id cache.set(key_str, ep_loc_dict) ep_blt = blts.employee # 标签绑定人员 if ep_blt: for i in Access.objects.filter(area=area).order_by('sort'): # 优先自定义权限过滤 if i.post: # 如果是按岗位设定的 eps_access = Employee.objects.filter(user__posts=i.post) if ep_blt in eps_access and i.type == Access.ACCESS_IN_YES: return elif ep_blt in eps_access and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass elif i.dept: # 如果是按部门设定的 if i.dept.type == 'dept': # 如果是内部部门 depts = get_child_queryset2(i.dept) if ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_YES: return elif ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass elif i.dept.type == 'rparty': # 如果是相关方 if ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_YES: return elif ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass elif i.employee: # 如果是按人设定的 if ep_blt == i.employee and i.type == Access.ACCESS_IN_YES: return elif ep_blt == i.employee and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass # 通用权限设置过滤 if ep_blt.type == 'employee' and area.employee_yes: return elif ep_blt.type == 'remployee' and area.remployee_yes: return elif ep_blt.type == 'visitor' and area.visitor_yes: return else: # 触发非法进入事件 pass elif area and (not blts): # 触发未知标签进入事件 pass @classmethod def rail_out(cls, data): pass @classmethod def low_power(cls, data): # 有绑定对象再提示低电量 blts = TDevice.objects.filter(code=data['userId']).first() if blts.employee: # 触发低电量提醒事件 pass @classmethod def one_key_alarm(cls, data): pass @classmethod def loc_change(cls, data): blts = TDevice.objects.filter(code=data['userId']).first() if blts.employee: # 从缓存查询人员位置信息 time2 = int(time.time()) key_str = 'ep_{}'.format(blts.employee.id) ep_loc_dict = cache.get_or_set( key_str, cls.get_ep_default(), timeout=None ) ep_loc_dict['time2'] = time2 # 从缓存里获取固定区域列表信息 area_fix_list = cache.get('area_fix_list', None) if not area_fix_list: area_fix_list = cache_areas_info() if ep_loc_dict.get('area_fix_id', None): # 如果存在所在固定区域 area_info = get_area_info_from_cache(ep_loc_dict['area_fix_id'], area_fix_list) if area_info: # 在该固定区域停留时间(分钟) stay_minute = int((ep_loc_dict['time2']-ep_loc_dict['time1'])/60) # 判断停留时间是否合理 # 先通过自定义权限过滤(暂未做) # 再经过通用设置过滤 if 0 < stay_minute < area_info['stay_minute_min']: # 触发离岗事件 return elif area_info['stay_minute_max'] < stay_minute: # 触发超时滞留事件 return else: for i in area_fix_list: if data['floorNo'] == i['floor_no']: point = shapely.geometry.Point(data['xMillimeter'], data['yMillimeter']) if i['poly_shape'].intersects(point): # 如果点在多边形中 ep_loc_dict['time1'] = time2 ep_loc_dict['are_id'] = i['area_fix_id'] ep_loc_dict['time2'] = time2 cache.set(key_str, ep_loc_dict) @classmethod def blt_online(cls, data): pass @classmethod def blt_offline(cls, data): pass