159 lines
5.4 KiB
Python
159 lines
5.4 KiB
Python
"""探测五(决定性):补上 client_id(每条唯一)+ base_info,重验两件事。
|
|
A. 流式多条:同一 context_token 连发 3 块(client_id 各异,state 1/1/2,间隔300ms)
|
|
-> 三块都到 = 多条/长简报可行
|
|
B. finish 后复用:发完 FINISH,等30s,用【同一 context_token】+新 client_id 再发一条(state=2)
|
|
-> 到 = context_token 24h 内可复用 -> "用户开口一次后可主动推" 成立(简报推送复活)
|
|
|
|
之前失败的最大嫌疑:缺 client_id(后续块无法路由被丢)。需要你发【一条】消息触发。
|
|
ASCII-only,bot_token 不打印。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import os
|
|
import random
|
|
import sys
|
|
import time
|
|
import uuid
|
|
|
|
import httpx
|
|
import segno
|
|
|
|
BASE = "https://ilinkai.weixin.qq.com"
|
|
QR_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
CHANNEL_VER = "1.0.2"
|
|
|
|
|
|
def _uin() -> str:
|
|
return base64.b64encode(str(random.randint(0, 2**32 - 1)).encode()).decode()
|
|
|
|
|
|
def _headers(token=None) -> dict:
|
|
h = {"Content-Type": "application/json",
|
|
"AuthorizationType": "ilink_bot_token", "X-WECHAT-UIN": _uin()}
|
|
if token:
|
|
h["Authorization"] = f"Bearer {token}"
|
|
return h
|
|
|
|
|
|
def _new_qr():
|
|
with httpx.Client(timeout=20) as c:
|
|
r = c.get(f"{BASE}/ilink/bot/get_bot_qrcode",
|
|
params={"bot_type": "3"}, headers=_headers())
|
|
if r.status_code != 200:
|
|
print(f"[FAIL] http {r.status_code}"); return None
|
|
d = r.json()
|
|
uniq = os.path.join(QR_DIR, f"clawbot_qr_{int(time.time())}.png")
|
|
segno.make(d.get("qrcode_img_content", ""), error="m").save(uniq, scale=8, border=3)
|
|
try:
|
|
os.startfile(uniq)
|
|
except Exception:
|
|
pass
|
|
print(f"[bind] FRESH QR -> {uniq}")
|
|
return d.get("qrcode", "")
|
|
|
|
|
|
def bind():
|
|
print("[bind] auto-refresh on expiry; scan whenever ready.")
|
|
qid = _new_qr()
|
|
if not qid:
|
|
return None
|
|
deadline = time.time() + 300
|
|
with httpx.Client(timeout=40) as c:
|
|
last = ""
|
|
while time.time() < deadline:
|
|
try:
|
|
j = c.get(f"{BASE}/ilink/bot/get_qrcode_status",
|
|
params={"qrcode": qid}, headers=_headers()).json()
|
|
st = j.get("status", "")
|
|
if st != last:
|
|
print(f"[bind] status={st!r}"); last = st
|
|
if st == "confirmed":
|
|
return j.get("bot_token", ""), (j.get("baseurl") or BASE)
|
|
if st == "expired":
|
|
nq = _new_qr()
|
|
if not nq:
|
|
return None
|
|
qid, last = nq, ""
|
|
except Exception as e:
|
|
print(f"[bind] err {e}")
|
|
time.sleep(2)
|
|
return None
|
|
|
|
|
|
def send(c, token, to_user, text, ctx, state, tag):
|
|
cid = uuid.uuid4().hex
|
|
body = {
|
|
"msg": {
|
|
"to_user_id": to_user,
|
|
"client_id": cid,
|
|
"message_type": 2,
|
|
"message_state": state,
|
|
"context_token": ctx,
|
|
"item_list": [{"type": 1, "text_item": {"text": text}}],
|
|
},
|
|
"base_info": {"channel_version": CHANNEL_VER},
|
|
}
|
|
r = c.post(f"{BASE}/ilink/bot/sendmessage", json=body, headers=_headers(token))
|
|
try:
|
|
j = r.json()
|
|
except Exception:
|
|
j = r.text[:160]
|
|
print(f"[send {tag}] state={state} client_id={cid[:8]} -> http={r.status_code} body={j}")
|
|
|
|
|
|
def wait_msg(c, token):
|
|
deadline = time.time() + 150
|
|
buf = ""
|
|
while time.time() < deadline:
|
|
try:
|
|
j = c.post(f"{BASE}/ilink/bot/getupdates",
|
|
json={"get_updates_buf": buf,
|
|
"base_info": {"channel_version": CHANNEL_VER}},
|
|
headers=_headers(token)).json()
|
|
buf = j.get("get_updates_buf", buf)
|
|
for m in j.get("msgs", []) or []:
|
|
txt = "".join((it.get("text_item", {}) or {}).get("text", "")
|
|
for it in m.get("item_list", []) or [])
|
|
print(f"[recv] <- {txt!r}")
|
|
return m
|
|
except Exception as e:
|
|
print(f"[recv] err {e}"); time.sleep(2)
|
|
return None
|
|
|
|
|
|
def main() -> int:
|
|
b = bind()
|
|
if not b:
|
|
return 2
|
|
token, base_url = b
|
|
global BASE
|
|
BASE = base_url or BASE
|
|
print("[bind] confirmed.\n[A] SEND one message now (e.g. 'go') ...")
|
|
with httpx.Client(timeout=30) as c:
|
|
m = wait_msg(c, token)
|
|
if not m:
|
|
print("no msg; abort."); return 1
|
|
to_user, ctx = m.get("from_user_id", ""), m.get("context_token", "")
|
|
|
|
print("[A] streaming 3 chunks WITH client_id (state 1,1,2, 300ms apart)...")
|
|
send(c, token, to_user, "[A1] client_id+流式第一段(state=1)", ctx, 1, "A1")
|
|
time.sleep(0.3)
|
|
send(c, token, to_user, "[A2] client_id+流式第二段(state=1)", ctx, 1, "A2")
|
|
time.sleep(0.3)
|
|
send(c, token, to_user, "[A3] client_id+末段(state=2 FINISH)", ctx, 2, "A3")
|
|
|
|
print("\n[B] wait 30s, then reuse SAME context_token + new client_id (state=2)...")
|
|
time.sleep(30)
|
|
send(c, token, to_user, "[B] finish后30秒,复用同token主动推(若到=24h可复用)", ctx, 2, "B")
|
|
|
|
print("\n========== CHECK YOUR PHONE ==========")
|
|
print("Report which arrived:")
|
|
print(" [A1]/[A2]/[A3] -> all three = multi-message/streaming OK (need client_id)")
|
|
print(" [B] -> arrived = token reusable after finish => PROACTIVE PUSH revives")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|