factory/apps/auth1/views.py

192 lines
5.3 KiB
Python
Executable File

from rest_framework.exceptions import ParseError
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.contrib.auth import authenticate, login, logout
from rest_framework.generics import CreateAPIView
from rest_framework.permissions import IsAuthenticated
from apps.auth1.errors import USERNAME_OR_PASSWORD_WRONG
from rest_framework.exceptions import ParseError
import requests
import json
from django.conf import settings
from rest_framework_simplejwt.tokens import RefreshToken
from django.core.cache import cache
from apps.utils.sms import send_sms
from apps.utils.tools import rannum
from apps.utils.wxmp import wxmpClient
from apps.utils.wx import wxClient
from apps.auth1.serializers import CodeLoginSerializer, LoginSerializer, SendCodeSerializer, WxCodeSerializer
from apps.system.models import User
from apps.utils.viewsets import CustomGenericViewSet, CustomModelViewSet
# Create your views here.
def get_tokens_for_user(user: User):
refresh = RefreshToken.for_user(user)
return {
'refresh': str(refresh),
'access': str(refresh.access_token),
}
class TokenBlackView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request, *args, **kwargs):
"""
Token拉黑
Token拉黑
"""
return Response(status=status.HTTP_200_OK)
class LoginView(CreateAPIView):
"""
Session登录
账户密码Session登录
"""
authentication_classes = []
permission_classes = []
serializer_class = LoginSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
vdata = serializer.validated_data
user = authenticate(username=vdata.get('username'),
password=vdata.get('password'))
if user is not None:
login(request, user)
return Response(status=201)
raise ParseError(**USERNAME_OR_PASSWORD_WRONG)
class LogoutView(APIView):
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
"""
退出登录
退出登录
"""
logout(request)
return Response()
class WxmpLogin(CreateAPIView):
"""微信小程序自动登录
微信小程序自动登录
"""
authentication_classes = []
permission_classes = []
serializer_class = WxCodeSerializer
def post(self, request):
code = request.data['code']
info = wxmpClient.get_basic_info(code=code)
openid = info['openid']
session_key = info['session_key']
try:
user = User.objects.get(wxmp_openid=openid, is_active=True)
ret = get_tokens_for_user(user)
ret['wxmp_session_key'] = session_key
ret['wxmp_openid'] = openid
cache.set(code, ret, 60*5)
return Response(ret)
except Exception:
return Response({'wxmp_openid': openid, 'wxmp_session_key': session_key}, status=400)
class WxLogin(CreateAPIView):
"""微信公众号授权登录
微信公众号授权登录
"""
authentication_classes = []
permission_classes = []
serializer_class = WxCodeSerializer
def post(self, request):
code = request.data['code']
info = wxClient.get_basic_info(code=code)
openid = info['openid']
access = info['access_token']
try:
user = User.objects.get(wx_openid=openid, is_active=True)
ret = get_tokens_for_user(user)
ret['wx_token'] = access
ret['wx_openid'] = openid
cache.set(code, ret, 60*5)
return Response(ret)
except Exception:
return Response({'wx_openid': openid, 'wx_token': access}, status=400)
class GetTokenFromCache(CreateAPIView):
"""以code获取token
以code获取token
"""
authentication_classes = []
permission_classes = []
serializer_class = WxCodeSerializer
def post(self, request):
code = request.data['code']
ret = cache.get(code, {})
return Response(ret)
class SendCode(CreateAPIView):
authentication_classes = []
permission_classes = []
serializer_class = SendCodeSerializer
def post(self, request):
"""短信验证码发送
短信验证码发送
"""
phone = request.data['phone']
code = rannum(4)
is_ok, _ = send_sms(phone, 505, {'code': code})
cache.set(phone, code, 60*5)
if is_ok:
return Response()
raise ParseError('短信发送失败,请确认手机号')
class CodeLogin(CreateAPIView):
"""手机验证码登录
手机验证码登录
"""
authentication_classes = []
permission_classes = []
serializer_class = CodeLoginSerializer
def post(self, request):
phone = request.data['phone']
code = request.data['code']
code_exist = cache.get(phone, None)
if code_exist == code:
user = User.objects.filter(phone=phone).first()
if user:
ret = get_tokens_for_user(user)
return Response(ret)
raise ParseError('账户不存在或已禁用')
raise ParseError('验证码错误')