factory/apps/em/views.py

167 lines
6.6 KiB
Python

from django.shortcuts import render
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from django.utils import timezone
from apps.em.models import Equipment, EcheckRecord, EInspect, Ecate
from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet
from apps.em.serializers import EquipmentSerializer, EcheckRecordSerializer, EInspectSerializer, EcateSerializer
from apps.em.filters import EquipFilterSet
from rest_framework.exceptions import ParseError
from rest_framework.mixins import ListModelMixin, CreateModelMixin, DestroyModelMixin
from dateutil.relativedelta import relativedelta
from rest_framework.decorators import action
from rest_framework.serializers import Serializer
from django.db import transaction
from apps.em.services import daoru_equipment
from rest_framework.response import Response
from django.conf import settings
from django.db.models import Count, Case, When, IntegerField, Max, OuterRef, Subquery
from datetime import datetime, timedelta
# Create your views here.
class EcateViewSet(CustomModelViewSet):
"""
list:设备分类
设备分类
"""
queryset = Ecate.objects.all()
serializer_class = EcateSerializer
ordering = ["id", "type", "code", "create_time"]
filterset_fields = {"type": ["exact", "in"], "code": ["exact", "in", "contains"], "is_for_safe": ["exact"], "is_for_enp": ["exact"], "is_car": ["exact"]}
class EquipmentViewSet(CustomModelViewSet):
"""
list:设备列表
设备列表
"""
queryset = Equipment.objects.all()
serializer_class = EquipmentSerializer
select_related_fields = ["create_by", "belong_dept", "keeper", "mgroup"]
search_fields = ["number", "name"]
filterset_class = EquipFilterSet
# def filter_queryset(self, queryset):
# if not self.detail and not self.request.query_params.get('type', None):
# raise ParseError('请指定设备类型')
# return super().filter_queryset(queryset)
@action(methods=["post"], detail=False, perms_map={"post": "equipment.create"}, serializer_class=Serializer)
@transaction.atomic
def daoru(self, request, *args, **kwargs):
"""导入
导入
"""
daoru_equipment(settings.BASE_DIR + request.data.get("path", ""))
return Response()
@swagger_auto_schema(
manual_parameters=[
openapi.Parameter(name="has_envdata", in_=openapi.IN_QUERY, description="Include envdata in the response", type=openapi.TYPE_STRING, enum=["yes", "no"], required=False),
openapi.Parameter(name="query", in_=openapi.IN_QUERY, description="定制返回数据", type=openapi.TYPE_STRING, required=False),
]
)
def list(self, request, *args, **kwargs):
return super().list(request, *args, **kwargs)
def add_info_for_list(self, data):
if self.request.query_params.get("has_envdata", "no") == "yes":
now = timezone.localtime()
now_10_before = now - timezone.timedelta(minutes=10)
data_ids = [item["id"] for item in data]
from apps.enp.models import EnvData
from apps.enp.serializers import EnvDataSerializer
# 子查询获取每个 equipment_id 对应的最大时间戳
# 后面可以考虑从缓存里拿
last_time_subquery = EnvData.objects.filter(equipment_id=OuterRef("equipment_id"), timex__gte=now_10_before, timex__lte=now).order_by("-timex").values("timex")[:1]
# 主查询,获取每个 equipment_id 对应的完整记录
last_envdata_qs = EnvData.objects.filter(equipment_id__in=data_ids, timex=Subquery(last_time_subquery))
envdata = EnvDataSerializer(last_envdata_qs, many=True).data
envdata_dict = {item["equipment"]: item for item in envdata}
for item in data:
item["envdata"] = envdata_dict.get(item["id"], {})
return data
@action(methods=["get"], detail=False, perms_map={"get": "*"})
def count_running_state(self, request, *args, **kwargs):
"""当前运行状态统计
当前运行状态统计
"""
queryset = self.filter_queryset(self.get_queryset())
result = queryset.aggregate(
count=Count("id"),
count_running=Count(Case(When(running_state=10, then=1), output_field=IntegerField())),
count_standby=Count(Case(When(running_state=20, then=1), output_field=IntegerField())),
count_stop=Count(Case(When(running_state=30, then=1), output_field=IntegerField())),
count_fail=Count(Case(When(running_state=40, then=1), output_field=IntegerField())),
count_offline=Count(Case(When(running_state=50, then=1), output_field=IntegerField())),
)
json_result = {
"count": result["count"],
"count_running": result["count_running"],
"count_standby": result["count_standby"],
"count_stop": result["count_stop"],
"count_fail": result["count_fail"],
"count_offline": result["count_offline"],
}
return Response(json_result)
class EcheckRecordViewSet(ListModelMixin, CreateModelMixin, DestroyModelMixin, CustomGenericViewSet):
"""
list:校准/检定记录
校准/检定记录
"""
perms_map = {"get": "*", "post": "echeckrecord.create", "put": "echeckrecord.update", "delete": "echeckrecord.delete"}
queryset = EcheckRecord.objects.all()
serializer_class = EcheckRecordSerializer
select_related_fields = ["equipment", "create_by"]
filterset_fields = ["equipment", "result"]
def perform_create(self, serializer):
instance = serializer.save()
equipment = instance.equipment
if equipment.cycle:
equipment.check_date = instance.check_date
equipment.next_check_date = instance.check_date + relativedelta(months=equipment.cycle)
equipment.save()
def perform_destroy(self, instance):
instance.delete()
equipment = instance.equipment
er = EcheckRecord.objects.filter(equipment=equipment).order_by("check_date").last()
if er:
equipment.check_date = instance.check_date
equipment.next_check_date = instance.check_date + relativedelta(months=equipment.cycle)
equipment.save()
else:
equipment.check_date = None
equipment.next_check_date = None
equipment.save()
class EInspectViewSet(CustomModelViewSet):
"""
list:巡检记录
巡检记录
"""
queryset = EInspect.objects.all()
serializer_class = EInspectSerializer
select_related_fields = ["equipment", "inspect_user"]
filterset_fields = ["equipment"]