factory/apps/am/tasks.py

29 lines
969 B
Python

from __future__ import absolute_import, unicode_literals
from apps.utils.task import CustomTask
from apps.am.models import Area
from celery import shared_task
from django.core.cache import cache
import shapely.geometry
@shared_task(base=CustomTask)
def cache_area_info():
"""
缓存区域信息
"""
area_list = []
for i in Area.objects.filter(type=Area.AREA_TYPE_FIX).exclude(third_info__xx_rail=None):
polygon = []
for item in i.third_info['xx_rail']['detail']['polygon']['points']:
polygon.append([item.x, item.y])
poly_context = {'type': 'MULTIPOLYGON',
'coordinates': [[area_list]]}
area_dict = {
'id': i.id,
'floor_no': i.third_info['xx_rail']['detail']['floorNo'],
'poly_shape': shapely.geometry.asShape(poly_context)
}
area_list.append(area_dict)
cache.set('area_list', area_list, timeout=None)
return area_list