From bf416314372513347604188f22f773b5f8ed38c2 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Mon, 18 May 2026 08:42:45 +0800 Subject: [PATCH] =?UTF-8?q?core(run=20cancel):=20POST=20/runs/{rid}/cancel?= =?UTF-8?q?=20+=20AgentLoop=20=E5=8D=8F=E4=BD=9C=E5=BC=8F=20cancel=20+=20d?= =?UTF-8?q?ev=20SPA=20stop=20=E6=8C=89=E9=92=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 落地 DESIGN §7.2 原标"待"的 cancel 路由 — 等待回复 / LLM 操作时也能中断。 - broker 加 request_cancel / is_cancelled / clear_cancel(per-run threading.Event) - AgentLoop 加 cancel_check 回调,每轮 LLM 前 + tool_calls 之间 poll;命中给 未执行 tool_call 补 [cancelled by user] tool result 保 LiteLLM 协议,emit cancelled event - 单活 gate + 启动 reaper 扩到 running | cancelling - BG 装 cancel_check + 终态判 cancelled/ok + finally clear flag - dev SPA stop 按钮 + cancelled badge + fetchSse try/finally 失败路径复原 UI LLM 同步 call 本身不可中断,最坏等当前一轮跑完(通常几十秒)。 Co-Authored-By: Claude Opus 4.7 (1M context) --- DESIGN.md | 12 +++++- PROGRESS.md | 24 ++++++------ RUN.md | 13 ++++--- core/loop.py | 34 ++++++++++++++++- web/app.py | 65 ++++++++++++++++++++++++++++---- web/broker.py | 21 +++++++++++ web/static/dev.html | 92 +++++++++++++++++++++++++++++++-------------- 7 files changed, 203 insertions(+), 58 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index a9c798c..f2be467 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -255,7 +255,13 @@ Tasks POST 防 `messages.idx` UniqueConstraint race; UI 应 disable send 按钮直到 SSE `done`) GET /v1/tasks/{id}/runs/{rid}/events SSE 流(见下) - POST /v1/tasks/{id}/runs/{rid}/cancel (待 — 做出来后 409 可主动 cancel,不必等流式结束) + POST /v1/tasks/{id}/runs/{rid}/cancel 协作式 cancel(202):标 `cancelling` + 信号 + broker;BG loop 在工具调用之间 poll 看见即退, + 给未执行 tool_call 补 `[cancelled by user]` + tool result(保 LiteLLM 协议),emit `cancelled` + 事件;finally 写终态 `cancelled`(异常 `error`)。 + LLM 同步 call 本身不可中断 — 最坏等当前一轮跑完。 + run.status != `running` → 409(已结束 / 已 cancelling) Files(user-rooted,不绑 task — `workspace/users//` 为根) GET /v1/files?path= 列子目录 {entries, crumbs, exists, root, current};留空 → user_root; @@ -281,6 +287,7 @@ text {"content":""} tool_call {"name":"...","args":{...},"args_preview":"..."} tool_result {"name":"...","preview":"...","truncated":bool} # 完整 result 走 DB,SSE 只送预览给 UI llm_end {"prompt_tokens":N,"completion_tokens":N} +cancelled {} # cancel 命中,后随 done 收流 error {"msg":": "} done {} ``` @@ -403,7 +410,8 @@ usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts) | 误删 folder | 二确认 + 输入 folder 名;真要再加 trash bin | | DB-then-FS 中断留孤儿目录 | 后台 GC 周期扫"FS 有但 DB 无引用" | | 同 folder 多 task 并发写同名 | 文件级悲观锁,冲突早失败 | -| 同 task 并发 POST messages 撞 `messages.idx` UniqueConstraint | `POST /v1/tasks/{id}/messages` 单活 run 检查:`SELECT … FOR UPDATE` 锁 task 行 + 查 `runs.status='running'`,有 → 409;同事务插新 Run 行避 TOCTOU。配启动 lifespan reaper 把孤儿 running 标 error(进程 crash 残留)。未来真生产 multi-worker 换 heartbeat / lease | +| 同 task 并发 POST messages 撞 `messages.idx` UniqueConstraint | `POST /v1/tasks/{id}/messages` 单活 run 检查:`SELECT … FOR UPDATE` 锁 task 行 + 查 `runs.status in ('running','cancelling')`,有 → 409;同事务插新 Run 行避 TOCTOU。配启动 lifespan reaper 把孤儿 `running`/`cancelling` 全标 error(进程 crash 残留)。未来真生产 multi-worker 换 heartbeat / lease | +| Run 跑太久 / 用户想中断 | `POST /v1/tasks/{id}/runs/{rid}/cancel` 协作式 cancel:标 `cancelling` + broker 信号;`AgentLoop.cancel_check` 回调在每轮 LLM 前、tool_calls 之间 poll;命中给未执行 tool_call 补 `[cancelled by user]` tool result 保 LiteLLM 协议,emit `cancelled` 事件,BG finally 写终态 `cancelled`。LLM 同步 call 本身不可中断 — 接受最坏等当前一轮跑完(几十秒内) | | Sandbox 出站越权 | egress allowlist 起步只放 LLM + PyPI | | 资源滥用 | BYO key 默认;月度配额;cold task LRU 清 | diff --git a/PROGRESS.md b/PROGRESS.md index 23cba26..c133d83 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。 -最后更新:2026-05-18(`POST /v1/tasks/{id}/messages` 同 task 单活 run:`SELECT … FOR UPDATE` 锁 task + 活跃 Run 检查,已有 running → 409;启动 lifespan reaper 把孤儿 running 标 error) +最后更新:2026-05-18(`POST /v1/tasks/{id}/runs/{rid}/cancel` 协作式 cancel + `cancelled` SSE 事件 + dev SPA stop 按钮;gate 扩到 `cancelling`) --- @@ -15,12 +15,13 @@ | 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 | | 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 | | 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill | -| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**D `/v1` JSON API 完工 ✅**(原 Phase G Jinja2/HTMX UI 撤,改 platform 端实现);**D' 过渡 auth(PLATFORM_KEY → JWT)+ dev SPA ✅**;**同 task 单活 run 锁 ✅**(POST /messages 409 + lifespan reaper);真 OIDC 待;C(Executor)待;E(CLI 双模式)待。 | +| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**D `/v1` JSON API 完工 ✅**(原 Phase G Jinja2/HTMX UI 撤,改 platform 端实现);**D' 过渡 auth(PLATFORM_KEY → JWT)+ dev SPA ✅**;**同 task 单活 run 锁 ✅**;**run cancel + dev SPA stop 按钮 ✅**;真 OIDC 待;C(Executor)待;E(CLI 双模式)待。 | --- ## 已完成关键能力 +- **05-18 / cancel run endpoint + AgentLoop 协作式 cancel + dev SPA stop 按钮**:用户反馈"等待回复或 LLM 操作时没有停止接口"。落地 DESIGN §7.2 原标"待"的 `POST /v1/tasks/{id}/runs/{rid}/cancel`。**Broker**(`web/broker.py`):加 `request_cancel(rid)` / `is_cancelled(rid)` / `clear_cancel(rid)` 三方法,内部 `dict[UUID, threading.Event]` per-run;`setdefault` 保证 BG 还没 register 也能 set。**Loop**(`core/loop.py`):`AgentLoop` 加 `cancel_check: Optional[Callable[[], bool]]` 字段(CLI 路径不传 = None 永不 cancel),`_is_cancelled()` helper + `_fill_cancelled_tool_results(remaining)` 给未执行的 tool_call 全部 append `[cancelled by user]` tool message —— LiteLLM 协议要求每个 assistant tool_call 必须有匹配 tool result,否则 resume 时 LLM 报错。Check 点:每轮 LLM 前 + tool_calls 之间。命中 emit `cancelled` event + return `[cancelled]`。**LLM 同步 call 本身不可中断**(litellm 同步阻塞,无原生 cancel)—— 接受最坏等当前一轮跑完(通常几十秒),注释里讲清楚。**Endpoint**(`web/app.py::cancel_run`):校验 task 归属 user + run 归属 task(else 404),`run.status` 必须是 `running`(else 409 含具体 status);标 `cancelling`(过渡态)+ `broker.request_cancel(rid)`;202。`_run_agent_bg` 装配时 `agent.cancel_check = lambda rid=run_id: broker.is_cancelled(rid)`,run 完时判 `broker.is_cancelled` 写终态 `cancelled` vs `ok`;finally `broker.clear_cancel + broker.close`。**Gate 同步扩**:`post_message` 单活 run 检查从 `status == 'running'` 改 `status in ('running', 'cancelling')`,确保 cancel 后旧 BG 还没退出时新 POST 仍 409(避免新旧 run 撞 messages.idx)。**Reaper 同步扩**:lifespan 启动也扫 `cancelling`(进程 crash 时 BG 来不及写终态 cancelled,反正没线程在跑就清掉)。**dev SPA**(`web/static/dev.html`):chat 表单加 ``(常态 hidden);state 加 `currentRunId`;sendMessage 拿到 run_id 后 show stop,fetchSse `try/finally` 收尾时一并 hide stop / 清 currentRunId / 复原 send button(确保 SSE 失败路径 UI 也 reset 不卡死)。cancel 按钮 click → `POST /runs/{rid}/cancel`;409 静默忽略(并发 done 不算错)。`handleSseEvent` 加 `cancelled` case → 在当前 assistant 卡贴一个虚线红框 "已停止(stopped by user)" badge。CSS 加 `.cancelled-badge`。**Smoke 15 case 全绿**:HTTP 层 11 case(cancel happy + 双 cancel 409 + cancelling 期间 POST messages 409 + ghost run 404 + invalid UUID 404 + cross-task 404 + cross-user 404 + cancel 已 ok 409 + cancel 已 error 409 + no auth 401 + stale reaper 扫 cancelling);Loop 层 4 case(cancel before first iter 不调 LLM / cancel between tool_calls 补 cancelled placeholder 3 个 + 保协议 + emit cancelled / 正常 done 不 emit cancelled / CLI 路径 cancel_check=None 默永不 cancel)。**没动 SSE handler 的 break list**(`("done", "error")`):cancelled 在 SSE 里走流给前端看,broker.close 之后立即跟 done 收流。**文档同步**:DESIGN §7.2 路由表 cancel 行从"待"扩成完整描述 + SSE 事件加 `cancelled{}` 行 + §7.7 风险表加"Run 跑太久 / 用户想中断"行;RUN 路由表加 cancel 行 + POST /messages 409 文案改 "running / cancelling" + 故障兜底加三行(cancel 409 / 点 stop 没立刻停 / reaper 扫 cancelling);PROGRESS 已完成 + 下一步重排(去掉 cancel,留 OIDC / C Executor / E CLI 双模式)。 - **05-18 / `POST /v1/tasks/{id}/messages` 单活 run 锁 + 孤儿 reaper**:用户连点 send / 多 tab 同时发消息 → 两个 BG 线程争 `messages.idx`(UniqueConstraint 会 race-crash 第二个 INSERT)的旧 TODO 落地。**实现**:`web/app.py::post_message` 把所有权 + 活跃 Run 检查 + 新 Run INSERT 收进一个 `session_scope()` 事务,首行用 `select(Task.task_id).where(...).with_for_update()` 锁 task 行序列化并发 POST;事务内查 `Run.status='running'` 命中即 raise `HTTPException(409, "task already has a running run ({rid}); wait for it to finish")`;无活跃则同事务 `s.add(Run(...status="running"))` —— 三步原子完成,避免 TOCTOU。lifespan 加 **stale-run reaper**:启动时 `UPDATE runs SET status='error', error='server restarted before run finished' WHERE status='running'`,把进程 crash 留下的孤儿 running 全清掉(否则对应 task 永挂 409)。结果 rowcount > 0 时 print info 行 `[startup] reaped N stale running run(s)`。Cancel 路由(DESIGN §7.2 标 "待")没改:有了它 409 时用户可主动 cancel,不必等流式结束。**没动 `Session.append`**:gate 已在 HTTP 层挡住了,单写者前提下 idx 自递增不会冲;在 ORM 里再加锁是过度。**Smoke 10 case 全绿**(in-process TestClient + `_run_agent_bg` mock 不真起 LLM):happy(202 + Run INSERT running)/ gate(同 task 第二 POST 409 + detail 含 "running run" + "wait for it to finish")/ clear after Run.status=ok 解锁(202)/ clear after Run.status=error 同(202)/ ghost task 跨用户路径 404(锁前所有权检查)/ invalid UUID 404 / empty content 400 早于 lock / no auth 401 早于 lock / stale reaper 测试(强行 SET 全部 Run=running → 开新 TestClient 触发 lifespan → 所有 running 变 error + 之后 POST 还能 202)/ cross-user(other UID token 访 sentinel task → 404 不暴露存在性)。**采坑**:`@case` 每个用 `make_client()` 起新 app 会重复触发 reaper,把 case 1 留下的 running 清掉 → case 2 的 409 测不出来;改成全部 case 共享一个 SHARED_CLIENT 跑,仅 stale-reaper case 用 `fresh=True` 开第二个。**文档同步**:DESIGN §7.2 POST /messages 行注 409 行为 + cancel "待" 后注"做出来后 409 可主动 cancel" / §7.7 风险表加"同 task 并发 POST messages.idx race"行;RUN 路由表 POST /messages 注 409;故障兜底替过期 TODO 行 → 加 "POST 返 409" 处置 + "[startup] reaped N stale running" 解释。**未来 TODO**:multi-worker 部署形态下 reaper 不能简单全表清(会误清其他 worker 的真在跑 run),换 heartbeat + lease(注释里记了)。 - **05-17 / files API 全面 user-rooted(去掉 task_id 前置)**:用户反馈"web 页应该能看到 user 的所有目录,现在只能选 task 后右侧才刷新"——根因是原 files API 用 task_id 拐杖间接拿 working_dir,迫使前端必须先选 task。语义上 files 操作只关心"路径 + user 边界",task_id 是多余的;同时 §7.1 心智模型早就把 task 和 dir 定义为正交副视图,API 不该混。**后端**:删 `_load_working_dir(task_id, user_id)`,加 `_load_user_root(user_id)`(走 `main.user_root(ws, uid)` 自动 mkdir 拿 `workspace/users//`);4 路由全换:`GET /v1/files?path=` / `GET /v1/files/download?path=` / `POST /v1/files/upload` / `POST /v1/files/delete`。`_safe_join` 边界从 task_dir 改 user_root,安全性不降低;`_enumerate_files` 加 dotfile 过滤(`if p.name.startswith(".")` 跳过 `.memory/` 等,同 `/v1/folders` 约定);`_rel_to` 把 `Path(".")` 归一为空串(避免 root 时 current="." 这种 ugly 形态)。删 `from_db_path` import(只剩 `to_db_path`)。**dev SPA**:`loadFiles()` 不再 gate on `state.taskId`,enterApp 时直接调一次拉 user_root;`selectTask` 在拿到 task meta 后 `state.filesPath = wdName`(从 working_dir 末段抽出)再 loadFiles,选 task 自动跳到对应子目录但用户可点 crumb 回 root 看其他目录;crumbs root 标签 "/" → "我的"(user_root 直观);files-proj header 从"项目名(state.taskMeta 派生)"改"路径首段(数据驱动)",空时显示 `(user root)`。**新增 upload 按钮**(原来藏在外部页面里没暴露给 SPA):pane-head 加 `⬆` 按钮 + 隐藏 ``,onchange 走 FormData POST `/v1/files/upload`,path 取当前 `state.filesPath`(空 → user_root);上传完 loadFiles 刷新。`deleteCurrentTask` 不再重置 files 面板(task 删了但 FS 文件保留,继续浏览有意义),只 reload 当前路径。`btn-refresh-files` 移除 disabled 状态(任何时候可用)。**Smoke 68 case 全绿**(in-process TestClient,跑完即删 `_smoke_files.py`):列 user_root(包含 working_dir 目录,`.memory` 被过滤) / 列子目录 2 层 / 不存在路径 200+exists=False / 路径安全 6 case(`../` / 绝对 / Windows 绝对 / `\\` 起头)/ upload 单 / multi+nested mkdir / 上传到 root / 文件名攻击 4 case(`../` `..` `/` `\\`)/ download 文件 + 深度 + 目录 400 + ghost 404 + 越界 400 / delete 文件 / 空目录 / 非空 400 / user_root 拒 / ghost 404 / 越界 400 / 跨 user 隔离 4 case(A 不见 B,B 不见 A)/ 无 token 全 401(GET list / POST upload / POST delete / GET download)/ 子目录里 dotfile 也过滤 / 新 user 首访 user_root 自动 mkdir + 列表空。**文档**:DESIGN §7.2 路由表段 + lead-in 同步("Task 一等公民,files 是其副视图(经 task_dir 暴露)" → "Task 一等公民;files 与 task 正交,走 user-rooted /v1/files*,以 workspace/users// 为边界")。 - **Q1 → 05-06 / Phase 1-4**:骨架 / 三 skill / run_python / Model Profile + Probing。ppt v3 加商务红 + apply_brand + Iconify;素材摄取改 markitdown CLI。 @@ -75,7 +76,7 @@ ``` core/capabilities.py 71 core/llm.py 93 ← +litellm 离线 cost map env -core/loop.py 152 ← §7 A: sink.emit +core/loop.py 182 ← §7 A sink.emit + cancel_check 协作式 cancel core/sinks.py 101 ← §7 A core/ui.py 38 core/paths.py 50 ← task_dir db form 归一(to_db_path / from_db_path) @@ -101,13 +102,13 @@ db/migrations/versions/ 0001_initial_schema.py 125 ← §7 B Step 1 0002_task_dir_relative.py 61 ← 现有 ROOT-prefix 绝对 → 相对 web/__init__.py 5 ← Phase G G1 -web/app.py 815 ← /v1/ JSON API + user_id 隔离 + files user-rooted +web/app.py 898 ← /v1/ JSON API + user_id 隔离 + run lock + cancel endpoint web/auth.py 115 ← D' 过渡:PLATFORM_KEY → JWT 兑换 -web/broker.py 88 ← Phase G G4: in-process pub/sub +web/broker.py 109 ← in-process pub/sub + cancel signal per-run web/sinks.py 20 ← Phase G G4: WebEventSink (§7 A sink 协议) -web/static/dev.html ~600 ← D' dev SPA(login + 3-pane,vanilla JS) +web/static/dev.html 1133 ← D' dev SPA + stop 按钮 + cancelled badge ───────────────────────────────── -Python 合计 ~3700 行(+ dev.html ~600 静态) +Python 合计 ~3800 行(+ dev.html 1133 静态) ``` 加 skills/ppt 脚本 ~600 行 + SKILL.md / references / config / prompts + alembic.ini,总仓库约 3500 行。 @@ -117,9 +118,8 @@ Python 合计 ~3700 行(+ dev.html ~600 静态) ## 下一步候选(性价比排序) 1. **真 OIDC 接入 + CORS 收紧**(~1 天)—— 把 `/v1/auth/login` 内部从 platform_key 校验换成 OIDC ID token 校验(路由层 Depends 不动);CORS 改成 platform 域名 allowlist。**真发布给真实用户前必做**。 -2. **`POST /v1/tasks/{id}/runs/{rid}/cancel`**(~1-2 小时)—— DESIGN §7.2 标 "待"。有了它 409 时用户可主动 cancel 当前 run 而非等流式跑完;BG 线程需要 cooperative cancel(check `Run.status` 已被改 `cancelling` 时 raise/break)。 -3. **§7 C Executor + sandbox**(~2-3 天)—— `run_python`/`shell` → `Executor.run(...)`,本地保留 subprocess、SaaS 走 docker;`api_key_env` → `KeyProvider` 运行时注入。多用户在线跑代码前置。 -4. **§7 E CLI transport 双模式**(~1.5 天)—— `cli.py chat --remote https://...` 走 HTTP 替代 in-process。dogfood ≡ 用户路径。 -5. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。 +2. **§7 C Executor + sandbox**(~2-3 天)—— `run_python`/`shell` → `Executor.run(...)`,本地保留 subprocess、SaaS 走 docker;`api_key_env` → `KeyProvider` 运行时注入。多用户在线跑代码前置。 +3. **§7 E CLI transport 双模式**(~1.5 天)—— `cli.py chat --remote https://...` 走 HTTP 替代 in-process。dogfood ≡ 用户路径。 +4. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。 -> §7 B + D + D'(过渡 auth)+ 单活 run 锁 主体已完工。剩余路线:真 OIDC → cancel 路由 → C(Executor)→ E(CLI 双模式)→ F(deploy / billing)。原 Phase G Web UI 路线撤(DESIGN §7.9),UI 改 platform 端实现;`web/static/dev.html` 是开发期单文件 SPA,跟 platform UI 并存不冲突。 +> §7 B + D + D'(过渡 auth)+ 单活 run 锁 + cancel 主体已完工。剩余路线:真 OIDC → C(Executor)→ E(CLI 双模式)→ F(deploy / billing)。原 Phase G Web UI 路线撤(DESIGN §7.9),UI 改 platform 端实现;`web/static/dev.html` 是开发期单文件 SPA,跟 platform UI 并存不冲突。 diff --git a/RUN.md b/RUN.md index b8dc7a2..c2090a6 100644 --- a/RUN.md +++ b/RUN.md @@ -2,7 +2,7 @@ > 怎么把 zcbot 跑起来。env / 常用命令 / 故障兜底。设计看 `DESIGN.md`,进度看 `PROGRESS.md`。 -最后更新:2026-05-18(`POST /v1/tasks/{id}/messages` 同 task 单活 run:已有 running → 409;启动 lifespan 把孤儿 running 标 error) +最后更新:2026-05-18(`POST /v1/tasks/{id}/runs/{rid}/cancel` 协作式 cancel + `cancelled` SSE 事件 + dev SPA stop 按钮;gate 扩到 `cancelling`) --- @@ -144,15 +144,16 @@ curl --noproxy '*' -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8765/v1/ta | `DELETE /v1/tasks/{id}` | 硬删 DB 行(messages CASCADE),FS working_dir 保留 | 必填 | | `GET /v1/folders` | 列当前 user 工作目录 + n_tasks + last_used(供创建 task 自动补全用) | 必填 | | `GET /v1/tasks/{id}/messages` | LiteLLM payload 透传 | 必填 | -| `POST /v1/tasks/{id}/messages` | `{content}` 发消息;返 `{run_id, events_url}`;**同 task 已有 running run → 409**(单活 run 保护,UI 应 disable send 按钮直到 SSE `done`) | 必填 | +| `POST /v1/tasks/{id}/messages` | `{content}` 发消息;返 `{run_id, events_url}`;**同 task 已有 running / cancelling run → 409**(单活 run 保护,UI 应 disable send 按钮直到 SSE `done`) | 必填 | | `GET /v1/tasks/{id}/runs/{rid}/events` | SSE 流(`event: ` + `data: `) | 必填 | +| `POST /v1/tasks/{id}/runs/{rid}/cancel` | 协作式 cancel 当前 run;返 `{ok, run_id, status:"cancelling"}`;run.status != `running` → 409;LLM 同步 call 本身不可中断,最坏等当前一轮跑完 | 必填 | | `GET /v1/tasks/{id}/files?path=` | 列子目录条目 + 面包屑 | 必填 | | `GET /v1/tasks/{id}/files/download?path=` | 下单文件 | 必填 | | `POST /v1/tasks/{id}/files/upload` | multipart 上传,`path` 走 form | 必填 | | `POST /v1/tasks/{id}/files/delete` | body `{path}`;文件或空目录 | 必填 | | `GET /v1/tasks/{id}/export` | 对话导出 .docx | 必填 | -**SSE 事件 schema**(每帧 `event: ` + `data: `):`run_start{}` → `llm_start{}` → `text{content}` / `tool_call{name,args,args_preview}` / `tool_result{name,preview,truncated}` → `llm_end{prompt_tokens,completion_tokens}` → `done{}`;异常路径走 `error{msg}`。30s 无 event 服务端发 `: ping` 注释心跳。SSE 经 nginx 反代记得关 buffering(响应头已带 `X-Accel-Buffering: no` 默认起效)。 +**SSE 事件 schema**(每帧 `event: ` + `data: `):`run_start{}` → `llm_start{}` → `text{content}` / `tool_call{name,args,args_preview}` / `tool_result{name,preview,truncated}` → `llm_end{prompt_tokens,completion_tokens}` → `done{}`;cancel 命中走 `cancelled{}` 后随 `done{}` 收流;异常路径走 `error{msg}`。30s 无 event 服务端发 `: ping` 注释心跳。SSE 经 nginx 反代记得关 buffering(响应头已带 `X-Accel-Buffering: no` 默认起效)。 **SSE 客户端注意**:浏览器原生 `EventSource` 不支持自定义 header,无法塞 Bearer token。要么走 `fetch + ReadableStream` 自解 SSE 帧(dev.html 走的就是这条),要么后端日后加 `?token=...` query 路径(目前不支持,避免 token 进 access log)。 @@ -175,8 +176,10 @@ curl --noproxy '*' -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8765/v1/ta | `cli.py web` 启动后 curl 连不上 | 检查 proxy(`HTTP_PROXY` / `HTTPS_PROXY`):本地服务在 127.0.0.1,系统 proxy 拦截会 502。临时 `unset HTTP_PROXY HTTPS_PROXY` 或加 `curl --noproxy '*'`。验通:`curl --noproxy '*' http://127.0.0.1:8765/healthz` → `{"status":"ok"}` | | SSE 卡住不流(经 nginx) | 反代要关 buffering — 后端响应头已带 `X-Accel-Buffering: no`,nginx ≥ 1.5.6 默认认。仍卡看 nginx 配 `proxy_buffering off; proxy_read_timeout 3600s;` | | platform 端 CORS preflight 失败 | 本地 dev `allow_origins=["*"]` 应该没事;部署后看是否按 platform 域名收紧过头(`access-control-allow-origin` 响应头要含 platform 域名 或 `*`)| -| `POST /v1/tasks/{id}/messages` 返 409 `task already has a running run` | 上一条消息的 BG run 还没跑完(SSE 没 `done`)。等流式跑完;或服务异常下 Run 行卡 `running`,启动 reaper 会清(crash 重启 / `cli.py web` 重启)。后续 `POST /v1/tasks/{id}/runs/{rid}/cancel`(DESIGN §7.2 待办)做出来后可主动 cancel | -| `[startup] reaped N stale running run(s)` | 上次 `cli.py web` 进程未正常 finish 留下 N 个 running Run 行,启动 lifespan 自动标 error。无需处理,info 级 | +| `POST /v1/tasks/{id}/messages` 返 409 `task already has an active run` | 上一条消息的 BG run 还没跑完(SSE 没 `done`)。等流式跑完;或点 dev SPA 的 stop / `POST /runs/{rid}/cancel`;服务异常下 Run 行卡 `running`/`cancelling`,启动 reaper 会清 | +| `POST /v1/tasks/{id}/runs/{rid}/cancel` 返 409 `run not running` | run 已结束(ok/error/cancelled)或已被 cancel 进入 `cancelling`,不能重复 cancel;dev SPA 自动忽略不报错 | +| 点 stop 后流式没立刻停 | LLM 同步调用本身不可中断,最坏等当前一轮跑完(通常几十秒)。loop 进入下个 check 点(每轮 LLM 前 / 每个 tool_call 前)就退,emit `cancelled` → SSE `done` → UI 收回 stop 按钮 | +| `[startup] reaped N stale active run(s)` | 上次 `cli.py web` 进程未正常 finish 留下 N 个 `running` / `cancelling` Run 行,启动 lifespan 自动标 error。无需处理,info 级 | | `cli.py web` 启动报 `PLATFORM_KEY env not set` / `JWT_SECRET env not set` | D' 过渡 auth 强制双 env 必填。生成 `python -c "import secrets;print(secrets.token_urlsafe(48))"` 各填一,写进 `.env` 重起 | | `/v1/*` 全返 401 `missing Authorization: Bearer` | 没拿 token 或没带 header。先 `POST /v1/auth/login` 拿 token,curl 加 `-H "Authorization: Bearer $TOKEN"` | | `/v1/*` 返 401 `token expired` | JWT 默 7d TTL 到期,重 login。要更长改 `ZCBOT_JWT_TTL_SECONDS` env | diff --git a/core/loop.py b/core/loop.py index 9c7502a..478bbf9 100644 --- a/core/loop.py +++ b/core/loop.py @@ -7,13 +7,16 @@ from __future__ import annotations import json import time -from typing import Any, Dict, Optional, Tuple +from typing import Any, Callable, Dict, Optional, Tuple from .capabilities import ModelCapabilities from .llm import LLM from .session import Session +_CANCELLED_TOOL_PLACEHOLDER = "[cancelled by user]" + + def _extract_usage(usage: Any) -> Tuple[int, int]: """从 litellm response.usage 提 (prompt_tokens, completion_tokens)。""" if not usage: @@ -36,6 +39,7 @@ class AgentLoop: capabilities: ModelCapabilities, sink: Optional[Any] = None, max_iterations: Optional[int] = None, + cancel_check: Optional[Callable[[], bool]] = None, ) -> None: self.llm = llm self.tools = tools @@ -43,15 +47,37 @@ class AgentLoop: self.caps = capabilities self.max_iterations = max_iterations or capabilities.max_iterations self.sink = sink + # 协作式 cancel:web 层注入 `lambda: broker.is_cancelled(run_id)`; + # CLI 路径不设(None → 永不 cancel)。LLM 调用本身是 litellm 同步阻塞、不可中断, + # check 点放在每轮 LLM 前、tool_calls 之间;一次 LLM call 最坏卡几十秒。 + self.cancel_check = cancel_check def _emit(self, event: dict) -> None: if self.sink is not None: self.sink.emit(event) + def _is_cancelled(self) -> bool: + return bool(self.cancel_check and self.cancel_check()) + + def _fill_cancelled_tool_results(self, remaining: list) -> None: + """给未执行的 tool_call 补 cancelled tool result,保 LiteLLM 协议完整。 + 每个 assistant tool_call 必须有对应的 tool message,否则 resume 时 LLM 报错。 + """ + for tc in remaining: + self.session.append({ + "role": "tool", + "tool_call_id": tc.id, + "content": _CANCELLED_TOOL_PLACEHOLDER, + }) + def run(self, user_message: str) -> str: self.session.append({"role": "user", "content": user_message}) for _ in range(self.max_iterations): + if self._is_cancelled(): + self._emit({"type": "cancelled"}) + return "[cancelled]" + self._emit({"type": "llm_start"}) start = time.monotonic() response = self.llm.chat( @@ -80,7 +106,11 @@ class AgentLoop: self._emit({"type": "done"}) return content or "" - for tc in tool_calls: + for i, tc in enumerate(tool_calls): + if self._is_cancelled(): + self._fill_cancelled_tool_results(tool_calls[i:]) + self._emit({"type": "cancelled"}) + return "[cancelled]" result = self._execute_tool_call(tc) self.session.append( { diff --git a/web/app.py b/web/app.py index 458e585..559e405 100644 --- a/web/app.py +++ b/web/app.py @@ -183,10 +183,11 @@ def _enumerate_files(root: Path, current: Path) -> tuple[list[dict], list[dict], # ─────────────────── Run 启动 + SSE 帧格式 ─────────────────── def _run_agent_bg(task_id: UUID, run_id: UUID, user_id: UUID, user_message: str) -> None: - """工作线程:`build_agent(resume=True)` → 装 WebEventSink → `agent.run` → 写 runs 状态。 + """工作线程:`build_agent(resume=True)` → 装 WebEventSink + cancel_check → `agent.run` → 写 runs 状态。 sink 通过 broker.emit 桥事件回 asyncio loop;agent.run 是 sync,所以在 to_thread 跑。 user_id 必须从 JWT 那侧透传过来 —— 决定 memory_block 读哪个 per-user 子树。 + cancel_check 桥 broker.is_cancelled,loop 在工具调用之间 poll(LLM 同步调用本身不可中断)。 """ from main import build_agent, sync_task_tokens try: @@ -195,12 +196,15 @@ def _run_agent_bg(task_id: UUID, run_id: UUID, user_id: UUID, user_message: str) session_id=str(task_id), resume=True, user_id=user_id, ) agent.sink = WebEventSink(broker, run_id) + agent.cancel_check = lambda rid=run_id: broker.is_cancelled(rid) agent.run(user_message) sync_task_tokens(task_state, agent.llm) + # cancel 命中时 agent.run 提前 return + 已 emit `cancelled`;终态写 "cancelled" + final_status = "cancelled" if broker.is_cancelled(run_id) else "ok" with session_scope() as s: s.execute( update(Run).where(Run.run_id == run_id).values( - status="ok", + status=final_status, finished_at=func.now(), tokens_p=agent.llm.token_counter.prompt_tokens, tokens_c=agent.llm.token_counter.completion_tokens, @@ -219,6 +223,7 @@ def _run_agent_bg(task_id: UUID, run_id: UUID, user_id: UUID, user_message: str) except Exception: pass # 已 emit error 给前端,DB 写失败不放大噪声 finally: + broker.clear_cancel(run_id) broker.close(run_id) @@ -271,13 +276,13 @@ def create_app() -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): broker.bind_loop(asyncio.get_running_loop()) - # Stale-run reaper:上次进程 crash 留下的 "running" 行已无 BG 线程继续, - # 启动时标 error,让对应 task 重新可发消息(否则 409 gate 永挂)。 + # Stale-run reaper:上次进程 crash 留下的 "running" / "cancelling" 行已无 BG 线程 + # 继续,启动时标 error,让对应 task 重新可发消息(否则 409 gate 永挂)。 # TODO 真生产 multi-worker:换 heartbeat / lease,只 reap 自家 worker 的孤儿。 with session_scope() as s: result = s.execute( update(Run) - .where(Run.status == "running") + .where(Run.status.in_(("running", "cancelling"))) .values( status="error", error="server restarted before run finished", @@ -285,7 +290,7 @@ def create_app() -> FastAPI: ) ) if result.rowcount: - print(f"[startup] reaped {result.rowcount} stale running run(s)") + print(f"[startup] reaped {result.rowcount} stale active run(s)") yield app = FastAPI( @@ -652,13 +657,13 @@ def create_app() -> FastAPI: raise HTTPException(404, f"task not found: {tid}") active = s.execute( select(Run.run_id) - .where(Run.task_id == tid, Run.status == "running") + .where(Run.task_id == tid, Run.status.in_(("running", "cancelling"))) .limit(1) ).scalar_one_or_none() if active is not None: raise HTTPException( 409, - f"task already has a running run ({active}); wait for it to finish", + f"task already has an active run ({active}); wait for it to finish or cancel", ) s.add(Run(run_id=run_id, task_id=tid, status="running", started_at=func.now())) # commit 后 lock 释放;BG 线程接管(sink 通过 broker 把 event 桥回 asyncio loop) @@ -668,6 +673,50 @@ def create_app() -> FastAPI: "events_url": f"/v1/tasks/{tid}/runs/{run_id}/events", } + # ───────────── Run cancel ───────────── + + @app.post("/v1/tasks/{task_id}/runs/{run_id}/cancel", status_code=202, tags=["runs"]) + def cancel_run( + task_id: str, + run_id: str, + user_id: UUID = Depends(require_user), + ): + """向当前 run 发协作式 cancel 信号。 + - 校验 task 归属 user + run 归属 task;否则 404 + - run.status 不是 `running` → 409(已结束 / 已 cancelling 不能重复 cancel) + - 标 `cancelling`(过渡态),BG 线程 loop 在工具调用之间 poll 看见即退; + 退出后 finally 写终态 `cancelled`(或异常路径 `error`) + - LLM 同步调用本身不可中断,最坏要等当前 LLM call 跑完(通常几十秒内) + """ + try: + tid = UUID(task_id) + rid = UUID(run_id) + except ValueError: + raise HTTPException(404, "invalid id") + with session_scope() as s: + run = s.execute( + select(Run) + .join(Task, Task.task_id == Run.task_id) + .where( + Run.run_id == rid, + Run.task_id == tid, + Task.user_id == user_id, + ) + .with_for_update() + ).scalar_one_or_none() + if run is None: + raise HTTPException(404, f"run not found: {rid}") + if run.status != "running": + raise HTTPException( + 409, + f"run not running (status={run.status}); cannot cancel", + ) + s.execute( + update(Run).where(Run.run_id == rid).values(status="cancelling") + ) + broker.request_cancel(rid) + return {"ok": True, "run_id": str(rid), "status": "cancelling"} + # ───────────── SSE events ───────────── @app.get("/v1/tasks/{task_id}/runs/{run_id}/events", tags=["runs"]) diff --git a/web/broker.py b/web/broker.py index 4015e52..b26ad29 100644 --- a/web/broker.py +++ b/web/broker.py @@ -20,6 +20,7 @@ from __future__ import annotations import asyncio +import threading from collections import defaultdict from typing import Any, Optional from uuid import UUID @@ -30,6 +31,10 @@ class RunBroker: self._subs: dict[UUID, set[asyncio.Queue]] = defaultdict(set) # 已经发完 done 的 run — 后来订阅者直接收到 done,避免无限等 self._done: set[UUID] = set() + # cancel signal per-run。AgentLoop 在 BG 线程里 poll is_cancelled() 决定是否退; + # request_cancel 可在 BG 还没 register 时调用(setdefault),BG 启动后第一次 + # check 即看到。run 完成在 finally 里 clear_cancel 回收。 + self._cancel_flags: dict[UUID, threading.Event] = {} self._loop: Optional[asyncio.AbstractEventLoop] = None def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None: @@ -83,6 +88,22 @@ class RunBroker: def is_done(self, run_id: UUID) -> bool: return run_id in self._done + # ─────────────── cancel signaling ─────────────── + + def request_cancel(self, run_id: UUID) -> None: + """主线程(HTTP handler)发的 cancel 信号 — BG 线程 poll is_cancelled() 看见即退。 + setdefault:即便 BG 还没注册 flag 也能 set,BG 启动后第一次 check 立刻看见。 + """ + self._cancel_flags.setdefault(run_id, threading.Event()).set() + + def is_cancelled(self, run_id: UUID) -> bool: + ev = self._cancel_flags.get(run_id) + return bool(ev and ev.is_set()) + + def clear_cancel(self, run_id: UUID) -> None: + """run 真正退出(BG finally)清掉 flag,避免 dict 无限增长。""" + self._cancel_flags.pop(run_id, None) + # 进程内单例 — FastAPI lifespan 里 bind_loop;agent / sink / SSE handler 共享。 broker = RunBroker() diff --git a/web/static/dev.html b/web/static/dev.html index 2b9443c..3ad7d36 100644 --- a/web/static/dev.html +++ b/web/static/dev.html @@ -126,6 +126,7 @@ .msg.user { background: var(--user-bg); align-self: flex-end; } .msg.assistant, .msg.system, .msg.tool, .msg.error { background: var(--asst-bg); align-self: flex-start; } .msg.error { border-color: var(--accent); background: var(--accent-soft); color: var(--accent); } + .cancelled-badge { margin-top: 8px; padding: 4px 10px; font-size: 12px; color: var(--accent); background: var(--accent-soft); border: 1px dashed var(--accent); border-radius: 4px; display: inline-block; } .msg .role { font-size: 11px; color: var(--muted); margin-bottom: 2px; font-family: monospace; } .msg .body { word-wrap: break-word; font-size: 14px; line-height: 1.55; } .msg .body.streaming::after { content: "▌"; color: var(--accent); animation: blink 1s infinite; } @@ -311,6 +312,7 @@
ready +
@@ -365,6 +367,7 @@ const state = { taskMeta: null, filesPath: "", evtSrc: null, + currentRunId: null, // 当前流式中的 run_id;用于 stop 按钮发 cancel // task list 分页 + 筛选 taskPage: 1, taskPageSize: 20, @@ -726,6 +729,8 @@ async function sendMessage() { const r = await api("POST", `/v1/tasks/${state.taskId}/messages`, { content }); $("chat-input").value = ""; + state.currentRunId = r.run_id; + $("chat-cancel").style.display = ""; streamSse(r.events_url, asstCard); } catch (e) { if (e.status === 401) { logout(); return; } @@ -735,6 +740,25 @@ async function sendMessage() { } } +async function cancelCurrentRun() { + if (!state.taskId || !state.currentRunId) return; + const btn = $("chat-cancel"); + btn.disabled = true; + $("chat-hint").textContent = "cancelling…"; + try { + await api("POST", `/v1/tasks/${state.taskId}/runs/${state.currentRunId}/cancel`); + // 不重置 state.currentRunId / 按钮 — 等 SSE 的 cancelled / done 走完一并清 + } catch (e) { + if (e.status === 401) { logout(); return; } + // 409 = 已结束 / 已 cancelling,不算错;其他贴 toast + if (e.status !== 409) appendErrorCard("cancel: " + e.message); + btn.disabled = false; + $("chat-hint").textContent = "ready"; + } +} + +$("chat-cancel").addEventListener("click", cancelCurrentRun); + function streamSse(url, asstCard) { // EventSource 不支持自定义 header,token 走 query string(?token=...) // 这里 SSE 走 same-origin,token 经 URL 传给后端 — 但当前后端只读 Authorization 头 @@ -744,39 +768,44 @@ function streamSse(url, asstCard) { async function fetchSse(url, asstCard) { const body = asstCard.querySelector(".body"); - const r = await fetch(url, { - headers: { "Authorization": "Bearer " + state.token, "Accept": "text/event-stream" }, - }); - if (!r.ok) throw new Error(r.status + " " + r.statusText); - const reader = r.body.getReader(); - const dec = new TextDecoder(); - let buf = ""; - // 流式 markdown:累积 raw 文本 → rAF 节流重渲染整段 body const ctx = { acc: "", body, pending: false }; - $("chat-hint").textContent = "streaming…"; - - while (true) { - const { value, done } = await reader.read(); - if (done) break; - buf += dec.decode(value, { stream: true }); + try { + const r = await fetch(url, { + headers: { "Authorization": "Bearer " + state.token, "Accept": "text/event-stream" }, + }); + if (!r.ok) throw new Error(r.status + " " + r.statusText); + const reader = r.body.getReader(); + const dec = new TextDecoder(); + let buf = ""; + $("chat-hint").textContent = "streaming…"; while (true) { - const idx = buf.indexOf("\n\n"); - if (idx < 0) break; - const frame = buf.slice(0, idx); - buf = buf.slice(idx + 2); - const ev = parseSseFrame(frame); - if (!ev) continue; - handleSseEvent(ev, asstCard, ctx); - if (ev.event === "done" || ev.event === "error") break; + const { value, done } = await reader.read(); + if (done) break; + buf += dec.decode(value, { stream: true }); + while (true) { + const idx = buf.indexOf("\n\n"); + if (idx < 0) break; + const frame = buf.slice(0, idx); + buf = buf.slice(idx + 2); + const ev = parseSseFrame(frame); + if (!ev) continue; + handleSseEvent(ev, asstCard, ctx); + if (ev.event === "done" || ev.event === "error") break; + } } + // 最终定稿 + 代码高亮(流式中不 highlight,省 CPU) + body.innerHTML = renderMd(ctx.acc); + highlightIn(asstCard); + } finally { + body.classList.remove("streaming"); + $("chat-send").disabled = false; + $("chat-hint").textContent = "ready"; + state.currentRunId = null; + const cb = $("chat-cancel"); + cb.style.display = "none"; + cb.disabled = false; } - // 最终定稿 + 代码高亮(流式中不 highlight,省 CPU) - body.innerHTML = renderMd(ctx.acc); - body.classList.remove("streaming"); - highlightIn(asstCard); - $("chat-send").disabled = false; - $("chat-hint").textContent = "ready"; - // 刷新 task meta + messages(拿真实持久化的) + // 刷新 task meta + messages(拿真实持久化的);失败路径已退出,这里不再跑 loadTaskList(); await loadMessages(); } @@ -825,6 +854,11 @@ function handleSseEvent(ev, asstCard, ctx) { det.className = "tool-call"; det.innerHTML = `tool_result
${escapeHtml(typeof txt === "string" ? txt : JSON.stringify(txt, null, 2))}
`; asstCard.appendChild(det); + } else if (t === "cancelled") { + const badge = document.createElement("div"); + badge.className = "cancelled-badge"; + badge.textContent = "已停止(stopped by user)"; + asstCard.appendChild(badge); } else if (t === "error") { const msg = (ev.data && (ev.data.msg || ev.data.error)) || JSON.stringify(ev.data); appendErrorCard(msg);