128 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			128 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
| from __future__ import absolute_import, unicode_literals
 | |
| from multiprocessing import Event
 | |
| from threading import Thread
 | |
| 
 | |
| from celery import shared_task
 | |
| 
 | |
| from apps.am.models import Area
 | |
| from apps.ecm.models import EventCate, Eventdo
 | |
| from apps.ecm.service import algo_handle, notify_event, save_dahua_pic
 | |
| from apps.hrm.models import Employee
 | |
| from apps.opm.models import Opl
 | |
| from apps.third.clients import xxClient
 | |
| from apps.third.clients import dhClient
 | |
| from apps.third.models import TDevice
 | |
| from apps.third.tapis import xxapis
 | |
| from django.utils import timezone
 | |
| import time
 | |
| from django.utils import timezone
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def update_count_people(i: Area):
 | |
|     if i.third_info.get('xx_rail', None):
 | |
|         railId = i.third_info['xx_rail']['id']
 | |
|         json = {"railId": railId, "type": ""}
 | |
|         _, res = xxClient.request(**xxapis['rail_ibeacon_list'], json=json)
 | |
|         blt_list = res['recordList']
 | |
|         macs = []
 | |
|         for i in blt_list:
 | |
|             macs.append(i['mac'])
 | |
|         i.count_people = TDevice.objects.filter(
 | |
|             type=TDevice.DEVICE_BLT, obj_cate='people', code__in=macs).exclude(employee=None).count()
 | |
|         i.save()
 | |
|         if i.count_people >= i.count_people_max:
 | |
|             # 触发超员事件
 | |
|             handle_xx_event_3('over_man', i)
 | |
|         elif i.count_people < i.count_people_min:
 | |
|             # 触发缺员事件
 | |
|             handle_xx_event_3('lack_man', i)
 | |
| 
 | |
| 
 | |
| def handle_xx_event_3(name: str, area: Area):
 | |
|     cate = EventCate.objects.filter(code=name).first()
 | |
|     if cate:
 | |
|         event = Event()
 | |
|         event.area = area
 | |
|         event.obj_cate = 'area'
 | |
|         event.happen_time = timezone.now()
 | |
|         event.save()
 | |
|         Eventdo.objects.get_or_create(cate=cate, event=event, defaults={
 | |
|             'cate': cate,
 | |
|             'event': event
 | |
|         })
 | |
|         voice_msg = area.name + '下有' + str(area.count) + '人,' + cate.name + ',请及时处理'
 | |
|         notify_event(event, voice_msg=voice_msg)
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def cal_area_count():
 | |
|     """
 | |
|     计算区域内人员数量
 | |
|     """
 | |
|     for i in Area.objects.filter(type=Area.AREA_TYPE_FIX):
 | |
|         Thread(target=update_count_people, args=(i,), daemon=True).start()
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def check_event_timeout():
 | |
|     """判断事件处理是否超期
 | |
|     """
 | |
|     for i in Event.objects.filter(handle_user=None, is_timeout=False):
 | |
|         cate = i.cates.all().order_by('priority', 'create_time').first()
 | |
|         if cate.hanle_minute > 0 and (timezone.now()-i.create_time).seconds > cate.hanle_minute * 60:
 | |
|             i.is_timeout = True
 | |
|             i.save()
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def snap_and_analyse(code: str, algo_codes: list, opl_id: str):
 | |
|     global_img_o = dhClient.snap(code)
 | |
|     happen_time = timezone.now()
 | |
|     vchannel = TDevice.objects.filter(code=code).first()
 | |
|     ec_codes = algo_handle(algo_codes, data={})  # 算法处理返回的事件结果
 | |
|     if ec_codes:
 | |
|         # 获取本次所有发生事件种类
 | |
|         ecs = EventCate.objects.filter(code__in=ec_codes)
 | |
|         obj_cate = 'other'
 | |
|         ep = None
 | |
|         if 'helmet' in ec_codes:
 | |
|             # 如果存在安全帽事件
 | |
|             # 尝试以图搜图找到当事人
 | |
|             res = dhClient.face_search(path=global_img_o)
 | |
|             if res and res[0]:
 | |
|                 ep = Employee.objects.filter(id_number=res[0]['identity']).first()
 | |
|                 if ep:
 | |
|                     obj_cate = 'people'
 | |
|         event = Event()
 | |
|         event.global_img = save_dahua_pic(global_img_o)
 | |
|         if vchannel:
 | |
|             event.vchannel = vchannel
 | |
|             event.area = vchannel.area
 | |
|         event.obj_cate = obj_cate
 | |
|         event.employee = ep
 | |
|         event.happen_time = happen_time
 | |
|         event.opl = Opl.objects.get(id=opl_id)
 | |
|         event.save()
 | |
|         for i in ecs:
 | |
|             Eventdo.objects.get_or_create(cate=i, event=event, defaults={
 | |
|                 'cate': i,
 | |
|                 'event': event
 | |
|             })
 | |
|     if event:
 | |
|         notify_event(event)
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def opl_task(vc_codes: list, opl_id: str):
 | |
|     """作业监控任务
 | |
|     """
 | |
|     opl_cate = Opl.objects.get(id=opl_id).cate
 | |
|     # 找到加载的算法
 | |
|     algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True))
 | |
|     while True:
 | |
|         for i in vc_codes:
 | |
|             snap_and_analyse.delay(i, algo_codes, opl_id)
 | |
|             time.sleep(2)
 | |
|         time.sleep(4)
 |