factory/apps/ecm/tasks.py

120 lines
4.2 KiB
Python

from __future__ import absolute_import, unicode_literals
import os
from threading import Thread
from celery import shared_task
import requests
from apps.am.models import Area
from apps.ecm.models import EventCate, Eventdo, Event
from apps.ecm.service import notify_event, snap_and_analyse
from apps.hrm.models import Employee
from apps.opm.models import Opl
from apps.third.dahua import dhClient
from apps.third.xunxi import xxClient
from apps.third.models import TDevice
from apps.third.tapis import xxapis
from django.utils import timezone
import time
from django.core.cache import cache
from django.conf import settings
from apps.utils.tasks import CustomTask
from datetime import timedelta
@shared_task(base=CustomTask)
def store_img(code: str, duration: int):
while True:
global_img = dhClient.snap(code)
file_name = global_img.split('/')[-1].split('?')[0]
res = requests.get(url=global_img, verify=False)
path = '/media/temp/store_img/'
full_path = settings.BASE_DIR + path
if not os.path.exists(full_path):
os.makedirs(full_path)
with open(full_path + file_name, 'wb') as f:
f.write(res.content)
time.sleep(duration)
def update_count_people(i: Area):
if i.third_info.get('xx_rail', None):
railId = i.third_info['xx_rail']['id']
json = {"railId": railId, "type": ""}
_, res = xxClient.request(**xxapis['rail_ibeacon_list'], json=json)
blt_list = res['recordList']
macs = []
for m in blt_list:
macs.append(m['userId'])
count_people = TDevice.objects.filter(
type=TDevice.DEVICE_BLT, obj_cate='people', code__in=macs).exclude(employee=None).count()
i.count_people = count_people
i.save()
if count_people >= i.count_people_max:
# 触发超员事件
handle_xx_event_3('over_man', i)
elif count_people < i.count_people_min:
# 触发缺员事件
handle_xx_event_3('lack_man', i)
return {'count': count_people}
def handle_xx_event_3(name: str, area: Area):
cate = EventCate.objects.filter(code=name).first()
if cate:
# 告警间隔内不触发
last_event = Event.objects.filter(cates=cate, area=area, obj_cate='area').order_by('-create_time').first()
same_allow_minute = cate.same_allow_minute
if same_allow_minute >0 and last_event and last_event.create_time + timedelta(minutes=cate.same_allow_minute) > timezone.now():
return
event = Event()
event.area = area
event.obj_cate = 'area'
event.happen_time = timezone.now()
event.save()
Eventdo.objects.get_or_create(cate=cate, event=event, defaults={
'cate': cate,
'event': event
})
voice_msg = area.name + '下有' + str(area.count_people) + '人,' + cate.name + ',请及时处理'
notify_event(event, voice_msg=voice_msg)
@shared_task
def cal_area_count():
"""
计算区域内人员数量
"""
areas = Area.objects.filter(type=Area.AREA_TYPE_FIX)
for i in areas:
update_count_people(i)
@shared_task
def check_event_timeout():
"""判断事件处理是否超期
"""
for i in Event.objects.filter(handle_user=None, is_timeout=False):
cate = i.cates.all().order_by('priority', 'create_time').first()
if cate and cate.handle_minute > 0 and (timezone.now()-i.create_time).seconds > cate.handle_minute * 60:
i.is_timeout = True
i.save()
@shared_task
def opl_task(vc_codes: list, opl_id: str):
"""作业监控任务
"""
start_time = time.time()
opl_cate = Opl.objects.get(id=opl_id).cate
# 找到作业需要加载的算法
algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True))
vchannels = TDevice.objects.filter(code__in=vc_codes)
opl = Opl.objects.get(id=opl_id)
while time.time()-start_time < 10800: # 一次任务不手动关闭最多持续3小时
for i in vchannels:
Thread(target=snap_and_analyse, args=(i, algo_codes, opl), daemon=True).start()
time.sleep(10)
opl.mtask_uid = None
opl.save()