ui+api: dev SPA SSE 客户端 3 次退避重连 + stream_events 非活跃 task 立即吐 done

--reload 重启 / 网络抖时 fetchSse 拆出 consumeSseStream + 包重连壳
(1s/2s/4s,EOF 未见 done/error 触发重连);后端 stream_events 入口检
tasks.run_status,非 running/cancelling 直接关流,避免重连卡在空 broker
无限挂 ping。3 次仍失败 → 卡片末尾红色"请重发"。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-21 15:14:51 +08:00
parent c5dcbb7e24
commit 52c25b9404
4 changed files with 75 additions and 27 deletions

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`
最后更新:2026-05-21(research skill 三次迭代:fetch_pdf 改走静态直链跟 fetch_xml 对齐 → 5/5 fetch 100%)
最后更新:2026-05-21(dev SPA SSE 客户端重连 + 后端 stream_events 非活跃态立即吐 done)
---
@ -23,6 +23,7 @@
### 2026-05-21
- **dev SPA SSE 客户端重连(覆盖 --reload 抖动)**:`fetchSse` 拆出 `consumeSseStream` + 包重连壳(1s/2s/4s 退避,最多 3 次);reader EOF 未见 done/error 算异常关流触发重连;后端 `stream_events` 入口检 `tasks.run_status`,非 running/cancelling 立即吐 done 关流(否则进程重启后新 broker 内存空,客户端会无限挂 ping)。3 次仍失败 → 卡片末尾红色"连接已断开,请重发"。断开期间 LLM delta 丢失,接受。
- **research skill 三次迭代 fetch_pdf 改走静态直链**:`fetch_pdf` 跟 `fetch_xml` 同范式,从 `paper["pdf_url"]` 流式下载,绕开 paper_pdf_view 路径 bug(disk 路径计算错);smoke 5/5 PASS。
- **research skill 二次迭代 list 端点加 pdf_url / xml_url 直链 + 新增 fetch_xml + pg_trgm GIN 索引**:serializer 后端拼直链(避免 LLM 拿 stale URL),`0006_pg_trgm` 给 title/first_author/institution 加 GIN 把 `?search=xxx` 从 30s timeout 降到几十 ms;SKILL.md 加"XML 优先 PDF"原则(XML 已结构化免 OCR)。
- **顶栏 token 累计修(sync_task_tokens 改走 messages SUM)**:5/20 切 streaming 后 `LLM.TokenCounter` 内存计数器永不更新;删 TokenCounter 整个类,`sync_task_tokens` 改 `SELECT SUM(tokens_in/out) FROM messages WHERE task_id=?` 现算;backfill 4 个 task。

5
RUN.md
View File

@ -2,7 +2,7 @@
> 怎么把 zcbot 跑起来。env / 常用命令 / 故障兜底。设计看 `DESIGN.md`,进度看 `PROGRESS.md`
最后更新:2026-05-20(加 `POST /v1/tasks/{id}/optimize_prompt` 辅助 LLM 草稿润色;`usage_events.kind` 新增 `prompt_optimize`)
最后更新:2026-05-21(dev SPA SSE 客户端重连 3 次退避;`/v1/tasks/{id}/events` 非活跃 task 立即吐 done)
---
@ -206,7 +206,7 @@ sudo systemctl stop zcbot
zcbot 现在 5 人级 + SSE 长连接,**严格"零中断"**(蓝绿 + nginx + SSE 客户端 reconnect 设计)代价高,不值得。有性价比的两挡:
**A. 简易档:`--reload`**(推荐当前规模)
ExecStart 加 `--reload`,`git pull` 后 uvicorn 监听到文件变动自动重起子进程,REST 抖动 < 1s。**代价**:SSE 连接被切断(浏览器看到 "load failed",dev.html 自动跳登录页或同事重发一次消息;DB 里被切的 task 启动 reaper `run_status=error`)。
ExecStart 加 `--reload`,`git pull` 后 uvicorn 监听到文件变动自动重起子进程,REST 抖动 < 1s。**代价**:SSE 连接被切断,**dev.html 自动重连 3 (1s/2s/4s 退避)**;若新进程已被启动 reaper `run_status=error`,重连立即收 done,卡片末尾追加红色"连接已断开,请重发"。期间 LLM 吐的 delta 丢失(broker 不持久化 event,接受)。3 次仍失败 同上提示,用户重发即可
```ini
ExecStart=/opt/zcbot/.venv/bin/python main.py web --host 0.0.0.0 --port 8765 --reload
@ -275,6 +275,7 @@ sudo journalctl -u zcbot -n 50 # 看新进程起没起干
| `/v1/*` 返 401 `token expired` | JWT 默 7d TTL 到期,重 login。要更长改 `ZCBOT_JWT_TTL_SECONDS` env |
| dev.html SSE 收不到流(消息发出去 UI 没动) | EventSource 不支持 header,dev.html 走 `fetch + ReadableStream`。devtools Network 看 POST /messages 是否 202 + events_url GET 是否 200 + Content-Type 是 text/event-stream;401 → token 过期,logout 重 login |
| dev.html 显示 "load failed" 立刻回登录页 | token 过期或 JWT_SECRET 服务端变了。已自动跳登录页,按上次 tab 重登 |
| dev.html 顶栏出现"连接断开,重连中…(N/3)" | SSE 流被切(`--reload` 重启 / nginx 切换 / 网络抖)。客户端自动重连,1s/2s/4s 退避;新进程已 reaper 标 error 则立即收 done + 卡片末尾"请重发"提示;若服务端还活着会继续看后续 delta(断开期间的丢失,broker 不持久化) |
---

View File

@ -1208,12 +1208,21 @@ def create_app() -> FastAPI:
raise HTTPException(404, f"invalid task id: {task_id!r}")
with session_scope() as s:
_assert_owns_task(s, tid, user_id)
run_status = s.execute(
select(Task.run_status).where(Task.task_id == tid)
).scalar_one()
# 重连保护:若 task 不在活跃态(进程重启 / reaper 已收尾 / 自然结束),
# 直接吐 done 关流。否则 broker 进程内队列空,客户端会无限挂在 ping 上。
is_active = run_status in ("running", "cancelling")
async def gen():
yield b": connected\nretry: 3000\n\n"
if not is_active:
yield _sse_event("done", {})
return
q = broker.subscribe(tid)
try:
# 第一帧 retry 注释 + 心跳:让 EventSource 立即建立(不被 buffer 卡)
yield b": connected\nretry: 3000\n\n"
while True:
try:
ev = await asyncio.wait_for(q.get(), timeout=30.0)

View File

@ -1668,37 +1668,45 @@ function streamSse(url, asstCard) {
async function fetchSse(url, asstCard) {
const body = asstCard.querySelector(".body");
const ctx = { acc: "", body, pending: false, seenRels: new Set() };
const ctx = { acc: "", body, pending: false, seenRels: new Set(), terminal: false };
const hint = $("chat-hint");
// 重连:reader 异常 / 自然 EOF 但未收到 done/error 时,GET events 重订阅。
// 后端 stream_events 重连入口会校验 run_status,task 已不活跃直接吐 done 关流;
// 这里 3 次失败再放弃,覆盖 systemctl restart 的 1~2s 抖动 + reaper 跑完的窗口。
const backoffs = [1000, 2000, 4000];
let attempt = 0;
try {
const r = await fetch(url, {
headers: { "Authorization": "Bearer " + state.token, "Accept": "text/event-stream" },
});
if (!r.ok) throw new Error(r.status + " " + r.statusText);
const reader = r.body.getReader();
const dec = new TextDecoder();
let buf = "";
$("chat-hint").textContent = "接收中…";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += dec.decode(value, { stream: true });
while (true) {
const idx = buf.indexOf("\n\n");
if (idx < 0) break;
const frame = buf.slice(0, idx);
buf = buf.slice(idx + 2);
const ev = parseSseFrame(frame);
if (!ev) continue;
handleSseEvent(ev, asstCard, ctx);
if (ev.event === "done" || ev.event === "error") break;
try {
await consumeSseStream(url, asstCard, ctx);
} catch (e) {
if (ctx.terminal) break; // 已收到 done/error,不重连
if (attempt >= backoffs.length) {
appendErrorCard("连接已断开,刚才的回复可能未完成,请重发消息。");
break;
}
hint.textContent = `连接断开,重连中…(${attempt + 1}/${backoffs.length})`;
await new Promise(r => setTimeout(r, backoffs[attempt]));
attempt++;
continue;
}
// consumeSseStream 正常返回:reader 收到 EOF
if (ctx.terminal) break; // 正常收尾(看到 done/error)
// 未见 done/error 就 EOF → 服务端中途关流(进程被杀 / nginx 切),重连
if (attempt >= backoffs.length) {
appendErrorCard("连接已断开,刚才的回复可能未完成,请重发消息。");
break;
}
hint.textContent = `连接断开,重连中…(${attempt + 1}/${backoffs.length})`;
await new Promise(r => setTimeout(r, backoffs[attempt]));
attempt++;
}
// 最终定稿 + 代码高亮(流式中不 highlight,省 CPU)
body.innerHTML = renderMd(ctx.acc);
highlightIn(asstCard);
} finally {
body.classList.remove("streaming");
$("chat-hint").textContent = "就绪";
hint.textContent = "就绪";
state.streaming = false;
setActionMode("idle");
}
@ -1709,6 +1717,35 @@ async function fetchSse(url, asstCard) {
refreshConcurrentWarnings(); // 自己 task 收尾,顺便清/更新 banner(同 wd 邻居可能也变了)
}
async function consumeSseStream(url, asstCard, ctx) {
const r = await fetch(url, {
headers: { "Authorization": "Bearer " + state.token, "Accept": "text/event-stream" },
});
if (!r.ok) throw new Error(r.status + " " + r.statusText);
const reader = r.body.getReader();
const dec = new TextDecoder();
let buf = "";
$("chat-hint").textContent = "接收中…";
while (true) {
const { value, done } = await reader.read();
if (done) return;
buf += dec.decode(value, { stream: true });
while (true) {
const idx = buf.indexOf("\n\n");
if (idx < 0) break;
const frame = buf.slice(0, idx);
buf = buf.slice(idx + 2);
const ev = parseSseFrame(frame);
if (!ev) continue;
handleSseEvent(ev, asstCard, ctx);
if (ev.event === "done" || ev.event === "error") {
ctx.terminal = true;
return;
}
}
}
}
function parseSseFrame(frame) {
const lines = frame.split("\n");
let event = "msg"; let dataLines = [];