factory/apps/third/views.py

95 lines
2.9 KiB
Python

from apps.utils.dahua import dhClient
from apps.utils.xunxi import xxClient
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.permissions import IsAuthenticated
from apps.utils.viewsets import CustomGenericViewSet
from rest_framework.mixins import CreateModelMixin
from apps.third.serializers import RequestCommonSerializer
# Create your views here.
class DahuaTestView(APIView):
"""
大华测试接口
"""
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
data = {
"data":{
"channelId": "1001339$1$0$0",
"streamType": "1",
"type": "hls"
}
}
res = dhClient.request(
url='/evo-apigw/admin/API/video/stream/realtime', method='post', json=data)
# data = {
# "pageNum":1,
# "pageSize":100,
# "isOnline":1,
# "showChildNodeData":1,
# "categorys":[8]
# }
# res = dhClient.request(
# url='/evo-apigw/evo-brm/1.0.0/device/subsystem/page', method='post', json=data)
# data = {
# "channelCodeList": ["1001382$7$0$0"]
# }
# res = dhClient.request(
# url='/evo-apigw/evo-accesscontrol/1.0.0/card/accessControl/channelControl/closeDoor', method='post', json=data)
return Response(res)
class XxTestView(APIView):
"""
寻息测试接口
"""
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
res = xxClient.request(
url='/api/application/build/buildListV2', json={})
return Response(res)
class XxCommonViewSet(CreateModelMixin, CustomGenericViewSet):
"""
寻息通用调用接口
"""
perms_map = {'post': '*'}
serializer_class = RequestCommonSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
vdata = serializer.validated_data
res = xxClient.request(
url=vdata['url'],
method=vdata.get('method', 'post'),
params=vdata.get('params', {}),
json=vdata.get('data', {}))
return Response(res)
class DhCommonViewSet(CreateModelMixin, CustomGenericViewSet):
"""
大华通用调用接口
"""
perms_map = {'post': '*'}
serializer_class = RequestCommonSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
vdata = serializer.validated_data
res = dhClient.request(
url=vdata['url'],
method=vdata.get('method', 'post'),
params=vdata.get('params', {}),
json=vdata.get('data', {}))
return Response(res)