factory/apps/am/tasks.py

35 lines
1.2 KiB
Python

from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from apps.am.models import Area
from celery import shared_task
from django.core.cache import cache
from shapely.geometry import Polygon
@shared_task(base=CustomTask)
def cache_areas_info():
"""
缓存区域信息
"""
area_list = []
for i in Area.objects.filter(is_hidden=False).exclude(third_info__xx_rail=None).order_by('number'):
points = []
is_ok = True
for item in i.third_info['xx_rail']['detail']['polygon']['points']:
if 'longitude' not in item:
is_ok = False
break
points.append((item['longitude'], item['latitude']))
if is_ok:
area_dict = {
'id': i.id,
'type': i.type,
'floor_no': i.third_info['xx_rail']['detail']['floorNo'],
'polygon': Polygon(points),
'stay_minute_min': i.stay_minute_min,
'stay_minute_max': i.stay_minute_max
}
area_list.append(area_dict)
cache.set('area_list', area_list, timeout=None)
return len(area_list)