zcbot/web/auth.py

334 lines
13 KiB
Python

"""Auth: 两条 login 路径,签同款 JWT(§7 D' 过渡形态)。
模型:
- `PLATFORM_KEY` env(必填):platform 服务端 / zcbot 间机器对机器共享密钥
- `JWT_SECRET` env(必填):HS256 签 token;泄漏 = 任意伪造,与 PLATFORM_KEY 同级保护
- `POST /v1/auth/login {user_id, platform_key}` → JWT(platform 服务端用,自带 user_id 注入)
- `POST /v1/auth/login_password {email, password}` → JWT
(dev SPA 用,users.email UNIQUE + users.password_hash bcrypt 校验;0005 加 UNIQUE)
- 后续 `/v1/*`(除 /healthz、/docs、/openapi.json、/、/v1/auth/login*)走 `Authorization: Bearer <jwt>`
- Token TTL: `ZCBOT_JWT_TTL_SECONDS` env 覆盖,默 7 天
发用户:`.venv/Scripts/python.exe main.py user add --email X --password Y`,后台直接
bcrypt + INSERT users;撤用户 `DELETE FROM users WHERE email=...`(messages CASCADE,
tasks 通过 FK 拦,要先 DELETE 该 user 的 tasks)。
OIDC(D')替换:只动 `/v1/auth/login` 实现(校验 ID token 代替 key);**邮箱密码路径
长期保留,与 OIDC 并存**(自有账号 + 同事试用不依赖外部 IdP)。
"""
from __future__ import annotations
import os
import time
from typing import Optional
from uuid import UUID
import bcrypt
import jwt
from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import select
from core.storage import session_scope
from core.storage.models import User
_DEFAULT_TTL_SECONDS = 7 * 24 * 3600 # 7d
class AuthConfig:
"""App 启动时一次性读 env + 校验存在性;create_app 调 `AuthConfig.from_env()` 拿到。"""
def __init__(
self,
platform_key: str,
jwt_secret: str,
ttl_seconds: int,
admin_token: Optional[str] = None,
):
self.platform_key = platform_key
self.jwt_secret = jwt_secret
self.ttl_seconds = ttl_seconds
# ZCBOT_ADMIN_TOKEN 未设 → None;此时 /v1/auth/admin/create_user 返 503(功能关闭)。
# 这是个独立的共享口令,跟 PLATFORM_KEY / JWT_SECRET 分开 —— 它是"管理员发用户"
# 的单一钥匙,不参与 platform 机器对机器 / token 签名路径。
self.admin_token = admin_token
@classmethod
def from_env(cls) -> "AuthConfig":
key = os.environ.get("PLATFORM_KEY", "").strip()
secret = os.environ.get("JWT_SECRET", "").strip()
missing = []
if not key:
missing.append("PLATFORM_KEY")
if not secret:
missing.append("JWT_SECRET")
if missing:
raise RuntimeError(
f"{', '.join(missing)} env not set. zcbot web requires both:\n"
" PLATFORM_KEY=<shared secret between platform and zcbot>\n"
" JWT_SECRET=<HMAC secret used to sign session tokens>"
)
ttl_raw = os.environ.get("ZCBOT_JWT_TTL_SECONDS", "").strip()
try:
ttl = int(ttl_raw) if ttl_raw else _DEFAULT_TTL_SECONDS
except ValueError:
raise RuntimeError(
f"ZCBOT_JWT_TTL_SECONDS must be int seconds, got {ttl_raw!r}"
)
if ttl <= 0:
raise RuntimeError(f"ZCBOT_JWT_TTL_SECONDS must be > 0, got {ttl}")
admin = os.environ.get("ZCBOT_ADMIN_TOKEN", "").strip() or None
return cls(
platform_key=key, jwt_secret=secret, ttl_seconds=ttl, admin_token=admin,
)
def hash_password(password: str) -> str:
"""bcrypt 哈希(默认 cost=12)。返 ASCII str(bcrypt 标准格式 `$2b$12$...`),直接落 DB。"""
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("ascii")
def verify_password(password: str, stored_hash: str) -> bool:
"""常数时间比对。stored_hash 是 DB 里 users.password_hash 列。"""
try:
return bcrypt.checkpw(password.encode("utf-8"), stored_hash.encode("ascii"))
except (ValueError, TypeError):
# stored_hash 格式坏(手工 INSERT 乱写)/ 不是 ASCII → 视作不匹配,别 500
return False
def resolve_user_by_email(email: str, password: str) -> Optional[tuple[UUID, str]]:
"""email + password → `(user_id, email)`;不匹配返 None(空表 / 邮箱不存在 / 密码错都走这条)。
单次 SELECT + bcrypt verify;不缓存,改密码 / 删账号下次 login 立即生效。
bcrypt.checkpw 本身是 constant-time;查不到也要跑一次 dummy hash 防 timing oracle
(5 人级别用户无所谓,但顺手做)。
"""
e = (email or "").strip().lower()
if not e or not password:
return None
with session_scope() as s:
row = s.execute(
select(User.user_id, User.email, User.password_hash).where(User.email == e)
).first()
if row is None:
# 避免 timing oracle:用户不存在时也跑一次同等开销的 verify
bcrypt.checkpw(b"x", b"$2b$12$" + b"." * 53)
return None
if not row.password_hash:
return None # 用户存在但没设密码(platform_key 入口建的)
if not verify_password(password, row.password_hash):
return None
return row.user_id, row.email
def mint_token(cfg: AuthConfig, user_id: UUID) -> tuple[str, int]:
"""签 JWT。返回 `(token, exp_unix_seconds)`。"""
now = int(time.time())
exp = now + cfg.ttl_seconds
payload = {"sub": str(user_id), "iat": now, "exp": exp}
token = jwt.encode(payload, cfg.jwt_secret, algorithm="HS256")
return token, exp
def verify_token(cfg: AuthConfig, token: str) -> UUID:
"""验签 + 取 sub。失败抛 HTTPException 401。"""
try:
payload = jwt.decode(token, cfg.jwt_secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise HTTPException(401, "token expired")
except jwt.InvalidTokenError as e:
raise HTTPException(401, f"invalid token: {e}")
sub = payload.get("sub", "")
try:
return UUID(sub)
except (ValueError, TypeError):
raise HTTPException(401, f"invalid sub in token: {sub!r}")
class UserCreateError(Exception):
"""create_user 失败:`code` 是 HTTP-style 语义码('invalid_email' / 'weak_password' /
'email_taken' / 'db_error'),`message` 是面向操作者的简短描述。
web 路由 / CLI 都用同一份;调用方决定是 raise HTTPException 还是 click.echo + exit。
"""
def __init__(self, code: str, message: str):
super().__init__(message)
self.code = code
self.message = message
def create_user(
email: str, password: str, user_id: Optional[UUID] = None, role: str = "user"
) -> tuple[UUID, str]:
"""新建用户:bcrypt(password) + INSERT users。
校验:email 含 @ + 非空;password ≥ 6 字符;role ∈ {'user','admin'}。`user_id` 不传
→ 随机 UUID4。冲突:email UNIQUE / user_id PK 撞 → `UserCreateError('email_taken'
| 'db_error')`。返回 `(user_id, normalized_email)`,供调用方记 log / 提示。
"""
from uuid import uuid4 as _uuid4
from sqlalchemy.exc import IntegrityError
e = (email or "").strip().lower()
if not e or "@" not in e:
raise UserCreateError("invalid_email", f"email 不合法: {email!r}")
if not password or len(password) < 6:
raise UserCreateError("weak_password", "password 至少 6 字符")
r = (role or "user").strip().lower()
if r not in ("user", "admin"):
raise UserCreateError("invalid_role", f"role 必须是 user / admin,收到 {role!r}")
uid = user_id or _uuid4()
try:
with session_scope() as s:
s.add(User(user_id=uid, email=e, password_hash=hash_password(password), role=r))
except IntegrityError as ex:
# email UNIQUE 撞最常见;user_id PK 撞理论上几乎不可能(uuid4)但也归一到 email_taken
# 之外 — 这里只对 email 报 409 语义,其他 DB 异常归 db_error
msg = str(getattr(ex, "orig", ex))
code = "email_taken" if "users_email" in msg or "email" in msg.lower() else "db_error"
raise UserCreateError(code, f"INSERT 失败: {msg}")
except Exception as ex:
raise UserCreateError("db_error", f"INSERT 失败: {type(ex).__name__}: {ex}")
return uid, e
def change_password(user_id: UUID, old_password: str, new_password: str) -> None:
"""改密码:验旧密码 → 校验新密码 → bcrypt 重哈希写回 users.password_hash。
错误归一到 `UserCreateError.code`(复用同一份 code 载体,web 路由映射成 HTTP):
- 'user_not_found' — user_id 无对应行(JWT 有效却查不到,极少见)
- 'no_password' — 该用户从没设过密码(platform_key 入口建的占位行),无从校验旧密码
- 'wrong_password' — 旧密码不匹配
- 'weak_password' — 新密码 < 6 字符
成功返 None;写在 session_scope 内,退出时一次 commit,下次 login 立即生效。
"""
if not new_password or len(new_password) < 6:
raise UserCreateError("weak_password", "新密码至少 6 字符")
with session_scope() as s:
user = s.execute(select(User).where(User.user_id == user_id)).scalar_one_or_none()
if user is None:
raise UserCreateError("user_not_found", "用户不存在")
if not user.password_hash:
raise UserCreateError("no_password", "该账号未设置密码,无法修改")
if not verify_password(old_password, user.password_hash):
raise UserCreateError("wrong_password", "旧密码不正确")
user.password_hash = hash_password(new_password)
def get_user_role(user_id: UUID) -> Optional[str]:
"""查 users.role。user_id 无对应行返 None;有行返 'user' / 'admin' 等。
单次 SELECT,不缓存 —— 改 role 下次请求立即生效(make_require_admin 每请求查一次,
管理员端点流量低,无需放进 JWT,也避免老 token 带过期 role)。
"""
with session_scope() as s:
row = s.execute(
select(User.role).where(User.user_id == user_id)
).first()
return row[0] if row is not None else None
def set_user_role(email: str, role: str) -> tuple[UUID, str]:
"""按 email 改 users.role。返 `(user_id, normalized_email)`。
错误归一到 `UserCreateError.code`:
- 'invalid_role' — role 不在 {'user', 'admin'}
- 'user_not_found' — email 查无此人
成功在 session_scope 内一次 commit,下次请求立即生效。
"""
r = (role or "").strip().lower()
if r not in ("user", "admin"):
raise UserCreateError("invalid_role", f"role 必须是 user / admin,收到 {role!r}")
e = (email or "").strip().lower()
with session_scope() as s:
user = s.execute(select(User).where(User.email == e)).scalar_one_or_none()
if user is None:
raise UserCreateError("user_not_found", f"email 查无此人: {email!r}")
user.role = r
return user.user_id, e
def ensure_user_row(user_id: UUID) -> None:
"""幂等 INSERT 一行 users 占位(`ON CONFLICT DO NOTHING`)。
platform_key 登录入口用 — 平台直传的 user_id 可能是 zcbot 没见过的,首次登录建行
避免下游 FK 失败。邮箱密码登录走 `main.py user add` 已经写好 users 行,不走这条。
"""
from sqlalchemy.dialects.postgresql import insert
stmt = insert(User).values(user_id=user_id).on_conflict_do_nothing(
index_elements=["user_id"]
)
with session_scope() as s:
s.execute(stmt)
# ──────────────── FastAPI Depends ────────────────
# auto_error=False 让我们自己出 401 文案,而不是 FastAPI 默认 "Not authenticated"
_bearer = HTTPBearer(auto_error=False)
def make_require_user(cfg: AuthConfig):
"""工厂:返回一个 Depends 函数,闭包持有 cfg(避免 app 启动后改 env)。
用法:
require_user = make_require_user(cfg)
@app.get("/v1/...", dependencies=[Depends(require_user)])
def route(user_id: UUID = Depends(require_user)):
...
实际使用建议直接 `user_id: UUID = Depends(require_user)`,既验签又拿到 user_id。
"""
async def require_user(
creds: Optional[HTTPAuthorizationCredentials] = Depends(_bearer),
) -> UUID:
if creds is None or not creds.credentials:
raise HTTPException(401, "missing Authorization: Bearer <token>")
if creds.scheme.lower() != "bearer":
raise HTTPException(401, f"unsupported auth scheme: {creds.scheme!r}")
return verify_token(cfg, creds.credentials)
return require_user
def make_require_admin(cfg: AuthConfig):
"""工厂:返回一个 Depends 函数,先验 JWT(同 require_user)再查 users.role=='admin'
用于 /v1/admin/* 管理端点:
require_admin = make_require_admin(cfg)
@app.get("/v1/admin/...", )
def route(user_id: UUID = Depends(require_admin)): ...
非 admin → 403(token 有效但无权限);DB 查 role(不放进 JWT),改 role 立即生效。
"""
_require_user = make_require_user(cfg)
async def require_admin(
creds: Optional[HTTPAuthorizationCredentials] = Depends(_bearer),
) -> UUID:
user_id = await _require_user(creds)
if get_user_role(user_id) != "admin":
raise HTTPException(403, "admin role required")
return user_id
return require_admin
__all__ = [
"AuthConfig",
"UserCreateError",
"change_password",
"create_user",
"ensure_user_row",
"get_user_role",
"hash_password",
"make_require_admin",
"make_require_user",
"mint_token",
"resolve_user_by_email",
"set_user_role",
"verify_password",
"verify_token",
]