224 lines
8.3 KiB
Python
224 lines
8.3 KiB
Python
"""探测六:验证 ClawBot 能否发【文件附件】(照官方 @tencent-weixin/openclaw-weixin 协议复刻)。
|
|
|
|
流程(全诊断,每步打印):
|
|
绑定 -> 等你发一条消息(拿 to_user + context_token) -> 造个小 txt ->
|
|
md5/随机aeskey(16B)/随机filekey(16B hex) -> AES-128-ECB+PKCS7 加密 ->
|
|
POST /ilink/bot/getuploadurl(打印完整返回,字段名不对可据此改) ->
|
|
POST 密文到 CDN 拿 header x-encrypted-param ->
|
|
sendmessage 带 file_item(type=4) 引用 -> 看手机是否收到文件。
|
|
|
|
字段依据(源码):MessageItemType.FILE=4 / UploadMediaType.FILE=3 / MessageState.FINISH=2,
|
|
aes_key = base64(aeskey.hex() 的 ascii 字节)。ASCII-only,bot_token 不打印。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import os
|
|
import random
|
|
import sys
|
|
import time
|
|
import uuid
|
|
from urllib.parse import quote
|
|
|
|
import httpx
|
|
import segno
|
|
from cryptography.hazmat.primitives import padding
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
|
BASE = "https://ilinkai.weixin.qq.com"
|
|
CDN_BASE_DEFAULT = "https://novac2c.cdn.weixin.qq.com/c2c"
|
|
QR_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
CHANNEL_VER = "1.0.2"
|
|
|
|
|
|
def _uin() -> str:
|
|
return base64.b64encode(str(random.randint(0, 2**32 - 1)).encode()).decode()
|
|
|
|
|
|
def _headers(token=None) -> dict:
|
|
h = {"Content-Type": "application/json",
|
|
"AuthorizationType": "ilink_bot_token", "X-WECHAT-UIN": _uin()}
|
|
if token:
|
|
h["Authorization"] = f"Bearer {token}"
|
|
return h
|
|
|
|
|
|
def _new_qr():
|
|
with httpx.Client(timeout=20) as c:
|
|
r = c.get(f"{BASE}/ilink/bot/get_bot_qrcode",
|
|
params={"bot_type": "3"}, headers=_headers())
|
|
if r.status_code != 200:
|
|
print(f"[FAIL] http {r.status_code}"); return None
|
|
d = r.json()
|
|
uniq = os.path.join(QR_DIR, f"clawbot_qr_{int(time.time())}.png")
|
|
segno.make(d.get("qrcode_img_content", ""), error="m").save(uniq, scale=8, border=3)
|
|
try:
|
|
os.startfile(uniq)
|
|
except Exception:
|
|
pass
|
|
print(f"[bind] FRESH QR -> {uniq}")
|
|
return d.get("qrcode", "")
|
|
|
|
|
|
def bind():
|
|
print("[bind] auto-refresh on expiry; scan whenever ready.")
|
|
qid = _new_qr()
|
|
if not qid:
|
|
return None
|
|
deadline = time.time() + 300
|
|
with httpx.Client(timeout=40) as c:
|
|
last = ""
|
|
while time.time() < deadline:
|
|
try:
|
|
j = c.get(f"{BASE}/ilink/bot/get_qrcode_status",
|
|
params={"qrcode": qid}, headers=_headers()).json()
|
|
st = j.get("status", "")
|
|
if st != last:
|
|
print(f"[bind] status={st!r}"); last = st
|
|
if st == "confirmed":
|
|
return j.get("bot_token", ""), (j.get("baseurl") or BASE)
|
|
if st == "expired":
|
|
nq = _new_qr()
|
|
if not nq:
|
|
return None
|
|
qid, last = nq, ""
|
|
except Exception as e:
|
|
print(f"[bind] err {e}")
|
|
time.sleep(2)
|
|
return None
|
|
|
|
|
|
def wait_msg(c, token):
|
|
deadline = time.time() + 150
|
|
buf = ""
|
|
while time.time() < deadline:
|
|
try:
|
|
j = c.post(f"{BASE}/ilink/bot/getupdates",
|
|
json={"get_updates_buf": buf,
|
|
"base_info": {"channel_version": CHANNEL_VER}},
|
|
headers=_headers(token)).json()
|
|
buf = j.get("get_updates_buf", buf)
|
|
for m in j.get("msgs", []) or []:
|
|
txt = "".join((it.get("text_item", {}) or {}).get("text", "")
|
|
for it in m.get("item_list", []) or [])
|
|
print(f"[recv] <- {txt!r}")
|
|
return m
|
|
except Exception as e:
|
|
print(f"[recv] err {e}"); time.sleep(2)
|
|
return None
|
|
|
|
|
|
def aes_ecb_pkcs7(plain: bytes, key: bytes) -> bytes:
|
|
padder = padding.PKCS7(128).padder()
|
|
padded = padder.update(plain) + padder.finalize()
|
|
enc = Cipher(algorithms.AES(key), modes.ECB()).encryptor()
|
|
return enc.update(padded) + enc.finalize()
|
|
|
|
|
|
def main() -> int:
|
|
b = bind()
|
|
if not b:
|
|
return 2
|
|
token, base_url = b
|
|
global BASE
|
|
BASE = base_url or BASE
|
|
print("[bind] confirmed.\n[file] SEND one message now (e.g. 'file') ...")
|
|
|
|
with httpx.Client(timeout=30) as c:
|
|
m = wait_msg(c, token)
|
|
if not m:
|
|
print("no msg; abort."); return 1
|
|
to_user, ctx = m.get("from_user_id", ""), m.get("context_token", "")
|
|
|
|
# 1) 造测试文件
|
|
fpath = os.path.join(QR_DIR, "zcbot_filetest.txt")
|
|
with open(fpath, "w", encoding="utf-8") as f:
|
|
f.write("zcbot 文件发送测试\nClawBot file attachment probe\n" + "x" * 200)
|
|
data = open(fpath, "rb").read()
|
|
fname = "zcbot_filetest.txt"
|
|
rawsize = len(data)
|
|
rawmd5 = hashlib.md5(data).hexdigest()
|
|
aeskey = random.randbytes(16)
|
|
filekey = random.randbytes(16).hex()
|
|
cipher = aes_ecb_pkcs7(data, aeskey)
|
|
filesize = len(cipher)
|
|
print(f"[file] {fname} rawsize={rawsize} md5={rawmd5} filesize(enc)={filesize}")
|
|
|
|
# 2) getuploadurl
|
|
up_body = {
|
|
"filekey": filekey, "media_type": 3, "to_user_id": to_user,
|
|
"rawsize": rawsize, "rawfilemd5": rawmd5, "filesize": filesize,
|
|
"no_need_thumb": True, "aeskey": aeskey.hex(),
|
|
"base_info": {"channel_version": CHANNEL_VER},
|
|
}
|
|
ru = c.post(f"{BASE}/ilink/bot/getuploadurl", json=up_body, headers=_headers(token))
|
|
print(f"[getuploadurl] http={ru.status_code}")
|
|
try:
|
|
uj = ru.json()
|
|
except Exception:
|
|
print(f"[getuploadurl] non-json: {ru.text[:300]}"); return 3
|
|
print(f"[getuploadurl] resp={uj}")
|
|
|
|
# 3) 解析上传 URL(字段名不确定,多名兜底)
|
|
full = (uj.get("upload_full_url") or uj.get("uploadFullUrl")
|
|
or uj.get("full_url") or uj.get("url"))
|
|
param = (uj.get("upload_param") or uj.get("uploadParam") or uj.get("param"))
|
|
cdn_base = uj.get("cdn_base_url") or uj.get("cdnBaseUrl") or CDN_BASE_DEFAULT
|
|
if full:
|
|
cdn_url = full
|
|
elif param:
|
|
# 源码模板:?encrypted_query_param=<urlencode(uploadParam)>&filekey=<urlencode(filekey)>
|
|
cdn_url = (f"{cdn_base}/upload?encrypted_query_param={quote(param)}"
|
|
f"&filekey={quote(filekey)}")
|
|
else:
|
|
print("[FAIL] no upload url/param in resp; inspect resp above to fix field names.")
|
|
return 4
|
|
print(f"[upload] POST ciphertext -> {cdn_url[:120]}...")
|
|
|
|
# 4) 上传密文到 CDN
|
|
rc = c.post(cdn_url, content=cipher,
|
|
headers={"Content-Type": "application/octet-stream"})
|
|
download_param = rc.headers.get("x-encrypted-param")
|
|
print(f"[upload] http={rc.status_code} x-encrypted-param={download_param!r}")
|
|
if not download_param:
|
|
print(f"[upload] resp headers={dict(rc.headers)} body={rc.text[:200]}")
|
|
print("[FAIL] no x-encrypted-param returned; upload likely rejected.")
|
|
return 5
|
|
|
|
# 5) sendmessage 带 file_item
|
|
msg_body = {
|
|
"msg": {
|
|
"from_user_id": "", "to_user_id": to_user,
|
|
"client_id": f"openclaw-weixin-{uuid.uuid4().hex}",
|
|
"message_type": 2, "message_state": 2, "context_token": ctx,
|
|
"item_list": [{
|
|
"type": 4,
|
|
"file_item": {
|
|
"media": {
|
|
"encrypt_query_param": download_param,
|
|
"aes_key": base64.b64encode(aeskey.hex().encode()).decode(),
|
|
"encrypt_type": 1,
|
|
},
|
|
"file_name": fname,
|
|
"len": str(rawsize),
|
|
},
|
|
}],
|
|
},
|
|
"base_info": {"channel_version": CHANNEL_VER},
|
|
}
|
|
rs = c.post(f"{BASE}/ilink/bot/sendmessage", json=msg_body, headers=_headers(token))
|
|
try:
|
|
sj = rs.json()
|
|
except Exception:
|
|
sj = rs.text[:200]
|
|
print(f"[sendmessage file] http={rs.status_code} body={sj}")
|
|
|
|
print("\n========== CHECK YOUR PHONE ==========")
|
|
print(f"Did a file '{fname}' arrive in the WeChat ClawBot chat (openable)?")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|