from __future__ import absolute_import, unicode_literals import os from threading import Thread from celery import shared_task import requests from apps.am.models import Area from apps.ecm.models import EventCate, Eventdo, Event from apps.ecm.service import notify_event, snap_and_analyse from apps.hrm.models import Employee from apps.opm.models import Opl from apps.third.dahua import dhClient from apps.third.xunxi import xxClient from apps.third.models import TDevice from apps.third.tapis import xxapis from django.utils import timezone import time from django.core.cache import cache from django.conf import settings from apps.utils.tasks import CustomTask @shared_task(base=CustomTask) def store_img(code: str, duration: int): while True: global_img = dhClient.snap(code) file_name = global_img.split('/')[-1].split('?')[0] res = requests.get(url=global_img, verify=False) path = '/media/temp/store_img/' full_path = settings.BASE_DIR + path if not os.path.exists(full_path): os.makedirs(full_path) with open(full_path + file_name, 'wb') as f: f.write(res.content) time.sleep(duration) def update_count_people(i: Area): if i.third_info.get('xx_rail', None): railId = i.third_info['xx_rail']['id'] json = {"railId": railId, "type": ""} _, res = xxClient.request(**xxapis['rail_ibeacon_list'], json=json) blt_list = res['recordList'] macs = [] for m in blt_list: macs.append(m['userId']) count_people = TDevice.objects.filter( type=TDevice.DEVICE_BLT, obj_cate='people', code__in=macs).exclude(employee=None).count() i.count_people = count_people i.save() if count_people >= i.count_people_max: # 触发超员事件 handle_xx_event_3('over_man', i) elif count_people < i.count_people_min: # 触发缺员事件 handle_xx_event_3('lack_man', i) return {'count': count_people} def handle_xx_event_3(name: str, area: Area): cate = EventCate.objects.filter(code=name).first() if cate: event = Event() event.area = area event.obj_cate = 'area' event.happen_time = timezone.now() event.save() Eventdo.objects.get_or_create(cate=cate, event=event, defaults={ 'cate': cate, 'event': event }) voice_msg = area.name + '下有' + str(area.count_people) + '人,' + cate.name + ',请及时处理' notify_event(event, voice_msg=voice_msg) @shared_task def cal_area_count(): """ 计算区域内人员数量 """ areas = Area.objects.filter(type=Area.AREA_TYPE_FIX) for i in areas: update_count_people(i) @shared_task def check_event_timeout(): """判断事件处理是否超期 """ for i in Event.objects.filter(handle_user=None, is_timeout=False): cate = i.cates.all().order_by('priority', 'create_time').first() if cate and cate.handle_minute > 0 and (timezone.now()-i.create_time).seconds > cate.handle_minute * 60: i.is_timeout = True i.save() @shared_task def opl_task(vc_codes: list, opl_id: str): """作业监控任务 """ start_time = time.time() opl_cate = Opl.objects.get(id=opl_id).cate # 找到作业需要加载的算法 algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True)) vchannels = TDevice.objects.filter(code__in=vc_codes) opl = Opl.objects.get(id=opl_id) while time.time()-start_time < 10800: # 一次任务不手动关闭最多持续3小时 for i in vchannels: Thread(target=snap_and_analyse, args=(i, algo_codes, opl), daemon=True).start() time.sleep(10) opl.mtask_uid = None opl.save()