cma_search/server/apps/system/views.py

334 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from django.contrib.auth.hashers import check_password, make_password
from django.contrib.auth.models import UserManager
from django.core.cache import cache
from django.http import request
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import status
from rest_framework.decorators import (action, authentication_classes,
permission_classes)
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.mixins import (CreateModelMixin, DestroyModelMixin,
ListModelMixin, RetrieveModelMixin,
UpdateModelMixin)
from rest_framework.pagination import PageNumberPagination
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from rest_framework_simplejwt.tokens import RefreshToken
from utils.queryset import get_child_queryset2
from .filters import UserFilter
from .models import (Dict, DictType, File, Message, Organization, Permission,
Position, Role, User)
from .permission import RbacPermission, get_permission_list
from .permission_data import RbacFilterSet
from .serializers import (DictSerializer, DictTypeSerializer, FileSerializer,
OrganizationSerializer, PermissionSerializer,
PositionSerializer, RoleSerializer,
UserCreateSerializer, UserListSerializer,
UserModifySerializer)
logger = logging.getLogger('log')
# logger.info('请求成功! response_code:{}response_headers:{}response_body:{}'.format(response_code, response_headers, response_body[:251]))
# logger.error('请求出错:{}'.format(error))
class LogoutView(APIView):
permission_classes = []
def get(self, request, *args, **kwargs): # 可将token加入黑名单
return Response(status=status.HTTP_200_OK)
import random
import smtplib
import string
from email.header import Header
from email.mime.text import MIMEText
from email.utils import formataddr
from rest_framework_simplejwt.tokens import RefreshToken
def get_tokens_for_user(user):
refresh = RefreshToken.for_user(user)
return {
'refresh': str(refresh),
'access': str(refresh.access_token),
}
import datetime
class Login2View(APIView):
authentication_classes = []
permission_classes = []
def post(self, request):
mail = request.data['mail']
msg = request.data['msg']
if not User.objects.filter(username=mail).exists():
return Response('账户不存在', status=status.HTTP_400_BAD_REQUEST)
a_minute_ago=datetime.datetime.now()-datetime.timedelta(minutes=1)
if Message.objects.filter(mail=mail, create_time__gte=a_minute_ago).exists() and Message.objects.filter(mail=mail).last().msg == msg:
user = User.objects.get(username=mail)
return Response(get_tokens_for_user(user), status=status.HTTP_200_OK)
return Response('验证码错误', status=status.HTTP_400_BAD_REQUEST)
class sendMsg(APIView):
authentication_classes = []
permission_classes = []
def post(self, request):
code = random.randint(10000,99999)
my_sender = 'zhulinwei@ctc.ac.cn'
my_user = request.data['mail']
my_pass = 'Zlw901105'
if not User.objects.filter(username=my_user).exists():
return Response('该账户不存在', status=status.HTTP_400_BAD_REQUEST)
try:
# 邮件内容
msg=MIMEText('您好,能力查询本次登陆验证码为' + str(code),'plain','utf-8')
# 括号里的对应发件人邮箱昵称、发件人邮箱账号
msg['From']=formataddr(["国检集团检测能力查询平台",my_sender])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg['To']=formataddr(["",my_user])
# 邮件的主题
msg['Subject'] = Header(str(code), 'utf-8').encode()
# SMTP服务器腾讯企业邮箱端口是465腾讯邮箱支持SSL(不强制) 不支持TLS
# qq邮箱smtp服务器地址:smtp.qq.com,端口号456
# 163邮箱smtp服务器地址smtp.163.com端口号25
server=smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
# 登录服务器,括号中对应的是发件人邮箱账号、邮箱密码
server.login(my_sender, my_pass)
# 发送邮件,括号中对应的是发件人邮箱账号、收件人邮箱账号、发送邮件
server.sendmail(my_sender,[my_user,],msg.as_string())
# 关闭连接
server.quit()
Message.objects.filter(mail=my_user).delete()
Message.objects.create(mail=my_user, msg=code)
except:
return Response('验证码发送失败', status=status.HTTP_400_BAD_REQUEST)
return Response(status=status.HTTP_200_OK)
class DictTypeViewSet(ModelViewSet):
"""
数据字典类型:增删改查
"""
perms_map = {'get': '*', 'post': 'dicttype_create',
'put': 'dicttype_update', 'delete': 'dicttype_delete'}
queryset = DictType.objects.all()
serializer_class = DictTypeSerializer
pagination_class = None
search_fields = ['^name']
ordering_fields = ['id']
ordering = 'id'
class DictViewSet(ModelViewSet):
"""
数据字典:增删改查
"""
perms_map = {'get': '*', 'post': 'dict_create',
'put': 'dict_update', 'delete': 'dict_delete'}
queryset = Dict.objects.all()
serializer_class = DictSerializer
search_fields = ['^name']
ordering_fields = ['id']
ordering = 'id'
class PositionViewSet(ModelViewSet):
"""
岗位:增删改查
"""
perms_map = {'get': '*', 'post': 'position_create',
'put': 'position_update', 'delete': 'position_delete'}
queryset = Position.objects.all()
serializer_class = PositionSerializer
pagination_class = None
search_fields = ['^name']
ordering_fields = ['id']
ordering = 'id'
class TestView(APIView):
perms_map = {'get': 'test_view'} # 单个API控权
pass
class PermissionViewSet(ModelViewSet):
"""
权限:增删改查
"""
perms_map = {'get': '*', 'post': 'perm_create',
'put': 'perm_update', 'delete': 'perm_delete'}
queryset = Position.objects.all()
queryset = Permission.objects.all()
serializer_class = PermissionSerializer
pagination_class = None
search_fields = ['name']
ordering_fields = ['sort']
ordering = 'sort'
class OrganizationViewSet(ModelViewSet):
"""
组织机构:增删改查
"""
perms_map = {'get': '*', 'post': 'org_create',
'put': 'org_update', 'delete': 'org_delete'}
queryset = Organization.objects.all()
serializer_class = OrganizationSerializer
pagination_class = None
search_fields = ['name']
ordering_fields = ['sort']
ordering = ['sort', 'pk']
class RoleViewSet(ModelViewSet):
"""
角色:增删改查
"""
perms_map = {'get': '*', 'post': 'role_create',
'put': 'role_update', 'delete': 'role_delete'}
queryset = Role.objects.all()
serializer_class = RoleSerializer
pagination_class = None
search_fields = ['name']
ordering_fields = ['id']
ordering = 'id'
class UserViewSet(ModelViewSet):
"""
用户管理:增删改查
"""
perms_map = {'get': '*', 'post': 'user_create',
'put': 'user_update', 'delete': 'user_delete'}
queryset = User.objects.all().order_by('-id')
serializer_class = UserListSerializer
filterset_class = UserFilter
search_fields = ['username', 'name', 'phone', 'email']
ordering_fields = ['-id']
def get_queryset(self):
queryset = self.queryset
if hasattr(self.get_serializer_class(), 'setup_eager_loading'):
queryset = self.get_serializer_class().setup_eager_loading(queryset) # 性能优化
dept = self.request.query_params.get('dept', None) # 该部门及其子部门所有员工
if dept is not None:
deptqueryset = get_child_queryset2(Organization.objects.get(pk=dept))
queryset = queryset.filter(dept__in=deptqueryset)
return queryset
def get_serializer_class(self):
# 根据请求类型动态变更serializer
if self.action == 'create':
return UserCreateSerializer
elif self.action == 'list':
return UserListSerializer
return UserModifySerializer
def create(self, request, *args, **kwargs):
# 创建用户默认添加密码
password = request.data['password'] if 'password' in request.data else None
if password:
password = make_password(password)
else:
password = make_password(''.join(random.sample(string.ascii_letters + string.digits, 8)))
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save(password=password)
return Response(serializer.data)
@action(methods=['put'], detail=False, permission_classes=[IsAuthenticated],
url_name='change_password')
def password(self, request, pk=None):
"""
用户修改密码
"""
user = request.user
old_password = request.data['old_password']
if check_password(old_password, user.password):
new_password1 = request.data['new_password1']
new_password2 = request.data['new_password2']
if new_password1 == new_password2:
user.set_password(new_password2)
user.save()
return Response('密码修改成功!', status=status.HTTP_200_OK)
else:
return Response('新密码两次输入不一致!', status=status.HTTP_400_BAD_REQUEST)
else:
return Response('旧密码错误!', status=status.HTTP_400_BAD_REQUEST)
# perms_map={'get':'*'}, 自定义action控权
@action(methods=['get'], detail=False, url_name='my_info', permission_classes=[IsAuthenticated])
def info(self, request, pk=None):
"""
初始化用户信息
"""
user = request.user
perms = get_permission_list(user)
data = {
'id': user.id,
'username': user.username,
'name': user.name,
'roles': user.roles.all().values_list('name', flat=True),
# 'avatar': request._request._current_scheme_host + '/media/' + str(user.image),
'avatar': user.avatar,
'perms': perms,
}
return Response(data)
@action(methods=['put'], detail=True, url_name='userpw_reset', perms_map={'put':'userpw_reset'})
def resetpw(self, request, pk=None):
"""
重置密码
"""
if request.user.is_superuser:
user = self.get_object()
user.set_password('0000')
user.save()
return Response('密码已重置为0000', status=status.HTTP_200_OK)
return Response('权限不足', status=status.HTTP_400_BAD_REQUEST)
from django.conf import settings
from rest_framework.parsers import (FileUploadParser, JSONParser,
MultiPartParser)
from .mixins import CreateModelAMixin
class FileViewSet(ModelViewSet):
"""
文件:增删改查
"""
perms_map = None
parser_classes = [MultiPartParser, JSONParser]
queryset = File.objects.all()
serializer_class = FileSerializer
filterset_fields = ['type']
search_fields = ['name']
ordering = '-create_time'
def perform_create(self, serializer):
fileobj = self.request.data.get('file')
name = fileobj._name
size = fileobj.size
mime = fileobj.content_type
type = '其它'
if 'image' in mime:
type = '图片'
elif 'video' in mime:
type = '视频'
elif 'audio' in mime:
type = '音频'
elif 'application' or 'text' in mime:
type = '文档'
instance = serializer.save(create_by = self.request.user, name=name, size=size, type=type, mime=mime)
instance.path = settings.MEDIA_URL + instance.file.name
instance.save()