"""ClawBot 个人微信 iLink Bot API 客户端(DESIGN §8.7 渠道 A)。 协议全部经真机实测(`scripts/probe_clawbot*.py`,2026-06-23): - 绑定:`get_bot_qrcode`(无凭据,出深链 → 自渲二维码)→ 轮询 `get_qrcode_status` (TTL ~1min,过期换码)→ `confirmed` 得 `bot_token` + `baseurl`。 - 收:`getupdates` 长轮询(hold ≤35s),消息带 `from_user_id` + `context_token`。 - 发:`sendmessage`,**每条 `client_id` 必唯一**(漏则同 token 后续被丢);多条/长文 按 ~1000 字分块,中间 `message_state=GENERATING(1)`、末块 `FINISH(2)`,间隔 ~300ms。 - `context_token` 有效期 ~24h、可复用 → 主动推送靠它(用户须先开口拿到 token)。 - 文件:`getuploadurl` → AES-128-ECB(PKCS7)加密 → POST 密文到 CDN 拿 `x-encrypted-param` → `sendmessage` 带 `file_item`。 纯协议客户端,不碰 DB / agent 编排。阻塞 IO(httpx 同步),调用方放 to_thread / executor。 """ from __future__ import annotations import base64 import hashlib import os import time import uuid from dataclasses import dataclass from typing import Any, Optional from urllib.parse import quote import httpx from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes DEFAULT_BASE = "https://ilinkai.weixin.qq.com" CDN_BASE = "https://novac2c.cdn.weixin.qq.com/c2c" CHANNEL_VERSION = "1.0.2" BOT_TYPE_PERSONAL = 3 # 协议枚举(源码 @tencent-weixin/openclaw-weixin src/api/types.ts,已实测) MSG_TYPE_BOT = 2 STATE_GENERATING = 1 STATE_FINISH = 2 ITEM_TEXT = 1 ITEM_IMAGE = 2 ITEM_FILE = 4 UPLOAD_MEDIA_FILE = 3 UPLOAD_MEDIA_IMAGE = 1 # 分块:长文按 ~1000 字切,块间隔防丢 CHUNK_CHARS = 1000 CHUNK_DELAY_S = 0.3 MAX_FILE_BYTES = 20 * 1024 * 1024 def _uin_header() -> str: """X-WECHAT-UIN:base64(随机 uint32 的十进制字符串),反重放,每请求变。""" n = int.from_bytes(os.urandom(4), "big") return base64.b64encode(str(n).encode()).decode() def _headers(bot_token: Optional[str] = None) -> dict[str, str]: h = { "Content-Type": "application/json", "AuthorizationType": "ilink_bot_token", "X-WECHAT-UIN": _uin_header(), } if bot_token: h["Authorization"] = f"Bearer {bot_token}" return h def _base_info() -> dict[str, str]: return {"channel_version": CHANNEL_VERSION} def _new_client_id() -> str: return f"openclaw-weixin-{uuid.uuid4().hex}" def _aes_ecb_pkcs7(plaintext: bytes, key: bytes) -> bytes: padder = padding.PKCS7(128).padder() padded = padder.update(plaintext) + padder.finalize() enc = Cipher(algorithms.AES(key), modes.ECB()).encryptor() return enc.update(padded) + enc.finalize() # ─────────────────────────── 绑定(无 token)─────────────────────────── @dataclass class QrCode: qrcode_id: str deeplink: str # liteapp.weixin.qq.com/q/...,调用方自渲成二维码图片 def get_bot_qrcode(base_url: str = DEFAULT_BASE, *, timeout: float = 20.0) -> QrCode: """取一张绑定二维码。无需任何预置凭据。`deeplink` 需自渲成二维码让用户扫。""" with httpx.Client(timeout=timeout) as c: r = c.get( f"{base_url}/ilink/bot/get_bot_qrcode", params={"bot_type": BOT_TYPE_PERSONAL}, headers=_headers(), ) r.raise_for_status() d = r.json() return QrCode(qrcode_id=d.get("qrcode", ""), deeplink=d.get("qrcode_img_content", "")) @dataclass class BindResult: status: str # wait | confirmed | expired bot_token: Optional[str] = None base_url: Optional[str] = None def poll_qrcode_status( qrcode_id: str, base_url: str = DEFAULT_BASE, *, timeout: float = 40.0 ) -> BindResult: """单次轮询扫码状态(服务端长轮询,hold 数十秒)。调用方循环调用, 遇 `expired` 重新 `get_bot_qrcode` 换码。`confirmed` 时返回 bot_token + base_url。""" with httpx.Client(timeout=timeout) as c: r = c.get( f"{base_url}/ilink/bot/get_qrcode_status", params={"qrcode": qrcode_id}, headers=_headers(), ) r.raise_for_status() d = r.json() return BindResult( status=d.get("status", ""), bot_token=d.get("bot_token"), base_url=d.get("baseurl") or d.get("base_url"), ) # ─────────────────────────── 收发(带 token)─────────────────────────── @dataclass class InboundMessage: from_user_id: str # xxx@im.wechat context_token: str # 回复 / 24h 内主动推须带回 text: str raw: dict[str, Any] class ILinkClient: """绑定后按用户持有 `bot_token` + `base_url`,收发该用户消息。""" def __init__(self, bot_token: str, base_url: str = DEFAULT_BASE) -> None: self.bot_token = bot_token self.base_url = base_url or DEFAULT_BASE # —— 收 —— def get_updates( self, cursor: str = "", *, timeout: float = 45.0 ) -> tuple[list[InboundMessage], str]: """长轮询拉新消息。返回 (消息列表, 新游标);游标传回下次调用。""" with httpx.Client(timeout=timeout) as c: r = c.post( f"{self.base_url}/ilink/bot/getupdates", json={"get_updates_buf": cursor, "base_info": _base_info()}, headers=_headers(self.bot_token), ) r.raise_for_status() d = r.json() msgs: list[InboundMessage] = [] for m in d.get("msgs", []) or []: text = "".join( (it.get("text_item", {}) or {}).get("text", "") for it in m.get("item_list", []) or [] ) msgs.append(InboundMessage( from_user_id=m.get("from_user_id", ""), context_token=m.get("context_token", ""), text=text, raw=m, )) return msgs, d.get("get_updates_buf", cursor) # —— 发(底层单条)—— def _send( self, to_user_id: str, context_token: str, item: dict, *, state: int ) -> None: body = { "msg": { "from_user_id": "", "to_user_id": to_user_id, "client_id": _new_client_id(), "message_type": MSG_TYPE_BOT, "message_state": state, "context_token": context_token, "item_list": [item], }, "base_info": _base_info(), } with httpx.Client(timeout=30.0) as c: r = c.post( f"{self.base_url}/ilink/bot/sendmessage", json=body, headers=_headers(self.bot_token), ) # 成功为 HTTP 200 + 空 body {};非 200 抛错(空 body 不代表失败) r.raise_for_status() # —— 发文本(自动分块,长文不丢)—— def send_text(self, to_user_id: str, context_token: str, text: str) -> None: text = text or "" chunks = [text[i:i + CHUNK_CHARS] for i in range(0, len(text), CHUNK_CHARS)] or [""] last = len(chunks) - 1 for i, chunk in enumerate(chunks): self._send( to_user_id, context_token, {"type": ITEM_TEXT, "text_item": {"text": chunk}}, state=STATE_FINISH if i == last else STATE_GENERATING, ) if i != last: time.sleep(CHUNK_DELAY_S) # —— 发文件(getuploadurl → AES-128-ECB → CDN → file_item)—— def _upload_file(self, to_user_id: str, data: bytes) -> dict[str, Any]: rawsize = len(data) rawmd5 = hashlib.md5(data).hexdigest() aeskey = os.urandom(16) filekey = os.urandom(16).hex() ciphertext = _aes_ecb_pkcs7(data, aeskey) filesize = len(ciphertext) with httpx.Client(timeout=30.0) as c: ru = c.post( f"{self.base_url}/ilink/bot/getuploadurl", json={ "filekey": filekey, "media_type": UPLOAD_MEDIA_FILE, "to_user_id": to_user_id, "rawsize": rawsize, "rawfilemd5": rawmd5, "filesize": filesize, "no_need_thumb": True, "aeskey": aeskey.hex(), "base_info": _base_info(), }, headers=_headers(self.bot_token), ) ru.raise_for_status() uj = ru.json() full = (uj.get("upload_full_url") or uj.get("uploadFullUrl") or uj.get("full_url") or uj.get("url")) param = (uj.get("upload_param") or uj.get("uploadParam") or uj.get("param")) if full: cdn_url = full elif param: cdn_url = (f"{CDN_BASE}/upload?encrypted_query_param={quote(param)}" f"&filekey={quote(filekey)}") else: raise RuntimeError(f"getuploadurl 无 upload url/param: {uj}") rc = c.post(cdn_url, content=ciphertext, headers={"Content-Type": "application/octet-stream"}) download_param = rc.headers.get("x-encrypted-param") if rc.status_code != 200 or not download_param: raise RuntimeError( f"CDN 上传失败 http={rc.status_code} " f"err={rc.headers.get('x-error-message')}" ) return { "encrypt_query_param": download_param, "aes_key": base64.b64encode(aeskey.hex().encode()).decode(), "rawsize": rawsize, } def send_file( self, to_user_id: str, context_token: str, file_path: str | os.PathLike, *, file_name: Optional[str] = None, ) -> None: data = _read_file_capped(file_path) name = file_name or os.path.basename(str(file_path)) up = self._upload_file(to_user_id, data) item = { "type": ITEM_FILE, "file_item": { "media": { "encrypt_query_param": up["encrypt_query_param"], "aes_key": up["aes_key"], "encrypt_type": 1, }, "file_name": name, "len": str(up["rawsize"]), }, } self._send(to_user_id, context_token, item, state=STATE_FINISH) def _read_file_capped(file_path: str | os.PathLike) -> bytes: size = os.path.getsize(file_path) if size > MAX_FILE_BYTES: raise ValueError(f"文件超过 {MAX_FILE_BYTES // (1024*1024)}MB 上限") with open(file_path, "rb") as f: return f.read()