"""敏感凭据的列加密(DESIGN §8.7:bot_token / latest_context_token 加密入库)。 - env `ZCBOT_WECHAT_SECRET_KEY` 在 → 用其派生的 Fernet 密钥加密,密文带 `v1:` 前缀。 - env 不在 → 退「明文标记」`plain:`(公测兜底,日志/沙箱/API 仍绝不带这两列; 正式部署应配 key)。`enc()`/`dec()` 对两种前缀都可逆,换 key 不影响存量明文行。 只在 host 进程(绑定服务 / 入站管理器 / push)用;绝不进沙箱 / run_python。 """ from __future__ import annotations import base64 import hashlib import os from typing import Optional from cryptography.fernet import Fernet, InvalidToken _PREFIX_ENC = "v1:" _PREFIX_PLAIN = "plain:" def _fernet() -> Optional[Fernet]: key = os.getenv("ZCBOT_WECHAT_SECRET_KEY", "").strip() if not key: return None # 任意口令 → 32B → urlsafe-base64 Fernet 密钥(确定性,免单独管 Fernet key) digest = hashlib.sha256(key.encode("utf-8")).digest() return Fernet(base64.urlsafe_b64encode(digest)) def enc(plaintext: Optional[str]) -> Optional[str]: """明文 → 入库串。配了 key 走密文(v1:),否则明文标记(plain:)。None 透传。""" if plaintext is None: return None f = _fernet() if f is None: return _PREFIX_PLAIN + plaintext token = f.encrypt(plaintext.encode("utf-8")).decode("ascii") return _PREFIX_ENC + token def dec(stored: Optional[str]) -> Optional[str]: """入库串 → 明文。识别 v1:/plain: 前缀;v1: 需 key 且匹配。None 透传。""" if stored is None: return None if stored.startswith(_PREFIX_PLAIN): return stored[len(_PREFIX_PLAIN):] if stored.startswith(_PREFIX_ENC): f = _fernet() if f is None: raise RuntimeError( "密文需要 ZCBOT_WECHAT_SECRET_KEY 才能解密,但 env 未配置" ) try: return f.decrypt(stored[len(_PREFIX_ENC):].encode("ascii")).decode("utf-8") except InvalidToken as e: raise RuntimeError("ZCBOT_WECHAT_SECRET_KEY 与密文不匹配(key 变了?)") from e # 无前缀:历史/手填的裸明文,容错原样返回 return stored