368 lines
11 KiB
Python
Executable File
368 lines
11 KiB
Python
Executable File
|
||
from rest_framework.exceptions import ParseError
|
||
from rest_framework.views import APIView
|
||
from rest_framework.response import Response
|
||
from rest_framework import status
|
||
from django.contrib.auth import authenticate, login, logout
|
||
from rest_framework.generics import CreateAPIView
|
||
from rest_framework.permissions import IsAuthenticated
|
||
from apps.auth1.errors import USERNAME_OR_PASSWORD_WRONG
|
||
from rest_framework_simplejwt.tokens import RefreshToken
|
||
from django.core.cache import cache
|
||
from apps.auth1.services import check_phone_code
|
||
|
||
from apps.utils.tools import rannum
|
||
from apps.utils.wxmp import wxmpClient
|
||
from apps.utils.wx import wxClient
|
||
from django.contrib.auth.hashers import make_password
|
||
from django.db.models import Q
|
||
from apps.auth1.services import validate_password
|
||
import base64
|
||
from apps.utils.tools import tran64
|
||
from apps.auth1.serializers import FaceLoginSerializer
|
||
|
||
|
||
from apps.auth1.serializers import (CodeLoginSerializer, LoginSerializer,
|
||
PwResetSerializer, SecretLoginSerializer, SendCodeSerializer, WxCodeSerializer, RegisterSerializer, EmailCodeSerializer, EmailCodeLoginSerializer)
|
||
from apps.system.models import User
|
||
from rest_framework_simplejwt.views import TokenObtainPairView
|
||
from apps.auth1.authentication import get_user_by_username_or
|
||
|
||
# Create your views here.
|
||
|
||
|
||
def get_tokens_for_user(user: User):
|
||
refresh = RefreshToken.for_user(user)
|
||
return {
|
||
'refresh': str(refresh),
|
||
'access': str(refresh.access_token),
|
||
}
|
||
|
||
class TokenLoginView(CreateAPIView):
|
||
"""
|
||
账户名/密码获取token
|
||
|
||
账户名/密码获取token
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = LoginSerializer
|
||
|
||
def create(self, request, *args, **kwargs):
|
||
serializer = self.get_serializer(data=request.data)
|
||
serializer.is_valid(raise_exception=True)
|
||
vdata = serializer.validated_data
|
||
password_check = vdata.get('password_check', True)
|
||
# 校验密码复杂度
|
||
is_ok = validate_password(vdata.get('password'))
|
||
if is_ok is False and password_check:
|
||
raise ParseError('密码校验失败, 请更换登录方式并修改密码')
|
||
|
||
user, _ = get_user_by_username_or(vdata.get('username'))
|
||
if user and cache.get(f"login_attempt_{user.id}", 0) > 3:
|
||
raise ParseError("登录失败次数过多,请稍后再试")
|
||
|
||
user = authenticate(username=vdata.get('username'),
|
||
password=vdata.get('password'))
|
||
|
||
if user is not None:
|
||
token_dict = get_tokens_for_user(user)
|
||
token_dict['password_ok'] = is_ok
|
||
return Response(token_dict)
|
||
raise ParseError(**USERNAME_OR_PASSWORD_WRONG)
|
||
|
||
class TokenBlackView(APIView):
|
||
permission_classes = [IsAuthenticated]
|
||
|
||
def post(self, request, *args, **kwargs):
|
||
"""
|
||
Token拉黑
|
||
|
||
|
||
Token拉黑
|
||
"""
|
||
return Response(status=status.HTTP_200_OK)
|
||
|
||
|
||
class LoginView(CreateAPIView):
|
||
"""
|
||
Session登录
|
||
|
||
|
||
账户密码Session登录
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = LoginSerializer
|
||
|
||
def create(self, request, *args, **kwargs):
|
||
serializer = self.get_serializer(data=request.data)
|
||
serializer.is_valid(raise_exception=True)
|
||
vdata = serializer.validated_data
|
||
user = authenticate(username=vdata.get('username'),
|
||
password=vdata.get('password'))
|
||
if user is not None:
|
||
login(request, user)
|
||
return Response(status=201)
|
||
raise ParseError(**USERNAME_OR_PASSWORD_WRONG)
|
||
|
||
|
||
class LogoutView(APIView):
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
|
||
def post(self, request, *args, **kwargs):
|
||
"""
|
||
退出登录
|
||
|
||
|
||
退出登录
|
||
"""
|
||
logout(request)
|
||
return Response()
|
||
|
||
|
||
class WxmpLogin(CreateAPIView):
|
||
"""微信小程序自动登录
|
||
|
||
微信小程序自动登录
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = WxCodeSerializer
|
||
|
||
def post(self, request):
|
||
code = request.data['code']
|
||
info = wxmpClient.get_basic_info(code=code)
|
||
openid = info['openid']
|
||
session_key = info['session_key']
|
||
try:
|
||
user = User.objects.get(wxmp_openid=openid)
|
||
ret = get_tokens_for_user(user)
|
||
ret['wxmp_session_key'] = session_key
|
||
ret['wxmp_openid'] = openid
|
||
cache.set(code, ret, 60*5)
|
||
return Response(ret)
|
||
except Exception:
|
||
return Response({'wxmp_openid': openid, 'wxmp_session_key': session_key}, status=400)
|
||
|
||
|
||
class WxLogin(CreateAPIView):
|
||
"""微信公众号授权登录
|
||
|
||
微信公众号授权登录
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = WxCodeSerializer
|
||
|
||
def post(self, request):
|
||
code = request.data['code']
|
||
info = wxClient.get_basic_info(code=code)
|
||
openid = info['openid']
|
||
access = info['access_token']
|
||
try:
|
||
user = User.objects.get(wx_openid=openid)
|
||
ret = get_tokens_for_user(user)
|
||
ret['wx_token'] = access
|
||
ret['wx_openid'] = openid
|
||
cache.set(code, ret, 60*5)
|
||
return Response(ret)
|
||
except Exception:
|
||
return Response({'wx_openid': openid, 'wx_token': access}, status=400)
|
||
|
||
|
||
class SendCode(CreateAPIView):
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = SendCodeSerializer
|
||
|
||
def post(self, request):
|
||
"""短信验证码发送
|
||
|
||
短信验证码发送
|
||
"""
|
||
from apps.utils.sms import send_sms
|
||
phone = request.data['phone']
|
||
code = rannum(6)
|
||
is_ok, _ = send_sms(phone, 505, {'code': code})
|
||
cache.set(phone, code, 60*5)
|
||
if is_ok:
|
||
return Response()
|
||
raise ParseError('短信发送失败,请确认手机号')
|
||
|
||
|
||
class CodeLogin(CreateAPIView):
|
||
"""手机验证码登录
|
||
|
||
手机验证码登录
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = CodeLoginSerializer
|
||
|
||
def post(self, request):
|
||
phone = request.data['phone']
|
||
code = request.data['code']
|
||
check_phone_code(phone, code)
|
||
user = User.objects.filter(phone=phone).first()
|
||
if user:
|
||
ret = get_tokens_for_user(user)
|
||
return Response(ret)
|
||
raise ParseError('账户不存在或已禁用')
|
||
|
||
|
||
class SecretLogin(CreateAPIView):
|
||
"""App端密钥登录
|
||
|
||
App端密钥登录
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = SecretLoginSerializer
|
||
|
||
def post(self, request):
|
||
sr = SecretLoginSerializer(data=request.data)
|
||
sr.is_valid(raise_exception=True)
|
||
vdata = sr.validated_data
|
||
username = vdata['username']
|
||
secret = vdata['secret']
|
||
user = User.objects.filter(Q(username=username) | Q(phone=username) | Q(
|
||
employee__id_number=username)).filter(secret=secret).first()
|
||
if user:
|
||
ret = get_tokens_for_user(user)
|
||
return Response(ret)
|
||
raise ParseError('登录失败')
|
||
|
||
|
||
class PwResetView(CreateAPIView):
|
||
"""重置密码
|
||
|
||
重置密码
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = PwResetSerializer
|
||
|
||
def post(self, request):
|
||
sr = PwResetSerializer(data=request.data)
|
||
sr.is_valid(raise_exception=True)
|
||
vdata = sr.validated_data
|
||
check_phone_code(vdata['phone'], vdata['code'])
|
||
user = User.objects.filter(phone=vdata['phone']).first()
|
||
if user:
|
||
user.password = make_password(vdata['password'])
|
||
user.save()
|
||
return Response()
|
||
raise ParseError('账户不存在或已禁用')
|
||
|
||
|
||
class FaceLoginView(CreateAPIView):
|
||
"""人脸识别登录
|
||
|
||
人脸识别登录
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = FaceLoginSerializer
|
||
|
||
|
||
def create(self, request, *args, **kwargs):
|
||
"""
|
||
人脸识别登录
|
||
"""
|
||
from apps.hrm.services import HrmService
|
||
base64_data = base64.urlsafe_b64decode(tran64(request.data.get('base64').replace(' ', '+')))
|
||
ep, msg = HrmService.face_compare_from_base64(base64_data)
|
||
if ep:
|
||
if ep.user and ep.user.is_active and ep.user.is_deleted is False:
|
||
user = ep.user
|
||
refresh = RefreshToken.for_user(ep.user)
|
||
# # 可设为在岗
|
||
# now = timezone.now()
|
||
# now_local = timezone.localtime()
|
||
# if 8<=now_local.hour<=17:
|
||
# ins, created = ClockRecord.objects.get_or_create(
|
||
# create_by = user, create_time__hour__range = [8,18],
|
||
# create_time__year=now_local.year, create_time__month=now_local.month,
|
||
# create_time__day=now_local.day,
|
||
# defaults={
|
||
# 'type':ClockRecord.ClOCK_WORK1,
|
||
# 'create_by':user,
|
||
# 'create_time':now
|
||
# })
|
||
# # 设为在岗
|
||
# if created:
|
||
# Employee.objects.filter(user=user).update(is_atwork=True, last_check_time=now)
|
||
|
||
return Response({
|
||
'refresh': str(refresh),
|
||
'access': str(refresh.access_token),
|
||
'username':user.username,
|
||
'name':user.name
|
||
})
|
||
else:
|
||
raise ParseError('账户不存在或不可用')
|
||
raise ParseError(msg)
|
||
|
||
|
||
class EmailCodeView(CreateAPIView):
|
||
"""发送邮箱验证码
|
||
|
||
发送邮箱验证码
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = EmailCodeSerializer
|
||
|
||
def post(self, request):
|
||
"""邮箱验证码发送
|
||
|
||
邮箱验证码发送
|
||
"""
|
||
from apps.utils.tasks import send_mail_task
|
||
email = request.data['email']
|
||
code = rannum(6)
|
||
|
||
# 缓存验证码,有效期5分钟
|
||
cache_key = f'email_code_{email}'
|
||
cache.set(cache_key, code, 60 * 5)
|
||
|
||
# 使用 send_mail_task 发送验证码邮件
|
||
send_mail_task.delay(
|
||
subject='登录验证码',
|
||
message=f'您的验证码是:{code},5分钟内有效。如非本人操作,请忽略此邮件。',
|
||
recipient_list=[email]
|
||
)
|
||
|
||
return Response({'message': '验证码已发送'})
|
||
|
||
|
||
class EmailCodeLogin(CreateAPIView):
|
||
"""邮箱验证码登录
|
||
|
||
邮箱验证码登录
|
||
"""
|
||
authentication_classes = []
|
||
permission_classes = []
|
||
serializer_class = EmailCodeLoginSerializer
|
||
|
||
def post(self, request):
|
||
email = request.data['email']
|
||
code = request.data['code']
|
||
|
||
# 验证邮箱验证码
|
||
cache_key = f'email_code_{email}'
|
||
cached_code = cache.get(cache_key)
|
||
if not cached_code or cached_code != code:
|
||
raise ParseError('验证码错误或已过期')
|
||
|
||
# 查找用户
|
||
user = User.objects.filter(username=email).first()
|
||
if user:
|
||
ret = get_tokens_for_user(user)
|
||
# 删除已使用的验证码
|
||
cache.delete(cache_key)
|
||
return Response(ret)
|
||
raise ParseError('账户不存在或已禁用')
|