"""探测六:验证 ClawBot 能否发【文件附件】(照官方 @tencent-weixin/openclaw-weixin 协议复刻)。 流程(全诊断,每步打印): 绑定 -> 等你发一条消息(拿 to_user + context_token) -> 造个小 txt -> md5/随机aeskey(16B)/随机filekey(16B hex) -> AES-128-ECB+PKCS7 加密 -> POST /ilink/bot/getuploadurl(打印完整返回,字段名不对可据此改) -> POST 密文到 CDN 拿 header x-encrypted-param -> sendmessage 带 file_item(type=4) 引用 -> 看手机是否收到文件。 字段依据(源码):MessageItemType.FILE=4 / UploadMediaType.FILE=3 / MessageState.FINISH=2, aes_key = base64(aeskey.hex() 的 ascii 字节)。ASCII-only,bot_token 不打印。 """ from __future__ import annotations import base64 import hashlib import os import random import sys import time import uuid from urllib.parse import quote import httpx import segno from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes BASE = "https://ilinkai.weixin.qq.com" CDN_BASE_DEFAULT = "https://novac2c.cdn.weixin.qq.com/c2c" QR_DIR = os.path.dirname(os.path.abspath(__file__)) CHANNEL_VER = "1.0.2" def _uin() -> str: return base64.b64encode(str(random.randint(0, 2**32 - 1)).encode()).decode() def _headers(token=None) -> dict: h = {"Content-Type": "application/json", "AuthorizationType": "ilink_bot_token", "X-WECHAT-UIN": _uin()} if token: h["Authorization"] = f"Bearer {token}" return h def _new_qr(): with httpx.Client(timeout=20) as c: r = c.get(f"{BASE}/ilink/bot/get_bot_qrcode", params={"bot_type": "3"}, headers=_headers()) if r.status_code != 200: print(f"[FAIL] http {r.status_code}"); return None d = r.json() uniq = os.path.join(QR_DIR, f"clawbot_qr_{int(time.time())}.png") segno.make(d.get("qrcode_img_content", ""), error="m").save(uniq, scale=8, border=3) try: os.startfile(uniq) except Exception: pass print(f"[bind] FRESH QR -> {uniq}") return d.get("qrcode", "") def bind(): print("[bind] auto-refresh on expiry; scan whenever ready.") qid = _new_qr() if not qid: return None deadline = time.time() + 300 with httpx.Client(timeout=40) as c: last = "" while time.time() < deadline: try: j = c.get(f"{BASE}/ilink/bot/get_qrcode_status", params={"qrcode": qid}, headers=_headers()).json() st = j.get("status", "") if st != last: print(f"[bind] status={st!r}"); last = st if st == "confirmed": return j.get("bot_token", ""), (j.get("baseurl") or BASE) if st == "expired": nq = _new_qr() if not nq: return None qid, last = nq, "" except Exception as e: print(f"[bind] err {e}") time.sleep(2) return None def wait_msg(c, token): deadline = time.time() + 150 buf = "" while time.time() < deadline: try: j = c.post(f"{BASE}/ilink/bot/getupdates", json={"get_updates_buf": buf, "base_info": {"channel_version": CHANNEL_VER}}, headers=_headers(token)).json() buf = j.get("get_updates_buf", buf) for m in j.get("msgs", []) or []: txt = "".join((it.get("text_item", {}) or {}).get("text", "") for it in m.get("item_list", []) or []) print(f"[recv] <- {txt!r}") return m except Exception as e: print(f"[recv] err {e}"); time.sleep(2) return None def aes_ecb_pkcs7(plain: bytes, key: bytes) -> bytes: padder = padding.PKCS7(128).padder() padded = padder.update(plain) + padder.finalize() enc = Cipher(algorithms.AES(key), modes.ECB()).encryptor() return enc.update(padded) + enc.finalize() def main() -> int: b = bind() if not b: return 2 token, base_url = b global BASE BASE = base_url or BASE print("[bind] confirmed.\n[file] SEND one message now (e.g. 'file') ...") with httpx.Client(timeout=30) as c: m = wait_msg(c, token) if not m: print("no msg; abort."); return 1 to_user, ctx = m.get("from_user_id", ""), m.get("context_token", "") # 1) 造测试文件 fpath = os.path.join(QR_DIR, "zcbot_filetest.txt") with open(fpath, "w", encoding="utf-8") as f: f.write("zcbot 文件发送测试\nClawBot file attachment probe\n" + "x" * 200) data = open(fpath, "rb").read() fname = "zcbot_filetest.txt" rawsize = len(data) rawmd5 = hashlib.md5(data).hexdigest() aeskey = random.randbytes(16) filekey = random.randbytes(16).hex() cipher = aes_ecb_pkcs7(data, aeskey) filesize = len(cipher) print(f"[file] {fname} rawsize={rawsize} md5={rawmd5} filesize(enc)={filesize}") # 2) getuploadurl up_body = { "filekey": filekey, "media_type": 3, "to_user_id": to_user, "rawsize": rawsize, "rawfilemd5": rawmd5, "filesize": filesize, "no_need_thumb": True, "aeskey": aeskey.hex(), "base_info": {"channel_version": CHANNEL_VER}, } ru = c.post(f"{BASE}/ilink/bot/getuploadurl", json=up_body, headers=_headers(token)) print(f"[getuploadurl] http={ru.status_code}") try: uj = ru.json() except Exception: print(f"[getuploadurl] non-json: {ru.text[:300]}"); return 3 print(f"[getuploadurl] resp={uj}") # 3) 解析上传 URL(字段名不确定,多名兜底) full = (uj.get("upload_full_url") or uj.get("uploadFullUrl") or uj.get("full_url") or uj.get("url")) param = (uj.get("upload_param") or uj.get("uploadParam") or uj.get("param")) cdn_base = uj.get("cdn_base_url") or uj.get("cdnBaseUrl") or CDN_BASE_DEFAULT if full: cdn_url = full elif param: # 源码模板:?encrypted_query_param=&filekey= cdn_url = (f"{cdn_base}/upload?encrypted_query_param={quote(param)}" f"&filekey={quote(filekey)}") else: print("[FAIL] no upload url/param in resp; inspect resp above to fix field names.") return 4 print(f"[upload] POST ciphertext -> {cdn_url[:120]}...") # 4) 上传密文到 CDN rc = c.post(cdn_url, content=cipher, headers={"Content-Type": "application/octet-stream"}) download_param = rc.headers.get("x-encrypted-param") print(f"[upload] http={rc.status_code} x-encrypted-param={download_param!r}") if not download_param: print(f"[upload] resp headers={dict(rc.headers)} body={rc.text[:200]}") print("[FAIL] no x-encrypted-param returned; upload likely rejected.") return 5 # 5) sendmessage 带 file_item msg_body = { "msg": { "from_user_id": "", "to_user_id": to_user, "client_id": f"openclaw-weixin-{uuid.uuid4().hex}", "message_type": 2, "message_state": 2, "context_token": ctx, "item_list": [{ "type": 4, "file_item": { "media": { "encrypt_query_param": download_param, "aes_key": base64.b64encode(aeskey.hex().encode()).decode(), "encrypt_type": 1, }, "file_name": fname, "len": str(rawsize), }, }], }, "base_info": {"channel_version": CHANNEL_VER}, } rs = c.post(f"{BASE}/ilink/bot/sendmessage", json=msg_body, headers=_headers(token)) try: sj = rs.json() except Exception: sj = rs.text[:200] print(f"[sendmessage file] http={rs.status_code} body={sj}") print("\n========== CHECK YOUR PHONE ==========") print(f"Did a file '{fname}' arrive in the WeChat ClawBot chat (openable)?") return 0 if __name__ == "__main__": sys.exit(main())