"""企业微信「接收消息」回调加解密(WXBizMsgCrypt 等价实现,DESIGN §8.7 渠道 B 入站)。 企业微信自建应用配「接收消息」回调后,服务器**主动 POST 加密 XML** 到回调 URL, 配 URL 时还会先 GET 一次 echostr 验有效性。这套加密**与 wecom.py 的 access_token / 出站 API 无关,也与 crypto.py 的 Fernet 列加密无关** —— 是企业微信专用方案: - key = base64decode(EncodingAESKey + "="),32B;IV = key[:16](AES-256-CBC) - 明文密文体 = random(16) || msg_len(4B 大端) || msg || receiveid(自建应用为 corpid) - 签名 = sha1(sorted([Token, timestamp, nonce, encrypt]) 拼接) 的 hexdigest 只做**解密 + 验签**(入站);回复走 wecom.send_text 主动推(agent 跑 >5s 无法被动同步回), 故不实现加密。凭据 Token / EncodingAESKey 同 secret —— 只在 host 进程读,绝不进沙箱。 """ from __future__ import annotations import base64 import hashlib import os import struct import xml.etree.ElementTree as ET from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes def callback_token() -> str: return os.getenv("WECOM_CALLBACK_TOKEN", "").strip() def callback_aeskey() -> str: return os.getenv("WECOM_CALLBACK_AESKEY", "").strip() def callback_configured() -> bool: """Token + EncodingAESKey 都在才算配好回调(沿用「有 key 才挂」§3.4)。""" return bool(callback_token() and callback_aeskey()) def _aes_key() -> bytes: """EncodingAESKey(43 字符)→ +'=' → base64 解码 → 32B AES 密钥。""" return base64.b64decode(callback_aeskey() + "=") def _signature(timestamp: str, nonce: str, encrypt: str) -> str: arr = sorted([callback_token(), timestamp, nonce, encrypt]) return hashlib.sha1("".join(arr).encode("utf-8")).hexdigest() def _aes_decrypt(encrypt_b64: str) -> bytes: key = _aes_key() cipher = Cipher(algorithms.AES(key), modes.CBC(key[:16])) dec = cipher.decryptor() raw = dec.update(base64.b64decode(encrypt_b64)) + dec.finalize() pad = raw[-1] # PKCS7(企业微信 block=32,按末字节剥即可) if not 1 <= pad <= 32: raise ValueError("PKCS7 padding 非法") return raw[:-pad] def _extract_plain(encrypt_b64: str, *, expect_receiveid: str = "") -> str: """解密 → 剥 16B 随机前缀 + 4B 长度,取 msg;尾部 receiveid 校验 corpid。""" raw = _aes_decrypt(encrypt_b64) body = raw[16:] msg_len = struct.unpack(">I", body[:4])[0] msg = body[4:4 + msg_len] receiveid = body[4 + msg_len:].decode("utf-8", "ignore") if expect_receiveid and receiveid != expect_receiveid: raise ValueError("receiveid 不匹配(corpid 校验失败)") return msg.decode("utf-8") def verify_url( msg_signature: str, timestamp: str, nonce: str, echostr: str, *, corpid: str = "" ) -> str: """配回调 URL 时企业微信 GET 验有效性:验签 + 解密 echostr,原样回明文。""" if _signature(timestamp, nonce, echostr) != msg_signature: raise ValueError("签名校验失败") return _extract_plain(echostr, expect_receiveid=corpid) def parse_message(plain_xml: str) -> dict: """解密后的明文 XML → dict(FromUserName / MsgType / Content / MsgId / ...)。""" root = ET.fromstring(plain_xml) return {child.tag: (child.text or "") for child in root} def decrypt_message( msg_signature: str, timestamp: str, nonce: str, body: str, *, corpid: str = "" ) -> dict: """收消息 POST:从信封 XML 取 Encrypt → 验签 → 解密 → parse_message。""" encrypt = ET.fromstring(body).findtext("Encrypt") or "" if _signature(timestamp, nonce, encrypt) != msg_signature: raise ValueError("签名校验失败") return parse_message(_extract_plain(encrypt, expect_receiveid=corpid))