zcbot/core/wechat/wecom_crypto.py

94 lines
3.8 KiB
Python

"""企业微信「接收消息」回调加解密(WXBizMsgCrypt 等价实现,DESIGN §8.7 渠道 B 入站)。
企业微信自建应用配「接收消息」回调后,服务器**主动 POST 加密 XML** 到回调 URL,
配 URL 时还会先 GET 一次 echostr 验有效性。这套加密**与 wecom.py 的 access_token /
出站 API 无关,也与 crypto.py 的 Fernet 列加密无关** —— 是企业微信专用方案:
- key = base64decode(EncodingAESKey + "="),32B;IV = key[:16](AES-256-CBC)
- 明文密文体 = random(16) || msg_len(4B 大端) || msg || receiveid(自建应用为 corpid)
- 签名 = sha1(sorted([Token, timestamp, nonce, encrypt]) 拼接) 的 hexdigest
只做**解密 + 验签**(入站);回复走 wecom.send_text 主动推(agent 跑 >5s 无法被动同步回),
故不实现加密。凭据 Token / EncodingAESKey 同 secret —— 只在 host 进程读,绝不进沙箱。
"""
from __future__ import annotations
import base64
import hashlib
import os
import struct
import xml.etree.ElementTree as ET
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
def callback_token() -> str:
return os.getenv("WECOM_CALLBACK_TOKEN", "").strip()
def callback_aeskey() -> str:
return os.getenv("WECOM_CALLBACK_AESKEY", "").strip()
def callback_configured() -> bool:
"""Token + EncodingAESKey 都在才算配好回调(沿用「有 key 才挂」§3.4)。"""
return bool(callback_token() and callback_aeskey())
def _aes_key() -> bytes:
"""EncodingAESKey(43 字符)→ +'=' → base64 解码 → 32B AES 密钥。"""
return base64.b64decode(callback_aeskey() + "=")
def _signature(timestamp: str, nonce: str, encrypt: str) -> str:
arr = sorted([callback_token(), timestamp, nonce, encrypt])
return hashlib.sha1("".join(arr).encode("utf-8")).hexdigest()
def _aes_decrypt(encrypt_b64: str) -> bytes:
key = _aes_key()
cipher = Cipher(algorithms.AES(key), modes.CBC(key[:16]))
dec = cipher.decryptor()
raw = dec.update(base64.b64decode(encrypt_b64)) + dec.finalize()
pad = raw[-1] # PKCS7(企业微信 block=32,按末字节剥即可)
if not 1 <= pad <= 32:
raise ValueError("PKCS7 padding 非法")
return raw[:-pad]
def _extract_plain(encrypt_b64: str, *, expect_receiveid: str = "") -> str:
"""解密 → 剥 16B 随机前缀 + 4B 长度,取 msg;尾部 receiveid 校验 corpid。"""
raw = _aes_decrypt(encrypt_b64)
body = raw[16:]
msg_len = struct.unpack(">I", body[:4])[0]
msg = body[4:4 + msg_len]
receiveid = body[4 + msg_len:].decode("utf-8", "ignore")
if expect_receiveid and receiveid != expect_receiveid:
raise ValueError("receiveid 不匹配(corpid 校验失败)")
return msg.decode("utf-8")
def verify_url(
msg_signature: str, timestamp: str, nonce: str, echostr: str, *, corpid: str = ""
) -> str:
"""配回调 URL 时企业微信 GET 验有效性:验签 + 解密 echostr,原样回明文。"""
if _signature(timestamp, nonce, echostr) != msg_signature:
raise ValueError("签名校验失败")
return _extract_plain(echostr, expect_receiveid=corpid)
def parse_message(plain_xml: str) -> dict:
"""解密后的明文 XML → dict(FromUserName / MsgType / Content / MsgId / ...)。"""
root = ET.fromstring(plain_xml)
return {child.tag: (child.text or "") for child in root}
def decrypt_message(
msg_signature: str, timestamp: str, nonce: str, body: str, *, corpid: str = ""
) -> dict:
"""收消息 POST:从信封 XML 取 Encrypt → 验签 → 解密 → parse_message。"""
encrypt = ET.fromstring(body).findtext("Encrypt") or ""
if _signature(timestamp, nonce, encrypt) != msg_signature:
raise ValueError("签名校验失败")
return parse_message(_extract_plain(encrypt, expect_receiveid=corpid))