diff --git a/PROGRESS.md b/PROGRESS.md index 18addfb..77c4853 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`。 -最后更新:2026-05-21(research skill 三次迭代:fetch_pdf 改走静态直链跟 fetch_xml 对齐 → 5/5 fetch 100%) +最后更新:2026-05-21(dev SPA SSE 客户端重连 + 后端 stream_events 非活跃态立即吐 done) --- @@ -23,6 +23,7 @@ ### 2026-05-21 +- **dev SPA SSE 客户端重连(覆盖 --reload 抖动)**:`fetchSse` 拆出 `consumeSseStream` + 包重连壳(1s/2s/4s 退避,最多 3 次);reader EOF 未见 done/error 算异常关流触发重连;后端 `stream_events` 入口检 `tasks.run_status`,非 running/cancelling 立即吐 done 关流(否则进程重启后新 broker 内存空,客户端会无限挂 ping)。3 次仍失败 → 卡片末尾红色"连接已断开,请重发"。断开期间 LLM delta 丢失,接受。 - **research skill 三次迭代 fetch_pdf 改走静态直链**:`fetch_pdf` 跟 `fetch_xml` 同范式,从 `paper["pdf_url"]` 流式下载,绕开 paper_pdf_view 路径 bug(disk 路径计算错);smoke 5/5 PASS。 - **research skill 二次迭代 list 端点加 pdf_url / xml_url 直链 + 新增 fetch_xml + pg_trgm GIN 索引**:serializer 后端拼直链(避免 LLM 拿 stale URL),`0006_pg_trgm` 给 title/first_author/institution 加 GIN 把 `?search=xxx` 从 30s timeout 降到几十 ms;SKILL.md 加"XML 优先 PDF"原则(XML 已结构化免 OCR)。 - **顶栏 token 累计修(sync_task_tokens 改走 messages SUM)**:5/20 切 streaming 后 `LLM.TokenCounter` 内存计数器永不更新;删 TokenCounter 整个类,`sync_task_tokens` 改 `SELECT SUM(tokens_in/out) FROM messages WHERE task_id=?` 现算;backfill 4 个 task。 diff --git a/RUN.md b/RUN.md index 1675ed1..e096fa4 100644 --- a/RUN.md +++ b/RUN.md @@ -2,7 +2,7 @@ > 怎么把 zcbot 跑起来。env / 常用命令 / 故障兜底。设计看 `DESIGN.md`,进度看 `PROGRESS.md`。 -最后更新:2026-05-20(加 `POST /v1/tasks/{id}/optimize_prompt` 辅助 LLM 草稿润色;`usage_events.kind` 新增 `prompt_optimize`) +最后更新:2026-05-21(dev SPA SSE 客户端重连 3 次退避;`/v1/tasks/{id}/events` 非活跃 task 立即吐 done) --- @@ -206,7 +206,7 @@ sudo systemctl stop zcbot zcbot 现在 5 人级 + SSE 长连接,**严格"零中断"**(蓝绿 + nginx + SSE 客户端 reconnect 设计)代价高,不值得。有性价比的两挡: **A. 简易档:`--reload`**(推荐当前规模) -ExecStart 加 `--reload`,`git pull` 后 uvicorn 监听到文件变动自动重起子进程,REST 抖动 < 1s。**代价**:SSE 连接被切断(浏览器看到 "load failed",dev.html 自动跳登录页或同事重发一次消息;DB 里被切的 task 走启动 reaper 标 `run_status=error`)。 +ExecStart 加 `--reload`,`git pull` 后 uvicorn 监听到文件变动自动重起子进程,REST 抖动 < 1s。**代价**:SSE 连接被切断,**dev.html 自动重连 3 次(1s/2s/4s 退避)**;若新进程已被启动 reaper 标 `run_status=error`,重连立即收 done,卡片末尾追加红色"连接已断开,请重发"。期间 LLM 吐的 delta 丢失(broker 不持久化 event,接受)。3 次仍失败 → 同上提示,用户重发即可。 ```ini ExecStart=/opt/zcbot/.venv/bin/python main.py web --host 0.0.0.0 --port 8765 --reload @@ -275,6 +275,7 @@ sudo journalctl -u zcbot -n 50 # 看新进程起没起干 | `/v1/*` 返 401 `token expired` | JWT 默 7d TTL 到期,重 login。要更长改 `ZCBOT_JWT_TTL_SECONDS` env | | dev.html SSE 收不到流(消息发出去 UI 没动) | EventSource 不支持 header,dev.html 走 `fetch + ReadableStream`。devtools Network 看 POST /messages 是否 202 + events_url GET 是否 200 + Content-Type 是 text/event-stream;401 → token 过期,logout 重 login | | dev.html 显示 "load failed" 立刻回登录页 | token 过期或 JWT_SECRET 服务端变了。已自动跳登录页,按上次 tab 重登 | +| dev.html 顶栏出现"连接断开,重连中…(N/3)" | SSE 流被切(`--reload` 重启 / nginx 切换 / 网络抖)。客户端自动重连,1s/2s/4s 退避;新进程已 reaper 标 error 则立即收 done + 卡片末尾"请重发"提示;若服务端还活着会继续看后续 delta(断开期间的丢失,broker 不持久化) | --- diff --git a/web/app.py b/web/app.py index 0fbf31d..44464db 100644 --- a/web/app.py +++ b/web/app.py @@ -1208,12 +1208,21 @@ def create_app() -> FastAPI: raise HTTPException(404, f"invalid task id: {task_id!r}") with session_scope() as s: _assert_owns_task(s, tid, user_id) + run_status = s.execute( + select(Task.run_status).where(Task.task_id == tid) + ).scalar_one() + + # 重连保护:若 task 不在活跃态(进程重启 / reaper 已收尾 / 自然结束), + # 直接吐 done 关流。否则 broker 进程内队列空,客户端会无限挂在 ping 上。 + is_active = run_status in ("running", "cancelling") async def gen(): + yield b": connected\nretry: 3000\n\n" + if not is_active: + yield _sse_event("done", {}) + return q = broker.subscribe(tid) try: - # 第一帧 retry 注释 + 心跳:让 EventSource 立即建立(不被 buffer 卡) - yield b": connected\nretry: 3000\n\n" while True: try: ev = await asyncio.wait_for(q.get(), timeout=30.0) diff --git a/web/static/dev.html b/web/static/dev.html index c7acb29..95d3c45 100644 --- a/web/static/dev.html +++ b/web/static/dev.html @@ -1668,37 +1668,45 @@ function streamSse(url, asstCard) { async function fetchSse(url, asstCard) { const body = asstCard.querySelector(".body"); - const ctx = { acc: "", body, pending: false, seenRels: new Set() }; + const ctx = { acc: "", body, pending: false, seenRels: new Set(), terminal: false }; + const hint = $("chat-hint"); + // 重连:reader 异常 / 自然 EOF 但未收到 done/error 时,GET events 重订阅。 + // 后端 stream_events 重连入口会校验 run_status,task 已不活跃直接吐 done 关流; + // 这里 3 次失败再放弃,覆盖 systemctl restart 的 1~2s 抖动 + reaper 跑完的窗口。 + const backoffs = [1000, 2000, 4000]; + let attempt = 0; try { - const r = await fetch(url, { - headers: { "Authorization": "Bearer " + state.token, "Accept": "text/event-stream" }, - }); - if (!r.ok) throw new Error(r.status + " " + r.statusText); - const reader = r.body.getReader(); - const dec = new TextDecoder(); - let buf = ""; - $("chat-hint").textContent = "接收中…"; while (true) { - const { value, done } = await reader.read(); - if (done) break; - buf += dec.decode(value, { stream: true }); - while (true) { - const idx = buf.indexOf("\n\n"); - if (idx < 0) break; - const frame = buf.slice(0, idx); - buf = buf.slice(idx + 2); - const ev = parseSseFrame(frame); - if (!ev) continue; - handleSseEvent(ev, asstCard, ctx); - if (ev.event === "done" || ev.event === "error") break; + try { + await consumeSseStream(url, asstCard, ctx); + } catch (e) { + if (ctx.terminal) break; // 已收到 done/error,不重连 + if (attempt >= backoffs.length) { + appendErrorCard("连接已断开,刚才的回复可能未完成,请重发消息。"); + break; + } + hint.textContent = `连接断开,重连中…(${attempt + 1}/${backoffs.length})`; + await new Promise(r => setTimeout(r, backoffs[attempt])); + attempt++; + continue; } + // consumeSseStream 正常返回:reader 收到 EOF + if (ctx.terminal) break; // 正常收尾(看到 done/error) + // 未见 done/error 就 EOF → 服务端中途关流(进程被杀 / nginx 切),重连 + if (attempt >= backoffs.length) { + appendErrorCard("连接已断开,刚才的回复可能未完成,请重发消息。"); + break; + } + hint.textContent = `连接断开,重连中…(${attempt + 1}/${backoffs.length})`; + await new Promise(r => setTimeout(r, backoffs[attempt])); + attempt++; } // 最终定稿 + 代码高亮(流式中不 highlight,省 CPU) body.innerHTML = renderMd(ctx.acc); highlightIn(asstCard); } finally { body.classList.remove("streaming"); - $("chat-hint").textContent = "就绪"; + hint.textContent = "就绪"; state.streaming = false; setActionMode("idle"); } @@ -1709,6 +1717,35 @@ async function fetchSse(url, asstCard) { refreshConcurrentWarnings(); // 自己 task 收尾,顺便清/更新 banner(同 wd 邻居可能也变了) } +async function consumeSseStream(url, asstCard, ctx) { + const r = await fetch(url, { + headers: { "Authorization": "Bearer " + state.token, "Accept": "text/event-stream" }, + }); + if (!r.ok) throw new Error(r.status + " " + r.statusText); + const reader = r.body.getReader(); + const dec = new TextDecoder(); + let buf = ""; + $("chat-hint").textContent = "接收中…"; + while (true) { + const { value, done } = await reader.read(); + if (done) return; + buf += dec.decode(value, { stream: true }); + while (true) { + const idx = buf.indexOf("\n\n"); + if (idx < 0) break; + const frame = buf.slice(0, idx); + buf = buf.slice(idx + 2); + const ev = parseSseFrame(frame); + if (!ev) continue; + handleSseEvent(ev, asstCard, ctx); + if (ev.event === "done" || ev.event === "error") { + ctx.terminal = true; + return; + } + } + } +} + function parseSseFrame(frame) { const lines = frame.split("\n"); let event = "msg"; let dataLines = [];