carbo_server/apps/auth1/views.py

388 lines
12 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from rest_framework.exceptions import ParseError
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.contrib.auth import authenticate, login, logout
from rest_framework.generics import CreateAPIView
from rest_framework.permissions import IsAuthenticated
from apps.auth1.errors import USERNAME_OR_PASSWORD_WRONG
from rest_framework_simplejwt.tokens import RefreshToken
from django.core.cache import cache
from apps.auth1.services import check_phone_code, check_email_code
from apps.utils.tools import rannum
from apps.utils.wxmp import wxmpClient
from apps.utils.wx import wxClient
from django.contrib.auth.hashers import make_password
from django.db.models import Q
from apps.auth1.services import validate_password
import base64
from apps.utils.tools import tran64
from apps.auth1.serializers import FaceLoginSerializer
from apps.auth1.serializers import (CodeLoginSerializer, LoginSerializer,
PwResetSerializer, SecretLoginSerializer, SendCodeSerializer, WxCodeSerializer, RegisterSerializer, EmailCodeSerializer, EmailCodeLoginSerializer)
from apps.system.models import User
from rest_framework_simplejwt.views import TokenObtainPairView
from apps.auth1.authentication import get_user_by_username_or
# Create your views here.
def get_tokens_for_user(user: User):
refresh = RefreshToken.for_user(user)
return {
'refresh': str(refresh),
'access': str(refresh.access_token),
}
class TokenLoginView(CreateAPIView):
"""
账户名/密码获取token
账户名/密码获取token
"""
authentication_classes = []
permission_classes = []
serializer_class = LoginSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
vdata = serializer.validated_data
password_check = vdata.get('password_check', True)
# 校验密码复杂度
is_ok = validate_password(vdata.get('password'))
if is_ok is False and password_check:
raise ParseError('密码校验失败, 请更换登录方式并修改密码')
user, _ = get_user_by_username_or(vdata.get('username'))
if user and cache.get(f"login_attempt_{user.id}", 0) > 3:
raise ParseError("登录失败次数过多,请稍后再试")
user = authenticate(username=vdata.get('username'),
password=vdata.get('password'))
if user is not None:
token_dict = get_tokens_for_user(user)
token_dict['password_ok'] = is_ok
return Response(token_dict)
raise ParseError(**USERNAME_OR_PASSWORD_WRONG)
class TokenBlackView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request, *args, **kwargs):
"""
Token拉黑
Token拉黑
"""
return Response(status=status.HTTP_200_OK)
class LoginView(CreateAPIView):
"""
Session登录
账户密码Session登录
"""
authentication_classes = []
permission_classes = []
serializer_class = LoginSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
vdata = serializer.validated_data
user = authenticate(username=vdata.get('username'),
password=vdata.get('password'))
if user is not None:
login(request, user)
return Response(status=201)
raise ParseError(**USERNAME_OR_PASSWORD_WRONG)
class LogoutView(APIView):
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
"""
退出登录
退出登录
"""
logout(request)
return Response()
class WxmpLogin(CreateAPIView):
"""微信小程序自动登录
微信小程序自动登录
"""
authentication_classes = []
permission_classes = []
serializer_class = WxCodeSerializer
def post(self, request):
code = request.data['code']
info = wxmpClient.get_basic_info(code=code)
openid = info['openid']
session_key = info['session_key']
try:
user = User.objects.get(wxmp_openid=openid)
ret = get_tokens_for_user(user)
ret['wxmp_session_key'] = session_key
ret['wxmp_openid'] = openid
cache.set(code, ret, 60*5)
return Response(ret)
except Exception:
return Response({'wxmp_openid': openid, 'wxmp_session_key': session_key}, status=400)
class WxLogin(CreateAPIView):
"""微信公众号授权登录
微信公众号授权登录
"""
authentication_classes = []
permission_classes = []
serializer_class = WxCodeSerializer
def post(self, request):
code = request.data['code']
info = wxClient.get_basic_info(code=code)
openid = info['openid']
access = info['access_token']
try:
user = User.objects.get(wx_openid=openid)
ret = get_tokens_for_user(user)
ret['wx_token'] = access
ret['wx_openid'] = openid
cache.set(code, ret, 60*5)
return Response(ret)
except Exception:
return Response({'wx_openid': openid, 'wx_token': access}, status=400)
class SendCode(CreateAPIView):
authentication_classes = []
permission_classes = []
serializer_class = SendCodeSerializer
def post(self, request):
"""短信验证码发送
短信验证码发送
"""
from apps.utils.sms import send_sms
phone = request.data['phone']
code = rannum(6)
is_ok, _ = send_sms(phone, 505, {'code': code})
cache.set(phone, code, 60*5)
if is_ok:
return Response()
raise ParseError('短信发送失败,请确认手机号')
class CodeLogin(CreateAPIView):
"""手机验证码登录
手机验证码登录
"""
authentication_classes = []
permission_classes = []
serializer_class = CodeLoginSerializer
def post(self, request):
phone = request.data['phone']
code = request.data['code']
check_phone_code(phone, code)
user = User.objects.filter(phone=phone).first()
if user:
ret = get_tokens_for_user(user)
return Response(ret)
raise ParseError('账户不存在或已禁用')
class EmailCodeLogin(CreateAPIView):
"""邮箱验证码登录
邮箱验证码登录
"""
authentication_classes = []
permission_classes = []
serializer_class = EmailCodeLoginSerializer
def post(self, request):
email = request.data['email']
code = request.data['code']
check_email_code(email, code)
user = User.objects.filter(username=email).first()
if user:
ret = get_tokens_for_user(user)
return Response(ret)
raise ParseError('账户不存在或已禁用')
class SecretLogin(CreateAPIView):
"""App端密钥登录
App端密钥登录
"""
authentication_classes = []
permission_classes = []
serializer_class = SecretLoginSerializer
def post(self, request):
sr = SecretLoginSerializer(data=request.data)
sr.is_valid(raise_exception=True)
vdata = sr.validated_data
username = vdata['username']
secret = vdata['secret']
user = User.objects.filter(Q(username=username) | Q(phone=username) | Q(
employee__id_number=username)).filter(secret=secret).first()
if user:
ret = get_tokens_for_user(user)
return Response(ret)
raise ParseError('登录失败')
class PwResetView(CreateAPIView):
"""重置密码
重置密码
"""
authentication_classes = []
permission_classes = []
serializer_class = PwResetSerializer
def post(self, request):
sr = PwResetSerializer(data=request.data)
sr.is_valid(raise_exception=True)
vdata = sr.validated_data
check_phone_code(vdata['phone'], vdata['code'])
user = User.objects.filter(phone=vdata['phone']).first()
if user:
user.password = make_password(vdata['password'])
user.save()
return Response()
raise ParseError('账户不存在或已禁用')
class FaceLoginView(CreateAPIView):
"""人脸识别登录
人脸识别登录
"""
authentication_classes = []
permission_classes = []
serializer_class = FaceLoginSerializer
def create(self, request, *args, **kwargs):
"""
人脸识别登录
"""
from apps.hrm.services import HrmService
base64_data = base64.urlsafe_b64decode(tran64(request.data.get('base64').replace(' ', '+')))
ep, msg = HrmService.face_compare_from_base64(base64_data)
if ep:
if ep.user and ep.user.is_active and ep.user.is_deleted is False:
user = ep.user
refresh = RefreshToken.for_user(ep.user)
# # 可设为在岗
# now = timezone.now()
# now_local = timezone.localtime()
# if 8<=now_local.hour<=17:
# ins, created = ClockRecord.objects.get_or_create(
# create_by = user, create_time__hour__range = [8,18],
# create_time__year=now_local.year, create_time__month=now_local.month,
# create_time__day=now_local.day,
# defaults={
# 'type':ClockRecord.ClOCK_WORK1,
# 'create_by':user,
# 'create_time':now
# })
# # 设为在岗
# if created:
# Employee.objects.filter(user=user).update(is_atwork=True, last_check_time=now)
return Response({
'refresh': str(refresh),
'access': str(refresh.access_token),
'username':user.username,
'name':user.name
})
else:
raise ParseError('账户不存在或不可用')
raise ParseError(msg)
class EmailCodeView(CreateAPIView):
"""发送邮箱验证码
发送邮箱验证码
"""
authentication_classes = []
permission_classes = []
serializer_class = EmailCodeSerializer
def post(self, request):
"""邮箱验证码发送
邮箱验证码发送
"""
from apps.utils.tasks import send_mail_task
email = request.data['email']
code = rannum(6)
# 缓存验证码有效期5分钟
cache_key = f'email_code_{email}'
cache.set(cache_key, code, 60 * 5)
# 使用 send_mail_task 发送验证码邮件
send_mail_task.delay(
subject='登录验证码',
message=f'您的验证码是:{code}5分钟内有效。如非本人操作请忽略此邮件。',
recipient_list=[email]
)
return Response({'message': '验证码已发送'})
class EmailCodeLogin(CreateAPIView):
"""邮箱验证码登录
邮箱验证码登录
"""
authentication_classes = []
permission_classes = []
serializer_class = EmailCodeLoginSerializer
def post(self, request):
email = request.data['email']
code = request.data['code']
# 验证邮箱验证码
cache_key = f'email_code_{email}'
cached_code = cache.get(cache_key)
if not cached_code or cached_code != code:
raise ParseError('验证码错误或已过期')
# 查找用户
user = User.objects.filter(username=email).first()
if user:
ret = get_tokens_for_user(user)
# 删除已使用的验证码
cache.delete(cache_key)
return Response(ret)
raise ParseError('账户不存在或已禁用')