from __future__ import absolute_import, unicode_literals from threading import Thread from celery import shared_task from apps.ai.main import ai_analyse from apps.am.models import Area from apps.ecm.models import EventCate, Eventdo, Event from apps.ecm.service import notify_event, snap_and_analyse from apps.hrm.models import Employee from apps.opm.models import Opl from apps.third.clients import xxClient from apps.third.clients import dhClient from apps.third.models import TDevice from apps.third.tapis import xxapis from django.utils import timezone import time from django.core.cache import cache @shared_task def update_count_people(i: Area): if i.third_info.get('xx_rail', None): railId = i.third_info['xx_rail']['id'] json = {"railId": railId, "type": ""} _, res = xxClient.request(**xxapis['rail_ibeacon_list'], json=json) blt_list = res['recordList'] macs = [] for i in blt_list: macs.append(i['mac']) i.count_people = TDevice.objects.filter( type=TDevice.DEVICE_BLT, obj_cate='people', code__in=macs).exclude(employee=None).count() i.save() if i.count_people >= i.count_people_max: # 触发超员事件 handle_xx_event_3('over_man', i) elif i.count_people < i.count_people_min: # 触发缺员事件 handle_xx_event_3('lack_man', i) return {'count': i.count_people} def handle_xx_event_3(name: str, area: Area): cate = EventCate.objects.filter(code=name).first() if cate: event = Event() event.area = area event.obj_cate = 'area' event.happen_time = timezone.now() event.save() Eventdo.objects.get_or_create(cate=cate, event=event, defaults={ 'cate': cate, 'event': event }) voice_msg = area.name + '下有' + str(area.count_people) + '人,' + cate.name + ',请及时处理' notify_event(event, voice_msg=voice_msg) @shared_task def cal_area_count(): """ 计算区域内人员数量 """ for i in Area.objects.filter(type=Area.AREA_TYPE_FIX): Thread(target=update_count_people, args=(i,), daemon=True).start() @shared_task def check_event_timeout(): """判断事件处理是否超期 """ for i in Event.objects.filter(handle_user=None, is_timeout=False): cate = i.cates.all().order_by('priority', 'create_time').first() if cate.handle_minute > 0 and (timezone.now()-i.create_time).seconds > cate.handle_minute * 60: i.is_timeout = True i.save() @shared_task def opl_task(vc_codes: list, opl_id: str): """作业监控任务 """ start_time = time.time() opl_cate = Opl.objects.get(id=opl_id).cate # 找到作业需要加载的算法 algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True)) vchannels = TDevice.objects.filter(code__in=vc_codes) opl = Opl.objects.get(id=opl_id) while time.time()-start_time < 14400: # 一次任务不手动关闭最多持续4小时 for i in vchannels: Thread(target=snap_and_analyse, args=(i, algo_codes, opl)).start() time.sleep(10)