zcbot/core/wechat/crypto.py

60 lines
2.2 KiB
Python

"""敏感凭据的列加密(DESIGN §8.7:bot_token / latest_context_token 加密入库)。
- env `ZCBOT_WECHAT_SECRET_KEY` 在 → 用其派生的 Fernet 密钥加密,密文带 `v1:` 前缀。
- env 不在 → 退「明文标记」`plain:`(公测兜底,日志/沙箱/API 仍绝不带这两列;
正式部署应配 key)。`enc()`/`dec()` 对两种前缀都可逆,换 key 不影响存量明文行。
只在 host 进程(绑定服务 / 入站管理器 / push)用;绝不进沙箱 / run_python。
"""
from __future__ import annotations
import base64
import hashlib
import os
from typing import Optional
from cryptography.fernet import Fernet, InvalidToken
_PREFIX_ENC = "v1:"
_PREFIX_PLAIN = "plain:"
def _fernet() -> Optional[Fernet]:
key = os.getenv("ZCBOT_WECHAT_SECRET_KEY", "").strip()
if not key:
return None
# 任意口令 → 32B → urlsafe-base64 Fernet 密钥(确定性,免单独管 Fernet key)
digest = hashlib.sha256(key.encode("utf-8")).digest()
return Fernet(base64.urlsafe_b64encode(digest))
def enc(plaintext: Optional[str]) -> Optional[str]:
"""明文 → 入库串。配了 key 走密文(v1:),否则明文标记(plain:)。None 透传。"""
if plaintext is None:
return None
f = _fernet()
if f is None:
return _PREFIX_PLAIN + plaintext
token = f.encrypt(plaintext.encode("utf-8")).decode("ascii")
return _PREFIX_ENC + token
def dec(stored: Optional[str]) -> Optional[str]:
"""入库串 → 明文。识别 v1:/plain: 前缀;v1: 需 key 且匹配。None 透传。"""
if stored is None:
return None
if stored.startswith(_PREFIX_PLAIN):
return stored[len(_PREFIX_PLAIN):]
if stored.startswith(_PREFIX_ENC):
f = _fernet()
if f is None:
raise RuntimeError(
"密文需要 ZCBOT_WECHAT_SECRET_KEY 才能解密,但 env 未配置"
)
try:
return f.decrypt(stored[len(_PREFIX_ENC):].encode("ascii")).decode("utf-8")
except InvalidToken as e:
raise RuntimeError("ZCBOT_WECHAT_SECRET_KEY 与密文不匹配(key 变了?)") from e
# 无前缀:历史/手填的裸明文,容错原样返回
return stored