139 lines
5.1 KiB
Python
139 lines
5.1 KiB
Python
"""Auth: PLATFORM_KEY → JWT token 兑换(§7 D' 过渡形态)。
|
|
|
|
模型:
|
|
- `PLATFORM_KEY` env(必填)是 platform/本仓库间的共享密钥;platform 服务端 / dev 页持有它
|
|
- `JWT_SECRET` env(必填)用于 HS256 签 token;泄漏 = 任意伪造,与 PLATFORM_KEY 同级保护
|
|
- `POST /v1/auth/login {user_id, platform_key}` → `{token, expires_at}`(后端校验 key 对 → 签 JWT)
|
|
- 后续 `/v1/*`(除 /healthz、/docs、/openapi.json、/、/v1/auth/login)走 `Authorization: Bearer <jwt>`
|
|
- Token TTL: `ZCBOT_JWT_TTL_SECONDS` env 覆盖,默 7 天
|
|
|
|
OIDC(D')替换:只动 `/v1/auth/login` 实现(校验 ID token 代替 key),路由层 Depends 不变。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
import jwt
|
|
from fastapi import Depends, HTTPException, Request
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
|
|
from core.storage import session_scope
|
|
from core.storage.models import SENTINEL_USER_ID, User
|
|
|
|
|
|
_DEFAULT_TTL_SECONDS = 7 * 24 * 3600 # 7d
|
|
|
|
|
|
class AuthConfig:
|
|
"""App 启动时一次性读 env + 校验存在性;create_app 调 `AuthConfig.from_env()` 拿到。"""
|
|
|
|
def __init__(self, platform_key: str, jwt_secret: str, ttl_seconds: int):
|
|
self.platform_key = platform_key
|
|
self.jwt_secret = jwt_secret
|
|
self.ttl_seconds = ttl_seconds
|
|
|
|
@classmethod
|
|
def from_env(cls) -> "AuthConfig":
|
|
key = os.environ.get("PLATFORM_KEY", "").strip()
|
|
secret = os.environ.get("JWT_SECRET", "").strip()
|
|
missing = []
|
|
if not key:
|
|
missing.append("PLATFORM_KEY")
|
|
if not secret:
|
|
missing.append("JWT_SECRET")
|
|
if missing:
|
|
raise RuntimeError(
|
|
f"{', '.join(missing)} env not set. zcbot web requires both:\n"
|
|
" PLATFORM_KEY=<shared secret between platform and zcbot>\n"
|
|
" JWT_SECRET=<HMAC secret used to sign session tokens>"
|
|
)
|
|
ttl_raw = os.environ.get("ZCBOT_JWT_TTL_SECONDS", "").strip()
|
|
try:
|
|
ttl = int(ttl_raw) if ttl_raw else _DEFAULT_TTL_SECONDS
|
|
except ValueError:
|
|
raise RuntimeError(
|
|
f"ZCBOT_JWT_TTL_SECONDS must be int seconds, got {ttl_raw!r}"
|
|
)
|
|
if ttl <= 0:
|
|
raise RuntimeError(f"ZCBOT_JWT_TTL_SECONDS must be > 0, got {ttl}")
|
|
return cls(platform_key=key, jwt_secret=secret, ttl_seconds=ttl)
|
|
|
|
|
|
def mint_token(cfg: AuthConfig, user_id: UUID) -> tuple[str, int]:
|
|
"""签 JWT。返回 `(token, exp_unix_seconds)`。"""
|
|
now = int(time.time())
|
|
exp = now + cfg.ttl_seconds
|
|
payload = {"sub": str(user_id), "iat": now, "exp": exp}
|
|
token = jwt.encode(payload, cfg.jwt_secret, algorithm="HS256")
|
|
return token, exp
|
|
|
|
|
|
def verify_token(cfg: AuthConfig, token: str) -> UUID:
|
|
"""验签 + 取 sub。失败抛 HTTPException 401。"""
|
|
try:
|
|
payload = jwt.decode(token, cfg.jwt_secret, algorithms=["HS256"])
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(401, "token expired")
|
|
except jwt.InvalidTokenError as e:
|
|
raise HTTPException(401, f"invalid token: {e}")
|
|
sub = payload.get("sub", "")
|
|
try:
|
|
return UUID(sub)
|
|
except (ValueError, TypeError):
|
|
raise HTTPException(401, f"invalid sub in token: {sub!r}")
|
|
|
|
|
|
def ensure_user_row(user_id: UUID) -> None:
|
|
"""幂等 INSERT 一行 users 占位(`ON CONFLICT DO NOTHING`)。
|
|
|
|
dev 用 SENTINEL,platform 注入的 user_id 也走这条 — 无论是新用户首次登录还是
|
|
既有用户复登,都安全。真用户 profile(email/oidc_subject 等)在 D' OIDC 阶段
|
|
再走专门的 register/sync 路径。
|
|
"""
|
|
from sqlalchemy.dialects.postgresql import insert
|
|
stmt = insert(User).values(user_id=user_id).on_conflict_do_nothing(
|
|
index_elements=["user_id"]
|
|
)
|
|
with session_scope() as s:
|
|
s.execute(stmt)
|
|
|
|
|
|
# ──────────────── FastAPI Depends ────────────────
|
|
# auto_error=False 让我们自己出 401 文案,而不是 FastAPI 默认 "Not authenticated"
|
|
_bearer = HTTPBearer(auto_error=False)
|
|
|
|
|
|
def make_require_user(cfg: AuthConfig):
|
|
"""工厂:返回一个 Depends 函数,闭包持有 cfg(避免 app 启动后改 env)。
|
|
|
|
用法:
|
|
require_user = make_require_user(cfg)
|
|
@app.get("/v1/...", dependencies=[Depends(require_user)])
|
|
def route(user_id: UUID = Depends(require_user)):
|
|
...
|
|
实际使用建议直接 `user_id: UUID = Depends(require_user)`,既验签又拿到 user_id。
|
|
"""
|
|
async def require_user(
|
|
creds: Optional[HTTPAuthorizationCredentials] = Depends(_bearer),
|
|
) -> UUID:
|
|
if creds is None or not creds.credentials:
|
|
raise HTTPException(401, "missing Authorization: Bearer <token>")
|
|
if creds.scheme.lower() != "bearer":
|
|
raise HTTPException(401, f"unsupported auth scheme: {creds.scheme!r}")
|
|
return verify_token(cfg, creds.credentials)
|
|
|
|
return require_user
|
|
|
|
|
|
__all__ = [
|
|
"AuthConfig",
|
|
"SENTINEL_USER_ID",
|
|
"ensure_user_row",
|
|
"make_require_user",
|
|
"mint_token",
|
|
"verify_token",
|
|
]
|