"""Auth: 两条 login 路径,签同款 JWT(§7 D' 过渡形态)。 模型: - `PLATFORM_KEY` env(必填):platform 服务端 / zcbot 间机器对机器共享密钥 - `JWT_SECRET` env(必填):HS256 签 token;泄漏 = 任意伪造,与 PLATFORM_KEY 同级保护 - **`invites` 表**(0005)dev SPA 邀请码登录后端:token PK / name UNIQUE / created_at; user_id 由 `uuid5(_INVITE_NAMESPACE, name)` 推导,重启稳定;改 name 会换身份(数据看不到)。 表空 → `/v1/auth/login_invite` 全 403(发码:`INSERT INTO invites(token, name) VALUES(...)`) - `POST /v1/auth/login {user_id, platform_key}` → JWT(platform 服务端用,自带 user_id 注入) - `POST /v1/auth/login_invite {token}` → JWT(dev SPA 用,name → user_id 服务端推导) - 后续 `/v1/*`(除 /healthz、/docs、/openapi.json、/、/v1/auth/login*)走 `Authorization: Bearer ` - Token TTL: `ZCBOT_JWT_TTL_SECONDS` env 覆盖,默 7 天 OIDC(D')替换:只动 `/v1/auth/login` 实现(校验 ID token 代替 key);invite 路径同期可下线。 """ from __future__ import annotations import os import time from typing import Optional from uuid import UUID, uuid5 import jwt from fastapi import Depends, HTTPException from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy import select from core.storage import session_scope from core.storage.models import Invite, User _DEFAULT_TTL_SECONDS = 7 * 24 * 3600 # 7d # uuid5 命名空间 — 别改,改了所有邀请码用户身份漂移、历史数据全丢 _INVITE_NAMESPACE = UUID("9b5e7a2a-3c8e-5f4d-8c1a-f0e6b9d7c3a1") class AuthConfig: """App 启动时一次性读 env + 校验存在性;create_app 调 `AuthConfig.from_env()` 拿到。""" def __init__(self, platform_key: str, jwt_secret: str, ttl_seconds: int): self.platform_key = platform_key self.jwt_secret = jwt_secret self.ttl_seconds = ttl_seconds @classmethod def from_env(cls) -> "AuthConfig": key = os.environ.get("PLATFORM_KEY", "").strip() secret = os.environ.get("JWT_SECRET", "").strip() missing = [] if not key: missing.append("PLATFORM_KEY") if not secret: missing.append("JWT_SECRET") if missing: raise RuntimeError( f"{', '.join(missing)} env not set. zcbot web requires both:\n" " PLATFORM_KEY=\n" " JWT_SECRET=" ) ttl_raw = os.environ.get("ZCBOT_JWT_TTL_SECONDS", "").strip() try: ttl = int(ttl_raw) if ttl_raw else _DEFAULT_TTL_SECONDS except ValueError: raise RuntimeError( f"ZCBOT_JWT_TTL_SECONDS must be int seconds, got {ttl_raw!r}" ) if ttl <= 0: raise RuntimeError(f"ZCBOT_JWT_TTL_SECONDS must be > 0, got {ttl}") return cls(platform_key=key, jwt_secret=secret, ttl_seconds=ttl) def resolve_invite(token: str) -> Optional[tuple[str, UUID]]: """查 invites 表;命中返 `(name, user_id)`,user_id 由 uuid5(NS, name) 推导。 返 None 表示 token 未命中(空串 / 不存在 / 已 DELETE 都走这条)。每次 login 一次 SELECT,5 人级别用户开销可忽略;不缓存避免 DELETE 后还能登的不一致窗口。 """ t = (token or "").strip() if not t: return None with session_scope() as s: row = s.execute( select(Invite.name).where(Invite.token == t) ).scalar_one_or_none() if row is None: return None return row, uuid5(_INVITE_NAMESPACE, row) def mint_token(cfg: AuthConfig, user_id: UUID) -> tuple[str, int]: """签 JWT。返回 `(token, exp_unix_seconds)`。""" now = int(time.time()) exp = now + cfg.ttl_seconds payload = {"sub": str(user_id), "iat": now, "exp": exp} token = jwt.encode(payload, cfg.jwt_secret, algorithm="HS256") return token, exp def verify_token(cfg: AuthConfig, token: str) -> UUID: """验签 + 取 sub。失败抛 HTTPException 401。""" try: payload = jwt.decode(token, cfg.jwt_secret, algorithms=["HS256"]) except jwt.ExpiredSignatureError: raise HTTPException(401, "token expired") except jwt.InvalidTokenError as e: raise HTTPException(401, f"invalid token: {e}") sub = payload.get("sub", "") try: return UUID(sub) except (ValueError, TypeError): raise HTTPException(401, f"invalid sub in token: {sub!r}") def ensure_user_row(user_id: UUID) -> None: """幂等 INSERT 一行 users 占位(`ON CONFLICT DO NOTHING`)。 邀请码登录(uuid5 推导)、platform_key 登录(显式传入)、未来 OIDC 都走这条 — 新用户首次登录建行,既有用户复登 no-op。真用户 profile(email/oidc_subject 等) 在 D' OIDC 阶段再走专门的 register/sync 路径。 """ from sqlalchemy.dialects.postgresql import insert stmt = insert(User).values(user_id=user_id).on_conflict_do_nothing( index_elements=["user_id"] ) with session_scope() as s: s.execute(stmt) # ──────────────── FastAPI Depends ──────────────── # auto_error=False 让我们自己出 401 文案,而不是 FastAPI 默认 "Not authenticated" _bearer = HTTPBearer(auto_error=False) def make_require_user(cfg: AuthConfig): """工厂:返回一个 Depends 函数,闭包持有 cfg(避免 app 启动后改 env)。 用法: require_user = make_require_user(cfg) @app.get("/v1/...", dependencies=[Depends(require_user)]) def route(user_id: UUID = Depends(require_user)): ... 实际使用建议直接 `user_id: UUID = Depends(require_user)`,既验签又拿到 user_id。 """ async def require_user( creds: Optional[HTTPAuthorizationCredentials] = Depends(_bearer), ) -> UUID: if creds is None or not creds.credentials: raise HTTPException(401, "missing Authorization: Bearer ") if creds.scheme.lower() != "bearer": raise HTTPException(401, f"unsupported auth scheme: {creds.scheme!r}") return verify_token(cfg, creds.credentials) return require_user __all__ = [ "AuthConfig", "ensure_user_row", "make_require_user", "mint_token", "resolve_invite", "verify_token", ]