core(0004): 删 runs / usage_events 表 + cancel/SSE 改 task-level

usage_events 全代码库零引用,纯死代码;runs 表实质就是"task 当前 in-flight 状态"
影子表,tokens_p/c 写但从未被读,run_id 唯二实用是 broker 频道键 + cancel 参数 —
单活 run 形态下完全冗余,客户端只需 task_id。按"开发期不写兼容层"心智一把切干净。

- alembic 0004:DROP runs / usage_events,tasks 加 run_status (idle/running/cancelling/error) +
  run_error 两列;ok/cancelled 终态都回 idle 不留持久标记,只有 error 持久
- ORM 删 Run / UsageEvent class
- Broker 全 task_id 索引,加 start(tid) 在新 run 前清 _done
- /v1/tasks/{tid}/runs/{rid}/{events,cancel} → /v1/tasks/{tid}/{events,cancel}
- POST /messages 返 {events_url} 去掉 run_id
- dev SPA: state.currentRunId → state.streaming(bool),路径去掉 /runs/{rid}/
- Smoke 18 case 全绿

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-18 11:05:35 +08:00
parent bf41631437
commit 2e519ab8a6
10 changed files with 264 additions and 192 deletions

View File

@ -249,19 +249,21 @@ Tasks
(同 working_dir 多 task 共享,文件由用户经 /files/delete 单独清) (同 working_dir 多 task 共享,文件由用户经 /files/delete 单独清)
GET /v1/folders 列当前 user 的 working_dir(FS 是 source of truth + 关联 task 计数 + 最后使用时间) GET /v1/folders 列当前 user 的 working_dir(FS 是 source of truth + 关联 task 计数 + 最后使用时间)
GET /v1/tasks/{id}/messages 历史(后续 ?search= 走 jsonb GIN / tsvector) GET /v1/tasks/{id}/messages 历史(后续 ?search= 走 jsonb GIN / tsvector)
POST /v1/tasks/{id}/messages {content} 发消息 + 起 run,返 {run_id} POST /v1/tasks/{id}/messages {content} 发消息 + 起 run,返 {events_url}
**同 task 单活 run**:已有 running → 409 **单活 run**(0004 简化):tasks.run_status in
(`SELECT … FOR UPDATE` 锁 task 行,序列化并发 ('running','cancelling') → 409;'error' 起新 run
POST 防 `messages.idx` UniqueConstraint race; 时清(跟 ok 一样视为可重启)。`SELECT … FOR UPDATE`
UI 应 disable send 按钮直到 SSE `done`) 锁 task 行,序列化并发 POST 防 `messages.idx` race
GET /v1/tasks/{id}/runs/{rid}/events SSE 流(见下) GET /v1/tasks/{id}/events SSE 流(见下)— 订阅 task 当前活动事件,
POST /v1/tasks/{id}/runs/{rid}/cancel 协作式 cancel(202):标 `cancelling` + 信号 单活 run 形态下无歧义,客户端只需 task_id
POST /v1/tasks/{id}/cancel 协作式 cancel(202):标 `cancelling` + 信号
broker;BG loop 在工具调用之间 poll 看见即退, broker;BG loop 在工具调用之间 poll 看见即退,
给未执行 tool_call 补 `[cancelled by user]` 给未执行 tool_call 补 `[cancelled by user]`
tool result(保 LiteLLM 协议),emit `cancelled` tool result(保 LiteLLM 协议),emit `cancelled`
事件;finally 写终态 `cancelled`(异常 `error`)。 事件;finally 写终态 — 正常 / cancel 都回 `idle`
LLM 同步 call 本身不可中断 — 最坏等当前一轮跑完。 (不留持久标记),异常才写 `error`
run.status != `running` → 409(已结束 / 已 cancelling) run_status != `running` → 409。
LLM 同步 call 本身不可中断 — 最坏等当前一轮跑完
Files(user-rooted,不绑 task — `workspace/users/<uid>/` 为根) Files(user-rooted,不绑 task — `workspace/users/<uid>/` 为根)
GET /v1/files?path= 列子目录 {entries, crumbs, exists, root, current};留空 → user_root; GET /v1/files?path= 列子目录 {entries, crumbs, exists, root, current};留空 → user_root;
@ -283,7 +285,7 @@ Misc
``` ```
run_start {} run_start {}
llm_start {} llm_start {}
text {"content":"<delta 或全量,取决于 model streaming 配置>"} text {"delta":"<delta 文本>"}
tool_call {"name":"...","args":{...},"args_preview":"..."} tool_call {"name":"...","args":{...},"args_preview":"..."}
tool_result {"name":"...","preview":"...","truncated":bool} # 完整 result 走 DB,SSE 只送预览给 UI tool_result {"name":"...","preview":"...","truncated":bool} # 完整 result 走 DB,SSE 只送预览给 UI
llm_end {"prompt_tokens":N,"completion_tokens":N} llm_end {"prompt_tokens":N,"completion_tokens":N}
@ -314,6 +316,8 @@ users(user_id uuid pk, email null, password_hash | oidc_subject null, plan null,
tasks(task_id uuid pk, user_id fk, name text not null, working_dir text not null, skill, description, tasks(task_id uuid pk, user_id fk, name text not null, working_dir text not null, skill, description,
status, model_profile, tokens_prompt, tokens_completion, cost_usd, status, model_profile, tokens_prompt, tokens_completion, cost_usd,
run_status text not null default 'idle', -- idle/running/cancelling/error(0004 合并 runs 表)
run_error text null, -- error 状态的错误文本,其他状态 NULL
created_at, updated_at); created_at, updated_at);
create index on tasks (user_id, working_dir); create index on tasks (user_id, working_dir);
-- working_dir 存储约定:本地 ROOT 内 → 相对 ROOT 的 posix 串 -- working_dir 存储约定:本地 ROOT 内 → 相对 ROOT 的 posix 串
@ -328,12 +332,13 @@ messages(message_id uuid pk, task_id fk, idx int not null,
unique (task_id, idx)); unique (task_id, idx));
create index on messages using gin (payload jsonb_path_ops); create index on messages using gin (payload jsonb_path_ops);
-- 全文搜按需加 tsvector + GIN(中文 simple + pg_trgm 起步) -- 全文搜按需加 tsvector + GIN(中文 simple + pg_trgm 起步)
runs(run_id pk, task_id fk, status, started_at, finished_at, error, tokens_p, tokens_c)
usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts)
-- append-only。task_id/run_id 不 FK,task 硬删后审计仍存活
``` ```
**0004 简化:删 `runs` / `usage_events` 表**(从未真用过 — 详 §7.9 取舍)。原 `runs`
角色等价于"task 当前 in-flight 状态",合并到 `tasks.run_status` + `tasks.run_error` 两列;
`usage_events` 是为未来计费预付的架构成本,真要计费再加,DB schema 改动便宜。`run_id`
取消 —— 单活 run 形态下它对客户端 / broker / cancel 全是冗余字段。
**No-subtask 校验**(`create_task`):查同 user 下是否存在 `new LIKE existing/%``existing LIKE new/%`,中一则拒;同 task_dir 允许。**两侧先用 `from_db_path` 归一到 absolute posix 再比前缀**(混合存储形态 [相对+绝对] 不会漏判),数量小直接 Python 端比对,不在 SQL 里拼分隔符。 **No-subtask 校验**(`create_task`):查同 user 下是否存在 `new LIKE existing/%``existing LIKE new/%`,中一则拒;同 task_dir 允许。**两侧先用 `from_db_path` 归一到 absolute posix 再比前缀**(混合存储形态 [相对+绝对] 不会漏判),数量小直接 Python 端比对,不在 SQL 里拼分隔符。
**Folder rename**(`old → new`,FS rename 成功后):`UPDATE tasks SET task_dir = new || substring(task_dir from len(old)+1) WHERE user_id=? AND (task_dir = old OR task_dir LIKE old||'/%')`。**用 `old/%` 而非 `old%`**,避免 `project_a` 误中 `project_a_other`。running task 引用时禁 rename / delete。 **Folder rename**(`old → new`,FS rename 成功后):`UPDATE tasks SET task_dir = new || substring(task_dir from len(old)+1) WHERE user_id=? AND (task_dir = old OR task_dir LIKE old||'/%')`。**用 `old/%` 而非 `old%`**,避免 `project_a` 误中 `project_a_other`。running task 引用时禁 rename / delete。
@ -410,8 +415,8 @@ usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts)
| 误删 folder | 二确认 + 输入 folder 名;真要再加 trash bin | | 误删 folder | 二确认 + 输入 folder 名;真要再加 trash bin |
| DB-then-FS 中断留孤儿目录 | 后台 GC 周期扫"FS 有但 DB 无引用" | | DB-then-FS 中断留孤儿目录 | 后台 GC 周期扫"FS 有但 DB 无引用" |
| 同 folder 多 task 并发写同名 | 文件级悲观锁,冲突早失败 | | 同 folder 多 task 并发写同名 | 文件级悲观锁,冲突早失败 |
| 同 task 并发 POST messages 撞 `messages.idx` UniqueConstraint | `POST /v1/tasks/{id}/messages` 单活 run 检查:`SELECT … FOR UPDATE` 锁 task 行 + 查 `runs.status in ('running','cancelling')`,有 → 409;同事务插新 Run 行避 TOCTOU。配启动 lifespan reaper 把孤儿 `running`/`cancelling` 全标 error(进程 crash 残留)。未来真生产 multi-worker 换 heartbeat / lease | | 同 task 并发 POST messages 撞 `messages.idx` UniqueConstraint | `POST /v1/tasks/{id}/messages` 单活 run 检查:`SELECT … FOR UPDATE` 锁 task 行 + 查 `tasks.run_status in ('running','cancelling')`,有 → 409;同事务标 running 避 TOCTOU。配启动 lifespan reaper 把孤儿 `running`/`cancelling` 全标 error(进程 crash 残留)。未来真生产 multi-worker 换 heartbeat / lease |
| Run 跑太久 / 用户想中断 | `POST /v1/tasks/{id}/runs/{rid}/cancel` 协作式 cancel:标 `cancelling` + broker 信号;`AgentLoop.cancel_check` 回调在每轮 LLM 前、tool_calls 之间 poll;命中给未执行 tool_call 补 `[cancelled by user]` tool result 保 LiteLLM 协议,emit `cancelled` 事件,BG finally 写终态 `cancelled`。LLM 同步 call 本身不可中断 — 接受最坏等当前一轮跑完(几十秒内) | | Run 跑太久 / 用户想中断 | `POST /v1/tasks/{id}/cancel` 协作式 cancel:标 `cancelling` + broker 信号;`AgentLoop.cancel_check` 回调在每轮 LLM 前、tool_calls 之间 poll;命中给未执行 tool_call 补 `[cancelled by user]` tool result 保 LiteLLM 协议,emit `cancelled` 事件,BG finally `run_status``idle`(不留持久标记)。LLM 同步 call 本身不可中断 — 接受最坏等当前一轮跑完(几十秒内) |
| Sandbox 出站越权 | egress allowlist 起步只放 LLM + PyPI | | Sandbox 出站越权 | egress allowlist 起步只放 LLM + PyPI |
| 资源滥用 | BYO key 默认;月度配额;cold task LRU 清 | | 资源滥用 | BYO key 默认;月度配额;cold task LRU 清 |
@ -423,7 +428,13 @@ usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts)
**skill 产物全落 cwd 不引入 artifacts 表**:中间件是用户花 token 生成的资产,可下载可替换;artifacts 表是为不确定 UX 收益预付架构成本。真嫌乱 UI 加折叠视图。 **skill 产物全落 cwd 不引入 artifacts 表**:中间件是用户花 token 生成的资产,可下载可替换;artifacts 表是为不确定 UX 收益预付架构成本。真嫌乱 UI 加折叠视图。
**hard cascade 而非 soft orphan**:`orphaned` 让 list / resume / UI 都多一种特殊 case,"删 folder = 删项目"比"留对话残骸"自然。`usage_events` append-only 不 FK,task 硬删后月账仍存活。 **hard cascade 而非 soft orphan**:`orphaned` 让 list / resume / UI 都多一种特殊 case,"删 folder = 删项目"比"留对话残骸"自然。(原 `usage_events` 表 0004 删 — 见下条)
**0004 删 `runs` + `usage_events` 表**(2026-05-18 决策):
- **`runs`**:实质是"task 当前 in-flight 状态"的影子表 —— `tokens_p/c` 写但从未被读(tokens 累计走 `tasks.tokens_prompt/_completion`),`started_at/finished_at/error` 也只写不读,`run_id` 唯二实用是 broker pub/sub 频道键 + cancel 参数。但 §7.1 选定单活 run 形态下 `run_id` 是冗余 —— 同 task 同时最多 1 个活 run,客户端只需要 task_id(永远有)就够。合并 `run_status` + `run_error` 两列入 `tasks`,删表;broker 改 task_id 索引;`/v1/tasks/{id}/runs/{rid}/{events,cancel}` 改 `/v1/tasks/{id}/{events,cancel}`
- **`usage_events`**:从未真写入(代码库零引用),纯死代码,为"未来计费"预付的架构成本。真要计费时改 DB schema 便宜,不预付。
- **取舍代价**:失"历史 run 元数据"(每次 LLM 调用的独立时间戳 / token 切片)—— messages 表已记下每次对话产物,token 累计在 tasks,真要细粒度审计再补回 `usage_events`(届时是新需求,不是技术债)。
- **`run_status` 终态语义**:`ok` / `cancelled` 收尾直接回 `idle`(用户视角"跑完了 / 停了"等价),只有 `error` 是持久终态(让用户能看到),起新 run 时由 `post_message` 清掉。
**本地也用 PG,不用 SQLite / JSON**:① dogfood ≡ 真实用户路径,bug 在 dogfood 就能复现;② Docker 已是必然依赖(§7.5),`docker compose up postgres` 零增量门槛;③ 双 adapter 维护税远高于 PG 一次性配置成本;④ 本地 dev 也能连远端测试服。 **本地也用 PG,不用 SQLite / JSON**:① dogfood ≡ 真实用户路径,bug 在 dogfood 就能复现;② Docker 已是必然依赖(§7.5),`docker compose up postgres` 零增量门槛;③ 双 adapter 维护税远高于 PG 一次性配置成本;④ 本地 dev 也能连远端测试服。

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。 > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。
最后更新:2026-05-18(`POST /v1/tasks/{id}/runs/{rid}/cancel` 协作式 cancel + `cancelled` SSE 事件 + dev SPA stop 按钮;gate 扩到 `cancelling`) 最后更新:2026-05-18(0004 大瘦身:删 runs / usage_events 表,run_status / run_error 合入 tasks;cancel / SSE 路由从 run_id 维度改 task 维度;broker 全 task_id 索引)
--- ---
@ -15,12 +15,13 @@
| 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 | | 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 |
| 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 | | 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 |
| 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill | | 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill |
| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**D `/v1` JSON API 完工 ✅**(原 Phase G Jinja2/HTMX UI 撤,改 platform 端实现);**D' 过渡 auth(PLATFORM_KEY → JWT)+ dev SPA ✅**;**同 task 单活 run 锁 ✅**;**run cancel + dev SPA stop 按钮 ✅**;真 OIDC 待;C(Executor)待;E(CLI 双模式)待。 | | §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**D `/v1` JSON API 完工 ✅**(原 Phase G Jinja2/HTMX UI 撤,改 platform 端实现);**D' 过渡 auth(PLATFORM_KEY → JWT)+ dev SPA ✅**;**同 task 单活 run 锁 ✅**;**task-level cancel + dev SPA stop 按钮 ✅**;**0004 schema 瘦身 ✅**(删 runs/usage_events);真 OIDC 待;C(Executor)待;E(CLI 双模式)待。 |
--- ---
## 已完成关键能力 ## 已完成关键能力
- **05-18 / 0004 schema 大瘦身:删 runs / usage_events 表,run_status / run_error 合入 tasks;路由从 run_id 维度改 task 维度**:用户复盘"为什么 cancel 接口要带 run_id?现在不是一个 task 一个 run 吗",顺手把 runs / usage_events 表也重新审视 — `usage_events` 全代码库零引用、零写入、零读取,纯死代码(为未来计费预付的架构成本);`runs` 表 `tokens_p/c` 写但从未被读(tokens 累计走 tasks 列),`started_at / finished_at / error` 也只写不读,`run_id` 唯二实用是 broker 频道键 + cancel 参数 — 但 §7.1 已选定**单活 run** 形态,同 task 同时最多 1 个活 run,客户端只需要 task_id(永远有)就够,run_id 完全冗余。按"开发期不写兼容层"心智一把切干净。**alembic 0004**:`DROP TABLE usage_events / runs`,`tasks` 加 `run_status text not null default 'idle'`(idle / running / cancelling / error)+ `run_error text null`。**ORM** `models.py``Run` / `UsageEvent` 两 class + 删 `BigInteger` import;`Task` 加两列;`storage/__init__.py` 文档示例同步;`Task.run_status` 终态语义:`ok / cancelled` 收尾都回 `idle`(用户视角"跑完 / 停了"等价不留持久标记),只有 `error` 是持久终态,起新 run 时清。**Broker**(`web/broker.py`)全面 task_id 索引:`_subs / _done / _cancel_flags` 三个 dict key 从 run_id 换 task_id;加 `start(task_id)` 入口在新 run 起来前清 `_done` 标记(避免上一轮 done 让新订阅者立刻断流)。**Sink**(`web/sinks.py`)绑 task_id 替代 run_id。**`web/app.py`**:① `_run_agent_bg(task_id, user_id, content)` 去掉 run_id 参数;装 `agent.cancel_check = lambda tid=task_id: broker.is_cancelled(tid)`;终态写 `tasks.run_status = "idle"`(原 `Run.status = "ok"/"cancelled"`)或 `"error"`(`run_error = err`);finally `broker.clear_cancel(tid) + broker.close(tid)`。② `POST /v1/tasks/{tid}/messages` 改:`SELECT Task.run_status … FOR UPDATE` 替代 `select(Run.run_id) … running/cancelling`;同事务 `UPDATE Task SET run_status='running', run_error=NULL`(error 也算可重启视为清);commit 后 `broker.start(tid)` 清 done;返 `{"events_url": "/v1/tasks/{tid}/events"}` 去掉 `run_id`。③ `POST /v1/tasks/{tid}/cancel` 取代 `POST /v1/tasks/{tid}/runs/{rid}/cancel`,只校验 task 归属 user;`run_status != 'running'` → 409。④ `GET /v1/tasks/{tid}/events` 取代 `/runs/{rid}/events`,broker.subscribe(tid)。⑤ lifespan reaper `UPDATE Task SET run_status='error' WHERE run_status IN ('running','cancelling')`,文案不变。⑥ `_task_dict` 暴露 `run_status` / `run_error` 字段给前端。**dev SPA**(`web/static/dev.html`):`state.currentRunId` 改 `state.streaming` bool;`POST /messages` 拿到 `events_url` 直接订阅,不再保存 run_id;cancel 按钮 click → `POST /v1/tasks/{tid}/cancel`(去掉 `/runs/{rid}/`)。**Migration 跑通**:本地 PG `db upgrade 0003 → 0004 (head)` 一把过(用户授权清旧数据,无 backfill)。**Smoke 18 case 全绿**(in-process TestClient + BG mock):POST /messages 返 `events_url` 无 run_id / tasks.run_status='running' / gate when running 409 / POST /cancel 202 + run_status='cancelling' + broker flag set / double cancel 409(状态非 running)/ gate during cancelling 也 409 / cancel idle 409 / cancel error 409 / error 状态可发新消息(error 不挂 gate + 清 run_error) / ghost task 404 / invalid UUID 404 / cross-user 404 / no auth 401 / GET /events 路由注册(SSE 流式跑会挂 30s 心跳,smoke 只验路径 + headers) / GET /tasks 返回 run_status / run_error 字段 / stale reaper 扫 running+cancelling 标 error / broker.start 清 _done / broker.subscribe + emit + close + late subscriber 立刻收 done / broker.request_cancel + is_cancelled + clear_cancel。**净增量**:核心代码 -200 行(删表 ORM + 两路由层简化),broker 加 21 行 start/cancel API,dev.html 几行字段重命名;DB 表 5 → 3,路由 `/runs/{rid}/{events,cancel}``/{events,cancel}`,前端 SPA 不再需要先拿 run_id 才能 cancel / 订阅 — 客户端只需 task_id。**文档同步**:DESIGN §7.2 路由表 messages 路由返 `events_url`(去 `run_id`)+ cancel / events 改 task-level + lead-in 注 0004 简化 + SSE schema text event 字段 `delta`(实际就是 delta,文档原 `content` 笔误);§7.4 schema 块 tasks 加两列 + 注 0004 合并;§7.9 hard cascade 行注 "原 usage_events 0004 删" + 加专项取舍说明"0004 删 runs + usage_events 表";§7.7 风险表两行同步 / 改 task-level 路由名;RUN 路由表三路由全改 + 故障兜底 cancel 409 文案改 + db upgrade head 改 0004;PROGRESS 已完成 + 状态表 + 文件清单。
- **05-18 / cancel run endpoint + AgentLoop 协作式 cancel + dev SPA stop 按钮**:用户反馈"等待回复或 LLM 操作时没有停止接口"。落地 DESIGN §7.2 原标"待"的 `POST /v1/tasks/{id}/runs/{rid}/cancel`。**Broker**(`web/broker.py`):加 `request_cancel(rid)` / `is_cancelled(rid)` / `clear_cancel(rid)` 三方法,内部 `dict[UUID, threading.Event]` per-run;`setdefault` 保证 BG 还没 register 也能 set。**Loop**(`core/loop.py`):`AgentLoop` 加 `cancel_check: Optional[Callable[[], bool]]` 字段(CLI 路径不传 = None 永不 cancel),`_is_cancelled()` helper + `_fill_cancelled_tool_results(remaining)` 给未执行的 tool_call 全部 append `[cancelled by user]` tool message —— LiteLLM 协议要求每个 assistant tool_call 必须有匹配 tool result,否则 resume 时 LLM 报错。Check 点:每轮 LLM 前 + tool_calls 之间。命中 emit `cancelled` event + return `[cancelled]`。**LLM 同步 call 本身不可中断**(litellm 同步阻塞,无原生 cancel)—— 接受最坏等当前一轮跑完(通常几十秒),注释里讲清楚。**Endpoint**(`web/app.py::cancel_run`):校验 task 归属 user + run 归属 task(else 404),`run.status` 必须是 `running`(else 409 含具体 status);标 `cancelling`(过渡态)+ `broker.request_cancel(rid)`;202。`_run_agent_bg` 装配时 `agent.cancel_check = lambda rid=run_id: broker.is_cancelled(rid)`,run 完时判 `broker.is_cancelled` 写终态 `cancelled` vs `ok`;finally `broker.clear_cancel + broker.close`。**Gate 同步扩**:`post_message` 单活 run 检查从 `status == 'running'``status in ('running', 'cancelling')`,确保 cancel 后旧 BG 还没退出时新 POST 仍 409(避免新旧 run 撞 messages.idx)。**Reaper 同步扩**:lifespan 启动也扫 `cancelling`(进程 crash 时 BG 来不及写终态 cancelled,反正没线程在跑就清掉)。**dev SPA**(`web/static/dev.html`):chat 表单加 `<button id="chat-cancel" class="small danger">stop</button>`(常态 hidden);state 加 `currentRunId`;sendMessage 拿到 run_id 后 show stop,fetchSse `try/finally` 收尾时一并 hide stop / 清 currentRunId / 复原 send button(确保 SSE 失败路径 UI 也 reset 不卡死)。cancel 按钮 click → `POST /runs/{rid}/cancel`;409 静默忽略(并发 done 不算错)。`handleSseEvent` 加 `cancelled` case → 在当前 assistant 卡贴一个虚线红框 "已停止(stopped by user)" badge。CSS 加 `.cancelled-badge`。**Smoke 15 case 全绿**:HTTP 层 11 case(cancel happy + 双 cancel 409 + cancelling 期间 POST messages 409 + ghost run 404 + invalid UUID 404 + cross-task 404 + cross-user 404 + cancel 已 ok 409 + cancel 已 error 409 + no auth 401 + stale reaper 扫 cancelling);Loop 层 4 case(cancel before first iter 不调 LLM / cancel between tool_calls 补 cancelled placeholder 3 个 + 保协议 + emit cancelled / 正常 done 不 emit cancelled / CLI 路径 cancel_check=None 默永不 cancel)。**没动 SSE handler 的 break list**(`("done", "error")`):cancelled 在 SSE 里走流给前端看,broker.close 之后立即跟 done 收流。**文档同步**:DESIGN §7.2 路由表 cancel 行从"待"扩成完整描述 + SSE 事件加 `cancelled{}` 行 + §7.7 风险表加"Run 跑太久 / 用户想中断"行;RUN 路由表加 cancel 行 + POST /messages 409 文案改 "running / cancelling" + 故障兜底加三行(cancel 409 / 点 stop 没立刻停 / reaper 扫 cancelling);PROGRESS 已完成 + 下一步重排(去掉 cancel,留 OIDC / C Executor / E CLI 双模式)。 - **05-18 / cancel run endpoint + AgentLoop 协作式 cancel + dev SPA stop 按钮**:用户反馈"等待回复或 LLM 操作时没有停止接口"。落地 DESIGN §7.2 原标"待"的 `POST /v1/tasks/{id}/runs/{rid}/cancel`。**Broker**(`web/broker.py`):加 `request_cancel(rid)` / `is_cancelled(rid)` / `clear_cancel(rid)` 三方法,内部 `dict[UUID, threading.Event]` per-run;`setdefault` 保证 BG 还没 register 也能 set。**Loop**(`core/loop.py`):`AgentLoop` 加 `cancel_check: Optional[Callable[[], bool]]` 字段(CLI 路径不传 = None 永不 cancel),`_is_cancelled()` helper + `_fill_cancelled_tool_results(remaining)` 给未执行的 tool_call 全部 append `[cancelled by user]` tool message —— LiteLLM 协议要求每个 assistant tool_call 必须有匹配 tool result,否则 resume 时 LLM 报错。Check 点:每轮 LLM 前 + tool_calls 之间。命中 emit `cancelled` event + return `[cancelled]`。**LLM 同步 call 本身不可中断**(litellm 同步阻塞,无原生 cancel)—— 接受最坏等当前一轮跑完(通常几十秒),注释里讲清楚。**Endpoint**(`web/app.py::cancel_run`):校验 task 归属 user + run 归属 task(else 404),`run.status` 必须是 `running`(else 409 含具体 status);标 `cancelling`(过渡态)+ `broker.request_cancel(rid)`;202。`_run_agent_bg` 装配时 `agent.cancel_check = lambda rid=run_id: broker.is_cancelled(rid)`,run 完时判 `broker.is_cancelled` 写终态 `cancelled` vs `ok`;finally `broker.clear_cancel + broker.close`。**Gate 同步扩**:`post_message` 单活 run 检查从 `status == 'running'``status in ('running', 'cancelling')`,确保 cancel 后旧 BG 还没退出时新 POST 仍 409(避免新旧 run 撞 messages.idx)。**Reaper 同步扩**:lifespan 启动也扫 `cancelling`(进程 crash 时 BG 来不及写终态 cancelled,反正没线程在跑就清掉)。**dev SPA**(`web/static/dev.html`):chat 表单加 `<button id="chat-cancel" class="small danger">stop</button>`(常态 hidden);state 加 `currentRunId`;sendMessage 拿到 run_id 后 show stop,fetchSse `try/finally` 收尾时一并 hide stop / 清 currentRunId / 复原 send button(确保 SSE 失败路径 UI 也 reset 不卡死)。cancel 按钮 click → `POST /runs/{rid}/cancel`;409 静默忽略(并发 done 不算错)。`handleSseEvent` 加 `cancelled` case → 在当前 assistant 卡贴一个虚线红框 "已停止(stopped by user)" badge。CSS 加 `.cancelled-badge`。**Smoke 15 case 全绿**:HTTP 层 11 case(cancel happy + 双 cancel 409 + cancelling 期间 POST messages 409 + ghost run 404 + invalid UUID 404 + cross-task 404 + cross-user 404 + cancel 已 ok 409 + cancel 已 error 409 + no auth 401 + stale reaper 扫 cancelling);Loop 层 4 case(cancel before first iter 不调 LLM / cancel between tool_calls 补 cancelled placeholder 3 个 + 保协议 + emit cancelled / 正常 done 不 emit cancelled / CLI 路径 cancel_check=None 默永不 cancel)。**没动 SSE handler 的 break list**(`("done", "error")`):cancelled 在 SSE 里走流给前端看,broker.close 之后立即跟 done 收流。**文档同步**:DESIGN §7.2 路由表 cancel 行从"待"扩成完整描述 + SSE 事件加 `cancelled{}` 行 + §7.7 风险表加"Run 跑太久 / 用户想中断"行;RUN 路由表加 cancel 行 + POST /messages 409 文案改 "running / cancelling" + 故障兜底加三行(cancel 409 / 点 stop 没立刻停 / reaper 扫 cancelling);PROGRESS 已完成 + 下一步重排(去掉 cancel,留 OIDC / C Executor / E CLI 双模式)。
- **05-18 / `POST /v1/tasks/{id}/messages` 单活 run 锁 + 孤儿 reaper**:用户连点 send / 多 tab 同时发消息 → 两个 BG 线程争 `messages.idx`(UniqueConstraint 会 race-crash 第二个 INSERT)的旧 TODO 落地。**实现**:`web/app.py::post_message` 把所有权 + 活跃 Run 检查 + 新 Run INSERT 收进一个 `session_scope()` 事务,首行用 `select(Task.task_id).where(...).with_for_update()` 锁 task 行序列化并发 POST;事务内查 `Run.status='running'` 命中即 raise `HTTPException(409, "task already has a running run ({rid}); wait for it to finish")`;无活跃则同事务 `s.add(Run(...status="running"))` —— 三步原子完成,避免 TOCTOU。lifespan 加 **stale-run reaper**:启动时 `UPDATE runs SET status='error', error='server restarted before run finished' WHERE status='running'`,把进程 crash 留下的孤儿 running 全清掉(否则对应 task 永挂 409)。结果 rowcount > 0 时 print info 行 `[startup] reaped N stale running run(s)`。Cancel 路由(DESIGN §7.2 标 "待")没改:有了它 409 时用户可主动 cancel,不必等流式结束。**没动 `Session.append`**:gate 已在 HTTP 层挡住了,单写者前提下 idx 自递增不会冲;在 ORM 里再加锁是过度。**Smoke 10 case 全绿**(in-process TestClient + `_run_agent_bg` mock 不真起 LLM):happy(202 + Run INSERT running)/ gate(同 task 第二 POST 409 + detail 含 "running run" + "wait for it to finish")/ clear after Run.status=ok 解锁(202)/ clear after Run.status=error 同(202)/ ghost task 跨用户路径 404(锁前所有权检查)/ invalid UUID 404 / empty content 400 早于 lock / no auth 401 早于 lock / stale reaper 测试(强行 SET 全部 Run=running → 开新 TestClient 触发 lifespan → 所有 running 变 error + 之后 POST 还能 202)/ cross-user(other UID token 访 sentinel task → 404 不暴露存在性)。**采坑**:`@case` 每个用 `make_client()` 起新 app 会重复触发 reaper,把 case 1 留下的 running 清掉 → case 2 的 409 测不出来;改成全部 case 共享一个 SHARED_CLIENT 跑,仅 stale-reaper case 用 `fresh=True` 开第二个。**文档同步**:DESIGN §7.2 POST /messages 行注 409 行为 + cancel "待" 后注"做出来后 409 可主动 cancel" / §7.7 风险表加"同 task 并发 POST messages.idx race"行;RUN 路由表 POST /messages 注 409;故障兜底替过期 TODO 行 → 加 "POST 返 409" 处置 + "[startup] reaped N stale running" 解释。**未来 TODO**:multi-worker 部署形态下 reaper 不能简单全表清(会误清其他 worker 的真在跑 run),换 heartbeat + lease(注释里记了)。 - **05-18 / `POST /v1/tasks/{id}/messages` 单活 run 锁 + 孤儿 reaper**:用户连点 send / 多 tab 同时发消息 → 两个 BG 线程争 `messages.idx`(UniqueConstraint 会 race-crash 第二个 INSERT)的旧 TODO 落地。**实现**:`web/app.py::post_message` 把所有权 + 活跃 Run 检查 + 新 Run INSERT 收进一个 `session_scope()` 事务,首行用 `select(Task.task_id).where(...).with_for_update()` 锁 task 行序列化并发 POST;事务内查 `Run.status='running'` 命中即 raise `HTTPException(409, "task already has a running run ({rid}); wait for it to finish")`;无活跃则同事务 `s.add(Run(...status="running"))` —— 三步原子完成,避免 TOCTOU。lifespan 加 **stale-run reaper**:启动时 `UPDATE runs SET status='error', error='server restarted before run finished' WHERE status='running'`,把进程 crash 留下的孤儿 running 全清掉(否则对应 task 永挂 409)。结果 rowcount > 0 时 print info 行 `[startup] reaped N stale running run(s)`。Cancel 路由(DESIGN §7.2 标 "待")没改:有了它 409 时用户可主动 cancel,不必等流式结束。**没动 `Session.append`**:gate 已在 HTTP 层挡住了,单写者前提下 idx 自递增不会冲;在 ORM 里再加锁是过度。**Smoke 10 case 全绿**(in-process TestClient + `_run_agent_bg` mock 不真起 LLM):happy(202 + Run INSERT running)/ gate(同 task 第二 POST 409 + detail 含 "running run" + "wait for it to finish")/ clear after Run.status=ok 解锁(202)/ clear after Run.status=error 同(202)/ ghost task 跨用户路径 404(锁前所有权检查)/ invalid UUID 404 / empty content 400 早于 lock / no auth 401 早于 lock / stale reaper 测试(强行 SET 全部 Run=running → 开新 TestClient 触发 lifespan → 所有 running 变 error + 之后 POST 还能 202)/ cross-user(other UID token 访 sentinel task → 404 不暴露存在性)。**采坑**:`@case` 每个用 `make_client()` 起新 app 会重复触发 reaper,把 case 1 留下的 running 清掉 → case 2 的 409 测不出来;改成全部 case 共享一个 SHARED_CLIENT 跑,仅 stale-reaper case 用 `fresh=True` 开第二个。**文档同步**:DESIGN §7.2 POST /messages 行注 409 行为 + cancel "待" 后注"做出来后 409 可主动 cancel" / §7.7 风险表加"同 task 并发 POST messages.idx race"行;RUN 路由表 POST /messages 注 409;故障兜底替过期 TODO 行 → 加 "POST 返 409" 处置 + "[startup] reaped N stale running" 解释。**未来 TODO**:multi-worker 部署形态下 reaper 不能简单全表清(会误清其他 worker 的真在跑 run),换 heartbeat + lease(注释里记了)。
- **05-17 / files API 全面 user-rooted(去掉 task_id 前置)**:用户反馈"web 页应该能看到 user 的所有目录,现在只能选 task 后右侧才刷新"——根因是原 files API 用 task_id 拐杖间接拿 working_dir,迫使前端必须先选 task。语义上 files 操作只关心"路径 + user 边界",task_id 是多余的;同时 §7.1 心智模型早就把 task 和 dir 定义为正交副视图,API 不该混。**后端**:删 `_load_working_dir(task_id, user_id)`,加 `_load_user_root(user_id)`(走 `main.user_root(ws, uid)` 自动 mkdir 拿 `workspace/users/<uid>/`);4 路由全换:`GET /v1/files?path=` / `GET /v1/files/download?path=` / `POST /v1/files/upload` / `POST /v1/files/delete`。`_safe_join` 边界从 task_dir 改 user_root,安全性不降低;`_enumerate_files` 加 dotfile 过滤(`if p.name.startswith(".")` 跳过 `.memory/` 等,同 `/v1/folders` 约定);`_rel_to` 把 `Path(".")` 归一为空串(避免 root 时 current="." 这种 ugly 形态)。删 `from_db_path` import(只剩 `to_db_path`)。**dev SPA**:`loadFiles()` 不再 gate on `state.taskId`,enterApp 时直接调一次拉 user_root;`selectTask` 在拿到 task meta 后 `state.filesPath = wdName`(从 working_dir 末段抽出)再 loadFiles,选 task 自动跳到对应子目录但用户可点 crumb 回 root 看其他目录;crumbs root 标签 "/" → "我的"(user_root 直观);files-proj header 从"项目名(state.taskMeta 派生)"改"路径首段(数据驱动)",空时显示 `(user root)`。**新增 upload 按钮**(原来藏在外部页面里没暴露给 SPA):pane-head 加 `⬆` 按钮 + 隐藏 `<input type=file multiple>`,onchange 走 FormData POST `/v1/files/upload`,path 取当前 `state.filesPath`(空 → user_root);上传完 loadFiles 刷新。`deleteCurrentTask` 不再重置 files 面板(task 删了但 FS 文件保留,继续浏览有意义),只 reload 当前路径。`btn-refresh-files` 移除 disabled 状态(任何时候可用)。**Smoke 68 case 全绿**(in-process TestClient,跑完即删 `_smoke_files.py`):列 user_root(包含 working_dir 目录,`.memory` 被过滤) / 列子目录 2 层 / 不存在路径 200+exists=False / 路径安全 6 case(`../` / 绝对 / Windows 绝对 / `\\` 起头)/ upload 单 / multi+nested mkdir / 上传到 root / 文件名攻击 4 case(`../` `..` `/` `\\`)/ download 文件 + 深度 + 目录 400 + ghost 404 + 越界 400 / delete 文件 / 空目录 / 非空 400 / user_root 拒 / ghost 404 / 越界 400 / 跨 user 隔离 4 case(A 不见 B,B 不见 A)/ 无 token 全 401(GET list / POST upload / POST delete / GET download)/ 子目录里 dotfile 也过滤 / 新 user 首访 user_root 自动 mkdir + 列表空。**文档**:DESIGN §7.2 路由表段 + lead-in 同步("Task 一等公民,files 是其副视图(经 task_dir 暴露)" → "Task 一等公民;files 与 task 正交,走 user-rooted /v1/files*,以 workspace/users/<uid>/ 为边界")。 - **05-17 / files API 全面 user-rooted(去掉 task_id 前置)**:用户反馈"web 页应该能看到 user 的所有目录,现在只能选 task 后右侧才刷新"——根因是原 files API 用 task_id 拐杖间接拿 working_dir,迫使前端必须先选 task。语义上 files 操作只关心"路径 + user 边界",task_id 是多余的;同时 §7.1 心智模型早就把 task 和 dir 定义为正交副视图,API 不该混。**后端**:删 `_load_working_dir(task_id, user_id)`,加 `_load_user_root(user_id)`(走 `main.user_root(ws, uid)` 自动 mkdir 拿 `workspace/users/<uid>/`);4 路由全换:`GET /v1/files?path=` / `GET /v1/files/download?path=` / `POST /v1/files/upload` / `POST /v1/files/delete`。`_safe_join` 边界从 task_dir 改 user_root,安全性不降低;`_enumerate_files` 加 dotfile 过滤(`if p.name.startswith(".")` 跳过 `.memory/` 等,同 `/v1/folders` 约定);`_rel_to` 把 `Path(".")` 归一为空串(避免 root 时 current="." 这种 ugly 形态)。删 `from_db_path` import(只剩 `to_db_path`)。**dev SPA**:`loadFiles()` 不再 gate on `state.taskId`,enterApp 时直接调一次拉 user_root;`selectTask` 在拿到 task meta 后 `state.filesPath = wdName`(从 working_dir 末段抽出)再 loadFiles,选 task 自动跳到对应子目录但用户可点 crumb 回 root 看其他目录;crumbs root 标签 "/" → "我的"(user_root 直观);files-proj header 从"项目名(state.taskMeta 派生)"改"路径首段(数据驱动)",空时显示 `(user root)`。**新增 upload 按钮**(原来藏在外部页面里没暴露给 SPA):pane-head 加 `⬆` 按钮 + 隐藏 `<input type=file multiple>`,onchange 走 FormData POST `/v1/files/upload`,path 取当前 `state.filesPath`(空 → user_root);上传完 loadFiles 刷新。`deleteCurrentTask` 不再重置 files 面板(task 删了但 FS 文件保留,继续浏览有意义),只 reload 当前路径。`btn-refresh-files` 移除 disabled 状态(任何时候可用)。**Smoke 68 case 全绿**(in-process TestClient,跑完即删 `_smoke_files.py`):列 user_root(包含 working_dir 目录,`.memory` 被过滤) / 列子目录 2 层 / 不存在路径 200+exists=False / 路径安全 6 case(`../` / 绝对 / Windows 绝对 / `\\` 起头)/ upload 单 / multi+nested mkdir / 上传到 root / 文件名攻击 4 case(`../` `..` `/` `\\`)/ download 文件 + 深度 + 目录 400 + ghost 404 + 越界 400 / delete 文件 / 空目录 / 非空 400 / user_root 拒 / ghost 404 / 越界 400 / 跨 user 隔离 4 case(A 不见 B,B 不见 A)/ 无 token 全 401(GET list / POST upload / POST delete / GET download)/ 子目录里 dotfile 也过滤 / 新 user 首访 user_root 自动 mkdir + 列表空。**文档**:DESIGN §7.2 路由表段 + lead-in 同步("Task 一等公民,files 是其副视图(经 task_dir 暴露)" → "Task 一等公民;files 与 task 正交,走 user-rooted /v1/files*,以 workspace/users/<uid>/ 为边界")。
@ -88,7 +89,7 @@ core/memory.py 81 ← per-user `.memory/` dotfile
core/export_docx.py 383 ← §7 B Step 2-4 + from_db_path 还原 + task_dir Optional core/export_docx.py 383 ← §7 B Step 2-4 + from_db_path 还原 + task_dir Optional
core/storage/__init__.py 27 ← §7 B Step 1-3 core/storage/__init__.py 27 ← §7 B Step 1-3
core/storage/engine.py 80 ← §7 B Step 1 core/storage/engine.py 80 ← §7 B Step 1
core/storage/models.py 124 ← §7 B Step 1 core/storage/models.py 99 ← 3 表(0004 删 runs/usage_events;Task + run_status/run_error)
core/storage/utils.py 136 ← check_no_subtask 改 Python 端归一 core/storage/utils.py 136 ← check_no_subtask 改 Python 端归一
tools/base.py 34 tools/base.py 34
tools/fs.py 182 tools/fs.py 182
@ -101,11 +102,15 @@ db/migrations/env.py 61 ← §7 B Step 1
db/migrations/versions/ db/migrations/versions/
0001_initial_schema.py 125 ← §7 B Step 1 0001_initial_schema.py 125 ← §7 B Step 1
0002_task_dir_relative.py 61 ← 现有 ROOT-prefix 绝对 → 相对 0002_task_dir_relative.py 61 ← 现有 ROOT-prefix 绝对 → 相对
0003_task_name_and_working_dir.py
51 ← name 必填 + task_dir→working_dir + mode→skill
0004_drop_runs_usage_events.py
77 ← 删 runs/usage_events + tasks 加 run_status/run_error
web/__init__.py 5 ← Phase G G1 web/__init__.py 5 ← Phase G G1
web/app.py 898 ← /v1/ JSON API + user_id 隔离 + run lock + cancel endpoint web/app.py 889 ← /v1/ JSON API + user_id 隔离 + run lock + task-level cancel
web/auth.py 115 ← D' 过渡:PLATFORM_KEY → JWT 兑换 web/auth.py 115 ← D' 过渡:PLATFORM_KEY → JWT 兑换
web/broker.py 109 ← in-process pub/sub + cancel signal per-run web/broker.py 121 ← in-process pub/sub + cancel signal,全 task_id 索引(0004)
web/sinks.py 20 ← Phase G G4: WebEventSink (§7 A sink 协议) web/sinks.py 21 ← WebEventSink 绑 task_id(0004)
web/static/dev.html 1133 ← D' dev SPA + stop 按钮 + cancelled badge web/static/dev.html 1133 ← D' dev SPA + stop 按钮 + cancelled badge
───────────────────────────────── ─────────────────────────────────
Python 合计 ~3800 行(+ dev.html 1133 静态) Python 合计 ~3800 行(+ dev.html 1133 静态)

14
RUN.md
View File

@ -2,7 +2,7 @@
> 怎么把 zcbot 跑起来。env / 常用命令 / 故障兜底。设计看 `DESIGN.md`,进度看 `PROGRESS.md` > 怎么把 zcbot 跑起来。env / 常用命令 / 故障兜底。设计看 `DESIGN.md`,进度看 `PROGRESS.md`
最后更新:2026-05-18(`POST /v1/tasks/{id}/runs/{rid}/cancel` 协作式 cancel + `cancelled` SSE 事件 + dev SPA stop 按钮;gate 扩到 `cancelling`) 最后更新:2026-05-18(0004 schema 简化:删 runs / usage_events 表;cancel 改 task-level `POST /v1/tasks/{id}/cancel`;SSE 改 `GET /v1/tasks/{id}/events`;run_status / run_error 合并入 tasks)
--- ---
@ -37,7 +37,7 @@ python -m venv .venv
# 3) DB schema 上车 # 3) DB schema 上车
.venv/Scripts/python.exe cli.py db upgrade head .venv/Scripts/python.exe cli.py db upgrade head
.venv/Scripts/python.exe cli.py db current # 应输出 0003 (head) .venv/Scripts/python.exe cli.py db current # 应输出 0004 (head)
``` ```
--- ---
@ -144,9 +144,9 @@ curl --noproxy '*' -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8765/v1/ta
| `DELETE /v1/tasks/{id}` | 硬删 DB 行(messages CASCADE),FS working_dir 保留 | 必填 | | `DELETE /v1/tasks/{id}` | 硬删 DB 行(messages CASCADE),FS working_dir 保留 | 必填 |
| `GET /v1/folders` | 列当前 user 工作目录 + n_tasks + last_used(供创建 task 自动补全用) | 必填 | | `GET /v1/folders` | 列当前 user 工作目录 + n_tasks + last_used(供创建 task 自动补全用) | 必填 |
| `GET /v1/tasks/{id}/messages` | LiteLLM payload 透传 | 必填 | | `GET /v1/tasks/{id}/messages` | LiteLLM payload 透传 | 必填 |
| `POST /v1/tasks/{id}/messages` | `{content}` 发消息;返 `{run_id, events_url}`;**同 task 已有 running / cancelling run → 409**(单活 run 保护,UI 应 disable send 按钮直到 SSE `done`) | 必填 | | `POST /v1/tasks/{id}/messages` | `{content}` 发消息;返 `{events_url}`;**`tasks.run_status` 是 running / cancelling → 409**(单活 run 保护;error 状态视为可重启,起新 run 时清);UI 应 disable send 按钮直到 SSE `done` | 必填 |
| `GET /v1/tasks/{id}/runs/{rid}/events` | SSE 流(`event: <type>` + `data: <json>`) | 必填 | | `GET /v1/tasks/{id}/events` | SSE 流(`event: <type>` + `data: <json>`);订阅 task 当前活动 — 单活 run 形态下无歧义 | 必填 |
| `POST /v1/tasks/{id}/runs/{rid}/cancel` | 协作式 cancel 当前 run;返 `{ok, run_id, status:"cancelling"}`;run.status != `running` → 409;LLM 同步 call 本身不可中断,最坏等当前一轮跑完 | 必填 | | `POST /v1/tasks/{id}/cancel` | 协作式 cancel 当前活跃 run;返 `{ok, task_id, run_status:"cancelling"}`;`run_status != "running"` → 409;LLM 同步 call 本身不可中断,最坏等当前一轮跑完 | 必填 |
| `GET /v1/tasks/{id}/files?path=` | 列子目录条目 + 面包屑 | 必填 | | `GET /v1/tasks/{id}/files?path=` | 列子目录条目 + 面包屑 | 必填 |
| `GET /v1/tasks/{id}/files/download?path=` | 下单文件 | 必填 | | `GET /v1/tasks/{id}/files/download?path=` | 下单文件 | 必填 |
| `POST /v1/tasks/{id}/files/upload` | multipart 上传,`path` 走 form | 必填 | | `POST /v1/tasks/{id}/files/upload` | multipart 上传,`path` 走 form | 必填 |
@ -176,8 +176,8 @@ curl --noproxy '*' -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8765/v1/ta
| `cli.py web` 启动后 curl 连不上 | 检查 proxy(`HTTP_PROXY` / `HTTPS_PROXY`):本地服务在 127.0.0.1,系统 proxy 拦截会 502。临时 `unset HTTP_PROXY HTTPS_PROXY` 或加 `curl --noproxy '*'`。验通:`curl --noproxy '*' http://127.0.0.1:8765/healthz` → `{"status":"ok"}` | | `cli.py web` 启动后 curl 连不上 | 检查 proxy(`HTTP_PROXY` / `HTTPS_PROXY`):本地服务在 127.0.0.1,系统 proxy 拦截会 502。临时 `unset HTTP_PROXY HTTPS_PROXY` 或加 `curl --noproxy '*'`。验通:`curl --noproxy '*' http://127.0.0.1:8765/healthz` → `{"status":"ok"}` |
| SSE 卡住不流(经 nginx) | 反代要关 buffering — 后端响应头已带 `X-Accel-Buffering: no`,nginx ≥ 1.5.6 默认认。仍卡看 nginx 配 `proxy_buffering off; proxy_read_timeout 3600s;` | | SSE 卡住不流(经 nginx) | 反代要关 buffering — 后端响应头已带 `X-Accel-Buffering: no`,nginx ≥ 1.5.6 默认认。仍卡看 nginx 配 `proxy_buffering off; proxy_read_timeout 3600s;` |
| platform 端 CORS preflight 失败 | 本地 dev `allow_origins=["*"]` 应该没事;部署后看是否按 platform 域名收紧过头(`access-control-allow-origin` 响应头要含 platform 域名 或 `*`)| | platform 端 CORS preflight 失败 | 本地 dev `allow_origins=["*"]` 应该没事;部署后看是否按 platform 域名收紧过头(`access-control-allow-origin` 响应头要含 platform 域名 或 `*`)|
| `POST /v1/tasks/{id}/messages` 返 409 `task already has an active run` | 上一条消息的 BG run 还没跑完(SSE 没 `done`)。等流式跑完;或点 dev SPA 的 stop / `POST /runs/{rid}/cancel`;服务异常下 Run 行`running`/`cancelling`,启动 reaper 会清 | | `POST /v1/tasks/{id}/messages` 返 409 `task already has an active run` | 上一条消息的 BG run 还没跑完(SSE 没 `done`)。等流式跑完;或点 dev SPA 的 stop / `POST /v1/tasks/{id}/cancel`;服务异常下 `tasks.run_status` `running`/`cancelling`,启动 reaper 会清 |
| `POST /v1/tasks/{id}/runs/{rid}/cancel` 返 409 `run not running` | run 已结束(ok/error/cancelled)或已被 cancel 进入 `cancelling`,不能重复 cancel;dev SPA 自动忽略不报错 | | `POST /v1/tasks/{id}/cancel` 返 409 `task not running` | `run_status` 不是 `running`(idle / cancelling / error 都不能 cancel,error 只能起新 run 顶掉);dev SPA 自动忽略不报错 |
| 点 stop 后流式没立刻停 | LLM 同步调用本身不可中断,最坏等当前一轮跑完(通常几十秒)。loop 进入下个 check 点(每轮 LLM 前 / 每个 tool_call 前)就退,emit `cancelled` → SSE `done` → UI 收回 stop 按钮 | | 点 stop 后流式没立刻停 | LLM 同步调用本身不可中断,最坏等当前一轮跑完(通常几十秒)。loop 进入下个 check 点(每轮 LLM 前 / 每个 tool_call 前)就退,emit `cancelled` → SSE `done` → UI 收回 stop 按钮 |
| `[startup] reaped N stale active run(s)` | 上次 `cli.py web` 进程未正常 finish 留下 N 个 `running` / `cancelling` Run 行,启动 lifespan 自动标 error。无需处理,info 级 | | `[startup] reaped N stale active run(s)` | 上次 `cli.py web` 进程未正常 finish 留下 N 个 `running` / `cancelling` Run 行,启动 lifespan 自动标 error。无需处理,info 级 |
| `cli.py web` 启动报 `PLATFORM_KEY env not set` / `JWT_SECRET env not set` | D' 过渡 auth 强制双 env 必填。生成 `python -c "import secrets;print(secrets.token_urlsafe(48))"` 各填一,写进 `.env` 重起 | | `cli.py web` 启动报 `PLATFORM_KEY env not set` / `JWT_SECRET env not set` | D' 过渡 auth 强制双 env 必填。生成 `python -c "import secrets;print(secrets.token_urlsafe(48))"` 各填一,写进 `.env` 重起 |

View File

@ -2,7 +2,7 @@
入口: 入口:
from core.storage import get_engine, session_scope, ensure_local_sentinel from core.storage import get_engine, session_scope, ensure_local_sentinel
from core.storage.models import User, Task, Message, Run, UsageEvent from core.storage.models import User, Task, Message
ZCBOT_DB_URL 环境变量必填(本地连测试 / staging PG;SaaS 连生产 PG) ZCBOT_DB_URL 环境变量必填(本地连测试 / staging PG;SaaS 连生产 PG)
未设置时 get_engine() RuntimeError 并指引设置 未设置时 get_engine() RuntimeError 并指引设置

View File

@ -1,9 +1,10 @@
"""SQLAlchemy 2.x ORM models,对应 DESIGN.md §7.4 schema。 """SQLAlchemy 2.x ORM models,对应 DESIGN.md §7.4 schema。
5 张表:users / tasks / messages / runs / usage_events 3 张表:users / tasks / messages
- users 本地形态固定 INSERT sentinel(`00000000-...`) - users 本地形态固定 INSERT sentinel(`00000000-...`)
- messages.payload jsonb,GIN 索引在 migration 里建 - messages.payload jsonb,GIN 索引在 migration 里建
- runs / usage_events B 阶段先建表,真正写入要等 D 阶段(HTTP /v1 + run 生命周期) - run 状态承载在 tasks.run_status / run_error 两列(0004 合并 runs );
runs / usage_events 0004 DESIGN §7.4 取舍 / PROGRESS 05-18
""" """
from __future__ import annotations from __future__ import annotations
@ -13,7 +14,6 @@ from typing import Any, Optional
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from sqlalchemy import ( from sqlalchemy import (
BigInteger,
DateTime, DateTime,
ForeignKey, ForeignKey,
Integer, Integer,
@ -65,6 +65,11 @@ class Task(Base):
tokens_prompt: Mapped[int] = mapped_column(Integer, nullable=False, default=0) tokens_prompt: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
tokens_completion: Mapped[int] = mapped_column(Integer, nullable=False, default=0) tokens_completion: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
cost_usd: Mapped[Decimal] = mapped_column(Numeric(12, 6), nullable=False, default=0) cost_usd: Mapped[Decimal] = mapped_column(Numeric(12, 6), nullable=False, default=0)
# 当前 in-flight 状态(原 runs 表合并入,DESIGN §7.4 简化 / 0004 migration):
# idle / running / cancelling / error;ok / cancelled 收尾直接回 idle,
# 只有 error 是持久终态(下次起新 run 时由 post_message 清掉)
run_status: Mapped[str] = mapped_column(Text, nullable=False, default="idle")
run_error: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False DateTime(timezone=True), server_default=func.now(), nullable=False
) )
@ -92,34 +97,3 @@ class Message(Base):
) )
class Run(Base):
__tablename__ = "runs"
run_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4)
task_id: Mapped[UUID] = mapped_column(
PG_UUID(as_uuid=True),
ForeignKey("tasks.task_id", ondelete="CASCADE"),
nullable=False,
)
status: Mapped[str] = mapped_column(Text, nullable=False, default="pending")
started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
error: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
tokens_p: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
tokens_c: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
class UsageEvent(Base):
"""append-only 审计。task_id / run_id 不 FK,task 硬删后审计仍存活(§7.4)。"""
__tablename__ = "usage_events"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
user_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), nullable=False)
task_id: Mapped[Optional[UUID]] = mapped_column(PG_UUID(as_uuid=True), nullable=True)
run_id: Mapped[Optional[UUID]] = mapped_column(PG_UUID(as_uuid=True), nullable=True)
kind: Mapped[str] = mapped_column(Text, nullable=False)
value: Mapped[Decimal] = mapped_column(Numeric(20, 8), nullable=False)
ts: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)

View File

@ -0,0 +1,78 @@
"""drop runs / usage_events tables + tasks 加 run_status / run_error。
Revision ID: 0004
Revises: 0003
Create Date: 2026-05-18
DESIGN §7.4 / §7.7 简化:
- usage_events 表纯死代码(从未写入 / 读取),"未来计费"预付的架构成本
- runs 表实质就是"task 当前 in-flight 状态"的影子表:tokens_p/c 写但从未被读
(tokens 真正走 tasks.tokens_prompt/_completion 累计版),started_at/finished_at
/error 也只写不读,run_id 只有 broker pub/sub cancel 参数两个用途 现在
cancel task-levelbroker task_id 索引,run_id 完全不需要
- tasks 加两列承接 run 当前状态:
`run_status text default 'idle'`(idle / running / cancelling / error)
`run_error text`(error 状态的错误文本,其他状态为 NULL)
`ok / cancelled` 收尾直接回 `idle`(不留"上次跑挂"的持久标记);只有 `error`
持久终态(下次起新 run 时清掉)
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "0004"
down_revision: Union[str, None] = "0003"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.drop_table("usage_events")
op.drop_table("runs")
op.add_column(
"tasks",
sa.Column(
"run_status", sa.Text(), nullable=False, server_default="idle"
),
)
op.add_column("tasks", sa.Column("run_error", sa.Text(), nullable=True))
def downgrade() -> None:
op.drop_column("tasks", "run_error")
op.drop_column("tasks", "run_status")
op.create_table(
"runs",
sa.Column("run_id", sa.dialects.postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column(
"task_id",
sa.dialects.postgresql.UUID(as_uuid=True),
sa.ForeignKey("tasks.task_id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("status", sa.Text(), nullable=False, server_default="pending"),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("error", sa.Text(), nullable=True),
sa.Column("tokens_p", sa.Integer(), nullable=False, server_default="0"),
sa.Column("tokens_c", sa.Integer(), nullable=False, server_default="0"),
)
op.create_table(
"usage_events",
sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
sa.Column("user_id", sa.dialects.postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("task_id", sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("run_id", sa.dialects.postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("kind", sa.Text(), nullable=False),
sa.Column("value", sa.Numeric(20, 8), nullable=False),
sa.Column(
"ts",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
)

View File

@ -35,7 +35,7 @@ from core.storage import (
check_no_subtask, check_no_subtask,
session_scope, session_scope,
) )
from core.storage.models import Message, Run, Task from core.storage.models import Message, Task
from core.storage.utils import ensure_local_task_row from core.storage.utils import ensure_local_task_row
from .auth import AuthConfig, ensure_user_row, make_require_user, mint_token from .auth import AuthConfig, ensure_user_row, make_require_user, mint_token
@ -99,6 +99,9 @@ def _task_dict(row: Any, *, n_messages: Optional[int] = None) -> dict:
"tokens_prompt": row.tokens_prompt or 0, "tokens_prompt": row.tokens_prompt or 0,
"tokens_completion": row.tokens_completion or 0, "tokens_completion": row.tokens_completion or 0,
"tokens": (row.tokens_prompt or 0) + (row.tokens_completion or 0), "tokens": (row.tokens_prompt or 0) + (row.tokens_completion or 0),
# 当前 run 状态(0004 schema 简化:原 runs 表合并入 task)
"run_status": row.run_status or "idle",
"run_error": row.run_error or None,
"created_at": _iso(getattr(row, "created_at", None)), "created_at": _iso(getattr(row, "created_at", None)),
"updated_at": _iso(getattr(row, "updated_at", None)), "updated_at": _iso(getattr(row, "updated_at", None)),
} }
@ -180,51 +183,48 @@ def _enumerate_files(root: Path, current: Path) -> tuple[list[dict], list[dict],
return entries, crumbs, exists return entries, crumbs, exists
# ─────────────────── Run 启动 + SSE 帧格式 ─────────────────── # ─────────────────── BG run + SSE 帧格式 ───────────────────
def _run_agent_bg(task_id: UUID, run_id: UUID, user_id: UUID, user_message: str) -> None: def _run_agent_bg(task_id: UUID, user_id: UUID, user_message: str) -> None:
"""工作线程:`build_agent(resume=True)` → 装 WebEventSink + cancel_check → `agent.run` → 写 runs 状态 """工作线程:`build_agent(resume=True)` → 装 WebEventSink + cancel_check → `agent.run` → 写 tasks.run_status
sink 通过 broker.emit 桥事件回 asyncio loop;agent.run sync,所以在 to_thread sink 通过 broker.emit 桥事件回 asyncio loop;agent.run sync,所以在 to_thread
user_id 必须从 JWT 那侧透传过来 决定 memory_block 读哪个 per-user 子树 user_id 必须从 JWT 那侧透传过来 决定 memory_block 读哪个 per-user 子树
cancel_check broker.is_cancelled,loop 在工具调用之间 poll(LLM 同步调用本身不可中断) cancel_check broker.is_cancelled,loop 在工具调用之间 poll(LLM 同步调用本身不可中断)
`ok / cancelled` 收尾直接回 `idle`(不留持久标记);只有 error 是持久终态
""" """
from main import build_agent, sync_task_tokens from main import build_agent, sync_task_tokens
try: try:
broker.emit(run_id, {"type": "run_start"}) broker.emit(task_id, {"type": "run_start"})
agent, session, sid, task_state, task_dir = build_agent( agent, session, sid, task_state, task_dir = build_agent(
session_id=str(task_id), resume=True, user_id=user_id, session_id=str(task_id), resume=True, user_id=user_id,
) )
agent.sink = WebEventSink(broker, run_id) agent.sink = WebEventSink(broker, task_id)
agent.cancel_check = lambda rid=run_id: broker.is_cancelled(rid) agent.cancel_check = lambda tid=task_id: broker.is_cancelled(tid)
agent.run(user_message) agent.run(user_message)
sync_task_tokens(task_state, agent.llm) sync_task_tokens(task_state, agent.llm)
# cancel 命中时 agent.run 提前 return + 已 emit `cancelled`;终态写 "cancelled" # cancel 命中或正常完成 → run_status 回 idle(error 才持久)
final_status = "cancelled" if broker.is_cancelled(run_id) else "ok"
with session_scope() as s: with session_scope() as s:
s.execute( s.execute(
update(Run).where(Run.run_id == run_id).values( update(Task).where(Task.task_id == task_id).values(
status=final_status, run_status="idle", run_error=None,
finished_at=func.now(),
tokens_p=agent.llm.token_counter.prompt_tokens,
tokens_c=agent.llm.token_counter.completion_tokens,
) )
) )
except Exception as e: except Exception as e:
err = f"{type(e).__name__}: {e}" err = f"{type(e).__name__}: {e}"
broker.emit(run_id, {"type": "error", "msg": err}) broker.emit(task_id, {"type": "error", "msg": err})
try: try:
with session_scope() as s: with session_scope() as s:
s.execute( s.execute(
update(Run).where(Run.run_id == run_id).values( update(Task).where(Task.task_id == task_id).values(
status="error", error=err, finished_at=func.now() run_status="error", run_error=err,
) )
) )
except Exception: except Exception:
pass # 已 emit error 给前端,DB 写失败不放大噪声 pass # 已 emit error 给前端,DB 写失败不放大噪声
finally: finally:
broker.clear_cancel(run_id) broker.clear_cancel(task_id)
broker.close(run_id) broker.close(task_id)
def _sse_event(event_type: str, payload: dict) -> bytes: def _sse_event(event_type: str, payload: dict) -> bytes:
@ -276,17 +276,16 @@ def create_app() -> FastAPI:
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
broker.bind_loop(asyncio.get_running_loop()) broker.bind_loop(asyncio.get_running_loop())
# Stale-run reaper:上次进程 crash 留下的 "running" / "cancelling" 已无 BG 线程 # Stale-run reaper:上次进程 crash 留下的 "running" / "cancelling" 已无 BG 线程
# 继续,启动时标 error,让对应 task 重新可发消息(否则 409 gate 永挂)。 # 继续,启动时标 error,让对应 task 重新可发消息(否则 gate 永挂)。
# TODO 真生产 multi-worker:换 heartbeat / lease,只 reap 自家 worker 的孤儿。 # TODO 真生产 multi-worker:换 heartbeat / lease,只 reap 自家 worker 的孤儿。
with session_scope() as s: with session_scope() as s:
result = s.execute( result = s.execute(
update(Run) update(Task)
.where(Run.status.in_(("running", "cancelling"))) .where(Task.run_status.in_(("running", "cancelling")))
.values( .values(
status="error", run_status="error",
error="server restarted before run finished", run_error="server restarted before run finished",
finished_at=func.now(),
) )
) )
if result.rowcount: if result.rowcount:
@ -633,11 +632,12 @@ def create_app() -> FastAPI:
body: MessageRequest, body: MessageRequest,
user_id: UUID = Depends(require_user), user_id: UUID = Depends(require_user),
): ):
"""发消息 + 起 BG run。返 `{run_id, events_url}`,客户端立刻订阅 SSE 拿流式。 """发消息 + 起 BG run。返 `{events_url}`,客户端立刻订阅 SSE 拿流式。
task 单活 run:`SELECT FOR UPDATE` task + 活跃 Run 检查,把所有权 / 单活 run:`SELECT FOR UPDATE` task + 活跃状态检查 + running,
活跃 / 插新 run 收进一个事务,挡住"用户连点 send 两条消息"导致两个 BG 线程 全收进一个事务挡住"用户连点 send 两条消息"导致两个 BG 线程争 `messages.idx`
`messages.idx`(UniqueConstraint race-crash)已有 running run 409 tasks.run_status in ('running','cancelling') 409;'error' 走起新 run 时清掉
( ok / cancelled 一样视为可重启)
""" """
try: try:
tid = UUID(task_id) tid = UUID(task_id)
@ -646,98 +646,89 @@ def create_app() -> FastAPI:
content = (body.content or "").strip() content = (body.content or "").strip()
if not content: if not content:
raise HTTPException(400, "empty content") raise HTTPException(400, "empty content")
run_id = uuid4()
with session_scope() as s: with session_scope() as s:
owned = s.execute( row = s.execute(
select(Task.task_id) select(Task.run_status)
.where(Task.task_id == tid, Task.user_id == user_id) .where(Task.task_id == tid, Task.user_id == user_id)
.with_for_update() .with_for_update()
).first() ).first()
if owned is None: if row is None:
raise HTTPException(404, f"task not found: {tid}") raise HTTPException(404, f"task not found: {tid}")
active = s.execute( if row.run_status in ("running", "cancelling"):
select(Run.run_id)
.where(Run.task_id == tid, Run.status.in_(("running", "cancelling")))
.limit(1)
).scalar_one_or_none()
if active is not None:
raise HTTPException( raise HTTPException(
409, 409,
f"task already has an active run ({active}); wait for it to finish or cancel", f"task already has an active run (status={row.run_status}); "
f"wait for it to finish or cancel",
) )
s.add(Run(run_id=run_id, task_id=tid, status="running", started_at=func.now())) s.execute(
update(Task).where(Task.task_id == tid).values(
run_status="running", run_error=None,
)
)
broker.start(tid) # 清上一轮 done 标记,新订阅者才能看到流式
# commit 后 lock 释放;BG 线程接管(sink 通过 broker 把 event 桥回 asyncio loop) # commit 后 lock 释放;BG 线程接管(sink 通过 broker 把 event 桥回 asyncio loop)
asyncio.create_task(asyncio.to_thread(_run_agent_bg, tid, run_id, user_id, content)) asyncio.create_task(asyncio.to_thread(_run_agent_bg, tid, user_id, content))
return { return {"events_url": f"/v1/tasks/{tid}/events"}
"run_id": str(run_id),
"events_url": f"/v1/tasks/{tid}/runs/{run_id}/events",
}
# ───────────── Run cancel ───────────── # ───────────── Cancel current run ─────────────
@app.post("/v1/tasks/{task_id}/runs/{run_id}/cancel", status_code=202, tags=["runs"]) @app.post("/v1/tasks/{task_id}/cancel", status_code=202, tags=["tasks"])
def cancel_run( def cancel_task(
task_id: str, task_id: str,
run_id: str,
user_id: UUID = Depends(require_user), user_id: UUID = Depends(require_user),
): ):
"""向当前 run 发协作式 cancel 信号。 """向当前 task 的活跃 run 发协作式 cancel 信号。
- 校验 task 归属 user + run 归属 task;否则 404 - 单活 run 形态下"取消当前活动"语义无歧义;客户端只需 task_id
- run.status 不是 `running` 409(已结束 / cancelling 不能重复 cancel) - 校验 task 归属 user;否则 404
- tasks.run_status 不是 `running` 409(idle / cancelling / error 都不能 cancel)
- `cancelling`(过渡态),BG 线程 loop 在工具调用之间 poll 看见即退; - `cancelling`(过渡态),BG 线程 loop 在工具调用之间 poll 看见即退;
退出后 finally 写终态 `cancelled`(或异常路径 `error`) 退出后 finally 写终态(正常idle,异常error)
- LLM 同步调用本身不可中断,最坏要等当前 LLM call 跑完(通常几十秒内) - LLM 同步调用本身不可中断,最坏要等当前 LLM call 跑完(通常几十秒内)
""" """
try: try:
tid = UUID(task_id) tid = UUID(task_id)
rid = UUID(run_id)
except ValueError: except ValueError:
raise HTTPException(404, "invalid id") raise HTTPException(404, f"invalid task id: {task_id!r}")
with session_scope() as s: with session_scope() as s:
run = s.execute( row = s.execute(
select(Run) select(Task.run_status)
.join(Task, Task.task_id == Run.task_id) .where(Task.task_id == tid, Task.user_id == user_id)
.where(
Run.run_id == rid,
Run.task_id == tid,
Task.user_id == user_id,
)
.with_for_update() .with_for_update()
).scalar_one_or_none() ).first()
if run is None: if row is None:
raise HTTPException(404, f"run not found: {rid}") raise HTTPException(404, f"task not found: {tid}")
if run.status != "running": if row.run_status != "running":
raise HTTPException( raise HTTPException(
409, 409,
f"run not running (status={run.status}); cannot cancel", f"task not running (run_status={row.run_status}); cannot cancel",
) )
s.execute( s.execute(
update(Run).where(Run.run_id == rid).values(status="cancelling") update(Task).where(Task.task_id == tid).values(run_status="cancelling")
) )
broker.request_cancel(rid) broker.request_cancel(tid)
return {"ok": True, "run_id": str(rid), "status": "cancelling"} return {"ok": True, "task_id": str(tid), "run_status": "cancelling"}
# ───────────── SSE events ───────────── # ───────────── SSE events ─────────────
@app.get("/v1/tasks/{task_id}/runs/{run_id}/events", tags=["runs"]) @app.get("/v1/tasks/{task_id}/events", tags=["tasks"])
async def stream_events( async def stream_events(
task_id: str, task_id: str,
run_id: str,
user_id: UUID = Depends(require_user), user_id: UUID = Depends(require_user),
): ):
"""SSE 流。事件类型:run_start / llm_start / text / tool_call / tool_result / """SSE 流。订阅当前 task 的活动 event(单活 run 形态下无歧义)。
llm_end / error / donedata JSON dict(已剔除 `type` 字段,移到 event ) 事件类型:run_start / llm_start / text / tool_call / tool_result /
llm_end / cancelled / error / donedata JSON dict(已剔除 `type` 字段,
移到 event )
""" """
try: try:
tid = UUID(task_id) tid = UUID(task_id)
rid = UUID(run_id)
except ValueError: except ValueError:
raise HTTPException(404, "invalid id") raise HTTPException(404, f"invalid task id: {task_id!r}")
with session_scope() as s: with session_scope() as s:
_assert_owns_task(s, tid, user_id) _assert_owns_task(s, tid, user_id)
async def gen(): async def gen():
q = broker.subscribe(rid) q = broker.subscribe(tid)
try: try:
# 第一帧 retry 注释 + 心跳:让 EventSource 立即建立(不被 buffer 卡) # 第一帧 retry 注释 + 心跳:让 EventSource 立即建立(不被 buffer 卡)
yield b": connected\nretry: 3000\n\n" yield b": connected\nretry: 3000\n\n"
@ -755,7 +746,7 @@ def create_app() -> FastAPI:
except asyncio.CancelledError: except asyncio.CancelledError:
pass # 客户端断开,静默退 pass # 客户端断开,静默退
finally: finally:
broker.unsubscribe(rid, q) broker.unsubscribe(tid, q)
return StreamingResponse( return StreamingResponse(
gen(), gen(),

View File

@ -1,20 +1,26 @@
"""RunBroker:in-process pub/sub,把 agent run 产生的 event fan-out 给所有 SSE 订阅者。 """RunBroker:in-process pub/sub,把 agent run 产生的 event fan-out 给所有 SSE 订阅者。
DESIGN §7 简化(0004 一并) 单活 run 形态下 run_id 是冗余的( task 同时
最多 1 个活 run);broker 内部全用 task_id 索引,客户端只需要 task_id 即可
订阅 / cancel,不再需要先拿 run_id
设计: 设计:
- emit() 从工作线程调(agent.run to_thread ), loop.call_soon_threadsafe - emit() 从工作线程调(agent.run to_thread ), loop.call_soon_threadsafe
桥到 asyncio queue;SSE generator await queue.get() 拉出来推流 桥到 asyncio queue;SSE generator await queue.get() 拉出来推流
- 同一 run_id 多个订阅者(刷新页面 / tab / 桌面+移动) 每个订阅 1 个独立 queue - 同一 task 多个订阅者(刷新页面 / tab / 桌面+移动) 每个订阅 1 个独立 queue
- run 结束 broker.close(run_id) 给所有订阅者派一条 done;新订阅者( done 后到的) - run 结束 broker.close(task_id) 给所有订阅者派一条 done;新订阅者( done
立即收到 done 并断流(不漏不挂) 后到的)立即收到 done 并断流(不漏不挂)
- task 起新 run broker.start(task_id) 清掉 _done 标记;否则上一轮 done
会让新订阅者立刻断流看不到流式
- 进程内单实例 / 多进程不共享 个人 SaaS worker 够用;真要扩多 worker 再上 Redis - 进程内单实例 / 多进程不共享 个人 SaaS worker 够用;真要扩多 worker 再上 Redis
- 不持久化 event messages 已落 PG,刷新页面走 G3 静态视图能看历史;真要"刷新继续看 - 不持久化 event messages 已落 PG,刷新页面走 GET /v1/tasks/{id}/messages 看历史;
实时流"未来加 event log 表 + backfill 真要"刷新继续看实时流"未来加 event log + backfill
线程模型: 线程模型:
- broker.bind_loop(loop) FastAPI startup 调一次,记录 asyncio loop 引用 - broker.bind_loop(loop) FastAPI startup 调一次,记录 asyncio loop 引用
- emit() 调用方可能在任意线程;put_nowait thread-unsafe(asyncio.Queue 设计前提 - emit() 调用方可能在任意线程;put_nowait thread-unsafe(asyncio.Queue 设计前提
是单 loop),所以走 call_soon_threadsafe 跨回 loop 线程再 put 是单 loop),所以走 call_soon_threadsafe 跨回 loop 线程再 put
- subscribe / unsubscribe / close 也都用 call_soon_threadsafe ,避免 race - subscribe / unsubscribe / close / start 也都用 call_soon_threadsafe ,避免 race
(实测 SSE generator finally unsubscribe,这个就在 loop 线程,直接调也行) (实测 SSE generator finally unsubscribe,这个就在 loop 线程,直接调也行)
""" """
from __future__ import annotations from __future__ import annotations
@ -29,9 +35,9 @@ from uuid import UUID
class RunBroker: class RunBroker:
def __init__(self) -> None: def __init__(self) -> None:
self._subs: dict[UUID, set[asyncio.Queue]] = defaultdict(set) self._subs: dict[UUID, set[asyncio.Queue]] = defaultdict(set)
# 已经发完 done 的 run — 后来订阅者直接收到 done,避免无限等 # 已经发完 done 的 task — 后来订阅者直接收到 done,避免无限等
self._done: set[UUID] = set() self._done: set[UUID] = set()
# cancel signal per-run。AgentLoop 在 BG 线程里 poll is_cancelled() 决定是否退; # cancel signal per-task。AgentLoop 在 BG 线程里 poll is_cancelled() 决定是否退;
# request_cancel 可在 BG 还没 register 时调用(setdefault),BG 启动后第一次 # request_cancel 可在 BG 还没 register 时调用(setdefault),BG 启动后第一次
# check 即看到。run 完成在 finally 里 clear_cancel 回收。 # check 即看到。run 完成在 finally 里 clear_cancel 回收。
self._cancel_flags: dict[UUID, threading.Event] = {} self._cancel_flags: dict[UUID, threading.Event] = {}
@ -41,25 +47,31 @@ class RunBroker:
"""FastAPI startup 调一次。""" """FastAPI startup 调一次。"""
self._loop = loop self._loop = loop
def subscribe(self, run_id: UUID) -> asyncio.Queue: def start(self, task_id: UUID) -> None:
"""订阅 run 的 event 流。已 done 的 run 立刻在 queue 放一条 done。 """同 task 起新 run 时调:清 _done 标记,让新订阅者能看到流式。
cancel flag finally clear_cancel ,这里不动(避免擦掉刚刚 request_cancel 的请求)
"""
self._done.discard(task_id)
def subscribe(self, task_id: UUID) -> asyncio.Queue:
"""订阅 task 当前 run 的 event 流。已 done 的 task 立刻在 queue 放一条 done。
调用方:SSE handler( asyncio loop 线程内) 调用方:SSE handler( asyncio loop 线程内)
""" """
q: asyncio.Queue = asyncio.Queue() q: asyncio.Queue = asyncio.Queue()
if run_id in self._done: if task_id in self._done:
q.put_nowait({"type": "done"}) q.put_nowait({"type": "done"})
else: else:
self._subs[run_id].add(q) self._subs[task_id].add(q)
return q return q
def unsubscribe(self, run_id: UUID, q: asyncio.Queue) -> None: def unsubscribe(self, task_id: UUID, q: asyncio.Queue) -> None:
"""SSE generator finally 清理。""" """SSE generator finally 清理。"""
self._subs.get(run_id, set()).discard(q) self._subs.get(task_id, set()).discard(q)
if run_id in self._subs and not self._subs[run_id]: if task_id in self._subs and not self._subs[task_id]:
del self._subs[run_id] del self._subs[task_id]
def emit(self, run_id: UUID, event: dict[str, Any]) -> None: def emit(self, task_id: UUID, event: dict[str, Any]) -> None:
"""从工作线程调:把 event 推给所有订阅者。 """从工作线程调:把 event 推给所有订阅者。
如果没人订阅(run 在跑但没浏览器连上),event 丢弃 这是设计选择 如果没人订阅(run 在跑但没浏览器连上),event 丢弃 这是设计选择
@ -68,41 +80,41 @@ class RunBroker:
loop = self._loop loop = self._loop
if loop is None: if loop is None:
return # 还没 bind,丢弃(测试 / 启动竞态) return # 还没 bind,丢弃(测试 / 启动竞态)
for q in list(self._subs.get(run_id, [])): for q in list(self._subs.get(task_id, [])):
loop.call_soon_threadsafe(q.put_nowait, event) loop.call_soon_threadsafe(q.put_nowait, event)
def close(self, run_id: UUID) -> None: def close(self, task_id: UUID) -> None:
"""run 结束:派 done 给所有订阅者,标记 run_id 为已完成。 """run 结束:派 done 给所有订阅者,标记 task 为已完成。
从工作线程调(agent.run 完成 / 抛异常 finally 清理) 从工作线程调(agent.run 完成 / 抛异常 finally 清理)
""" """
self.emit(run_id, {"type": "done"}) self.emit(task_id, {"type": "done"})
self._done.add(run_id) self._done.add(task_id)
# subs 不在这里立即删 — SSE generator 会先收到 done、yield 它、走到 # subs 不在这里立即删 — SSE generator 会先收到 done、yield 它、走到
# finally unsubscribe;此处 emit 后立即删会让那次 emit 之后的清理无的放矢。 # finally unsubscribe;此处 emit 后立即删会让那次 emit 之后的清理无的放矢。
def n_subscribers(self, run_id: UUID) -> int: def n_subscribers(self, task_id: UUID) -> int:
"""供测试 / 监控用。""" """供测试 / 监控用。"""
return len(self._subs.get(run_id, set())) return len(self._subs.get(task_id, set()))
def is_done(self, run_id: UUID) -> bool: def is_done(self, task_id: UUID) -> bool:
return run_id in self._done return task_id in self._done
# ─────────────── cancel signaling ─────────────── # ─────────────── cancel signaling ───────────────
def request_cancel(self, run_id: UUID) -> None: def request_cancel(self, task_id: UUID) -> None:
"""主线程(HTTP handler)发的 cancel 信号 — BG 线程 poll is_cancelled() 看见即退。 """主线程(HTTP handler)发的 cancel 信号 — BG 线程 poll is_cancelled() 看见即退。
setdefault:即便 BG 还没注册 flag 也能 set,BG 启动后第一次 check 立刻看见 setdefault:即便 BG 还没注册 flag 也能 set,BG 启动后第一次 check 立刻看见
""" """
self._cancel_flags.setdefault(run_id, threading.Event()).set() self._cancel_flags.setdefault(task_id, threading.Event()).set()
def is_cancelled(self, run_id: UUID) -> bool: def is_cancelled(self, task_id: UUID) -> bool:
ev = self._cancel_flags.get(run_id) ev = self._cancel_flags.get(task_id)
return bool(ev and ev.is_set()) return bool(ev and ev.is_set())
def clear_cancel(self, run_id: UUID) -> None: def clear_cancel(self, task_id: UUID) -> None:
"""run 真正退出(BG finally)清掉 flag,避免 dict 无限增长。""" """run 真正退出(BG finally)清掉 flag,避免 dict 无限增长。"""
self._cancel_flags.pop(run_id, None) self._cancel_flags.pop(task_id, None)
# 进程内单例 — FastAPI lifespan 里 bind_loop;agent / sink / SSE handler 共享。 # 进程内单例 — FastAPI lifespan 里 bind_loop;agent / sink / SSE handler 共享。

View File

@ -1,7 +1,8 @@
"""WebEventSink:实现 §7 A 的 sink 协议,把 AgentLoop.emit 桥到 RunBroker。 """WebEventSink:实现 §7 A 的 sink 协议,把 AgentLoop.emit 桥到 RunBroker。
每次 run 一个 sink 实例,绑死 run_id`emit({type, ...})` 直接转 broker.emit(run_id, event) task_id(0004 简化:run_id 已合并入 task 当前状态)`emit({type, ...})` 直接转
sink 实例由 web 层在启 run 时创建,传进 AgentLoop;loop 完全不知 web 存在(§5 Less Scaffolding) broker.emit(task_id, event)sink 实例由 web 层在启 run 时创建,传进 AgentLoop;
loop 完全不知 web 存在(§5 Less Scaffolding)
""" """
from __future__ import annotations from __future__ import annotations
@ -12,9 +13,9 @@ from .broker import RunBroker
class WebEventSink: class WebEventSink:
def __init__(self, broker: RunBroker, run_id: UUID) -> None: def __init__(self, broker: RunBroker, task_id: UUID) -> None:
self._broker = broker self._broker = broker
self._run_id = run_id self._task_id = task_id
def emit(self, event: dict[str, Any]) -> None: def emit(self, event: dict[str, Any]) -> None:
self._broker.emit(self._run_id, event) self._broker.emit(self._task_id, event)

View File

@ -367,7 +367,7 @@ const state = {
taskMeta: null, taskMeta: null,
filesPath: "", filesPath: "",
evtSrc: null, evtSrc: null,
currentRunId: null, // 当前流式中的 run_id;用于 stop 按钮发 cancel streaming: false, // 当前是否在流式中;true 时显示 stop 按钮
// task list 分页 + 筛选 // task list 分页 + 筛选
taskPage: 1, taskPage: 1,
taskPageSize: 20, taskPageSize: 20,
@ -729,7 +729,7 @@ async function sendMessage() {
const r = await api("POST", `/v1/tasks/${state.taskId}/messages`, { content }); const r = await api("POST", `/v1/tasks/${state.taskId}/messages`, { content });
$("chat-input").value = ""; $("chat-input").value = "";
state.currentRunId = r.run_id; state.streaming = true;
$("chat-cancel").style.display = ""; $("chat-cancel").style.display = "";
streamSse(r.events_url, asstCard); streamSse(r.events_url, asstCard);
} catch (e) { } catch (e) {
@ -740,14 +740,14 @@ async function sendMessage() {
} }
} }
async function cancelCurrentRun() { async function cancelCurrentTask() {
if (!state.taskId || !state.currentRunId) return; if (!state.taskId || !state.streaming) return;
const btn = $("chat-cancel"); const btn = $("chat-cancel");
btn.disabled = true; btn.disabled = true;
$("chat-hint").textContent = "cancelling…"; $("chat-hint").textContent = "cancelling…";
try { try {
await api("POST", `/v1/tasks/${state.taskId}/runs/${state.currentRunId}/cancel`); await api("POST", `/v1/tasks/${state.taskId}/cancel`);
// 不重置 state.currentRunId / 按钮 — 等 SSE 的 cancelled / done 走完一并清 // 不重置 streaming / 按钮 — 等 SSE 的 cancelled / done 走完一并清
} catch (e) { } catch (e) {
if (e.status === 401) { logout(); return; } if (e.status === 401) { logout(); return; }
// 409 = 已结束 / 已 cancelling,不算错;其他贴 toast // 409 = 已结束 / 已 cancelling,不算错;其他贴 toast
@ -757,7 +757,7 @@ async function cancelCurrentRun() {
} }
} }
$("chat-cancel").addEventListener("click", cancelCurrentRun); $("chat-cancel").addEventListener("click", cancelCurrentTask);
function streamSse(url, asstCard) { function streamSse(url, asstCard) {
// EventSource 不支持自定义 header,token 走 query string(?token=...) // EventSource 不支持自定义 header,token 走 query string(?token=...)
@ -800,7 +800,7 @@ async function fetchSse(url, asstCard) {
body.classList.remove("streaming"); body.classList.remove("streaming");
$("chat-send").disabled = false; $("chat-send").disabled = false;
$("chat-hint").textContent = "ready"; $("chat-hint").textContent = "ready";
state.currentRunId = null; state.streaming = false;
const cb = $("chat-cancel"); const cb = $("chat-cancel");
cb.style.display = "none"; cb.style.display = "none";
cb.disabled = false; cb.disabled = false;