factory/apps/ecm/tasks.py

128 lines
4.3 KiB
Python

from __future__ import absolute_import, unicode_literals
from multiprocessing import Event
from threading import Thread
from celery import shared_task
from apps.am.models import Area
from apps.ecm.models import EventCate, Eventdo
from apps.ecm.service import algo_handle, notify_event, save_dahua_pic
from apps.hrm.models import Employee
from apps.opm.models import Opl
from apps.third.clients import xxClient
from apps.third.clients import dhClient
from apps.third.models import TDevice
from apps.third.tapis import xxapis
from django.utils import timezone
import time
from django.utils import timezone
@shared_task
def update_count_people(i: Area):
if i.third_info.get('xx_rail', None):
railId = i.third_info['xx_rail']['id']
json = {"railId": railId, "type": ""}
_, res = xxClient.request(**xxapis['rail_ibeacon_list'], json=json)
blt_list = res['recordList']
macs = []
for i in blt_list:
macs.append(i['mac'])
i.count_people = TDevice.objects.filter(
type=TDevice.DEVICE_BLT, obj_cate='people', code__in=macs).exclude(employee=None).count()
i.save()
if i.count_people >= i.count_people_max:
# 触发超员事件
handle_xx_event_3('over_man', i)
elif i.count_people < i.count_people_min:
# 触发缺员事件
handle_xx_event_3('lack_man', i)
def handle_xx_event_3(name: str, area: Area):
cate = EventCate.objects.filter(code=name).first()
if cate:
event = Event()
event.area = area
event.obj_cate = 'area'
event.happen_time = timezone.now()
event.save()
Eventdo.objects.get_or_create(cate=cate, event=event, defaults={
'cate': cate,
'event': event
})
voice_msg = area.name + '下有' + str(area.count) + '人,' + cate.name + ',请及时处理'
notify_event(event, voice_msg=voice_msg)
@shared_task
def cal_area_count():
"""
计算区域内人员数量
"""
for i in Area.objects.filter(type=Area.AREA_TYPE_FIX):
Thread(target=update_count_people, args=(i,), daemon=True).start()
@shared_task
def check_event_timeout():
"""判断事件处理是否超期
"""
for i in Event.objects.filter(handle_user=None, is_timeout=False):
cate = i.cates.all().order_by('priority', 'create_time').first()
if cate.hanle_minute > 0 and (timezone.now()-i.create_time).seconds > cate.hanle_minute * 60:
i.is_timeout = True
i.save()
@shared_task
def snap_and_analyse(code: str, algo_codes: list, opl_id: str):
global_img_o = dhClient.snap(code)
happen_time = timezone.now()
vchannel = TDevice.objects.filter(code=code).first()
ec_codes = algo_handle(algo_codes, data={}) # 算法处理返回的事件结果
if ec_codes:
# 获取本次所有发生事件种类
ecs = EventCate.objects.filter(code__in=ec_codes)
obj_cate = 'other'
ep = None
if 'helmet' in ec_codes:
# 如果存在安全帽事件
# 尝试以图搜图找到当事人
res = dhClient.face_search(path=global_img_o)
if res and res[0]:
ep = Employee.objects.filter(id_number=res[0]['identity']).first()
if ep:
obj_cate = 'people'
event = Event()
event.global_img = save_dahua_pic(global_img_o)
if vchannel:
event.vchannel = vchannel
event.area = vchannel.area
event.obj_cate = obj_cate
event.employee = ep
event.happen_time = happen_time
event.opl = Opl.objects.get(id=opl_id)
event.save()
for i in ecs:
Eventdo.objects.get_or_create(cate=i, event=event, defaults={
'cate': i,
'event': event
})
if event:
notify_event(event)
@shared_task
def opl_task(vc_codes: list, opl_id: str):
"""作业监控任务
"""
opl_cate = Opl.objects.get(id=opl_id).cate
# 找到加载的算法
algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True))
while True:
for i in vc_codes:
snap_and_analyse.delay(i, algo_codes, opl_id)
time.sleep(2)
time.sleep(4)