from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from apps.am.models import Area from celery import shared_task from django.core.cache import cache from shapely.geometry import Polygon @shared_task(base=CustomTask) def cache_areas_info(): """ 缓存区域信息 """ area_list = [] for i in Area.objects.filter(is_hidden=False).exclude(third_info__xx_rail=None).order_by('number'): points = [] is_ok = True for item in i.third_info['xx_rail']['detail']['polygon']['points']: if 'longitude' not in item: is_ok = False break points.append((item['longitude'], item['latitude'])) if is_ok: area_dict = { 'id': i.id, 'name': i.name, 'type': i.type, 'floor_no': i.third_info['xx_rail']['detail']['floorNo'], 'polygon': Polygon(points), 'stay_minute_min': i.stay_minute_min, 'stay_minute_max': i.stay_minute_max } area_list.append(area_dict) cache.set('area_list', area_list, timeout=None) return len(area_list)