# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from django.utils import timezone from apps.em.models import Equipment, RuningState from django.core.cache import cache @shared_task(base=CustomTask) def check_equipment_offline(seconds=20): """监测设备是否掉线(根据测点采集) 监测设备是否掉线 """ now = timezone.now() equips = Equipment.objects.filter(mp_ep_monitored__is_rep_ep_running_state=True, mp_ep_monitored__enabled=True) for equip in equips: is_offline = True cache_key = f"equipment_{equip.id}" cache_value = cache.get(cache_key, None) if cache_value: last_timex = cache_value.get("running_state_timex", None) if last_timex and (now - last_timex).total_seconds() <= seconds: is_offline = False if is_offline and equip.running_state != RuningState.OFFLINE: equip.running_state = RuningState.OFFLINE equip.save(update_fields=["running_state"])