"""Auth: PLATFORM_KEY → JWT token 兑换(§7 D' 过渡形态)。 模型: - `PLATFORM_KEY` env(必填)是 platform/本仓库间的共享密钥;platform 服务端 / dev 页持有它 - `JWT_SECRET` env(必填)用于 HS256 签 token;泄漏 = 任意伪造,与 PLATFORM_KEY 同级保护 - `POST /v1/auth/login {user_id, platform_key}` → `{token, expires_at}`(后端校验 key 对 → 签 JWT) - 后续 `/v1/*`(除 /healthz、/docs、/openapi.json、/、/v1/auth/login)走 `Authorization: Bearer ` - Token TTL: `ZCBOT_JWT_TTL_SECONDS` env 覆盖,默 7 天 OIDC(D')替换:只动 `/v1/auth/login` 实现(校验 ID token 代替 key),路由层 Depends 不变。 """ from __future__ import annotations import os import time from typing import Optional from uuid import UUID import jwt from fastapi import Depends, HTTPException, Request from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from core.storage import session_scope from core.storage.models import SENTINEL_USER_ID, User _DEFAULT_TTL_SECONDS = 7 * 24 * 3600 # 7d class AuthConfig: """App 启动时一次性读 env + 校验存在性;create_app 调 `AuthConfig.from_env()` 拿到。""" def __init__(self, platform_key: str, jwt_secret: str, ttl_seconds: int): self.platform_key = platform_key self.jwt_secret = jwt_secret self.ttl_seconds = ttl_seconds @classmethod def from_env(cls) -> "AuthConfig": key = os.environ.get("PLATFORM_KEY", "").strip() secret = os.environ.get("JWT_SECRET", "").strip() missing = [] if not key: missing.append("PLATFORM_KEY") if not secret: missing.append("JWT_SECRET") if missing: raise RuntimeError( f"{', '.join(missing)} env not set. zcbot web requires both:\n" " PLATFORM_KEY=\n" " JWT_SECRET=" ) ttl_raw = os.environ.get("ZCBOT_JWT_TTL_SECONDS", "").strip() try: ttl = int(ttl_raw) if ttl_raw else _DEFAULT_TTL_SECONDS except ValueError: raise RuntimeError( f"ZCBOT_JWT_TTL_SECONDS must be int seconds, got {ttl_raw!r}" ) if ttl <= 0: raise RuntimeError(f"ZCBOT_JWT_TTL_SECONDS must be > 0, got {ttl}") return cls(platform_key=key, jwt_secret=secret, ttl_seconds=ttl) def mint_token(cfg: AuthConfig, user_id: UUID) -> tuple[str, int]: """签 JWT。返回 `(token, exp_unix_seconds)`。""" now = int(time.time()) exp = now + cfg.ttl_seconds payload = {"sub": str(user_id), "iat": now, "exp": exp} token = jwt.encode(payload, cfg.jwt_secret, algorithm="HS256") return token, exp def verify_token(cfg: AuthConfig, token: str) -> UUID: """验签 + 取 sub。失败抛 HTTPException 401。""" try: payload = jwt.decode(token, cfg.jwt_secret, algorithms=["HS256"]) except jwt.ExpiredSignatureError: raise HTTPException(401, "token expired") except jwt.InvalidTokenError as e: raise HTTPException(401, f"invalid token: {e}") sub = payload.get("sub", "") try: return UUID(sub) except (ValueError, TypeError): raise HTTPException(401, f"invalid sub in token: {sub!r}") def ensure_user_row(user_id: UUID) -> None: """幂等 INSERT 一行 users 占位(`ON CONFLICT DO NOTHING`)。 dev 用 SENTINEL,platform 注入的 user_id 也走这条 — 无论是新用户首次登录还是 既有用户复登,都安全。真用户 profile(email/oidc_subject 等)在 D' OIDC 阶段 再走专门的 register/sync 路径。 """ from sqlalchemy.dialects.postgresql import insert stmt = insert(User).values(user_id=user_id).on_conflict_do_nothing( index_elements=["user_id"] ) with session_scope() as s: s.execute(stmt) # ──────────────── FastAPI Depends ──────────────── # auto_error=False 让我们自己出 401 文案,而不是 FastAPI 默认 "Not authenticated" _bearer = HTTPBearer(auto_error=False) def make_require_user(cfg: AuthConfig): """工厂:返回一个 Depends 函数,闭包持有 cfg(避免 app 启动后改 env)。 用法: require_user = make_require_user(cfg) @app.get("/v1/...", dependencies=[Depends(require_user)]) def route(user_id: UUID = Depends(require_user)): ... 实际使用建议直接 `user_id: UUID = Depends(require_user)`,既验签又拿到 user_id。 """ async def require_user( creds: Optional[HTTPAuthorizationCredentials] = Depends(_bearer), ) -> UUID: if creds is None or not creds.credentials: raise HTTPException(401, "missing Authorization: Bearer ") if creds.scheme.lower() != "bearer": raise HTTPException(401, f"unsupported auth scheme: {creds.scheme!r}") return verify_token(cfg, creds.credentials) return require_user __all__ = [ "AuthConfig", "SENTINEL_USER_ID", "ensure_user_row", "make_require_user", "mint_token", "verify_token", ]