from apps.utils.sms import send_sms import requests from apps.am.models import Access, Area from apps.am.tasks import cache_areas_info from apps.ecm.models import AlgoChannel, Event, EventCate, Eventdo, NotifySetting, Remind from apps.hrm.models import Employee from apps.system.models import User from apps.third.clients import xxClient from apps.third.models import TDevice from apps.third.tapis import xxapis from apps.utils.queryset import get_child_queryset2, get_parent_queryset from django.core.cache import cache import time from shapely.geometry import Point from apps.third.clients import dhClient, spClient from django.utils import timezone from django.conf import settings import os from apps.utils.speech import generate_voice from threading import Thread from apps.utils.tools import timestamp_to_time from apps.vm.models import Visit requests.packages.urllib3.disable_warnings() def update_remind_read(event: Event, user: User): qs = Remind.objects.filter(event=event, recipient=user, is_read=False) if qs.exists(): qs.update(is_read=True) def get_area_info_from_cache(target: str, cache: list): """从区域信息缓存里匹配到所在区域 Args: target (str): 区域ID cache (list): Returns: _type_: _description_ """ for i in cache: if i['id'] == target: return i return None def save_dahua_pic(pic_url: str): """保存大华报警图片到本地 返回本地路径 """ file_name = pic_url.split('/')[-1].split('?')[0] res = requests.get(url=pic_url, verify=False) path = '/media/' + timezone.now().strftime('%Y/%m/%d/') full_path = settings.BASE_DIR + path if not os.path.exists(full_path): os.makedirs(full_path) with open(full_path + file_name, 'wb') as f: f.write(res.content) return path + file_name def get_ep_default(): """返回人员默认位置信息 """ return { 'area_fix_id': None, # 当前所在固定区域ID 'time0': None, # 定位首次出现时间戳 "time1": None, # 首次在该区域时间戳 "time2": int(time.time()), # 当前时间戳 } def algo_handle(codes: list, data: dict): """算法 Args: code (str): 算法标识列表 data (dict): 需要处理的内容 """ return ['helmet'] def notify_event(event: Event): """事件后续处理: Args: event (Event): _description_ """ # 生成通知文本 voice_msg = '' ep = event.employee if ep: ep_name = ep.name ep_type = '员工' if ep.type == 'rempoyee': ep_type = '相关方人员' elif ep.type == 'visitor': ep_type = '访客' voice_msg = '位于{}的{}{},'.format(event.area.name, ep_type, ep_name) else: voice_msg = '位于{}的未知人员,'.format(event.area.name) for i in event.cates.all(): voice_msg = voice_msg + i.name + ',' event.voice_msg = voice_msg + '请及时处理' event.save() # 喇叭播放 Thread(target=save_voice_and_speak, args=(event,), daemon=True).start() # 创建提醒 Thread(target=create_remind, args=(event,), daemon=True).start() def save_voice_and_speak(event: Event): """生成语音同时喇叭播放 Args: event (Event): _description_ """ main_cate = event.cates.all().order_by('priority', 'create_time').first() v_p, v_num = main_cate.voice_person, main_cate.voice_num _, event.voice, _ = generate_voice(event.voice_msg, v_p) event.save() sps = list(TDevice.objects.filter(area=event.area, type=TDevice.DEVICE_SPEAKER).values_list('code', flat=True)) if len(sps) == 0: # 如果当前区域没有喇叭就找覆盖区的喇叭 sps = list(TDevice.objects.filter(areas=event.area, type=TDevice.DEVICE_SPEAKER).values_list('code', flat=True)) # 查找固定喇叭 for m in event.cates.all(): for n in m.speakers.all(): if n.code not in sps: sps.append(n.code) if sps: spClient.speak(event.voice, sps, v_num) def create_remind(event: Event): """ 创建事件提醒并发送短信/微信 """ # 向当事人本人发送通知 # if event.employee and event.employee.phone: # t_sms = Thread(target=send_sms, args=(event.employee.phone, '1001', {'code': '5678'}), daemon=True) # t_sms.start() # 查找所有提醒配置 n_s = NotifySetting.objects.filter(event_cate__in=event.cates.all()).order_by('sort') area_level = event.area.level for i in n_s: if i.user and area_level >= i.filter_area_level: Remind.objects.get_or_create(event=event, recipient=i.user, defaults={ 'event': event, 'recipient': i.user, 'notify_setting': i, 'can_handle': i.can_handle, }) elif i.post and area_level >= i.filter_area_level: qs = User.objects.filter(posts=i.post) if i.filter_recipient == 20: # 当事人所在部门以上 if event.employee and event.employee.user: qs = qs.filter(depts__in=get_parent_queryset(event.employee.user.belong_dept)) elif i.filter_recipient == 40: # 属地部门以上 if event.area.belong_dept: qs = qs.filter(depts__in=get_parent_queryset(event.area.belong_dept)) for m in qs: Remind.objects.get_or_create(event=event, recipient=m, defaults={ 'event': event, 'recipient': m, 'notify_setting': i, 'can_handle': i.can_handle, }) elif i.variable and area_level >= i.filter_area_level: if i.variable == 'self': if event.employee and event.employee.user: Remind.objects.get_or_create(event=event, recipient=event.employee.user, defaults={ 'event': event, 'recipient': event.employee.user, 'notify_setting': i, 'can_handle': i.can_handle, }) elif i.variable == 'area_manager': if event.area.manager: Remind.objects.get_or_create(event=event, recipient=event.area.manager, defaults={ 'event': event, 'recipient': event.area.manager, 'notify_setting': i, 'can_handle': i.can_handle, }) elif i.variable == 'visit_receptionist': if event.employee and event.employee.type == 'visitor': # 确定是访客 visit = Visit.objects.filter(visitors__employee=event.employee, state=Visit.V_WORKING).first() if visit: Remind.objects.get_or_create(event=event, recipient=visit.receptionist, defaults={ 'event': event, 'recipient': visit.receptionist, 'notify_setting': i, 'can_handle': i.can_handle, }) # 开始发送通知 for i in Remind.objects.filter(event=event): if i.notify_setting.sms_enable: if i.recipient.employee.phone: # 短信通知 Thread(target=send_sms, args=(i.recipient.employee.phone, '1001', {'code': '5678'}), daemon=True).start() if i.notify_setting.wechat_enable: pass event.is_pushed = True event.save() def dispatch_dahua_event(data: dict): """分发大华事件进行处理 """ vchannel_code = data['info']['nodeCode'] alarm_type = data['info']['alarmType'] vchannel = TDevice.objects.filter(code=vchannel_code).first() event = None print(data) if alarm_type in [1001003, 1001000] and vchannel: # 内部人员/或陌生人报警 # 查看加载的算法 algo_codes = list(AlgoChannel.objects.filter(vchannel=vchannel).exclude( algo__code=None).order_by('algo__priority', 'algo__create_time').values_list('algo__code', flat=True)) area = vchannel.area # 视频所在区域 if algo_codes and area: # 如果加载了算法且视频通道绑定区域才继续 face_img_o = dhClient.get_full_pic(data['info']['alarmPicture']) global_img_o = dhClient.get_full_pic(data['info']['extend']['globalScenePicUrl']) obj_cate = 'people' ep = None # 对应人员 if alarm_type == 1001003: # 内部人员 ep = Employee.objects.filter(id_number=data['info']['extend']['candidateInfo'][0]['id']).first() ec_codes = algo_handle(algo_codes, data={}) # 算法处理 if ec_codes: # 如果触发事件 # 获取本次所有发生事件种类 ecs = EventCate.objects.filter(code__in=ec_codes) # 创建事件 event = Event() event.face_img = save_dahua_pic(face_img_o) event.global_img = save_dahua_pic(global_img_o) event.area = area event.obj_cate = obj_cate event.vchannel = vchannel event.employee = ep event.happen_time = timestamp_to_time(int(data['info']['alarmDate'])) event.save() for i in ecs: Eventdo.objects.get_or_create(cate=i, event=event, defaults={ 'cate': i, 'event': event }) if event: notify_event(event) def dispatch_xunxi_event(data: dict): """分发寻息事件进行处理 """ if data.type == 'rail': if data.data.type == 1: # 围栏进入 rail_in(data=data.data) elif data.data.type == 2: # 围栏离开 rail_out(data=data.data) elif data.type == 'onKeyAlarm': # 一键呼救 one_key_alarm(data=data) elif data.type == 'location': # 定位信息 loc_change(data=data) elif data.type == 'onOffLine': if data.data.online: # 标签定位在线 pass else: # 标签定位离线 pass elif data.type == 'lowpower': # 低电量 low_power(data=data.data) elif data.type == 'bltOnOffLineV2': if data.data.online: # 标签通信在线 blt_online(data=data.data) else: # 标签通信离线 blt_offline(data=data.data) def rail_in(data): """围栏进入事件 """ # 找到所在围栏 area = Area.objects.filter(third_info__xx_rail__id=data['railId']).first() # 找到进入对象 blts = TDevice.objects.filter(code=data['userId']).first() if area and blts and blts.employee: # 如果是人 if area.type == Area.AREA_TYPE_FIX: # 如果是固定区域 # 更新人员位置信息缓存 key_str = 'ep_{}'.format(blts.employee.id) ep_loc_dict = cache.get_or_set( key_str, get_ep_default(), timeout=None ) if ep_loc_dict['area_fix_id'] != area.id: # 如果区域未变化则不动 ep_loc_dict['time1'] = ep_loc_dict['time2'] ep_loc_dict['area_fix_id'] = area.id cache.set(key_str, ep_loc_dict) ep_blt = blts.employee # 标签绑定人员 if ep_blt: for i in Access.objects.filter(area=area).order_by('sort'): # 优先自定义权限过滤 if i.post: # 如果是按岗位设定的 eps_access = Employee.objects.filter(user__posts=i.post) if ep_blt in eps_access and i.type == Access.ACCESS_IN_YES: return elif ep_blt in eps_access and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass elif i.dept: # 如果是按部门设定的 if i.dept.type == 'dept': # 如果是内部部门 depts = get_child_queryset2(i.dept) if ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_YES: return elif ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass elif i.dept.type == 'rparty': # 如果是相关方 if ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_YES: return elif ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass elif i.employee: # 如果是按人设定的 if ep_blt == i.employee and i.type == Access.ACCESS_IN_YES: return elif ep_blt == i.employee and i.type == Access.ACCESS_IN_NO: # 触发非法进入事件 pass # 通用权限设置过滤 if ep_blt.type == 'employee' and area.employee_yes: return elif ep_blt.type == 'remployee' and area.remployee_yes: return elif ep_blt.type == 'visitor' and area.visitor_yes: return else: # 触发非法进入事件 pass elif area and (not blts): # 触发未知标签进入事件 pass def rail_out(data): pass def low_power(data): # 有绑定对象再提示低电量 blts = TDevice.objects.filter(code=data['userId']).first() if blts.employee: # 触发低电量提醒事件 pass def one_key_alarm(data): pass def loc_change(data): blts = TDevice.objects.filter(code=data['userId']).first() if blts.employee: # 从缓存查询人员位置信息 time2 = int(time.time()) key_str = 'ep_{}'.format(blts.employee.id) ep_loc_dict = cache.get_or_set( key_str, get_ep_default(), timeout=None ) ep_loc_dict['time2'] = time2 # 从缓存里获取固定区域列表信息 area_fix_list = cache.get('area_fix_list', None) if not area_fix_list: area_fix_list = cache_areas_info() if ep_loc_dict.get('area_fix_id', None): # 如果存在所在固定区域 area_info = get_area_info_from_cache(ep_loc_dict['area_fix_id'], area_fix_list) if area_info: # 在该固定区域停留时间(分钟) stay_minute = int((ep_loc_dict['time2']-ep_loc_dict['time1'])/60) # 判断停留时间是否合理 # 先通过自定义权限过滤(暂未做) # 再经过通用设置过滤 if 0 < stay_minute < area_info['stay_minute_min']: # 触发离岗事件 return elif area_info['stay_minute_max'] < stay_minute: # 触发超时滞留事件 return else: for i in area_fix_list: if data['floorNo'] == i['floor_no']: point = Point(data['xMillimeter'], data['yMillimeter']) if i['polygon'].intersects(point): # 如果点在多边形中 ep_loc_dict['time1'] = time2 ep_loc_dict['are_id'] = i['area_fix_id'] ep_loc_dict['time2'] = time2 cache.set(key_str, ep_loc_dict) def blt_online(data): pass def blt_offline(data): pass