zcbot/core/wechat/ilink.py

298 lines
11 KiB
Python

"""ClawBot 个人微信 iLink Bot API 客户端(DESIGN §8.7 渠道 A)。
协议全部经真机实测(`scripts/probe_clawbot*.py`,2026-06-23):
- 绑定:`get_bot_qrcode`(无凭据,出深链 → 自渲二维码)→ 轮询 `get_qrcode_status`
(TTL ~1min,过期换码)→ `confirmed` 得 `bot_token` + `baseurl`。
- 收:`getupdates` 长轮询(hold ≤35s),消息带 `from_user_id` + `context_token`。
- 发:`sendmessage`,**每条 `client_id` 必唯一**(漏则同 token 后续被丢);多条/长文
按 ~1000 字分块,中间 `message_state=GENERATING(1)`、末块 `FINISH(2)`,间隔 ~300ms。
- `context_token` 有效期 ~24h、可复用 → 主动推送靠它(用户须先开口拿到 token)。
- 文件:`getuploadurl` → AES-128-ECB(PKCS7)加密 → POST 密文到 CDN 拿 `x-encrypted-param`
→ `sendmessage` 带 `file_item`。
纯协议客户端,不碰 DB / agent 编排。阻塞 IO(httpx 同步),调用方放 to_thread / executor。
"""
from __future__ import annotations
import base64
import hashlib
import os
import time
import uuid
from dataclasses import dataclass
from typing import Any, Optional
from urllib.parse import quote
import httpx
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
DEFAULT_BASE = "https://ilinkai.weixin.qq.com"
CDN_BASE = "https://novac2c.cdn.weixin.qq.com/c2c"
CHANNEL_VERSION = "1.0.2"
BOT_TYPE_PERSONAL = 3
# 协议枚举(源码 @tencent-weixin/openclaw-weixin src/api/types.ts,已实测)
MSG_TYPE_BOT = 2
STATE_GENERATING = 1
STATE_FINISH = 2
ITEM_TEXT = 1
ITEM_IMAGE = 2
ITEM_FILE = 4
UPLOAD_MEDIA_FILE = 3
UPLOAD_MEDIA_IMAGE = 1
# 分块:长文按 ~1000 字切,块间隔防丢
CHUNK_CHARS = 1000
CHUNK_DELAY_S = 0.3
MAX_FILE_BYTES = 20 * 1024 * 1024
def _uin_header() -> str:
"""X-WECHAT-UIN:base64(随机 uint32 的十进制字符串),反重放,每请求变。"""
n = int.from_bytes(os.urandom(4), "big")
return base64.b64encode(str(n).encode()).decode()
def _headers(bot_token: Optional[str] = None) -> dict[str, str]:
h = {
"Content-Type": "application/json",
"AuthorizationType": "ilink_bot_token",
"X-WECHAT-UIN": _uin_header(),
}
if bot_token:
h["Authorization"] = f"Bearer {bot_token}"
return h
def _base_info() -> dict[str, str]:
return {"channel_version": CHANNEL_VERSION}
def _new_client_id() -> str:
return f"openclaw-weixin-{uuid.uuid4().hex}"
def _aes_ecb_pkcs7(plaintext: bytes, key: bytes) -> bytes:
padder = padding.PKCS7(128).padder()
padded = padder.update(plaintext) + padder.finalize()
enc = Cipher(algorithms.AES(key), modes.ECB()).encryptor()
return enc.update(padded) + enc.finalize()
# ─────────────────────────── 绑定(无 token)───────────────────────────
@dataclass
class QrCode:
qrcode_id: str
deeplink: str # liteapp.weixin.qq.com/q/...,调用方自渲成二维码图片
def get_bot_qrcode(base_url: str = DEFAULT_BASE, *, timeout: float = 20.0) -> QrCode:
"""取一张绑定二维码。无需任何预置凭据。`deeplink` 需自渲成二维码让用户扫。"""
with httpx.Client(timeout=timeout) as c:
r = c.get(
f"{base_url}/ilink/bot/get_bot_qrcode",
params={"bot_type": BOT_TYPE_PERSONAL},
headers=_headers(),
)
r.raise_for_status()
d = r.json()
return QrCode(qrcode_id=d.get("qrcode", ""), deeplink=d.get("qrcode_img_content", ""))
@dataclass
class BindResult:
status: str # wait | confirmed | expired
bot_token: Optional[str] = None
base_url: Optional[str] = None
def poll_qrcode_status(
qrcode_id: str, base_url: str = DEFAULT_BASE, *, timeout: float = 40.0
) -> BindResult:
"""单次轮询扫码状态(服务端长轮询,hold 数十秒)。调用方循环调用,
遇 `expired` 重新 `get_bot_qrcode` 换码。`confirmed` 时返回 bot_token + base_url。"""
with httpx.Client(timeout=timeout) as c:
r = c.get(
f"{base_url}/ilink/bot/get_qrcode_status",
params={"qrcode": qrcode_id},
headers=_headers(),
)
r.raise_for_status()
d = r.json()
return BindResult(
status=d.get("status", ""),
bot_token=d.get("bot_token"),
base_url=d.get("baseurl") or d.get("base_url"),
)
# ─────────────────────────── 收发(带 token)───────────────────────────
@dataclass
class InboundMessage:
from_user_id: str # xxx@im.wechat
context_token: str # 回复 / 24h 内主动推须带回
text: str
raw: dict[str, Any]
class ILinkClient:
"""绑定后按用户持有 `bot_token` + `base_url`,收发该用户消息。"""
def __init__(self, bot_token: str, base_url: str = DEFAULT_BASE) -> None:
self.bot_token = bot_token
self.base_url = base_url or DEFAULT_BASE
# —— 收 ——
def get_updates(
self, cursor: str = "", *, timeout: float = 45.0
) -> tuple[list[InboundMessage], str]:
"""长轮询拉新消息。返回 (消息列表, 新游标);游标传回下次调用。"""
with httpx.Client(timeout=timeout) as c:
r = c.post(
f"{self.base_url}/ilink/bot/getupdates",
json={"get_updates_buf": cursor, "base_info": _base_info()},
headers=_headers(self.bot_token),
)
r.raise_for_status()
d = r.json()
msgs: list[InboundMessage] = []
for m in d.get("msgs", []) or []:
text = "".join(
(it.get("text_item", {}) or {}).get("text", "")
for it in m.get("item_list", []) or []
)
msgs.append(InboundMessage(
from_user_id=m.get("from_user_id", ""),
context_token=m.get("context_token", ""),
text=text,
raw=m,
))
return msgs, d.get("get_updates_buf", cursor)
# —— 发(底层单条)——
def _send(
self, to_user_id: str, context_token: str, item: dict, *, state: int
) -> None:
body = {
"msg": {
"from_user_id": "",
"to_user_id": to_user_id,
"client_id": _new_client_id(),
"message_type": MSG_TYPE_BOT,
"message_state": state,
"context_token": context_token,
"item_list": [item],
},
"base_info": _base_info(),
}
with httpx.Client(timeout=30.0) as c:
r = c.post(
f"{self.base_url}/ilink/bot/sendmessage",
json=body,
headers=_headers(self.bot_token),
)
# 成功为 HTTP 200 + 空 body {};非 200 抛错(空 body 不代表失败)
r.raise_for_status()
# —— 发文本(自动分块,长文不丢)——
def send_text(self, to_user_id: str, context_token: str, text: str) -> None:
text = text or ""
chunks = [text[i:i + CHUNK_CHARS] for i in range(0, len(text), CHUNK_CHARS)] or [""]
last = len(chunks) - 1
for i, chunk in enumerate(chunks):
self._send(
to_user_id, context_token,
{"type": ITEM_TEXT, "text_item": {"text": chunk}},
state=STATE_FINISH if i == last else STATE_GENERATING,
)
if i != last:
time.sleep(CHUNK_DELAY_S)
# —— 发文件(getuploadurl → AES-128-ECB → CDN → file_item)——
def _upload_file(self, to_user_id: str, data: bytes) -> dict[str, Any]:
rawsize = len(data)
rawmd5 = hashlib.md5(data).hexdigest()
aeskey = os.urandom(16)
filekey = os.urandom(16).hex()
ciphertext = _aes_ecb_pkcs7(data, aeskey)
filesize = len(ciphertext)
with httpx.Client(timeout=30.0) as c:
ru = c.post(
f"{self.base_url}/ilink/bot/getuploadurl",
json={
"filekey": filekey,
"media_type": UPLOAD_MEDIA_FILE,
"to_user_id": to_user_id,
"rawsize": rawsize,
"rawfilemd5": rawmd5,
"filesize": filesize,
"no_need_thumb": True,
"aeskey": aeskey.hex(),
"base_info": _base_info(),
},
headers=_headers(self.bot_token),
)
ru.raise_for_status()
uj = ru.json()
full = (uj.get("upload_full_url") or uj.get("uploadFullUrl")
or uj.get("full_url") or uj.get("url"))
param = (uj.get("upload_param") or uj.get("uploadParam") or uj.get("param"))
if full:
cdn_url = full
elif param:
cdn_url = (f"{CDN_BASE}/upload?encrypted_query_param={quote(param)}"
f"&filekey={quote(filekey)}")
else:
raise RuntimeError(f"getuploadurl 无 upload url/param: {uj}")
rc = c.post(cdn_url, content=ciphertext,
headers={"Content-Type": "application/octet-stream"})
download_param = rc.headers.get("x-encrypted-param")
if rc.status_code != 200 or not download_param:
raise RuntimeError(
f"CDN 上传失败 http={rc.status_code} "
f"err={rc.headers.get('x-error-message')}"
)
return {
"encrypt_query_param": download_param,
"aes_key": base64.b64encode(aeskey.hex().encode()).decode(),
"rawsize": rawsize,
}
def send_file(
self,
to_user_id: str,
context_token: str,
file_path: str | os.PathLike,
*,
file_name: Optional[str] = None,
) -> None:
data = _read_file_capped(file_path)
name = file_name or os.path.basename(str(file_path))
up = self._upload_file(to_user_id, data)
item = {
"type": ITEM_FILE,
"file_item": {
"media": {
"encrypt_query_param": up["encrypt_query_param"],
"aes_key": up["aes_key"],
"encrypt_type": 1,
},
"file_name": name,
"len": str(up["rawsize"]),
},
}
self._send(to_user_id, context_token, item, state=STATE_FINISH)
def _read_file_capped(file_path: str | os.PathLike) -> bytes:
size = os.path.getsize(file_path)
if size > MAX_FILE_BYTES:
raise ValueError(f"文件超过 {MAX_FILE_BYTES // (1024*1024)}MB 上限")
with open(file_path, "rb") as f:
return f.read()