from datetime import timedelta from apps.vm.models import Visit from apps.ai.main import ai_analyse, ai_analyse_2 from apps.opm.models import Operation, Opl, OplWorker from apps.utils.sms import send_sms import requests from apps.am.models import Access, Area from apps.am.tasks import cache_areas_info from apps.ecm.models import AlgoChannel, Event, EventCate, Eventdo, NotifySetting, Remind from apps.hrm.models import ClockRecord, Employee from apps.system.models import User from apps.third.models import TDevice from apps.utils.queryset import get_child_queryset2, get_parent_queryset from django.core.cache import cache import time from shapely.geometry import Point from apps.third.dahua import dhClient from apps.third.xunxi import xxClient from apps.third.speaker import spClient from django.utils import timezone from django.conf import settings import os from apps.utils.speech import generate_voice from threading import Thread from apps.utils.tools import timestamp_to_time from apps.third.tapis import xxapis import logging from datetime import datetime from apps.utils.img import compress_image import traceback myLogger = logging.getLogger('log') requests.packages.urllib3.disable_warnings() def update_remind_read(event: Event, user: User): qs = Remind.objects.filter(event=event, recipient=user, is_read=False) if qs.exists(): qs.update(is_read=True) def get_area_info_from_cache(target: str, cache: list): """从区域信息缓存里匹配到所在区域 """ for i in cache: if i['id'] == target: return i return None def save_dahua_pic(pic_url: str, save_path: str = '/media/'): """保存大华报警图片到本地(全路径) 返回本地路径 """ file_name = pic_url.split('/')[-1].split('?')[0] res = requests.get(url=pic_url, verify=False) path = save_path + timezone.now().strftime('%Y/%m/%d/') full_path = settings.BASE_DIR + path if not os.path.exists(full_path): os.makedirs(full_path) with open(full_path + file_name, 'wb') as f: f.write(res.content) return path + file_name def compress_global_img(path_file): full_path_file = settings.BASE_DIR + path_file try: out_file, _ = compress_image(full_path_file) return out_file.replace(settings.BASE_DIR, '') except: myLogger.error(f'图片压缩失败-{full_path_file}-{traceback.format_exc()}') return None def get_ep_default(): """返回人员默认位置信息 """ return { 'area_fix_id': None, # 当前所在固定区域ID 'area_fix_name': None, # 当前所在固定区域Name 'area_temp_id': None, # 当前所在临时区域ID 'xx_detail': {}, # 寻息定位的详细信息 'time0': None, # 定位首次出现时间戳 "time1": None, # 首次在该区域时间戳 "time2": int(time.time()), # 当前时间戳 } def gen_params(event: Event): """ 生成短信模板发送参数 """ # 生成通知文本 ep = event.employee obj_cate = event.obj_cate area_name = event.area.name if event.area else '未知区域' params = {'area': area_name, 'employee': '', 'event': ''} if event.opl: # 主要针对的是作业人员未就位事件 params['employee'] = event.opl.cate.name + '作业' elif ep: ep_name = ep.name ep_type = '正式员工' if ep.type == 'remployee': ep_type = '相关方人员' elif ep.type == 'visitor': ep_type = '访客' elif ep.type == 'driver': ep_type = '货车司机' params['employee'] = ep_type + ep_name elif obj_cate == 'people': params['employee'] = '人员' cats_list = [] for i in event.cates.all(): cats_list.append(i.name) params['event'] = ','.join(cats_list) return params def notify_event(event: Event): """事件后续处理: Args: event (Event): _description_ """ from apps.ecm.tasks import event_push params = {} if event.voice_msg: # 如果已经生成通知文本了就不再处理 pass else: params = gen_params(event) if params['employee']: event.voice_msg = '位于{}的{},{},请及时处理'.format( params['area'], params['employee'], params['event']) else: event.voice_msg = '在{}下,发生{},请及时处理'.format( params['area'], params['event']) event.save() # 喇叭播放(任何情况下) Thread(target=save_voice_and_speak, args=(event,), daemon=True).start() # 事件ws推送 event_push.delay(event.id) # 如果还未创建提醒 if not event.is_pushed: if not params: params = gen_params(event) Thread(target=create_remind, args=(event, params), daemon=True).start() def save_voice_and_speak(event: Event): """生成语音同时喇叭播放 Args: event (Event): _description_ """ try: main_cate = event.cates.all().order_by('priority', 'create_time').first() v_p, v_num = main_cate.voice_person, main_cate.voice_num # if event.voice: # 如果已经生成了报警声音不用再请求 # pass # else: # _, event.voice, _ = generate_voice(event.voice_msg, v_p) # event.save() if main_cate.speaker_on and event.voice_msg: sps = [] if event.area: # 如果事件存在发生区域 sps = list(TDevice.objects.filter(area=event.area, type=TDevice.DEVICE_SPEAKER).values_list('code', flat=True)) if len(sps) == 0: # 如果当前区域没有喇叭就找覆盖区的喇叭 sps = list(TDevice.objects.filter(areas=event.area, type=TDevice.DEVICE_SPEAKER).values_list('code', flat=True)) # 查找固定喇叭 sps2 = list(TDevice.objects.filter( ec_speakers__event_cates=event).values_list('code', flat=True)) for i in sps2: if i not in sps: sps.append(i) if sps: myLogger.info(f'喇叭播放:{event.voice_msg}, {sps}, {v_num}') spClient.speak(event.voice_msg, sps, v_num, v_p=v_p) except Exception: myLogger.error('喇叭播放失败', exc_info=True) def create_remind(event: Event, params: dict): """ 创建事件提醒并发送短信/微信 """ from apps.ecm.tasks import remind_push # 向当事人本人发送通知 # if event.employee and event.employee.phone: # t_sms = Thread(target=send_sms, args=(event.employee.phone, '1001', {'code': '5678'}), daemon=True) # t_sms.start() # 查找所有提醒配置 n_s = NotifySetting.objects.filter( event_cate__in=event.cates.all()).order_by('sort') area_level = event.area.level if event.area else Area.AREA_4 for i in n_s: if i.user and area_level >= i.filter_area_level: Remind.objects.get_or_create(event=event, recipient=i.user, defaults={ 'event': event, 'recipient': i.user, 'notify_setting': i, 'can_handle': i.can_handle, }) elif i.post and area_level >= i.filter_area_level: qs = User.objects.filter(posts=i.post) if i.filter_recipient == 20 and event.employee and event.employee.user: qs = qs.filter(depts__in=get_parent_queryset( event.employee.user.belong_dept)) elif i.filter_recipient == 10 and event.employee and event.employee.user: qs = qs.filter(depts=event.employee.user.belong_dept) elif i.filter_recipient == 40 and event.area.belong_dept: qs = qs.filter(depts__in=get_parent_queryset( event.area.belong_dept)) elif i.filter_recipient == 30 and event.area.belong_dept: qs = qs.filter(depts=event.area.belong_dept) else: qs = User.objects.none() for m in qs: Remind.objects.get_or_create(event=event, recipient=m, defaults={ 'event': event, 'recipient': m, 'notify_setting': i, 'can_handle': i.can_handle, }) elif i.variable and area_level >= i.filter_area_level: if i.variable == 'self': if event.employee and event.employee.user: Remind.objects.get_or_create(event=event, recipient=event.employee.user, defaults={ 'event': event, 'recipient': event.employee.user, 'notify_setting': i, 'can_handle': i.can_handle, }) elif i.variable == 'area_manager': if event.area and event.area.manager: Remind.objects.get_or_create(event=event, recipient=event.area.manager, defaults={ 'event': event, 'recipient': event.area.manager, 'notify_setting': i, 'can_handle': i.can_handle, }) elif i.variable == 'visit_receptionist': if event.employee and event.employee.type == 'visitor': # 确定是访客 visit = Visit.objects.filter( visitors__employee=event.employee, state=Visit.V_WORKING).first() if visit: Remind.objects.get_or_create(event=event, recipient=visit.receptionist, defaults={ 'event': event, 'recipient': visit.receptionist, 'notify_setting': i, 'can_handle': i.can_handle, }) # 开始发送通知 for i in Remind.objects.filter(event=event): remind_push.delay(i.id) if i.notify_setting.sms_enable and i.recipient.phone: # 发送短信通知 Thread(target=send_sms, args=(i.recipient.phone, 1003, params), daemon=True).start() if i.notify_setting.wechat_enable: pass event.is_pushed = True event.save() def check_not_in_place(opl: Opl): # 检查作业人员未就位事件(直接创建事件) end_time = opl.end_time if timezone.now() >= end_time: # 如果大于预计结束时间不做检查了 return area = opl.operation.area if 'xx_rail' in area.third_info: railId = area.third_info['xx_rail']['id'] json = { "railId": railId, "type": "" } is_ok, res = xxClient.request( **xxapis['rail_ibeacon_list'], json=json, raise_exception=False) if is_ok == 'success': blt_list = res['recordList'] macs = [] # 所有该区域在线标签 for i in blt_list: macs.append(i['userId']) tds = TDevice.objects.filter( type=TDevice.DEVICE_BLT, employee__user__opl_worker__opl=opl) # 所有工作人员的定位标签 for i in tds: if i.code not in macs: # 触发作业人员未就位事件 cate = EventCate.objects.get(code='not_in_place') event = Event() event.area = area event.obj_cate = 'opl' event.employee = i.employee event.opl = opl event.happen_time = timezone.now() event.save() Eventdo.objects.get_or_create(cate=cate, event=event, defaults={ 'cate': cate, 'event': event }) notify_event(event) break def check_miss_lcard(ep: Employee, area: Area): # 获取当前区域下的定位卡列表 if 'xx_rail' in area.third_info: railId = area.third_info['xx_rail']['id'] json = { "railId": railId, "type": "" } is_ok, res = xxClient.request( **xxapis['rail_ibeacon_list'], json=json, raise_exception=False) if is_ok == 'success': blt_list = res['recordList'] macs = [] # 所有该区域在线标签 for i in blt_list: macs.append(i['userId']) td = TDevice.objects.filter( type=TDevice.DEVICE_BLT, employee=ep).first() if td and td.code in macs: return False return True def dispatch_dahua_event(data: dict): """分发大华事件进行处理 """ vchannel_code = data['info']['nodeCode'] alarm_type = data['info']['alarmType'] vchannel = TDevice.objects.filter(code=vchannel_code).first() event = None if vchannel is None: return face_img_o = None global_img_o = None area = vchannel.area obj_cate = 'other' ep = None if alarm_type in [1001003, 1001000]: obj_cate = 'people' face_img_o = dhClient.get_full_pic(data['info']['alarmPicture']) global_img_o = dhClient.get_full_pic( data['info']['extend']['globalScenePicUrl']) else: global_img_o = dhClient.get_full_pic(data['info']['alarmPicture']) happen_time = timestamp_to_time(int(data['info']['alarmDate'])) algo_channels = AlgoChannel.objects.filter(vchannel=vchannel).exclude( algo__code=None).order_by('algo__priority', 'algo__create_time').values('id', 'algo', 'algo__code') cates = [] # 触发的事件种类 algo_codes = [] # 需要执行的ai算法 ec_codes = {} # 算法返回的信息 for i in algo_channels: if i['algo__code'] == str(alarm_type): cates.append(i['algo']) else: algo_codes.append(i['algo__code']) if algo_codes: ec_codes = ai_analyse_2( algo_codes, global_img=global_img_o, face_img=face_img_o) # 算法处理 for m in ec_codes.keys(): for n in algo_channels: if m == n['algo__code']: cates.append(n['algo']) if alarm_type == 1001003 and area: # 内部人员需要执行未带定位卡算法 ep = Employee.objects.filter( id_number=data['info']['extend']['candidateInfo'][0]['id']).first() # 检查是否携带定位卡只针对内部员工和相关方 if 'miss_lcard' in algo_codes and ep: is_happend = check_miss_lcard(ep=ep, area=area) if is_happend: cate_obj = EventCate.objects.filter(code='miss_lcard').first() if cate_obj and check_same_allow_minute_and_raise_event(cate_obj, ep): cates.append(cate_obj.id) if cates: event = Event() event.global_img = ec_codes['global_img'] if ec_codes.get( 'global_img', None) else save_dahua_pic(global_img_o) event.global_img_compressed = compress_global_img(event.global_img) if face_img_o: event.face_img = save_dahua_pic(face_img_o) event.area = area event.obj_cate = obj_cate event.vchannel = vchannel event.employee = ep event.happen_time = happen_time event.save() current_level = 10 for i in cates: cate = EventCate.objects.get(id=i) if cate.origin_level > current_level: event.current_level = current_level Eventdo.objects.get_or_create(cate=cate, event=event, defaults={ 'cate': cate, 'event': event }) event.save() if event: notify_event(event) def rail_in(data): """围栏进入 """ # 找到所在围栏 area = Area.objects.filter(third_info__xx_rail__id=data['railId']).first() # 找到进入对象 blts = TDevice.objects.filter(code=data['userId']).first() if area and blts and blts.employee: # 如果是人 ep_blt = blts.employee # 标签绑定人员 if ep_blt: for i in Access.objects.filter(area=area).order_by('sort'): # 优先自定义权限过滤 if i.post: # 如果是按岗位设定的 eps_access = Employee.objects.filter(user__posts=i.post) if ep_blt in eps_access and i.type == Access.ACCESS_IN_YES: return elif ep_blt in eps_access and i.type == Access.ACCESS_IN_NO: # 触发违规进入事件 handle_xx_event_2('i_enter', ep=ep_blt, area=area) elif i.dept: # 如果是按部门设定的 if i.dept.type == 'dept': # 如果是内部部门 depts = get_child_queryset2(i.dept) if ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_YES: return elif ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_NO: # 触发违规进入事件 return handle_xx_event_2('i_enter', ep=ep_blt, area=area) elif i.dept.type == 'rparty': # 如果是相关方 if ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_YES: return elif ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_NO: # 触发违规进入事件 return handle_xx_event_2('i_enter', ep=ep_blt, area=area) elif i.employee: # 如果是按人设定的 if ep_blt == i.employee and i.type == Access.ACCESS_IN_YES: return elif ep_blt == i.employee and i.type == Access.ACCESS_IN_NO: # 触发违规进入事件 return handle_xx_event_2('i_enter', ep=ep_blt, area=area) # 通用权限设置过滤 access_list = area.access_list if ep_blt.type not in access_list: # 触发违规进入事件 return handle_xx_event_2('i_enter', ep=ep_blt, area=area) # elif area and (not blts): # # 触发未知标签进入事件 # e_i_enter(ep=ep_blt, area=area) def rail_out(data): pass def loc_change(data): blts = TDevice.objects.filter(code=data['userId']).first() if blts and blts.employee: # 从缓存查询人员位置信息 key_str = 'ep_{}'.format(blts.employee.id) ep_loc_dict = cache.get_or_set( key_str, get_ep_default(), timeout=None ) ep_loc_dict['time2'] = int(time.time()) ep_loc_dict['xx_detail'] = data area_fix, area_temp = get_area_from_point( data['longitude'], data['latitude'], data['floorNo'], ep_loc_dict['area_fix_id']) time2 = int(time.time()) ep_loc_dict['area_temp_id'] = area_temp['id'] if area_temp else None ep_loc_dict['time2'] = time2 if area_fix and ep_loc_dict['area_fix_id'] == area_fix['id']: ep_loc_dict['area_fix_name'] = area_fix.get( 'name', None) # 兼容性处理,后续可去除 # 如果停留在该区域 cache.set(key_str, ep_loc_dict, timeout=None) # 在该固定区域停留时间(分钟) stay_minute = int((ep_loc_dict['time2']-ep_loc_dict['time1'])/60) # 判断停留时间是否合理 # 先通过自定义权限过滤(暂未做) # 再经过通用设置过滤 code_name = '' # if 0 < stay_minute < area_fix['stay_minute_min']: # # 触发离岗事件 # code_name = 'leave_area' if area_fix['stay_minute_max'] > 0 and area_fix['stay_minute_max'] < stay_minute: # 触发超时滞留事件 code_name = 'stand_area' if code_name: handle_xx_event_2(code_name, ep=blts.employee, area=Area.objects.get(id=area_fix['id'])) else: ep_loc_dict['time1'] = time2 ep_loc_dict['area_fix_id'] = area_fix['id'] if area_fix else None ep_loc_dict['area_fix_name'] = area_fix.get( 'name', None) if area_fix else None cache.set(key_str, ep_loc_dict, timeout=None) return ep_loc_dict def check_same_allow_minute_and_raise_event(cate: EventCate, employee: Employee = None, area: Area = None, obj_cate: str = None): """ 根据配置告警间隔判断是否需要报出事件 """ filters = {'cates': cate} if employee: filters['employee'] = employee if area: filters['area'] = area if obj_cate: filters['obj_cate'] = obj_cate last_event = Event.objects.filter( **filters).order_by('-create_time').first() minutes = cate.same_allow_minute if minutes > 0 and last_event and last_event.create_time + timedelta(minutes=minutes) > timezone.now(): return False return True def handle_xx_event(name: str, data: dict): # 有绑定对象再提示事件(包括一键报警事件/低电量) blts = TDevice.objects.filter(code=data['mac']).first() ep_loc_dict = {} if blts and blts.employee: # 触发事件 cate = EventCate.objects.filter(code=name).first() # 找到最近未处理同一人发生的事件 if cate: # 5分钟内不再次触发 if check_same_allow_minute_and_raise_event(cate, blts.employee) is False: return event = Event() # 查询定位信息 key_str = 'ep_{}'.format(blts.employee.id) ep_loc_dict = cache.get(key_str, None) if ep_loc_dict: area = None if ep_loc_dict['area_fix_id']: area = Area.objects.get(id=ep_loc_dict['area_fix_id']) event.area = area event.location = ep_loc_dict event.obj_cate = 'people' event.employee = blts.employee event.happen_time = timezone.now() event.save() Eventdo.objects.get_or_create(cate=cate, event=event, defaults={ 'cate': cate, 'event': event }) notify_event(event) return ep_loc_dict def handle_xx_event_2(name: str, ep: Employee, area: Area): # 违规进入事件特殊处理 # 找寻该区域下审批和进行的作业, 本厂或相关方人员, 如是就不触发 cate = EventCate.objects.filter(code=name).first() if cate: if name == 'i_enter' and ep.type in ['employee', 'remployee']: if check_same_allow_minute_and_raise_event(cate, ep, area) is False: return ops = Operation.objects.filter(area=area, state__in=[ Operation.OP_AUDIT, Operation.OP_WAIT, Operation.OP_WORK]) if OplWorker.objects.filter(opl__operation__in=ops, worker__employee=ep).exists(): # 如果是作业人员 return elif ops.filter(coordinator__employee=ep).exists(): # 如果是协调员 return elif Opl.objects.filter(operation__in=ops, charger__employee=ep).exists(): # 如果是作业负责人 return elif Opl.objects.filter(operation__in=ops, monitor__employee=ep).exists(): # 如果是作业监护人 return event = Event() event.area = area # 查询定位信息 key_str = 'ep_{}'.format(ep.id) ep_loc_dict = cache.get(key_str, None) if ep_loc_dict: event.location = ep_loc_dict event.obj_cate = 'people' event.employee = ep event.happen_time = timezone.now() event.save() Eventdo.objects.get_or_create(cate=cate, event=event, defaults={ 'cate': cate, 'event': event }) notify_event(event) def blt_online(data): # 定位在线 blts = TDevice.objects.filter(code=data['userId']).first() if blts.employee: ep = blts.employee if ep.type == 'employee' and ep.is_atwork is False: # 上班打卡 now = timezone.now() cr_10 = ClockRecord.objects.filter(type=10, employee=ep, create_time__year=now.year, create_time__month=now.month, create_time__day=now.day).first() if cr_10: if now < cr_10.create_by: cr_10.create_by = now cr_10.trigger = 'location' cr_10.detail = data cr_10.save() else: cr_10 = ClockRecord() cr_10.type = 10 cr_10.employee = ep cr_10.trigger = 'location' cr_10.detail = data cr_10.save() ep.is_atwork = True ep.last_check_time = now ep.save() def blt_offline(data): blts = TDevice.objects.filter(code=data['userId']).first() # 定位离线 if blts.employee: ep = blts.employee if ep.type == 'employee' and ep.is_atwork: # 下班打卡 now = timezone.now() cr_20 = ClockRecord.objects.filter(type=20, employee=ep, create_time__year=now.year, create_time__month=now.month, create_time__day=now.day).first() if cr_20: if now > cr_20.create_time: cr_20.create_time = now cr_20.trigger = 'location' cr_20.detail = data cr_20.save() else: cr_20 = ClockRecord() cr_20.type = 20 cr_20.employee = ep cr_20.trigger = 'location' cr_20.detail = data cr_20.save() ep.is_atwork = False ep.last_check_time = now ep.save() def get_area_from_point(x: int, y: int, floorNo: str, area_fix_id): """ 从位置信息获取所在固定区域(可能有一个区域) 返回一个固定区域, 一个临时区域 """ area_fix = None area_temp = None area_list = cache.get('area_list', None) if not area_list: cache_areas_info() area_list = cache.get('area_list', None) point = Point(x, y) for i in area_list: if floorNo == i['floor_no']: if i['polygon'].intersects(point): # 如果点在多边形中 if i['id'] == area_fix_id: area_fix = i elif i['type'] == Area.AREA_TYPE_FIX: area_fix = i elif i['type'] == Area.AREA_TYPE_TEMP: area_temp = i return area_fix, area_temp def snap_and_analyse(vchannel: TDevice, algo_codes: list, opl: Opl = None): global_img_o = dhClient.snap(vchannel.code, raise_exception=False) happen_time = timezone.now() ckey = 'vchannel_' + vchannel.id cvalue = cache.get(ckey, {}) if global_img_o is None: # 说明视频抓拍失败 if cvalue: try: cvalue.update( {'snap': 'fail', 'snap_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")}) cache.set(ckey, cvalue, timeout=60) except: pass return else: if cvalue: try: cvalue.update( {'snap': 'success', 'snap_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")}) cache.set(ckey, cvalue, timeout=60) except: pass ec_codes = ai_analyse_2(algo_codes, global_img=global_img_o) # 算法处理返回的事件结果 if ec_codes: # 获取本次所有发生事件种类 ecs = EventCate.objects.filter(code__in=ec_codes.keys()) if not ecs.exists(): return obj_cate = 'opl' if opl else 'other' ep = None if 'helmet' in ec_codes or 'helmet2' in ec_codes and obj_cate == 'other': obj_cate = 'people' # 如果存在安全帽事件 # 尝试以图搜图找到当事人 # res = dhClient.face_search(path=global_img_o) # if res and res[0]: # ep = Employee.objects.filter(id_number=res[0]['identity']).first() pass ecs = [cate for cate in ecs if check_same_allow_minute_and_raise_event( cate, ep, None, obj_cate) is True] if ecs: event = Event() event.global_img = ec_codes['global_img'] if ec_codes.get( 'global_img', None) else save_dahua_pic(global_img_o) event.global_img_compressed = compress_global_img(event.global_img) event.vchannel = vchannel event.area = vchannel.area event.obj_cate = obj_cate event.employee = ep event.happen_time = happen_time event.opl = opl event.save() for i in ecs: Eventdo.objects.get_or_create(cate=i, event=event, defaults={ 'cate': i, 'event': event }) if event: notify_event(event) def handle_xx_event_3(name: str, area: Area): cate = EventCate.objects.filter(code=name).first() if cate: if check_same_allow_minute_and_raise_event(cate, None, area) is False: return event = Event() event.area = area event.obj_cate = 'area' event.happen_time = timezone.now() event.save() Eventdo.objects.get_or_create(cate=cate, event=event, defaults={ 'cate': cate, 'event': event }) voice_msg = area.name + '下有' + \ str(area.count_people) + '人,' + cate.name + ',请及时处理' notify_event(event, voice_msg=voice_msg)