"""探测三:钉死 ClawBot 的 context_token 语义(决定拉取式简报 + 长回复可行性)。 要回答两个问题: T1 多发:一条用户消息收到后,用【同一个新鲜 token】连发两条回复 -> 第二条到不到 = 能否分段/多条回复(长简报关键) T2 延迟:第二条用户消息收到后,【先不回】,等 25s,再用那条【没用过的】token 回一次 -> 到不到 = token 是否限时(能否把回复推迟一会儿) 需要你【先后发两条消息】给「微信 ClawBot」(比如先发 1,再发 2)。 结果以手机实收为准(接口返空 body 不可信)。bot_token 不打印。ASCII-only。 """ from __future__ import annotations import base64 import os import random import sys import time import httpx import segno BASE = "https://ilinkai.weixin.qq.com" QR_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "clawbot_qr.png") CHANNEL_VER = "1.0.2" def _uin() -> str: return base64.b64encode(str(random.randint(0, 2**32 - 1)).encode()).decode() def _headers(token: str | None = None) -> dict: h = {"Content-Type": "application/json", "AuthorizationType": "ilink_bot_token", "X-WECHAT-UIN": _uin()} if token: h["Authorization"] = f"Bearer {token}" return h def _new_qr() -> str | None: """拉一张新二维码、弹窗,返回 qrcode id;失败返回 None。""" with httpx.Client(timeout=20) as c: r = c.get(f"{BASE}/ilink/bot/get_bot_qrcode", params={"bot_type": "3"}, headers=_headers()) if r.status_code != 200: print(f"[FAIL] get_bot_qrcode http {r.status_code}: {r.text[:200]}") return None d = r.json() qid = d.get("qrcode", "") uniq = os.path.join(os.path.dirname(QR_PATH), f"clawbot_qr_{int(time.time())}.png") segno.make(d.get("qrcode_img_content", ""), error="m").save(uniq, scale=8, border=3) try: os.startfile(uniq) except Exception: try: import webbrowser webbrowser.open("file://" + uniq.replace("\\", "/")) except Exception: pass print(f"[bind] FRESH QR -> {uniq} (older windows are stale, ignore them)") return qid def bind() -> tuple[str, str] | None: """过期自动换新码,直到扫成功或总超时(5min)。消除扫码时间竞争。""" print("[bind] GET get_bot_qrcode ... (auto-refresh on expiry; scan whenever ready)") qid = _new_qr() if not qid: return None deadline = time.time() + 300 with httpx.Client(timeout=40) as c: last = "" while time.time() < deadline: try: j = c.get(f"{BASE}/ilink/bot/get_qrcode_status", params={"qrcode": qid}, headers=_headers()).json() st = j.get("status", "") if st != last: print(f"[bind] status={st!r}"); last = st if st == "confirmed": return j.get("bot_token", ""), (j.get("baseurl") or BASE) if st == "expired": print("[bind] QR expired -> generating a new one ...") nq = _new_qr() if not nq: return None qid, last = nq, "" continue except Exception as e: print(f"[bind] err {type(e).__name__}: {e}") time.sleep(2) print("[bind] overall timeout (5min)."); return None def send(c, token, to_user, text, ctx): body = {"msg": {"to_user_id": to_user, "message_type": 2, "message_state": 2, "context_token": ctx, "item_list": [{"type": 1, "text_item": {"text": text}}]}} r = c.post(f"{BASE}/ilink/bot/sendmessage", json=body, headers=_headers(token)) try: return {"http": r.status_code, "json": r.json()} except Exception: return {"http": r.status_code, "text": r.text[:200]} def wait_msg(c, token, buf): """阻塞等下一条用户消息,返回 (msg, new_buf)。""" deadline = time.time() + 150 while time.time() < deadline: try: j = c.post(f"{BASE}/ilink/bot/getupdates", json={"get_updates_buf": buf, "base_info": {"channel_version": CHANNEL_VER}}, headers=_headers(token)).json() buf = j.get("get_updates_buf", buf) for m in j.get("msgs", []) or []: txt = "".join((it.get("text_item", {}) or {}).get("text", "") for it in m.get("item_list", []) or []) print(f"[recv] <- {txt!r}") return m, buf except Exception as e: print(f"[recv] err {type(e).__name__}: {e}"); time.sleep(2) return None, buf def main() -> int: b = bind() if not b: return 2 token, base_url = b global BASE BASE = base_url or BASE print("[bind] confirmed.\n") with httpx.Client(timeout=40) as c: # ---- T1: 同一 token 连发两条 ---- print("[T1] SEND your 1st message now (e.g. '1') ...") m, buf = wait_msg(c, token, "") if not m: print("[T1] no msg; abort."); return 1 to_user, ctx = m.get("from_user_id", ""), m.get("context_token", "") r1a = send(c, token, to_user, "[T1-a] 同token第一条(立即)", ctx) r1b = send(c, token, to_user, "[T1-b] 同token第二条(紧接)", ctx) print(f"[T1] sent two with same token. http: a={r1a.get('http')} b={r1b.get('http')}") # ---- T2: 收到后不回,延迟 25s 再用未用过的 token 回一次 ---- print("\n[T2] SEND your 2nd message now (e.g. '2') ...") m2, buf = wait_msg(c, token, buf) if not m2: print("[T2] no msg; skip."); else: to_user2, ctx2 = m2.get("from_user_id", ""), m2.get("context_token", "") print("[T2] received; NOT replying; waiting 25s...") time.sleep(25) r2 = send(c, token, to_user2, "[T2] 延迟25秒,未用过的token回复", ctx2) print(f"[T2] sent after delay. http={r2.get('http')}") print("\n========== CHECK YOUR PHONE ==========") print("Report which of these arrived in the WeChat ClawBot chat:") print(" [T1-a] 同token第一条(立即)") print(" [T1-b] 同token第二条(紧接) <- if arrives: multi-message per turn OK") print(" [T2] 延迟25秒,未用过的token回复 <- if arrives: token is time-windowed, deferred reply OK") return 0 if __name__ == "__main__": sys.exit(main())