"""Auth: 两条 login 路径,签同款 JWT(§7 D' 过渡形态)。 模型: - `PLATFORM_KEY` env(必填):platform 服务端 / zcbot 间机器对机器共享密钥 - `JWT_SECRET` env(必填):HS256 签 token;泄漏 = 任意伪造,与 PLATFORM_KEY 同级保护 - `POST /v1/auth/login {user_id, platform_key}` → JWT(platform 服务端用,自带 user_id 注入) - `POST /v1/auth/login_password {email, password}` → JWT (dev SPA 用,users.email UNIQUE + users.password_hash bcrypt 校验;0005 加 UNIQUE) - 后续 `/v1/*`(除 /healthz、/docs、/openapi.json、/、/v1/auth/login*)走 `Authorization: Bearer ` - Token TTL: `ZCBOT_JWT_TTL_SECONDS` env 覆盖,默 7 天 发用户:`.venv/Scripts/python.exe main.py user add --email X --password Y`,后台直接 bcrypt + INSERT users;撤用户 `DELETE FROM users WHERE email=...`(messages CASCADE, tasks 通过 FK 拦,要先 DELETE 该 user 的 tasks)。 OIDC(D')替换:只动 `/v1/auth/login` 实现(校验 ID token 代替 key);**邮箱密码路径 长期保留,与 OIDC 并存**(自有账号 + 同事试用不依赖外部 IdP)。 """ from __future__ import annotations import os import time from typing import Optional from uuid import UUID import bcrypt import jwt from fastapi import Depends, HTTPException from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy import func, select from core.storage import session_scope from core.storage.models import User _DEFAULT_TTL_SECONDS = 7 * 24 * 3600 # 7d class AuthConfig: """App 启动时一次性读 env + 校验存在性;create_app 调 `AuthConfig.from_env()` 拿到。""" def __init__( self, platform_key: str, jwt_secret: str, ttl_seconds: int, admin_token: Optional[str] = None, ): self.platform_key = platform_key self.jwt_secret = jwt_secret self.ttl_seconds = ttl_seconds # ZCBOT_ADMIN_TOKEN 未设 → None;此时 /v1/auth/admin/create_user 返 503(功能关闭)。 # 这是个独立的共享口令,跟 PLATFORM_KEY / JWT_SECRET 分开 —— 它是"管理员发用户" # 的单一钥匙,不参与 platform 机器对机器 / token 签名路径。 self.admin_token = admin_token @classmethod def from_env(cls) -> "AuthConfig": key = os.environ.get("PLATFORM_KEY", "").strip() secret = os.environ.get("JWT_SECRET", "").strip() missing = [] if not key: missing.append("PLATFORM_KEY") if not secret: missing.append("JWT_SECRET") if missing: raise RuntimeError( f"{', '.join(missing)} env not set. zcbot web requires both:\n" " PLATFORM_KEY=\n" " JWT_SECRET=" ) ttl_raw = os.environ.get("ZCBOT_JWT_TTL_SECONDS", "").strip() try: ttl = int(ttl_raw) if ttl_raw else _DEFAULT_TTL_SECONDS except ValueError: raise RuntimeError( f"ZCBOT_JWT_TTL_SECONDS must be int seconds, got {ttl_raw!r}" ) if ttl <= 0: raise RuntimeError(f"ZCBOT_JWT_TTL_SECONDS must be > 0, got {ttl}") admin = os.environ.get("ZCBOT_ADMIN_TOKEN", "").strip() or None return cls( platform_key=key, jwt_secret=secret, ttl_seconds=ttl, admin_token=admin, ) def hash_password(password: str) -> str: """bcrypt 哈希(默认 cost=12)。返 ASCII str(bcrypt 标准格式 `$2b$12$...`),直接落 DB。""" return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("ascii") def verify_password(password: str, stored_hash: str) -> bool: """常数时间比对。stored_hash 是 DB 里 users.password_hash 列。""" try: return bcrypt.checkpw(password.encode("utf-8"), stored_hash.encode("ascii")) except (ValueError, TypeError): # stored_hash 格式坏(手工 INSERT 乱写)/ 不是 ASCII → 视作不匹配,别 500 return False def resolve_user_by_email(email: str, password: str) -> Optional[tuple[UUID, str]]: """email + password → `(user_id, email)`;不匹配返 None(空表 / 邮箱不存在 / 密码错都走这条)。 单次 SELECT + bcrypt verify;不缓存,改密码 / 删账号下次 login 立即生效。 bcrypt.checkpw 本身是 constant-time;查不到也要跑一次 dummy hash 防 timing oracle (5 人级别用户无所谓,但顺手做)。 """ e = (email or "").strip().lower() if not e or not password: return None with session_scope() as s: row = s.execute( select(User.user_id, User.email, User.password_hash).where(User.email == e) ).first() if row is None: # 避免 timing oracle:用户不存在时也跑一次同等开销的 verify bcrypt.checkpw(b"x", b"$2b$12$" + b"." * 53) return None if not row.password_hash: return None # 用户存在但没设密码(platform_key 入口建的) if not verify_password(password, row.password_hash): return None return row.user_id, row.email def mint_token(cfg: AuthConfig, user_id: UUID) -> tuple[str, int]: """签 JWT。返回 `(token, exp_unix_seconds)`。""" now = int(time.time()) exp = now + cfg.ttl_seconds payload = {"sub": str(user_id), "iat": now, "exp": exp} token = jwt.encode(payload, cfg.jwt_secret, algorithm="HS256") return token, exp def verify_token(cfg: AuthConfig, token: str) -> UUID: """验签 + 取 sub。失败抛 HTTPException 401。""" try: payload = jwt.decode(token, cfg.jwt_secret, algorithms=["HS256"]) except jwt.ExpiredSignatureError: raise HTTPException(401, "token expired") except jwt.InvalidTokenError as e: raise HTTPException(401, f"invalid token: {e}") sub = payload.get("sub", "") try: return UUID(sub) except (ValueError, TypeError): raise HTTPException(401, f"invalid sub in token: {sub!r}") class UserCreateError(Exception): """create_user 失败:`code` 是 HTTP-style 语义码('invalid_email' / 'weak_password' / 'email_taken' / 'db_error'),`message` 是面向操作者的简短描述。 web 路由 / CLI 都用同一份;调用方决定是 raise HTTPException 还是 click.echo + exit。 """ def __init__(self, code: str, message: str): super().__init__(message) self.code = code self.message = message def create_user( email: str, password: str, user_id: Optional[UUID] = None, role: str = "user" ) -> tuple[UUID, str]: """新建用户:bcrypt(password) + INSERT users。 校验:email 含 @ + 非空;password ≥ 6 字符;role ∈ {'user','admin'}。`user_id` 不传 → 随机 UUID4。冲突:email UNIQUE / user_id PK 撞 → `UserCreateError('email_taken' | 'db_error')`。返回 `(user_id, normalized_email)`,供调用方记 log / 提示。 """ from uuid import uuid4 as _uuid4 from sqlalchemy.exc import IntegrityError e = (email or "").strip().lower() if not e or "@" not in e: raise UserCreateError("invalid_email", f"email 不合法: {email!r}") if not password or len(password) < 6: raise UserCreateError("weak_password", "password 至少 6 字符") r = (role or "user").strip().lower() if r not in ("user", "admin"): raise UserCreateError("invalid_role", f"role 必须是 user / admin,收到 {role!r}") uid = user_id or _uuid4() try: with session_scope() as s: s.add(User(user_id=uid, email=e, password_hash=hash_password(password), role=r)) except IntegrityError as ex: # email UNIQUE 撞最常见;user_id PK 撞理论上几乎不可能(uuid4)但也归一到 email_taken # 之外 — 这里只对 email 报 409 语义,其他 DB 异常归 db_error msg = str(getattr(ex, "orig", ex)) code = "email_taken" if "users_email" in msg or "email" in msg.lower() else "db_error" raise UserCreateError(code, f"INSERT 失败: {msg}") except Exception as ex: raise UserCreateError("db_error", f"INSERT 失败: {type(ex).__name__}: {ex}") return uid, e def change_password(user_id: UUID, old_password: str, new_password: str) -> None: """改密码:验旧密码 → 校验新密码 → bcrypt 重哈希写回 users.password_hash。 错误归一到 `UserCreateError.code`(复用同一份 code 载体,web 路由映射成 HTTP): - 'user_not_found' — user_id 无对应行(JWT 有效却查不到,极少见) - 'no_password' — 该用户从没设过密码(platform_key 入口建的占位行),无从校验旧密码 - 'wrong_password' — 旧密码不匹配 - 'weak_password' — 新密码 < 6 字符 成功返 None;写在 session_scope 内,退出时一次 commit,下次 login 立即生效。 """ if not new_password or len(new_password) < 6: raise UserCreateError("weak_password", "新密码至少 6 字符") with session_scope() as s: user = s.execute(select(User).where(User.user_id == user_id)).scalar_one_or_none() if user is None: raise UserCreateError("user_not_found", "用户不存在") if not user.password_hash: raise UserCreateError("no_password", "该账号未设置密码,无法修改") if not verify_password(old_password, user.password_hash): raise UserCreateError("wrong_password", "旧密码不正确") user.password_hash = hash_password(new_password) def get_user_role(user_id: UUID) -> Optional[str]: """查 users.role。user_id 无对应行返 None;有行返 'user' / 'admin' 等。 单次 SELECT,不缓存 —— 改 role 下次请求立即生效(make_require_admin 每请求查一次, 管理员端点流量低,无需放进 JWT,也避免老 token 带过期 role)。 """ with session_scope() as s: row = s.execute( select(User.role).where(User.user_id == user_id) ).first() return row[0] if row is not None else None def get_user_profile(user_id: UUID) -> Optional[dict]: """查当前用户档案 → `{role, name, user_name, email}`;无对应行返 None。 /v1/me + login 响应用,单次 SELECT。name / user_name 是平台登录注入的(0016), 可能为 None(邮箱密码 / 历史行)。 """ with session_scope() as s: row = s.execute( select(User.role, User.name, User.user_name, User.email).where( User.user_id == user_id ) ).first() if row is None: return None return { "role": row.role or "user", "name": row.name, "user_name": row.user_name, "email": row.email, } def set_user_role(email: str, role: str) -> tuple[UUID, str]: """按 email 改 users.role。返 `(user_id, normalized_email)`。 错误归一到 `UserCreateError.code`: - 'invalid_role' — role 不在 {'user', 'admin'} - 'user_not_found' — email 查无此人 成功在 session_scope 内一次 commit,下次请求立即生效。 """ r = (role or "").strip().lower() if r not in ("user", "admin"): raise UserCreateError("invalid_role", f"role 必须是 user / admin,收到 {role!r}") e = (email or "").strip().lower() with session_scope() as s: user = s.execute(select(User).where(User.email == e)).scalar_one_or_none() if user is None: raise UserCreateError("user_not_found", f"email 查无此人: {email!r}") user.role = r return user.user_id, e def ensure_user_row( user_id: UUID, name: Optional[str] = None, user_name: Optional[str] = None, ) -> None: """幂等 upsert 一行 users 占位,并同步平台注入的用户档案(name / user_name)。 platform_key 登录入口用 — 平台直传的 user_id 可能是 zcbot 没见过的,首次登录建行 避免下游 FK 失败。邮箱密码登录走 `main.py user add` 已经写好 users 行,不走这条。 name / user_name 由平台在 login body 注入(0016): - 首次建行:有值就一并写入 - 已存在行:`COALESCE(EXCLUDED.x, users.x)` —— 平台传了非空值就刷新(同步平台侧改名), 传 None / 空则保留旧值,不会被覆盖清空。空串归一到 None,不当有效值落库。 """ from sqlalchemy.dialects.postgresql import insert nm = (name or "").strip() or None un = (user_name or "").strip() or None stmt = insert(User).values(user_id=user_id, name=nm, user_name=un) stmt = stmt.on_conflict_do_update( index_elements=["user_id"], set_={ "name": func.coalesce(stmt.excluded.name, User.name), "user_name": func.coalesce(stmt.excluded.user_name, User.user_name), }, ) with session_scope() as s: s.execute(stmt) # ──────────────── FastAPI Depends ──────────────── # auto_error=False 让我们自己出 401 文案,而不是 FastAPI 默认 "Not authenticated" _bearer = HTTPBearer(auto_error=False) def make_require_user(cfg: AuthConfig): """工厂:返回一个 Depends 函数,闭包持有 cfg(避免 app 启动后改 env)。 用法: require_user = make_require_user(cfg) @app.get("/v1/...", dependencies=[Depends(require_user)]) def route(user_id: UUID = Depends(require_user)): ... 实际使用建议直接 `user_id: UUID = Depends(require_user)`,既验签又拿到 user_id。 """ async def require_user( creds: Optional[HTTPAuthorizationCredentials] = Depends(_bearer), ) -> UUID: if creds is None or not creds.credentials: raise HTTPException(401, "missing Authorization: Bearer ") if creds.scheme.lower() != "bearer": raise HTTPException(401, f"unsupported auth scheme: {creds.scheme!r}") return verify_token(cfg, creds.credentials) return require_user def make_require_admin(cfg: AuthConfig): """工厂:返回一个 Depends 函数,先验 JWT(同 require_user)再查 users.role=='admin'。 用于 /v1/admin/* 管理端点: require_admin = make_require_admin(cfg) @app.get("/v1/admin/...", ) def route(user_id: UUID = Depends(require_admin)): ... 非 admin → 403(token 有效但无权限);DB 查 role(不放进 JWT),改 role 立即生效。 """ _require_user = make_require_user(cfg) async def require_admin( creds: Optional[HTTPAuthorizationCredentials] = Depends(_bearer), ) -> UUID: user_id = await _require_user(creds) if get_user_role(user_id) != "admin": raise HTTPException(403, "admin role required") return user_id return require_admin __all__ = [ "AuthConfig", "UserCreateError", "change_password", "create_user", "ensure_user_row", "get_user_profile", "get_user_role", "hash_password", "make_require_admin", "make_require_user", "mint_token", "resolve_user_by_email", "set_user_role", "verify_password", "verify_token", ]