zcbot/scripts/probe_clawbot_token.py

169 lines
6.5 KiB
Python

"""探测三:钉死 ClawBot 的 context_token 语义(决定拉取式简报 + 长回复可行性)。
要回答两个问题:
T1 多发:一条用户消息收到后,用【同一个新鲜 token】连发两条回复
-> 第二条到不到 = 能否分段/多条回复(长简报关键)
T2 延迟:第二条用户消息收到后,【先不回】,等 25s,再用那条【没用过的】token 回一次
-> 到不到 = token 是否限时(能否把回复推迟一会儿)
需要你【先后发两条消息】给「微信 ClawBot」(比如先发 1,再发 2)。
结果以手机实收为准(接口返空 body 不可信)。bot_token 不打印。ASCII-only。
"""
from __future__ import annotations
import base64
import os
import random
import sys
import time
import httpx
import segno
BASE = "https://ilinkai.weixin.qq.com"
QR_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "clawbot_qr.png")
CHANNEL_VER = "1.0.2"
def _uin() -> str:
return base64.b64encode(str(random.randint(0, 2**32 - 1)).encode()).decode()
def _headers(token: str | None = None) -> dict:
h = {"Content-Type": "application/json",
"AuthorizationType": "ilink_bot_token",
"X-WECHAT-UIN": _uin()}
if token:
h["Authorization"] = f"Bearer {token}"
return h
def _new_qr() -> str | None:
"""拉一张新二维码、弹窗,返回 qrcode id;失败返回 None。"""
with httpx.Client(timeout=20) as c:
r = c.get(f"{BASE}/ilink/bot/get_bot_qrcode",
params={"bot_type": "3"}, headers=_headers())
if r.status_code != 200:
print(f"[FAIL] get_bot_qrcode http {r.status_code}: {r.text[:200]}")
return None
d = r.json()
qid = d.get("qrcode", "")
uniq = os.path.join(os.path.dirname(QR_PATH), f"clawbot_qr_{int(time.time())}.png")
segno.make(d.get("qrcode_img_content", ""), error="m").save(uniq, scale=8, border=3)
try:
os.startfile(uniq)
except Exception:
try:
import webbrowser
webbrowser.open("file://" + uniq.replace("\\", "/"))
except Exception:
pass
print(f"[bind] FRESH QR -> {uniq} (older windows are stale, ignore them)")
return qid
def bind() -> tuple[str, str] | None:
"""过期自动换新码,直到扫成功或总超时(5min)。消除扫码时间竞争。"""
print("[bind] GET get_bot_qrcode ... (auto-refresh on expiry; scan whenever ready)")
qid = _new_qr()
if not qid:
return None
deadline = time.time() + 300
with httpx.Client(timeout=40) as c:
last = ""
while time.time() < deadline:
try:
j = c.get(f"{BASE}/ilink/bot/get_qrcode_status",
params={"qrcode": qid}, headers=_headers()).json()
st = j.get("status", "")
if st != last:
print(f"[bind] status={st!r}"); last = st
if st == "confirmed":
return j.get("bot_token", ""), (j.get("baseurl") or BASE)
if st == "expired":
print("[bind] QR expired -> generating a new one ...")
nq = _new_qr()
if not nq:
return None
qid, last = nq, ""
continue
except Exception as e:
print(f"[bind] err {type(e).__name__}: {e}")
time.sleep(2)
print("[bind] overall timeout (5min)."); return None
def send(c, token, to_user, text, ctx):
body = {"msg": {"to_user_id": to_user, "message_type": 2, "message_state": 2,
"context_token": ctx,
"item_list": [{"type": 1, "text_item": {"text": text}}]}}
r = c.post(f"{BASE}/ilink/bot/sendmessage", json=body, headers=_headers(token))
try:
return {"http": r.status_code, "json": r.json()}
except Exception:
return {"http": r.status_code, "text": r.text[:200]}
def wait_msg(c, token, buf):
"""阻塞等下一条用户消息,返回 (msg, new_buf)。"""
deadline = time.time() + 150
while time.time() < deadline:
try:
j = c.post(f"{BASE}/ilink/bot/getupdates",
json={"get_updates_buf": buf,
"base_info": {"channel_version": CHANNEL_VER}},
headers=_headers(token)).json()
buf = j.get("get_updates_buf", buf)
for m in j.get("msgs", []) or []:
txt = "".join((it.get("text_item", {}) or {}).get("text", "")
for it in m.get("item_list", []) or [])
print(f"[recv] <- {txt!r}")
return m, buf
except Exception as e:
print(f"[recv] err {type(e).__name__}: {e}"); time.sleep(2)
return None, buf
def main() -> int:
b = bind()
if not b:
return 2
token, base_url = b
global BASE
BASE = base_url or BASE
print("[bind] confirmed.\n")
with httpx.Client(timeout=40) as c:
# ---- T1: 同一 token 连发两条 ----
print("[T1] SEND your 1st message now (e.g. '1') ...")
m, buf = wait_msg(c, token, "")
if not m:
print("[T1] no msg; abort."); return 1
to_user, ctx = m.get("from_user_id", ""), m.get("context_token", "")
r1a = send(c, token, to_user, "[T1-a] 同token第一条(立即)", ctx)
r1b = send(c, token, to_user, "[T1-b] 同token第二条(紧接)", ctx)
print(f"[T1] sent two with same token. http: a={r1a.get('http')} b={r1b.get('http')}")
# ---- T2: 收到后不回,延迟 25s 再用未用过的 token 回一次 ----
print("\n[T2] SEND your 2nd message now (e.g. '2') ...")
m2, buf = wait_msg(c, token, buf)
if not m2:
print("[T2] no msg; skip.");
else:
to_user2, ctx2 = m2.get("from_user_id", ""), m2.get("context_token", "")
print("[T2] received; NOT replying; waiting 25s...")
time.sleep(25)
r2 = send(c, token, to_user2, "[T2] 延迟25秒,未用过的token回复", ctx2)
print(f"[T2] sent after delay. http={r2.get('http')}")
print("\n========== CHECK YOUR PHONE ==========")
print("Report which of these arrived in the WeChat ClawBot chat:")
print(" [T1-a] 同token第一条(立即)")
print(" [T1-b] 同token第二条(紧接) <- if arrives: multi-message per turn OK")
print(" [T2] 延迟25秒,未用过的token回复 <- if arrives: token is time-windowed, deferred reply OK")
return 0
if __name__ == "__main__":
sys.exit(main())