factory/apps/ecm/service.py

288 lines
12 KiB
Python

from apps.utils.sms import send_sms
import requests
from apps.am.models import Access, Area
from apps.am.tasks import cache_areas_info
from apps.ecm.models import Event, EventCate, Eventdo
from apps.hrm.models import Employee
from apps.system.models import User
from apps.third.clients import xxClient
from apps.third.models import TDevice
from apps.third.tapis import xxapis
from apps.utils.queryset import get_child_queryset2
from django.core.cache import cache
import time
import shapely.geometry
from apps.third.clients import dhClient, spClient
from django.utils import timezone
from django.conf import settings
import os
from apps.utils.speech import generate_voice
from threading import Thread
requests.packages.urllib3.disable_warnings()
def get_area_info_from_cache(target: str, cache: list):
for i in cache:
if i['id'] == target:
return i
return None
def save_dahua_pic(pic: str):
"""保存大华报警图片到本地
返回本地路径
"""
file_name = pic.split('/')[-1]
full_url = dhClient.get_full_pic(pic)
res = requests.get(url=full_url, verify=False)
path = '/media/' + timezone.now().strftime('%Y/%m/%d/')
full_path = settings.BASE_DIR + path
if not os.path.exists(full_path):
os.makedirs(full_path)
with open(full_path + file_name, 'wb') as f:
f.write(res.content)
return path + file_name
class EcmService:
"""事件处理服务
"""
@classmethod
def get_ep_default(cls):
return {
'area_fix_id': None, # 当前所在固定区域ID
'time0': None, # 定位首次出现时间戳
"time1": None, # 首次在该区域时间戳
"time2": int(time.time()), # 当前时间戳
}
@classmethod
def create_remind_and_speak(cls, event: Event):
"""
创建事件提醒并发送短信/微信/喇叭
"""
# 喇叭提醒
if event.voice:
sps = list(TDevice.objects.filter(area=event.area, type=TDevice.DEVICE_SPEAKER).values_list('code', flat=True))
if len(sps) == 0: # 找覆盖区的音响
sps = list(TDevice.objects.filter(areas=event.area,
type=TDevice.DEVICE_SPEAKER).values_list('code', flat=True))
if sps:
t = Thread(target=spClient.speak, args=(event.voice, sps), daemon=True)
t.start()
if event.employee and event.employee.phone:
t = Thread(target=send_sms, args=(event.employee.phone, '1001', {'code': '5678'}), daemon=True)
t.start()
@classmethod
def dispatch_dahua_event(cls, data: dict):
"""分发大华事件进行处理
"""
vchannel_code = data['info']['nodeCode']
alarm_type = data['info']['alarmType']
vchannel = TDevice.objects.filter(code=vchannel_code).first()
print(data)
if alarm_type in [1001003, 1001000] and vchannel: # 内部人员/或陌生人报警
# 加载算法逻辑
# 安全帽检测
ec = EventCate.objects.filter(code='helmet').first() # 模拟发生安全帽事件
# 视频区域
area = vchannel.area
if ec and area:
# 保存照片
face_img = save_dahua_pic(data['info']['alarmPicture'])
global_img = save_dahua_pic(data['info']['extend']['globalScenePicUrl'])
event = Event()
event.face_img = face_img
event.global_img = global_img
event.area = area
event.obj_cate = 'people'
ep = None # 找到人员
if alarm_type == 1001003:
ep = Employee.objects.filter(id_number=data['info']['extend']['candidateInfo'][0]['id']).first()
if ep:
voice_msg = '位于{}{}{},您未佩戴安全帽,请及时处理'
ep_name = ep.name
ep_type = '员工'
if ep.type == 'rempoyee':
ep_type = '相关方人员'
elif ep.type == 'visitor':
ep_type = '访客'
event.voice_msg = voice_msg.format(area.name, ep_type, ep_name)
else:
event.voice_msg = '位于{}的未知人员,您未佩戴安全帽,请及时处理'.format(area.name)
event.employee = ep
_, event.voice, _ = generate_voice(event.voice_msg, ec.voice_person)
event.save()
Eventdo.objects.get_or_create(cate=ec, event=event, defaults={
'cate': ec,
'event': event
})
cls.create_remind_and_speak(event=event)
@classmethod
def dispatch_xunxi_event(cls, data: dict):
"""分发寻息事件进行处理
"""
if data.type == 'rail':
if data.data.type == 1:
# 围栏进入
cls.rail_in(data=data.data)
elif data.data.type == 2:
# 围栏离开
cls.rail_out(data=data.data)
elif data.type == 'onKeyAlarm':
# 一键呼救
cls.one_key_alarm(data=data)
elif data.type == 'location':
# 定位信息
cls.loc_change(data=data)
elif data.type == 'onOffLine':
if data.data.online:
# 标签定位在线
pass
else:
# 标签定位离线
pass
elif data.type == 'lowpower':
# 低电量
cls.low_power(data=data.data)
elif data.type == 'bltOnOffLineV2':
if data.data.online:
# 标签通信在线
cls.blt_online(data=data.data)
else:
# 标签通信离线
cls.blt_offline(data=data.data)
@classmethod
def rail_in(cls, data):
"""围栏进入事件
"""
# 找到所在围栏
area = Area.objects.filter(third_info__xx_rail__id=data['railId']).first()
# 找到进入对象
blts = TDevice.objects.filter(code=data['userId']).first()
if area and blts and blts.employee: # 如果是人
if area.type == Area.AREA_TYPE_FIX:
# 更新人员位置信息缓存
key_str = 'ep_{}'.format(blts.employee.id)
ep_loc_dict = cache.get_or_set(
key_str, cls.get_ep_default(), timeout=None
)
if ep_loc_dict['area_fix_id'] != area.id: # 如果区域未变化则不动
ep_loc_dict['time1'] = ep_loc_dict['time2']
ep_loc_dict['area_fix_id'] = area.id
cache.set(key_str, ep_loc_dict)
ep_blt = blts.employee # 标签绑定人员
if ep_blt:
for i in Access.objects.filter(area=area).order_by('sort'):
# 优先自定义权限过滤
if i.post: # 如果是按岗位设定的
eps_access = Employee.objects.filter(user__posts=i.post)
if ep_blt in eps_access and i.type == Access.ACCESS_IN_YES:
return
elif ep_blt in eps_access and i.type == Access.ACCESS_IN_NO:
# 触发非法进入事件
pass
elif i.dept: # 如果是按部门设定的
if i.dept.type == 'dept': # 如果是内部部门
depts = get_child_queryset2(i.dept)
if ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_YES:
return
elif ep_blt.belong_dept in depts and i.type == Access.ACCESS_IN_NO:
# 触发非法进入事件
pass
elif i.dept.type == 'rparty': # 如果是相关方
if ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_YES:
return
elif ep_blt.belong_dept == i.dept and i.type == Access.ACCESS_IN_NO:
# 触发非法进入事件
pass
elif i.employee: # 如果是按人设定的
if ep_blt == i.employee and i.type == Access.ACCESS_IN_YES:
return
elif ep_blt == i.employee and i.type == Access.ACCESS_IN_NO:
# 触发非法进入事件
pass
# 通用权限设置过滤
if ep_blt.type == 'employee' and area.employee_yes:
return
elif ep_blt.type == 'remployee' and area.remployee_yes:
return
elif ep_blt.type == 'visitor' and area.visitor_yes:
return
else:
# 触发非法进入事件
pass
elif area and (not blts):
# 触发未知标签进入事件
pass
@classmethod
def rail_out(cls, data):
pass
@classmethod
def low_power(cls, data):
# 有绑定对象再提示低电量
blts = TDevice.objects.filter(code=data['userId']).first()
if blts.employee:
# 触发低电量提醒事件
pass
@classmethod
def one_key_alarm(cls, data):
pass
@classmethod
def loc_change(cls, data):
blts = TDevice.objects.filter(code=data['userId']).first()
if blts.employee:
# 从缓存查询人员位置信息
time2 = int(time.time())
key_str = 'ep_{}'.format(blts.employee.id)
ep_loc_dict = cache.get_or_set(
key_str, cls.get_ep_default(), timeout=None
)
ep_loc_dict['time2'] = time2
# 从缓存里获取固定区域列表信息
area_fix_list = cache.get('area_fix_list', None)
if not area_fix_list:
area_fix_list = cache_areas_info()
if ep_loc_dict.get('area_fix_id', None):
# 如果存在所在固定区域
area_info = get_area_info_from_cache(ep_loc_dict['area_fix_id'], area_fix_list)
if area_info:
# 在该固定区域停留时间(分钟)
stay_minute = int((ep_loc_dict['time2']-ep_loc_dict['time1'])/60)
# 判断停留时间是否合理
# 先通过自定义权限过滤(暂未做)
# 再经过通用设置过滤
if 0 < stay_minute < area_info['stay_minute_min']:
# 触发离岗事件
return
elif area_info['stay_minute_max'] < stay_minute:
# 触发超时滞留事件
return
else:
for i in area_fix_list:
if data['floorNo'] == i['floor_no']:
point = shapely.geometry.Point(data['xMillimeter'], data['yMillimeter'])
if i['poly_shape'].intersects(point): # 如果点在多边形中
ep_loc_dict['time1'] = time2
ep_loc_dict['are_id'] = i['area_fix_id']
ep_loc_dict['time2'] = time2
cache.set(key_str, ep_loc_dict)
@classmethod
def blt_online(cls, data):
pass
@classmethod
def blt_offline(cls, data):
pass