factory/apps/ecm/tasks.py

92 lines
3.0 KiB
Python

from __future__ import absolute_import, unicode_literals
from multiprocessing import Event
from threading import Thread
from celery import shared_task
from apps.ai.main import ai_analyse
from apps.am.models import Area
from apps.ecm.models import EventCate, Eventdo
from apps.ecm.service import notify_event, snap_and_analyse
from apps.hrm.models import Employee
from apps.opm.models import Opl
from apps.third.clients import xxClient
from apps.third.clients import dhClient
from apps.third.models import TDevice
from apps.third.tapis import xxapis
from django.utils import timezone
import time
from django.utils import timezone
@shared_task
def update_count_people(i: Area):
if i.third_info.get('xx_rail', None):
railId = i.third_info['xx_rail']['id']
json = {"railId": railId, "type": ""}
_, res = xxClient.request(**xxapis['rail_ibeacon_list'], json=json)
blt_list = res['recordList']
macs = []
for i in blt_list:
macs.append(i['mac'])
i.count_people = TDevice.objects.filter(
type=TDevice.DEVICE_BLT, obj_cate='people', code__in=macs).exclude(employee=None).count()
i.save()
if i.count_people >= i.count_people_max:
# 触发超员事件
handle_xx_event_3('over_man', i)
elif i.count_people < i.count_people_min:
# 触发缺员事件
handle_xx_event_3('lack_man', i)
def handle_xx_event_3(name: str, area: Area):
cate = EventCate.objects.filter(code=name).first()
if cate:
event = Event()
event.area = area
event.obj_cate = 'area'
event.happen_time = timezone.now()
event.save()
Eventdo.objects.get_or_create(cate=cate, event=event, defaults={
'cate': cate,
'event': event
})
voice_msg = area.name + '下有' + str(area.count) + '人,' + cate.name + ',请及时处理'
notify_event(event, voice_msg=voice_msg)
@shared_task
def cal_area_count():
"""
计算区域内人员数量
"""
for i in Area.objects.filter(type=Area.AREA_TYPE_FIX):
Thread(target=update_count_people, args=(i,), daemon=True).start()
@shared_task
def check_event_timeout():
"""判断事件处理是否超期
"""
for i in Event.objects.filter(handle_user=None, is_timeout=False):
cate = i.cates.all().order_by('priority', 'create_time').first()
if cate.hanle_minute > 0 and (timezone.now()-i.create_time).seconds > cate.hanle_minute * 60:
i.is_timeout = True
i.save()
@shared_task
def opl_task(vc_codes: list, opl_id: str):
"""作业监控任务
"""
opl_cate = Opl.objects.get(id=opl_id).cate
# 找到作业需要加载的算法
algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True))
vchannels = TDevice.objects.filter(code__in=vc_codes)
opl = Opl.objects.get(id=opl_id)
while True:
for i in vchannels:
Thread(target=snap_and_analyse, args=(i, algo_codes, opl)).start()
time.sleep(2)