feat: base 对登录失败次数过多做限制

This commit is contained in:
caoqianming 2024-12-05 14:47:38 +08:00
parent 15d37604ea
commit 1042deb817
2 changed files with 27 additions and 7 deletions

View File

@ -1,9 +1,20 @@
from django.contrib.auth.backends import ModelBackend
from django.db.models import Q
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.core.exceptions import ValidationError
UserModel = get_user_model()
def get_user_by_username_or(username: str):
try:
user = UserModel._default_manager.get(
Q(username=username) | Q(phone=username) | Q(employee__id_number=username))
return user, ""
except UserModel.DoesNotExist:
return None, 'not_exist'
except Exception as e:
return None, str(e)
class CustomBackend(ModelBackend):
def authenticate(self, request, username=None, password=None, **kwargs):
@ -11,13 +22,15 @@ class CustomBackend(ModelBackend):
username = kwargs.get(UserModel.USERNAME_FIELD)
if username is None or password is None:
return
try:
user = UserModel._default_manager.get(
Q(username=username) | Q(phone=username) | Q(employee__id_number=username))
except UserModel.DoesNotExist:
# Run the default password hasher once to reduce the timing
# difference between an existing and a nonexistent user (#20760).
user, msg = get_user_by_username_or(username)
if msg == 'not_exist':
UserModel().set_password(password)
else:
if user:
key_login_attempt = f"login_attempt_{user.id}"
if user.check_password(password) and self.user_can_authenticate(user):
cache.delete(key_login_attempt)
return user
else:
login_attempt = cache.get(key_login_attempt, 0)
cache.set(key_login_attempt, login_attempt + 1, 60)

View File

@ -26,6 +26,7 @@ from apps.auth1.serializers import (CodeLoginSerializer, LoginSerializer,
PwResetSerializer, SecretLoginSerializer, SendCodeSerializer, WxCodeSerializer)
from apps.system.models import User
from rest_framework_simplejwt.views import TokenObtainPairView
from apps.auth1.authentication import get_user_by_username_or
# Create your views here.
@ -56,8 +57,14 @@ class TokenLoginView(CreateAPIView):
is_ok = validate_password(vdata.get('password'))
if is_ok is False and password_check:
raise ParseError('密码校验失败, 请更换登录方式并修改密码')
user, _ = get_user_by_username_or(vdata.get('username'))
if user and cache.get(f"login_attempt_{user.id}", 0) > 3:
raise ParseError("登录失败次数过多,请稍后再试")
user = authenticate(username=vdata.get('username'),
password=vdata.get('password'))
if user is not None:
token_dict = get_tokens_for_user(user)
token_dict['password_ok'] = is_ok