examtest/test_server/rbac/views/user.py

140 lines
5.5 KiB
Python

# -*- coding: utf-8 -*-
from operator import itemgetter
# import jwt
from django.conf import settings
from django.contrib.auth import authenticate,login,logout
from django.contrib.auth.hashers import check_password
from django_filters.rest_framework import DjangoFilterBackend
from django.contrib.auth.hashers import make_password
from rest_framework.decorators import action
from rest_framework.filters import SearchFilter, OrderingFilter
from rest_framework.generics import ListAPIView
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from rest_framework import status
from utils.custom import CommonPagination
from utils.child import get_child_queryset
from ..models import UserProfile, Menu, Organization
from ..serializers.menu_serializer import MenuSerializer
from ..serializers.user_serializer import UserListSerializer, UserCreateSerializer, UserModifySerializer, \
UserInfoListSerializer
from ..permission import get_all_menu_queryset,get_permission_list,RbacPermission
class UserLogoutView(APIView):
authentication_classes = ()
permission_classes = ()
def get(self, request, *args, **kwargs):
return Response(status=status.HTTP_200_OK)
class UserInfoView(APIView):
"""
获取当前用户基本信息/菜单列表|接口权限
"""
permission_classes = (IsAuthenticated,)
def get(self, request):
if request.user.id is not None:
user = request.user
perms = get_permission_list(user)
request.session['perms'] = perms # 存到session表中
data = {
'id': user.id,
'username': user.username,
'name': user.name,
# 'avatar': request._request._current_scheme_host + '/media/' + str(user.image),
'avatar': user.avatar,
'perms': perms,
'roles':user.roles.values_list('name', flat=True)
}
return Response(data)
else:
return Response({"error": '请登录后访问!'})
from utils.pagination import PageOrNot
class UserViewSet(PageOrNot, ModelViewSet):
"""
用户管理:增删改查
"""
perms_map = ({'get': 'user_list'}, {'post': 'user_create'}, {'put': 'user_update'},
{'delete': 'user_delete'})
queryset = UserProfile.objects.filter(is_delete=0).all()
serializer_class = UserListSerializer
pagination_class = CommonPagination
# filter_backends = (DjangoFilterBackend, SearchFilter, OrderingFilter)
filter_fields = ('is_active',)
# search_fields = ('username', 'name', 'mobile', 'email')
ordering_fields = ('id',)
# authentication_classes = (JSONWebTokenAuthentication,)
# permission_classes = (RbacPermission,IsAuthenticated)
def get_queryset(self):
queryset = self.queryset
if hasattr(self.get_serializer_class(), 'setup_eager_loading'):
queryset = self.get_serializer_class().setup_eager_loading(queryset) # 性能优化
department = self.request.query_params.get('department', None) # 该部门及其子部门所有员工
name = self.request.query_params.get('name',None)
if name is not None:
queryset = queryset.filter(name__contains=name)
if department is not None:
deptqueryset = get_child_queryset(Organization.objects.all(),Organization.objects.filter(id=department),Organization.objects.none())
queryset = queryset.filter(department__in=deptqueryset)
return queryset
def destroy(self, request, *args, **kwargs): #逻辑删除
instance = self.get_object()
# self.perform_destroy(instance)
instance.is_delete = True
instance.save()
return Response(status=status.HTTP_204_NO_CONTENT)
def get_serializer_class(self):
# 根据请求类型动态变更serializer
if self.action == 'create':
return UserCreateSerializer
elif self.action == 'list':
return UserListSerializer
return UserModifySerializer
def create(self, request, *args, **kwargs):
# 创建用户默认添加密码
password = request.data['password'] if 'password' in request.data else None
if password:
request.data['password'] = make_password(password)
else:
request.data['password'] = make_password('fs0000')
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, headers=headers)
@action(methods=['put'], detail=False, permission_classes=[IsAuthenticated], # perms_map={'put':'change_password'}
url_name='change_password')
def password(self, request, pk=None):
"""
修改密码
"""
user = request.user
old_password = request.data['old_password']
if check_password(old_password, user.password):
new_password1 = request.data['new_password1']
new_password2 = request.data['new_password2']
if new_password1 == new_password2:
user.set_password(new_password2)
user.save()
return Response('密码修改成功!')
else:
return Response({'error': '新密码两次输入不一致!'})
else:
return Response({'error':'旧密码错误!'})