412 lines
16 KiB
Python
412 lines
16 KiB
Python
"""ClawBot 个人微信 iLink Bot API 客户端(DESIGN §8.7 渠道 A)。
|
|
|
|
协议全部经真机实测(`scripts/probe_clawbot*.py`,2026-06-23):
|
|
- 绑定:`get_bot_qrcode`(无凭据,出深链 → 自渲二维码)→ 轮询 `get_qrcode_status`
|
|
(TTL ~1min,过期换码)→ `confirmed` 得 `bot_token` + `baseurl`。
|
|
- 收:`getupdates` 长轮询(hold ≤35s),消息带 `from_user_id` + `context_token`。
|
|
- 发:`sendmessage`,**每条 `client_id` 必唯一**(漏则同 token 后续被丢);多条/长文
|
|
按 ~1000 字分块,中间 `message_state=GENERATING(1)`、末块 `FINISH(2)`,间隔 ~300ms。
|
|
- `context_token` 有效期 ~24h、可复用 → 主动推送靠它(用户须先开口拿到 token)。
|
|
- 文件:`getuploadurl` → AES-128-ECB(PKCS7)加密 → POST 密文到 CDN 拿 `x-encrypted-param`
|
|
→ `sendmessage` 带 `file_item`。
|
|
|
|
纯协议客户端,不碰 DB / agent 编排。阻塞 IO(httpx 同步),调用方放 to_thread / executor。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import os
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Optional
|
|
from urllib.parse import quote
|
|
|
|
import httpx
|
|
from cryptography.hazmat.primitives import padding
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
|
DEFAULT_BASE = "https://ilinkai.weixin.qq.com"
|
|
CDN_BASE = "https://novac2c.cdn.weixin.qq.com/c2c"
|
|
CHANNEL_VERSION = "1.0.2"
|
|
BOT_TYPE_PERSONAL = 3
|
|
|
|
# 协议枚举(源码 @tencent-weixin/openclaw-weixin src/api/types.ts,已实测)
|
|
MSG_TYPE_BOT = 2
|
|
STATE_GENERATING = 1
|
|
STATE_FINISH = 2
|
|
ITEM_TEXT = 1
|
|
ITEM_IMAGE = 2
|
|
ITEM_FILE = 4
|
|
UPLOAD_MEDIA_FILE = 3
|
|
UPLOAD_MEDIA_IMAGE = 1
|
|
|
|
# 分块:长文按 ~1000 字切,块间隔防丢
|
|
CHUNK_CHARS = 1000
|
|
CHUNK_DELAY_S = 0.3
|
|
MAX_FILE_BYTES = 20 * 1024 * 1024
|
|
|
|
|
|
def _uin_header() -> str:
|
|
"""X-WECHAT-UIN:base64(随机 uint32 的十进制字符串),反重放,每请求变。"""
|
|
n = int.from_bytes(os.urandom(4), "big")
|
|
return base64.b64encode(str(n).encode()).decode()
|
|
|
|
|
|
def _headers(bot_token: Optional[str] = None) -> dict[str, str]:
|
|
h = {
|
|
"Content-Type": "application/json",
|
|
"AuthorizationType": "ilink_bot_token",
|
|
"X-WECHAT-UIN": _uin_header(),
|
|
}
|
|
if bot_token:
|
|
h["Authorization"] = f"Bearer {bot_token}"
|
|
return h
|
|
|
|
|
|
def _base_info() -> dict[str, str]:
|
|
return {"channel_version": CHANNEL_VERSION}
|
|
|
|
|
|
def _new_client_id() -> str:
|
|
return f"openclaw-weixin-{uuid.uuid4().hex}"
|
|
|
|
|
|
def _aes_ecb_pkcs7(plaintext: bytes, key: bytes) -> bytes:
|
|
padder = padding.PKCS7(128).padder()
|
|
padded = padder.update(plaintext) + padder.finalize()
|
|
enc = Cipher(algorithms.AES(key), modes.ECB()).encryptor()
|
|
return enc.update(padded) + enc.finalize()
|
|
|
|
|
|
def _aes_ecb_unpkcs7(ciphertext: bytes, key: bytes) -> bytes:
|
|
"""收图/收文件的解密:AES-128-ECB 解 + 去 PKCS7(发送侧 `_aes_ecb_pkcs7` 的逆)。"""
|
|
dec = Cipher(algorithms.AES(key), modes.ECB()).decryptor()
|
|
padded = dec.update(ciphertext) + dec.finalize()
|
|
unpadder = padding.PKCS7(128).unpadder()
|
|
return unpadder.update(padded) + unpadder.finalize()
|
|
|
|
|
|
def _decode_media_aes_key(raw: str) -> bytes:
|
|
"""媒体 `media.aes_key` → 16 字节 AES key。两种实测编码兜住:
|
|
- `base64(raw 16 bytes)`(图片常见)→ 解码得 16 字节直用;
|
|
- `base64(hex 字符串)`(文件/语音/视频,发送侧 `_upload_file` 也用这种)→ 解码得
|
|
32 个 ASCII hex 字符,再 `fromhex` 成 16 字节。
|
|
"""
|
|
dec = base64.b64decode(raw)
|
|
if len(dec) == 16:
|
|
return dec
|
|
if len(dec) == 32:
|
|
try:
|
|
return bytes.fromhex(dec.decode("ascii"))
|
|
except (ValueError, UnicodeDecodeError):
|
|
return dec[:16]
|
|
return dec[:16]
|
|
|
|
|
|
def _guess_image_ext(data: bytes) -> str:
|
|
"""按 magic bytes 猜图片扩展名(微信入站图片无原文件名)。认不出回退 .jpg。"""
|
|
if data[:3] == b"\xff\xd8\xff":
|
|
return ".jpg"
|
|
if data[:8] == b"\x89PNG\r\n\x1a\n":
|
|
return ".png"
|
|
if data[:6] in (b"GIF87a", b"GIF89a"):
|
|
return ".gif"
|
|
if data[:4] == b"RIFF" and data[8:12] == b"WEBP":
|
|
return ".webp"
|
|
if data[:2] == b"BM":
|
|
return ".bmp"
|
|
return ".jpg"
|
|
|
|
|
|
# ─────────────────────────── 绑定(无 token)───────────────────────────
|
|
|
|
@dataclass
|
|
class QrCode:
|
|
qrcode_id: str
|
|
deeplink: str # liteapp.weixin.qq.com/q/...,调用方自渲成二维码图片
|
|
|
|
|
|
def get_bot_qrcode(base_url: str = DEFAULT_BASE, *, timeout: float = 20.0) -> QrCode:
|
|
"""取一张绑定二维码。无需任何预置凭据。`deeplink` 需自渲成二维码让用户扫。"""
|
|
with httpx.Client(timeout=timeout) as c:
|
|
r = c.get(
|
|
f"{base_url}/ilink/bot/get_bot_qrcode",
|
|
params={"bot_type": BOT_TYPE_PERSONAL},
|
|
headers=_headers(),
|
|
)
|
|
r.raise_for_status()
|
|
d = r.json()
|
|
return QrCode(qrcode_id=d.get("qrcode", ""), deeplink=d.get("qrcode_img_content", ""))
|
|
|
|
|
|
@dataclass
|
|
class BindResult:
|
|
status: str # wait | confirmed | expired
|
|
bot_token: Optional[str] = None
|
|
base_url: Optional[str] = None
|
|
|
|
|
|
def poll_qrcode_status(
|
|
qrcode_id: str, base_url: str = DEFAULT_BASE, *, timeout: float = 40.0
|
|
) -> BindResult:
|
|
"""单次轮询扫码状态(服务端长轮询,hold 数十秒)。调用方循环调用,
|
|
遇 `expired` 重新 `get_bot_qrcode` 换码。`confirmed` 时返回 bot_token + base_url。"""
|
|
with httpx.Client(timeout=timeout) as c:
|
|
r = c.get(
|
|
f"{base_url}/ilink/bot/get_qrcode_status",
|
|
params={"qrcode": qrcode_id},
|
|
headers=_headers(),
|
|
)
|
|
r.raise_for_status()
|
|
d = r.json()
|
|
return BindResult(
|
|
status=d.get("status", ""),
|
|
bot_token=d.get("bot_token"),
|
|
base_url=d.get("baseurl") or d.get("base_url"),
|
|
)
|
|
|
|
|
|
# ─────────────────────────── 收发(带 token)───────────────────────────
|
|
|
|
@dataclass
|
|
class InboundAttachment:
|
|
"""入站附件(图片 / 文件)的 CDN 引用 + 下载后填充的明文字节。
|
|
|
|
协议结构(getupdates 返回的 item_list 项,实测 + 逆向 photon-hq/wechat-ilink-client):
|
|
- 图片 `image_item`(type=2):`media{encrypt_query_param, aes_key, encrypt_type}`,
|
|
另带优先 `aeskey`(32 位 hex);文件名缺失,下载后按 magic bytes 补扩展名。
|
|
- 文件 `file_item`(type=4):`media{...}` + `file_name` + `len`(明文大小)。
|
|
"""
|
|
kind: str # "image" | "file"
|
|
media: dict[str, Any] # {encrypt_query_param, aes_key, encrypt_type}
|
|
file_name: str = "" # 文件原名(图片无名,落盘时按 magic bytes 生成)
|
|
aeskey_hex: str = "" # 图片优先 key:image_item.aeskey(32 hex chars)
|
|
size: int = 0 # 明文大小(file_item.len / image mid_size),仅参考
|
|
data: Optional[bytes] = None # 下载 + 解密后的明文,由调用方(inbound)回填
|
|
|
|
|
|
@dataclass
|
|
class InboundMessage:
|
|
from_user_id: str # xxx@im.wechat
|
|
context_token: str # 回复 / 24h 内主动推须带回
|
|
text: str
|
|
raw: dict[str, Any]
|
|
attachments: list[InboundAttachment] = field(default_factory=list)
|
|
|
|
|
|
class ILinkClient:
|
|
"""绑定后按用户持有 `bot_token` + `base_url`,收发该用户消息。"""
|
|
|
|
def __init__(self, bot_token: str, base_url: str = DEFAULT_BASE) -> None:
|
|
self.bot_token = bot_token
|
|
self.base_url = base_url or DEFAULT_BASE
|
|
|
|
# —— 收 ——
|
|
def get_updates(
|
|
self, cursor: str = "", *, timeout: float = 45.0
|
|
) -> tuple[list[InboundMessage], str]:
|
|
"""长轮询拉新消息。返回 (消息列表, 新游标);游标传回下次调用。"""
|
|
with httpx.Client(timeout=timeout) as c:
|
|
r = c.post(
|
|
f"{self.base_url}/ilink/bot/getupdates",
|
|
json={"get_updates_buf": cursor, "base_info": _base_info()},
|
|
headers=_headers(self.bot_token),
|
|
)
|
|
r.raise_for_status()
|
|
d = r.json()
|
|
msgs: list[InboundMessage] = []
|
|
for m in d.get("msgs", []) or []:
|
|
text_parts: list[str] = []
|
|
attachments: list[InboundAttachment] = []
|
|
for it in m.get("item_list", []) or []:
|
|
if it.get("text_item"):
|
|
text_parts.append((it["text_item"] or {}).get("text", ""))
|
|
img = it.get("image_item")
|
|
if img:
|
|
attachments.append(InboundAttachment(
|
|
kind="image",
|
|
media=img.get("media") or {},
|
|
aeskey_hex=(img.get("aeskey") or ""),
|
|
size=int(img.get("mid_size") or 0),
|
|
))
|
|
fil = it.get("file_item")
|
|
if fil:
|
|
attachments.append(InboundAttachment(
|
|
kind="file",
|
|
media=fil.get("media") or {},
|
|
file_name=(fil.get("file_name") or "file"),
|
|
size=int(fil.get("len") or 0),
|
|
))
|
|
msgs.append(InboundMessage(
|
|
from_user_id=m.get("from_user_id", ""),
|
|
context_token=m.get("context_token", ""),
|
|
text="".join(text_parts),
|
|
raw=m,
|
|
attachments=attachments,
|
|
))
|
|
return msgs, d.get("get_updates_buf", cursor)
|
|
|
|
# —— 收附件(CDN 下载 → AES-128-ECB 解密 → 明文 bytes)——
|
|
def download_media(self, att: InboundAttachment, *, timeout: float = 60.0) -> bytes:
|
|
"""下载并解密一个入站附件,返回明文 bytes(发送侧上传链路的逆操作)。
|
|
|
|
URL:`{CDN_BASE}/download?encrypted_query_param=<media.encrypt_query_param>`。
|
|
Key 优先级:图片 `image_item.aeskey`(32 hex)> `media.aes_key`(两种编码,见
|
|
`_decode_media_aes_key`)。
|
|
"""
|
|
media = att.media or {}
|
|
qp = media.get("encrypt_query_param") or media.get("encrypted_query_param") or ""
|
|
if not qp:
|
|
raise RuntimeError(f"附件无 encrypt_query_param: kind={att.kind} media={media}")
|
|
url = f"{CDN_BASE}/download?encrypted_query_param={quote(qp)}"
|
|
with httpx.Client(timeout=timeout) as c:
|
|
# 下载语义按逆向文档是 GET;CDN 若只认 POST 则回退一次(下载幂等,无副作用)
|
|
r = c.get(url)
|
|
if r.status_code == 405 or (400 <= r.status_code < 500 and not r.content):
|
|
r = c.post(url, content=b"")
|
|
r.raise_for_status()
|
|
ciphertext = r.content
|
|
if att.aeskey_hex and len(att.aeskey_hex) == 32:
|
|
key = bytes.fromhex(att.aeskey_hex)
|
|
else:
|
|
key = _decode_media_aes_key(media.get("aes_key") or "")
|
|
return _aes_ecb_unpkcs7(ciphertext, key)
|
|
|
|
# —— 发(底层单条)——
|
|
def _send(
|
|
self, to_user_id: str, context_token: str, item: dict, *, state: int
|
|
) -> None:
|
|
body = {
|
|
"msg": {
|
|
"from_user_id": "",
|
|
"to_user_id": to_user_id,
|
|
"client_id": _new_client_id(),
|
|
"message_type": MSG_TYPE_BOT,
|
|
"message_state": state,
|
|
"context_token": context_token,
|
|
"item_list": [item],
|
|
},
|
|
"base_info": _base_info(),
|
|
}
|
|
with httpx.Client(timeout=30.0) as c:
|
|
r = c.post(
|
|
f"{self.base_url}/ilink/bot/sendmessage",
|
|
json=body,
|
|
headers=_headers(self.bot_token),
|
|
)
|
|
# 成功为 HTTP 200 + 空 body {};非 200 抛错(空 body 不代表失败)
|
|
r.raise_for_status()
|
|
|
|
# —— 发文本(自动分块,长文不丢)——
|
|
def send_text(self, to_user_id: str, context_token: str, text: str) -> None:
|
|
text = text or ""
|
|
chunks = [text[i:i + CHUNK_CHARS] for i in range(0, len(text), CHUNK_CHARS)] or [""]
|
|
last = len(chunks) - 1
|
|
for i, chunk in enumerate(chunks):
|
|
self._send(
|
|
to_user_id, context_token,
|
|
{"type": ITEM_TEXT, "text_item": {"text": chunk}},
|
|
state=STATE_FINISH if i == last else STATE_GENERATING,
|
|
)
|
|
if i != last:
|
|
time.sleep(CHUNK_DELAY_S)
|
|
|
|
# —— 发文件(getuploadurl → AES-128-ECB → CDN → file_item)——
|
|
def _upload_file(self, to_user_id: str, data: bytes) -> dict[str, Any]:
|
|
rawsize = len(data)
|
|
rawmd5 = hashlib.md5(data).hexdigest()
|
|
aeskey = os.urandom(16)
|
|
filekey = os.urandom(16).hex()
|
|
ciphertext = _aes_ecb_pkcs7(data, aeskey)
|
|
filesize = len(ciphertext)
|
|
|
|
with httpx.Client(timeout=30.0) as c:
|
|
ru = c.post(
|
|
f"{self.base_url}/ilink/bot/getuploadurl",
|
|
json={
|
|
"filekey": filekey,
|
|
"media_type": UPLOAD_MEDIA_FILE,
|
|
"to_user_id": to_user_id,
|
|
"rawsize": rawsize,
|
|
"rawfilemd5": rawmd5,
|
|
"filesize": filesize,
|
|
"no_need_thumb": True,
|
|
"aeskey": aeskey.hex(),
|
|
"base_info": _base_info(),
|
|
},
|
|
headers=_headers(self.bot_token),
|
|
)
|
|
ru.raise_for_status()
|
|
uj = ru.json()
|
|
full = (uj.get("upload_full_url") or uj.get("uploadFullUrl")
|
|
or uj.get("full_url") or uj.get("url"))
|
|
param = (uj.get("upload_param") or uj.get("uploadParam") or uj.get("param"))
|
|
if full:
|
|
cdn_url = full
|
|
elif param:
|
|
cdn_url = (f"{CDN_BASE}/upload?encrypted_query_param={quote(param)}"
|
|
f"&filekey={quote(filekey)}")
|
|
else:
|
|
raise RuntimeError(f"getuploadurl 无 upload url/param: {uj}")
|
|
|
|
rc = c.post(cdn_url, content=ciphertext,
|
|
headers={"Content-Type": "application/octet-stream"})
|
|
download_param = rc.headers.get("x-encrypted-param")
|
|
if rc.status_code != 200 or not download_param:
|
|
raise RuntimeError(
|
|
f"CDN 上传失败 http={rc.status_code} "
|
|
f"err={rc.headers.get('x-error-message')}"
|
|
)
|
|
return {
|
|
"encrypt_query_param": download_param,
|
|
"aes_key": base64.b64encode(aeskey.hex().encode()).decode(),
|
|
"rawsize": rawsize,
|
|
}
|
|
|
|
def send_file(
|
|
self,
|
|
to_user_id: str,
|
|
context_token: str,
|
|
file_path: str | os.PathLike,
|
|
*,
|
|
file_name: Optional[str] = None,
|
|
) -> None:
|
|
data = _read_file_capped(file_path)
|
|
name = file_name or os.path.basename(str(file_path))
|
|
up = self._upload_file(to_user_id, data)
|
|
item = {
|
|
"type": ITEM_FILE,
|
|
"file_item": {
|
|
"media": {
|
|
"encrypt_query_param": up["encrypt_query_param"],
|
|
"aes_key": up["aes_key"],
|
|
"encrypt_type": 1,
|
|
},
|
|
"file_name": name,
|
|
"len": str(up["rawsize"]),
|
|
},
|
|
}
|
|
self._send(to_user_id, context_token, item, state=STATE_FINISH)
|
|
|
|
|
|
def attachment_basename(att: InboundAttachment) -> str:
|
|
"""入站附件的安全落盘文件名(不含目录):剥掉路径分隔防穿越;图片按 magic bytes 补扩展名。
|
|
|
|
返回的是 basename,调用方负责加前缀(时间戳 / 随机)防重名并拼到 inbound 目录下。
|
|
"""
|
|
if att.kind == "image":
|
|
ext = _guess_image_ext(att.data or b"")
|
|
return f"image{ext}"
|
|
name = os.path.basename((att.file_name or "file").replace("\\", "/")).strip()
|
|
return name or "file"
|
|
|
|
|
|
def _read_file_capped(file_path: str | os.PathLike) -> bytes:
|
|
size = os.path.getsize(file_path)
|
|
if size > MAX_FILE_BYTES:
|
|
raise ValueError(f"文件超过 {MAX_FILE_BYTES // (1024*1024)}MB 上限")
|
|
with open(file_path, "rb") as f:
|
|
return f.read()
|