247 lines
7.3 KiB
Python
Executable File
247 lines
7.3 KiB
Python
Executable File
|
|
from rest_framework.exceptions import ParseError
|
|
from rest_framework.views import APIView
|
|
from rest_framework.response import Response
|
|
from rest_framework import status
|
|
from django.contrib.auth import authenticate, login, logout
|
|
from rest_framework.generics import CreateAPIView
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from apps.auth1.errors import USERNAME_OR_PASSWORD_WRONG
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
from django.core.cache import cache
|
|
from apps.auth1.services import check_phone_code
|
|
from apps.utils.sms import send_sms
|
|
from apps.utils.tools import rannum
|
|
from apps.utils.wxmp import wxmpClient
|
|
from apps.utils.wx import wxClient
|
|
from django.contrib.auth.hashers import make_password
|
|
from django.db.models import Q
|
|
from apps.auth1.services import validate_password
|
|
|
|
|
|
from apps.auth1.serializers import (CodeLoginSerializer, LoginSerializer,
|
|
PwResetSerializer, SecretLoginSerializer, SendCodeSerializer, WxCodeSerializer)
|
|
from apps.system.models import User
|
|
from rest_framework_simplejwt.views import TokenObtainPairView
|
|
|
|
# Create your views here.
|
|
|
|
|
|
def get_tokens_for_user(user: User):
|
|
refresh = RefreshToken.for_user(user)
|
|
return {
|
|
'refresh': str(refresh),
|
|
'access': str(refresh.access_token),
|
|
}
|
|
|
|
class TokenLoginView(CreateAPIView):
|
|
"""
|
|
账户名/密码获取token
|
|
|
|
账户名/密码获取token
|
|
"""
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = LoginSerializer
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
vdata = serializer.validated_data
|
|
password_check = vdata.get('password_check', True)
|
|
# 校验密码复杂度
|
|
is_ok = validate_password(vdata.get('password'))
|
|
if is_ok is False and password_check:
|
|
raise ParseError('密码校验失败, 请更换登录方式并修改密码')
|
|
user = authenticate(username=vdata.get('username'),
|
|
password=vdata.get('password'))
|
|
if user is not None:
|
|
token_dict = get_tokens_for_user(user)
|
|
token_dict['password_ok'] = is_ok
|
|
return Response(token_dict)
|
|
raise ParseError(**USERNAME_OR_PASSWORD_WRONG)
|
|
|
|
class TokenBlackView(APIView):
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
"""
|
|
Token拉黑
|
|
|
|
|
|
Token拉黑
|
|
"""
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
|
|
class LoginView(CreateAPIView):
|
|
"""
|
|
Session登录
|
|
|
|
|
|
账户密码Session登录
|
|
"""
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = LoginSerializer
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
vdata = serializer.validated_data
|
|
user = authenticate(username=vdata.get('username'),
|
|
password=vdata.get('password'))
|
|
if user is not None:
|
|
login(request, user)
|
|
return Response(status=201)
|
|
raise ParseError(**USERNAME_OR_PASSWORD_WRONG)
|
|
|
|
|
|
class LogoutView(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
"""
|
|
退出登录
|
|
|
|
|
|
退出登录
|
|
"""
|
|
logout(request)
|
|
return Response()
|
|
|
|
|
|
class WxmpLogin(CreateAPIView):
|
|
"""微信小程序自动登录
|
|
|
|
微信小程序自动登录
|
|
"""
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = WxCodeSerializer
|
|
|
|
def post(self, request):
|
|
code = request.data['code']
|
|
info = wxmpClient.get_basic_info(code=code)
|
|
openid = info['openid']
|
|
session_key = info['session_key']
|
|
try:
|
|
user = User.objects.get(wxmp_openid=openid)
|
|
ret = get_tokens_for_user(user)
|
|
ret['wxmp_session_key'] = session_key
|
|
ret['wxmp_openid'] = openid
|
|
cache.set(code, ret, 60*5)
|
|
return Response(ret)
|
|
except Exception:
|
|
return Response({'wxmp_openid': openid, 'wxmp_session_key': session_key}, status=400)
|
|
|
|
|
|
class WxLogin(CreateAPIView):
|
|
"""微信公众号授权登录
|
|
|
|
微信公众号授权登录
|
|
"""
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = WxCodeSerializer
|
|
|
|
def post(self, request):
|
|
code = request.data['code']
|
|
info = wxClient.get_basic_info(code=code)
|
|
openid = info['openid']
|
|
access = info['access_token']
|
|
try:
|
|
user = User.objects.get(wx_openid=openid)
|
|
ret = get_tokens_for_user(user)
|
|
ret['wx_token'] = access
|
|
ret['wx_openid'] = openid
|
|
cache.set(code, ret, 60*5)
|
|
return Response(ret)
|
|
except Exception:
|
|
return Response({'wx_openid': openid, 'wx_token': access}, status=400)
|
|
|
|
|
|
class SendCode(CreateAPIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = SendCodeSerializer
|
|
|
|
def post(self, request):
|
|
"""短信验证码发送
|
|
|
|
短信验证码发送
|
|
"""
|
|
phone = request.data['phone']
|
|
code = rannum(6)
|
|
is_ok, _ = send_sms(phone, 505, {'code': code})
|
|
cache.set(phone, code, 60*5)
|
|
if is_ok:
|
|
return Response()
|
|
raise ParseError('短信发送失败,请确认手机号')
|
|
|
|
|
|
class CodeLogin(CreateAPIView):
|
|
"""手机验证码登录
|
|
|
|
手机验证码登录
|
|
"""
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = CodeLoginSerializer
|
|
|
|
def post(self, request):
|
|
phone = request.data['phone']
|
|
code = request.data['code']
|
|
check_phone_code(phone, code)
|
|
user = User.objects.filter(phone=phone).first()
|
|
if user:
|
|
ret = get_tokens_for_user(user)
|
|
return Response(ret)
|
|
raise ParseError('账户不存在或已禁用')
|
|
|
|
|
|
class SecretLogin(CreateAPIView):
|
|
"""App端密钥登录
|
|
|
|
App端密钥登录
|
|
"""
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = SecretLoginSerializer
|
|
|
|
def post(self, request):
|
|
sr = SecretLoginSerializer(data=request.data)
|
|
sr.is_valid(raise_exception=True)
|
|
vdata = sr.validated_data
|
|
username = vdata['username']
|
|
secret = vdata['secret']
|
|
user = User.objects.filter(Q(username=username) | Q(phone=username) | Q(
|
|
employee__id_number=username)).filter(secret=secret).first()
|
|
if user:
|
|
ret = get_tokens_for_user(user)
|
|
return Response(ret)
|
|
raise ParseError('登录失败')
|
|
|
|
|
|
class PwResetView(CreateAPIView):
|
|
"""重置密码
|
|
|
|
重置密码
|
|
"""
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = PwResetSerializer
|
|
|
|
def post(self, request):
|
|
sr = PwResetSerializer(data=request.data)
|
|
sr.is_valid(raise_exception=True)
|
|
vdata = sr.validated_data
|
|
check_phone_code(vdata['phone'], vdata['code'])
|
|
user = User.objects.filter(phone=vdata['phone']).first()
|
|
if user:
|
|
user.password = make_password(vdata['password'])
|
|
user.save()
|
|
return Response()
|
|
raise ParseError('账户不存在或已禁用')
|