From 2e519ab8a66daeeaa7de63777580266b6a7b4dd6 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Mon, 18 May 2026 11:05:35 +0800 Subject: [PATCH] =?UTF-8?q?core(0004):=20=E5=88=A0=20runs=20/=20usage=5Fev?= =?UTF-8?q?ents=20=E8=A1=A8=20+=20cancel/SSE=20=E6=94=B9=20task-level?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit usage_events 全代码库零引用,纯死代码;runs 表实质就是"task 当前 in-flight 状态" 影子表,tokens_p/c 写但从未被读,run_id 唯二实用是 broker 频道键 + cancel 参数 — 单活 run 形态下完全冗余,客户端只需 task_id。按"开发期不写兼容层"心智一把切干净。 - alembic 0004:DROP runs / usage_events,tasks 加 run_status (idle/running/cancelling/error) + run_error 两列;ok/cancelled 终态都回 idle 不留持久标记,只有 error 持久 - ORM 删 Run / UsageEvent class - Broker 全 task_id 索引,加 start(tid) 在新 run 前清 _done - /v1/tasks/{tid}/runs/{rid}/{events,cancel} → /v1/tasks/{tid}/{events,cancel} - POST /messages 返 {events_url} 去掉 run_id - dev SPA: state.currentRunId → state.streaming(bool),路径去掉 /runs/{rid}/ - Smoke 18 case 全绿 Co-Authored-By: Claude Opus 4.7 (1M context) --- DESIGN.md | 47 +++--- PROGRESS.md | 17 +- RUN.md | 14 +- core/storage/__init__.py | 2 +- core/storage/models.py | 42 +---- ...260518_1200_0004_drop_runs_usage_events.py | 78 +++++++++ web/app.py | 153 +++++++++--------- web/broker.py | 76 +++++---- web/sinks.py | 11 +- web/static/dev.html | 16 +- 10 files changed, 264 insertions(+), 192 deletions(-) create mode 100644 db/migrations/versions/20260518_1200_0004_drop_runs_usage_events.py diff --git a/DESIGN.md b/DESIGN.md index f2be467..6a2511f 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -249,19 +249,21 @@ Tasks (同 working_dir 多 task 共享,文件由用户经 /files/delete 单独清) GET /v1/folders 列当前 user 的 working_dir(FS 是 source of truth + 关联 task 计数 + 最后使用时间) GET /v1/tasks/{id}/messages 历史(后续 ?search= 走 jsonb GIN / tsvector) - POST /v1/tasks/{id}/messages {content} 发消息 + 起 run,返 {run_id} - **同 task 单活 run**:已有 running → 409 - (`SELECT … FOR UPDATE` 锁 task 行,序列化并发 - POST 防 `messages.idx` UniqueConstraint race; - UI 应 disable send 按钮直到 SSE `done`) - GET /v1/tasks/{id}/runs/{rid}/events SSE 流(见下) - POST /v1/tasks/{id}/runs/{rid}/cancel 协作式 cancel(202):标 `cancelling` + 信号 + POST /v1/tasks/{id}/messages {content} 发消息 + 起 run,返 {events_url} + **单活 run**(0004 简化):tasks.run_status in + ('running','cancelling') → 409;'error' 起新 run + 时清(跟 ok 一样视为可重启)。`SELECT … FOR UPDATE` + 锁 task 行,序列化并发 POST 防 `messages.idx` race + GET /v1/tasks/{id}/events SSE 流(见下)— 订阅 task 当前活动事件, + 单活 run 形态下无歧义,客户端只需 task_id + POST /v1/tasks/{id}/cancel 协作式 cancel(202):标 `cancelling` + 信号 broker;BG loop 在工具调用之间 poll 看见即退, 给未执行 tool_call 补 `[cancelled by user]` tool result(保 LiteLLM 协议),emit `cancelled` - 事件;finally 写终态 `cancelled`(异常 `error`)。 - LLM 同步 call 本身不可中断 — 最坏等当前一轮跑完。 - run.status != `running` → 409(已结束 / 已 cancelling) + 事件;finally 写终态 — 正常 / cancel 都回 `idle` + (不留持久标记),异常才写 `error`。 + run_status != `running` → 409。 + LLM 同步 call 本身不可中断 — 最坏等当前一轮跑完 Files(user-rooted,不绑 task — `workspace/users//` 为根) GET /v1/files?path= 列子目录 {entries, crumbs, exists, root, current};留空 → user_root; @@ -283,7 +285,7 @@ Misc ``` run_start {} llm_start {} -text {"content":""} +text {"delta":""} tool_call {"name":"...","args":{...},"args_preview":"..."} tool_result {"name":"...","preview":"...","truncated":bool} # 完整 result 走 DB,SSE 只送预览给 UI llm_end {"prompt_tokens":N,"completion_tokens":N} @@ -314,6 +316,8 @@ users(user_id uuid pk, email null, password_hash | oidc_subject null, plan null, tasks(task_id uuid pk, user_id fk, name text not null, working_dir text not null, skill, description, status, model_profile, tokens_prompt, tokens_completion, cost_usd, + run_status text not null default 'idle', -- idle/running/cancelling/error(0004 合并 runs 表) + run_error text null, -- error 状态的错误文本,其他状态 NULL created_at, updated_at); create index on tasks (user_id, working_dir); -- working_dir 存储约定:本地 ROOT 内 → 相对 ROOT 的 posix 串 @@ -328,12 +332,13 @@ messages(message_id uuid pk, task_id fk, idx int not null, unique (task_id, idx)); create index on messages using gin (payload jsonb_path_ops); -- 全文搜按需加 tsvector + GIN(中文 simple + pg_trgm 起步) - -runs(run_id pk, task_id fk, status, started_at, finished_at, error, tokens_p, tokens_c) -usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts) --- append-only。task_id/run_id 不 FK,task 硬删后审计仍存活 ``` +**0004 简化:删 `runs` / `usage_events` 表**(从未真用过 — 详 §7.9 取舍)。原 `runs` 表 +角色等价于"task 当前 in-flight 状态",合并到 `tasks.run_status` + `tasks.run_error` 两列; +`usage_events` 是为未来计费预付的架构成本,真要计费再加,DB schema 改动便宜。`run_id` +取消 —— 单活 run 形态下它对客户端 / broker / cancel 全是冗余字段。 + **No-subtask 校验**(`create_task`):查同 user 下是否存在 `new LIKE existing/%` 或 `existing LIKE new/%`,中一则拒;同 task_dir 允许。**两侧先用 `from_db_path` 归一到 absolute posix 再比前缀**(混合存储形态 [相对+绝对] 不会漏判),数量小直接 Python 端比对,不在 SQL 里拼分隔符。 **Folder rename**(`old → new`,FS rename 成功后):`UPDATE tasks SET task_dir = new || substring(task_dir from len(old)+1) WHERE user_id=? AND (task_dir = old OR task_dir LIKE old||'/%')`。**用 `old/%` 而非 `old%`**,避免 `project_a` 误中 `project_a_other`。running task 引用时禁 rename / delete。 @@ -410,8 +415,8 @@ usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts) | 误删 folder | 二确认 + 输入 folder 名;真要再加 trash bin | | DB-then-FS 中断留孤儿目录 | 后台 GC 周期扫"FS 有但 DB 无引用" | | 同 folder 多 task 并发写同名 | 文件级悲观锁,冲突早失败 | -| 同 task 并发 POST messages 撞 `messages.idx` UniqueConstraint | `POST /v1/tasks/{id}/messages` 单活 run 检查:`SELECT … FOR UPDATE` 锁 task 行 + 查 `runs.status in ('running','cancelling')`,有 → 409;同事务插新 Run 行避 TOCTOU。配启动 lifespan reaper 把孤儿 `running`/`cancelling` 全标 error(进程 crash 残留)。未来真生产 multi-worker 换 heartbeat / lease | -| Run 跑太久 / 用户想中断 | `POST /v1/tasks/{id}/runs/{rid}/cancel` 协作式 cancel:标 `cancelling` + broker 信号;`AgentLoop.cancel_check` 回调在每轮 LLM 前、tool_calls 之间 poll;命中给未执行 tool_call 补 `[cancelled by user]` tool result 保 LiteLLM 协议,emit `cancelled` 事件,BG finally 写终态 `cancelled`。LLM 同步 call 本身不可中断 — 接受最坏等当前一轮跑完(几十秒内) | +| 同 task 并发 POST messages 撞 `messages.idx` UniqueConstraint | `POST /v1/tasks/{id}/messages` 单活 run 检查:`SELECT … FOR UPDATE` 锁 task 行 + 查 `tasks.run_status in ('running','cancelling')`,有 → 409;同事务标 running 避 TOCTOU。配启动 lifespan reaper 把孤儿 `running`/`cancelling` 全标 error(进程 crash 残留)。未来真生产 multi-worker 换 heartbeat / lease | +| Run 跑太久 / 用户想中断 | `POST /v1/tasks/{id}/cancel` 协作式 cancel:标 `cancelling` + broker 信号;`AgentLoop.cancel_check` 回调在每轮 LLM 前、tool_calls 之间 poll;命中给未执行 tool_call 补 `[cancelled by user]` tool result 保 LiteLLM 协议,emit `cancelled` 事件,BG finally `run_status` 回 `idle`(不留持久标记)。LLM 同步 call 本身不可中断 — 接受最坏等当前一轮跑完(几十秒内) | | Sandbox 出站越权 | egress allowlist 起步只放 LLM + PyPI | | 资源滥用 | BYO key 默认;月度配额;cold task LRU 清 | @@ -423,7 +428,13 @@ usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts) **skill 产物全落 cwd 不引入 artifacts 表**:中间件是用户花 token 生成的资产,可下载可替换;artifacts 表是为不确定 UX 收益预付架构成本。真嫌乱 UI 加折叠视图。 -**hard cascade 而非 soft orphan**:`orphaned` 让 list / resume / UI 都多一种特殊 case,"删 folder = 删项目"比"留对话残骸"自然。`usage_events` append-only 不 FK,task 硬删后月账仍存活。 +**hard cascade 而非 soft orphan**:`orphaned` 让 list / resume / UI 都多一种特殊 case,"删 folder = 删项目"比"留对话残骸"自然。(原 `usage_events` 表 0004 删 — 见下条) + +**0004 删 `runs` + `usage_events` 表**(2026-05-18 决策): +- **`runs`**:实质是"task 当前 in-flight 状态"的影子表 —— `tokens_p/c` 写但从未被读(tokens 累计走 `tasks.tokens_prompt/_completion`),`started_at/finished_at/error` 也只写不读,`run_id` 唯二实用是 broker pub/sub 频道键 + cancel 参数。但 §7.1 选定单活 run 形态下 `run_id` 是冗余 —— 同 task 同时最多 1 个活 run,客户端只需要 task_id(永远有)就够。合并 `run_status` + `run_error` 两列入 `tasks`,删表;broker 改 task_id 索引;`/v1/tasks/{id}/runs/{rid}/{events,cancel}` 改 `/v1/tasks/{id}/{events,cancel}`。 +- **`usage_events`**:从未真写入(代码库零引用),纯死代码,为"未来计费"预付的架构成本。真要计费时改 DB schema 便宜,不预付。 +- **取舍代价**:失"历史 run 元数据"(每次 LLM 调用的独立时间戳 / token 切片)—— messages 表已记下每次对话产物,token 累计在 tasks,真要细粒度审计再补回 `usage_events`(届时是新需求,不是技术债)。 +- **`run_status` 终态语义**:`ok` / `cancelled` 收尾直接回 `idle`(用户视角"跑完了 / 停了"等价),只有 `error` 是持久终态(让用户能看到),起新 run 时由 `post_message` 清掉。 **本地也用 PG,不用 SQLite / JSON**:① dogfood ≡ 真实用户路径,bug 在 dogfood 就能复现;② Docker 已是必然依赖(§7.5),`docker compose up postgres` 零增量门槛;③ 双 adapter 维护税远高于 PG 一次性配置成本;④ 本地 dev 也能连远端测试服。 diff --git a/PROGRESS.md b/PROGRESS.md index c133d83..1906664 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。 -最后更新:2026-05-18(`POST /v1/tasks/{id}/runs/{rid}/cancel` 协作式 cancel + `cancelled` SSE 事件 + dev SPA stop 按钮;gate 扩到 `cancelling`) +最后更新:2026-05-18(0004 大瘦身:删 runs / usage_events 表,run_status / run_error 合入 tasks;cancel / SSE 路由从 run_id 维度改 task 维度;broker 全 task_id 索引) --- @@ -15,12 +15,13 @@ | 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 | | 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 | | 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill | -| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**D `/v1` JSON API 完工 ✅**(原 Phase G Jinja2/HTMX UI 撤,改 platform 端实现);**D' 过渡 auth(PLATFORM_KEY → JWT)+ dev SPA ✅**;**同 task 单活 run 锁 ✅**;**run cancel + dev SPA stop 按钮 ✅**;真 OIDC 待;C(Executor)待;E(CLI 双模式)待。 | +| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**D `/v1` JSON API 完工 ✅**(原 Phase G Jinja2/HTMX UI 撤,改 platform 端实现);**D' 过渡 auth(PLATFORM_KEY → JWT)+ dev SPA ✅**;**同 task 单活 run 锁 ✅**;**task-level cancel + dev SPA stop 按钮 ✅**;**0004 schema 瘦身 ✅**(删 runs/usage_events);真 OIDC 待;C(Executor)待;E(CLI 双模式)待。 | --- ## 已完成关键能力 +- **05-18 / 0004 schema 大瘦身:删 runs / usage_events 表,run_status / run_error 合入 tasks;路由从 run_id 维度改 task 维度**:用户复盘"为什么 cancel 接口要带 run_id?现在不是一个 task 一个 run 吗",顺手把 runs / usage_events 表也重新审视 — `usage_events` 全代码库零引用、零写入、零读取,纯死代码(为未来计费预付的架构成本);`runs` 表 `tokens_p/c` 写但从未被读(tokens 累计走 tasks 列),`started_at / finished_at / error` 也只写不读,`run_id` 唯二实用是 broker 频道键 + cancel 参数 — 但 §7.1 已选定**单活 run** 形态,同 task 同时最多 1 个活 run,客户端只需要 task_id(永远有)就够,run_id 完全冗余。按"开发期不写兼容层"心智一把切干净。**alembic 0004**:`DROP TABLE usage_events / runs`,`tasks` 加 `run_status text not null default 'idle'`(idle / running / cancelling / error)+ `run_error text null`。**ORM** `models.py` 删 `Run` / `UsageEvent` 两 class + 删 `BigInteger` import;`Task` 加两列;`storage/__init__.py` 文档示例同步;`Task.run_status` 终态语义:`ok / cancelled` 收尾都回 `idle`(用户视角"跑完 / 停了"等价不留持久标记),只有 `error` 是持久终态,起新 run 时清。**Broker**(`web/broker.py`)全面 task_id 索引:`_subs / _done / _cancel_flags` 三个 dict key 从 run_id 换 task_id;加 `start(task_id)` 入口在新 run 起来前清 `_done` 标记(避免上一轮 done 让新订阅者立刻断流)。**Sink**(`web/sinks.py`)绑 task_id 替代 run_id。**`web/app.py`**:① `_run_agent_bg(task_id, user_id, content)` 去掉 run_id 参数;装 `agent.cancel_check = lambda tid=task_id: broker.is_cancelled(tid)`;终态写 `tasks.run_status = "idle"`(原 `Run.status = "ok"/"cancelled"`)或 `"error"`(`run_error = err`);finally `broker.clear_cancel(tid) + broker.close(tid)`。② `POST /v1/tasks/{tid}/messages` 改:`SELECT Task.run_status … FOR UPDATE` 替代 `select(Run.run_id) … running/cancelling`;同事务 `UPDATE Task SET run_status='running', run_error=NULL`(error 也算可重启视为清);commit 后 `broker.start(tid)` 清 done;返 `{"events_url": "/v1/tasks/{tid}/events"}` 去掉 `run_id`。③ `POST /v1/tasks/{tid}/cancel` 取代 `POST /v1/tasks/{tid}/runs/{rid}/cancel`,只校验 task 归属 user;`run_status != 'running'` → 409。④ `GET /v1/tasks/{tid}/events` 取代 `/runs/{rid}/events`,broker.subscribe(tid)。⑤ lifespan reaper `UPDATE Task SET run_status='error' WHERE run_status IN ('running','cancelling')`,文案不变。⑥ `_task_dict` 暴露 `run_status` / `run_error` 字段给前端。**dev SPA**(`web/static/dev.html`):`state.currentRunId` 改 `state.streaming` bool;`POST /messages` 拿到 `events_url` 直接订阅,不再保存 run_id;cancel 按钮 click → `POST /v1/tasks/{tid}/cancel`(去掉 `/runs/{rid}/`)。**Migration 跑通**:本地 PG `db upgrade 0003 → 0004 (head)` 一把过(用户授权清旧数据,无 backfill)。**Smoke 18 case 全绿**(in-process TestClient + BG mock):POST /messages 返 `events_url` 无 run_id / tasks.run_status='running' / gate when running 409 / POST /cancel 202 + run_status='cancelling' + broker flag set / double cancel 409(状态非 running)/ gate during cancelling 也 409 / cancel idle 409 / cancel error 409 / error 状态可发新消息(error 不挂 gate + 清 run_error) / ghost task 404 / invalid UUID 404 / cross-user 404 / no auth 401 / GET /events 路由注册(SSE 流式跑会挂 30s 心跳,smoke 只验路径 + headers) / GET /tasks 返回 run_status / run_error 字段 / stale reaper 扫 running+cancelling 标 error / broker.start 清 _done / broker.subscribe + emit + close + late subscriber 立刻收 done / broker.request_cancel + is_cancelled + clear_cancel。**净增量**:核心代码 -200 行(删表 ORM + 两路由层简化),broker 加 21 行 start/cancel API,dev.html 几行字段重命名;DB 表 5 → 3,路由 `/runs/{rid}/{events,cancel}` → `/{events,cancel}`,前端 SPA 不再需要先拿 run_id 才能 cancel / 订阅 — 客户端只需 task_id。**文档同步**:DESIGN §7.2 路由表 messages 路由返 `events_url`(去 `run_id`)+ cancel / events 改 task-level + lead-in 注 0004 简化 + SSE schema text event 字段 `delta`(实际就是 delta,文档原 `content` 笔误);§7.4 schema 块 tasks 加两列 + 注 0004 合并;§7.9 hard cascade 行注 "原 usage_events 0004 删" + 加专项取舍说明"0004 删 runs + usage_events 表";§7.7 风险表两行同步 / 改 task-level 路由名;RUN 路由表三路由全改 + 故障兜底 cancel 409 文案改 + db upgrade head 改 0004;PROGRESS 已完成 + 状态表 + 文件清单。 - **05-18 / cancel run endpoint + AgentLoop 协作式 cancel + dev SPA stop 按钮**:用户反馈"等待回复或 LLM 操作时没有停止接口"。落地 DESIGN §7.2 原标"待"的 `POST /v1/tasks/{id}/runs/{rid}/cancel`。**Broker**(`web/broker.py`):加 `request_cancel(rid)` / `is_cancelled(rid)` / `clear_cancel(rid)` 三方法,内部 `dict[UUID, threading.Event]` per-run;`setdefault` 保证 BG 还没 register 也能 set。**Loop**(`core/loop.py`):`AgentLoop` 加 `cancel_check: Optional[Callable[[], bool]]` 字段(CLI 路径不传 = None 永不 cancel),`_is_cancelled()` helper + `_fill_cancelled_tool_results(remaining)` 给未执行的 tool_call 全部 append `[cancelled by user]` tool message —— LiteLLM 协议要求每个 assistant tool_call 必须有匹配 tool result,否则 resume 时 LLM 报错。Check 点:每轮 LLM 前 + tool_calls 之间。命中 emit `cancelled` event + return `[cancelled]`。**LLM 同步 call 本身不可中断**(litellm 同步阻塞,无原生 cancel)—— 接受最坏等当前一轮跑完(通常几十秒),注释里讲清楚。**Endpoint**(`web/app.py::cancel_run`):校验 task 归属 user + run 归属 task(else 404),`run.status` 必须是 `running`(else 409 含具体 status);标 `cancelling`(过渡态)+ `broker.request_cancel(rid)`;202。`_run_agent_bg` 装配时 `agent.cancel_check = lambda rid=run_id: broker.is_cancelled(rid)`,run 完时判 `broker.is_cancelled` 写终态 `cancelled` vs `ok`;finally `broker.clear_cancel + broker.close`。**Gate 同步扩**:`post_message` 单活 run 检查从 `status == 'running'` 改 `status in ('running', 'cancelling')`,确保 cancel 后旧 BG 还没退出时新 POST 仍 409(避免新旧 run 撞 messages.idx)。**Reaper 同步扩**:lifespan 启动也扫 `cancelling`(进程 crash 时 BG 来不及写终态 cancelled,反正没线程在跑就清掉)。**dev SPA**(`web/static/dev.html`):chat 表单加 ``(常态 hidden);state 加 `currentRunId`;sendMessage 拿到 run_id 后 show stop,fetchSse `try/finally` 收尾时一并 hide stop / 清 currentRunId / 复原 send button(确保 SSE 失败路径 UI 也 reset 不卡死)。cancel 按钮 click → `POST /runs/{rid}/cancel`;409 静默忽略(并发 done 不算错)。`handleSseEvent` 加 `cancelled` case → 在当前 assistant 卡贴一个虚线红框 "已停止(stopped by user)" badge。CSS 加 `.cancelled-badge`。**Smoke 15 case 全绿**:HTTP 层 11 case(cancel happy + 双 cancel 409 + cancelling 期间 POST messages 409 + ghost run 404 + invalid UUID 404 + cross-task 404 + cross-user 404 + cancel 已 ok 409 + cancel 已 error 409 + no auth 401 + stale reaper 扫 cancelling);Loop 层 4 case(cancel before first iter 不调 LLM / cancel between tool_calls 补 cancelled placeholder 3 个 + 保协议 + emit cancelled / 正常 done 不 emit cancelled / CLI 路径 cancel_check=None 默永不 cancel)。**没动 SSE handler 的 break list**(`("done", "error")`):cancelled 在 SSE 里走流给前端看,broker.close 之后立即跟 done 收流。**文档同步**:DESIGN §7.2 路由表 cancel 行从"待"扩成完整描述 + SSE 事件加 `cancelled{}` 行 + §7.7 风险表加"Run 跑太久 / 用户想中断"行;RUN 路由表加 cancel 行 + POST /messages 409 文案改 "running / cancelling" + 故障兜底加三行(cancel 409 / 点 stop 没立刻停 / reaper 扫 cancelling);PROGRESS 已完成 + 下一步重排(去掉 cancel,留 OIDC / C Executor / E CLI 双模式)。 - **05-18 / `POST /v1/tasks/{id}/messages` 单活 run 锁 + 孤儿 reaper**:用户连点 send / 多 tab 同时发消息 → 两个 BG 线程争 `messages.idx`(UniqueConstraint 会 race-crash 第二个 INSERT)的旧 TODO 落地。**实现**:`web/app.py::post_message` 把所有权 + 活跃 Run 检查 + 新 Run INSERT 收进一个 `session_scope()` 事务,首行用 `select(Task.task_id).where(...).with_for_update()` 锁 task 行序列化并发 POST;事务内查 `Run.status='running'` 命中即 raise `HTTPException(409, "task already has a running run ({rid}); wait for it to finish")`;无活跃则同事务 `s.add(Run(...status="running"))` —— 三步原子完成,避免 TOCTOU。lifespan 加 **stale-run reaper**:启动时 `UPDATE runs SET status='error', error='server restarted before run finished' WHERE status='running'`,把进程 crash 留下的孤儿 running 全清掉(否则对应 task 永挂 409)。结果 rowcount > 0 时 print info 行 `[startup] reaped N stale running run(s)`。Cancel 路由(DESIGN §7.2 标 "待")没改:有了它 409 时用户可主动 cancel,不必等流式结束。**没动 `Session.append`**:gate 已在 HTTP 层挡住了,单写者前提下 idx 自递增不会冲;在 ORM 里再加锁是过度。**Smoke 10 case 全绿**(in-process TestClient + `_run_agent_bg` mock 不真起 LLM):happy(202 + Run INSERT running)/ gate(同 task 第二 POST 409 + detail 含 "running run" + "wait for it to finish")/ clear after Run.status=ok 解锁(202)/ clear after Run.status=error 同(202)/ ghost task 跨用户路径 404(锁前所有权检查)/ invalid UUID 404 / empty content 400 早于 lock / no auth 401 早于 lock / stale reaper 测试(强行 SET 全部 Run=running → 开新 TestClient 触发 lifespan → 所有 running 变 error + 之后 POST 还能 202)/ cross-user(other UID token 访 sentinel task → 404 不暴露存在性)。**采坑**:`@case` 每个用 `make_client()` 起新 app 会重复触发 reaper,把 case 1 留下的 running 清掉 → case 2 的 409 测不出来;改成全部 case 共享一个 SHARED_CLIENT 跑,仅 stale-reaper case 用 `fresh=True` 开第二个。**文档同步**:DESIGN §7.2 POST /messages 行注 409 行为 + cancel "待" 后注"做出来后 409 可主动 cancel" / §7.7 风险表加"同 task 并发 POST messages.idx race"行;RUN 路由表 POST /messages 注 409;故障兜底替过期 TODO 行 → 加 "POST 返 409" 处置 + "[startup] reaped N stale running" 解释。**未来 TODO**:multi-worker 部署形态下 reaper 不能简单全表清(会误清其他 worker 的真在跑 run),换 heartbeat + lease(注释里记了)。 - **05-17 / files API 全面 user-rooted(去掉 task_id 前置)**:用户反馈"web 页应该能看到 user 的所有目录,现在只能选 task 后右侧才刷新"——根因是原 files API 用 task_id 拐杖间接拿 working_dir,迫使前端必须先选 task。语义上 files 操作只关心"路径 + user 边界",task_id 是多余的;同时 §7.1 心智模型早就把 task 和 dir 定义为正交副视图,API 不该混。**后端**:删 `_load_working_dir(task_id, user_id)`,加 `_load_user_root(user_id)`(走 `main.user_root(ws, uid)` 自动 mkdir 拿 `workspace/users//`);4 路由全换:`GET /v1/files?path=` / `GET /v1/files/download?path=` / `POST /v1/files/upload` / `POST /v1/files/delete`。`_safe_join` 边界从 task_dir 改 user_root,安全性不降低;`_enumerate_files` 加 dotfile 过滤(`if p.name.startswith(".")` 跳过 `.memory/` 等,同 `/v1/folders` 约定);`_rel_to` 把 `Path(".")` 归一为空串(避免 root 时 current="." 这种 ugly 形态)。删 `from_db_path` import(只剩 `to_db_path`)。**dev SPA**:`loadFiles()` 不再 gate on `state.taskId`,enterApp 时直接调一次拉 user_root;`selectTask` 在拿到 task meta 后 `state.filesPath = wdName`(从 working_dir 末段抽出)再 loadFiles,选 task 自动跳到对应子目录但用户可点 crumb 回 root 看其他目录;crumbs root 标签 "/" → "我的"(user_root 直观);files-proj header 从"项目名(state.taskMeta 派生)"改"路径首段(数据驱动)",空时显示 `(user root)`。**新增 upload 按钮**(原来藏在外部页面里没暴露给 SPA):pane-head 加 `⬆` 按钮 + 隐藏 ``,onchange 走 FormData POST `/v1/files/upload`,path 取当前 `state.filesPath`(空 → user_root);上传完 loadFiles 刷新。`deleteCurrentTask` 不再重置 files 面板(task 删了但 FS 文件保留,继续浏览有意义),只 reload 当前路径。`btn-refresh-files` 移除 disabled 状态(任何时候可用)。**Smoke 68 case 全绿**(in-process TestClient,跑完即删 `_smoke_files.py`):列 user_root(包含 working_dir 目录,`.memory` 被过滤) / 列子目录 2 层 / 不存在路径 200+exists=False / 路径安全 6 case(`../` / 绝对 / Windows 绝对 / `\\` 起头)/ upload 单 / multi+nested mkdir / 上传到 root / 文件名攻击 4 case(`../` `..` `/` `\\`)/ download 文件 + 深度 + 目录 400 + ghost 404 + 越界 400 / delete 文件 / 空目录 / 非空 400 / user_root 拒 / ghost 404 / 越界 400 / 跨 user 隔离 4 case(A 不见 B,B 不见 A)/ 无 token 全 401(GET list / POST upload / POST delete / GET download)/ 子目录里 dotfile 也过滤 / 新 user 首访 user_root 自动 mkdir + 列表空。**文档**:DESIGN §7.2 路由表段 + lead-in 同步("Task 一等公民,files 是其副视图(经 task_dir 暴露)" → "Task 一等公民;files 与 task 正交,走 user-rooted /v1/files*,以 workspace/users// 为边界")。 @@ -88,7 +89,7 @@ core/memory.py 81 ← per-user `.memory/` dotfile core/export_docx.py 383 ← §7 B Step 2-4 + from_db_path 还原 + task_dir Optional core/storage/__init__.py 27 ← §7 B Step 1-3 core/storage/engine.py 80 ← §7 B Step 1 -core/storage/models.py 124 ← §7 B Step 1 +core/storage/models.py 99 ← 3 表(0004 删 runs/usage_events;Task + run_status/run_error) core/storage/utils.py 136 ← check_no_subtask 改 Python 端归一 tools/base.py 34 tools/fs.py 182 @@ -101,11 +102,15 @@ db/migrations/env.py 61 ← §7 B Step 1 db/migrations/versions/ 0001_initial_schema.py 125 ← §7 B Step 1 0002_task_dir_relative.py 61 ← 现有 ROOT-prefix 绝对 → 相对 + 0003_task_name_and_working_dir.py + 51 ← name 必填 + task_dir→working_dir + mode→skill + 0004_drop_runs_usage_events.py + 77 ← 删 runs/usage_events + tasks 加 run_status/run_error web/__init__.py 5 ← Phase G G1 -web/app.py 898 ← /v1/ JSON API + user_id 隔离 + run lock + cancel endpoint +web/app.py 889 ← /v1/ JSON API + user_id 隔离 + run lock + task-level cancel web/auth.py 115 ← D' 过渡:PLATFORM_KEY → JWT 兑换 -web/broker.py 109 ← in-process pub/sub + cancel signal per-run -web/sinks.py 20 ← Phase G G4: WebEventSink (§7 A sink 协议) +web/broker.py 121 ← in-process pub/sub + cancel signal,全 task_id 索引(0004) +web/sinks.py 21 ← WebEventSink 绑 task_id(0004) web/static/dev.html 1133 ← D' dev SPA + stop 按钮 + cancelled badge ───────────────────────────────── Python 合计 ~3800 行(+ dev.html 1133 静态) diff --git a/RUN.md b/RUN.md index c2090a6..e1b4781 100644 --- a/RUN.md +++ b/RUN.md @@ -2,7 +2,7 @@ > 怎么把 zcbot 跑起来。env / 常用命令 / 故障兜底。设计看 `DESIGN.md`,进度看 `PROGRESS.md`。 -最后更新:2026-05-18(`POST /v1/tasks/{id}/runs/{rid}/cancel` 协作式 cancel + `cancelled` SSE 事件 + dev SPA stop 按钮;gate 扩到 `cancelling`) +最后更新:2026-05-18(0004 schema 简化:删 runs / usage_events 表;cancel 改 task-level `POST /v1/tasks/{id}/cancel`;SSE 改 `GET /v1/tasks/{id}/events`;run_status / run_error 合并入 tasks) --- @@ -37,7 +37,7 @@ python -m venv .venv # 3) DB schema 上车 .venv/Scripts/python.exe cli.py db upgrade head -.venv/Scripts/python.exe cli.py db current # 应输出 0003 (head) +.venv/Scripts/python.exe cli.py db current # 应输出 0004 (head) ``` --- @@ -144,9 +144,9 @@ curl --noproxy '*' -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8765/v1/ta | `DELETE /v1/tasks/{id}` | 硬删 DB 行(messages CASCADE),FS working_dir 保留 | 必填 | | `GET /v1/folders` | 列当前 user 工作目录 + n_tasks + last_used(供创建 task 自动补全用) | 必填 | | `GET /v1/tasks/{id}/messages` | LiteLLM payload 透传 | 必填 | -| `POST /v1/tasks/{id}/messages` | `{content}` 发消息;返 `{run_id, events_url}`;**同 task 已有 running / cancelling run → 409**(单活 run 保护,UI 应 disable send 按钮直到 SSE `done`) | 必填 | -| `GET /v1/tasks/{id}/runs/{rid}/events` | SSE 流(`event: ` + `data: `) | 必填 | -| `POST /v1/tasks/{id}/runs/{rid}/cancel` | 协作式 cancel 当前 run;返 `{ok, run_id, status:"cancelling"}`;run.status != `running` → 409;LLM 同步 call 本身不可中断,最坏等当前一轮跑完 | 必填 | +| `POST /v1/tasks/{id}/messages` | `{content}` 发消息;返 `{events_url}`;**`tasks.run_status` 是 running / cancelling → 409**(单活 run 保护;error 状态视为可重启,起新 run 时清);UI 应 disable send 按钮直到 SSE `done` | 必填 | +| `GET /v1/tasks/{id}/events` | SSE 流(`event: ` + `data: `);订阅 task 当前活动 — 单活 run 形态下无歧义 | 必填 | +| `POST /v1/tasks/{id}/cancel` | 协作式 cancel 当前活跃 run;返 `{ok, task_id, run_status:"cancelling"}`;`run_status != "running"` → 409;LLM 同步 call 本身不可中断,最坏等当前一轮跑完 | 必填 | | `GET /v1/tasks/{id}/files?path=` | 列子目录条目 + 面包屑 | 必填 | | `GET /v1/tasks/{id}/files/download?path=` | 下单文件 | 必填 | | `POST /v1/tasks/{id}/files/upload` | multipart 上传,`path` 走 form | 必填 | @@ -176,8 +176,8 @@ curl --noproxy '*' -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8765/v1/ta | `cli.py web` 启动后 curl 连不上 | 检查 proxy(`HTTP_PROXY` / `HTTPS_PROXY`):本地服务在 127.0.0.1,系统 proxy 拦截会 502。临时 `unset HTTP_PROXY HTTPS_PROXY` 或加 `curl --noproxy '*'`。验通:`curl --noproxy '*' http://127.0.0.1:8765/healthz` → `{"status":"ok"}` | | SSE 卡住不流(经 nginx) | 反代要关 buffering — 后端响应头已带 `X-Accel-Buffering: no`,nginx ≥ 1.5.6 默认认。仍卡看 nginx 配 `proxy_buffering off; proxy_read_timeout 3600s;` | | platform 端 CORS preflight 失败 | 本地 dev `allow_origins=["*"]` 应该没事;部署后看是否按 platform 域名收紧过头(`access-control-allow-origin` 响应头要含 platform 域名 或 `*`)| -| `POST /v1/tasks/{id}/messages` 返 409 `task already has an active run` | 上一条消息的 BG run 还没跑完(SSE 没 `done`)。等流式跑完;或点 dev SPA 的 stop / `POST /runs/{rid}/cancel`;服务异常下 Run 行卡 `running`/`cancelling`,启动 reaper 会清 | -| `POST /v1/tasks/{id}/runs/{rid}/cancel` 返 409 `run not running` | run 已结束(ok/error/cancelled)或已被 cancel 进入 `cancelling`,不能重复 cancel;dev SPA 自动忽略不报错 | +| `POST /v1/tasks/{id}/messages` 返 409 `task already has an active run` | 上一条消息的 BG run 还没跑完(SSE 没 `done`)。等流式跑完;或点 dev SPA 的 stop / `POST /v1/tasks/{id}/cancel`;服务异常下 `tasks.run_status` 卡 `running`/`cancelling`,启动 reaper 会清 | +| `POST /v1/tasks/{id}/cancel` 返 409 `task not running` | `run_status` 不是 `running`(idle / cancelling / error 都不能 cancel,error 只能起新 run 顶掉);dev SPA 自动忽略不报错 | | 点 stop 后流式没立刻停 | LLM 同步调用本身不可中断,最坏等当前一轮跑完(通常几十秒)。loop 进入下个 check 点(每轮 LLM 前 / 每个 tool_call 前)就退,emit `cancelled` → SSE `done` → UI 收回 stop 按钮 | | `[startup] reaped N stale active run(s)` | 上次 `cli.py web` 进程未正常 finish 留下 N 个 `running` / `cancelling` Run 行,启动 lifespan 自动标 error。无需处理,info 级 | | `cli.py web` 启动报 `PLATFORM_KEY env not set` / `JWT_SECRET env not set` | D' 过渡 auth 强制双 env 必填。生成 `python -c "import secrets;print(secrets.token_urlsafe(48))"` 各填一,写进 `.env` 重起 | diff --git a/core/storage/__init__.py b/core/storage/__init__.py index 2aff2b9..f3f6ef8 100644 --- a/core/storage/__init__.py +++ b/core/storage/__init__.py @@ -2,7 +2,7 @@ 入口: from core.storage import get_engine, session_scope, ensure_local_sentinel - from core.storage.models import User, Task, Message, Run, UsageEvent + from core.storage.models import User, Task, Message ZCBOT_DB_URL 环境变量必填(本地连测试 / staging PG;SaaS 连生产 PG)。 未设置时 get_engine() 抛 RuntimeError 并指引设置。 diff --git a/core/storage/models.py b/core/storage/models.py index 036ddf9..6dec8e3 100644 --- a/core/storage/models.py +++ b/core/storage/models.py @@ -1,9 +1,10 @@ """SQLAlchemy 2.x ORM models,对应 DESIGN.md §7.4 schema。 -5 张表:users / tasks / messages / runs / usage_events。 +3 张表:users / tasks / messages。 - users 本地形态固定 INSERT sentinel(`00000000-...`) - messages.payload 用 jsonb,GIN 索引在 migration 里建 -- runs / usage_events 在 B 阶段先建表,真正写入要等 D 阶段(HTTP /v1 + run 生命周期) +- run 状态承载在 tasks.run_status / run_error 两列(0004 合并 runs 表); + 原 runs / usage_events 表 0004 删 — 详 DESIGN §7.4 取舍 / PROGRESS 05-18 """ from __future__ import annotations @@ -13,7 +14,6 @@ from typing import Any, Optional from uuid import UUID, uuid4 from sqlalchemy import ( - BigInteger, DateTime, ForeignKey, Integer, @@ -65,6 +65,11 @@ class Task(Base): tokens_prompt: Mapped[int] = mapped_column(Integer, nullable=False, default=0) tokens_completion: Mapped[int] = mapped_column(Integer, nullable=False, default=0) cost_usd: Mapped[Decimal] = mapped_column(Numeric(12, 6), nullable=False, default=0) + # 当前 in-flight 状态(原 runs 表合并入,DESIGN §7.4 简化 / 0004 migration): + # idle / running / cancelling / error;ok / cancelled 收尾直接回 idle, + # 只有 error 是持久终态(下次起新 run 时由 post_message 清掉) + run_status: Mapped[str] = mapped_column(Text, nullable=False, default="idle") + run_error: Mapped[Optional[str]] = mapped_column(Text, nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False ) @@ -92,34 +97,3 @@ class Message(Base): ) -class Run(Base): - __tablename__ = "runs" - - run_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4) - task_id: Mapped[UUID] = mapped_column( - PG_UUID(as_uuid=True), - ForeignKey("tasks.task_id", ondelete="CASCADE"), - nullable=False, - ) - status: Mapped[str] = mapped_column(Text, nullable=False, default="pending") - started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True) - finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True) - error: Mapped[Optional[str]] = mapped_column(Text, nullable=True) - tokens_p: Mapped[int] = mapped_column(Integer, nullable=False, default=0) - tokens_c: Mapped[int] = mapped_column(Integer, nullable=False, default=0) - - -class UsageEvent(Base): - """append-only 审计。task_id / run_id 不 FK,task 硬删后审计仍存活(§7.4)。""" - - __tablename__ = "usage_events" - - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) - user_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), nullable=False) - task_id: Mapped[Optional[UUID]] = mapped_column(PG_UUID(as_uuid=True), nullable=True) - run_id: Mapped[Optional[UUID]] = mapped_column(PG_UUID(as_uuid=True), nullable=True) - kind: Mapped[str] = mapped_column(Text, nullable=False) - value: Mapped[Decimal] = mapped_column(Numeric(20, 8), nullable=False) - ts: Mapped[datetime] = mapped_column( - DateTime(timezone=True), server_default=func.now(), nullable=False - ) diff --git a/db/migrations/versions/20260518_1200_0004_drop_runs_usage_events.py b/db/migrations/versions/20260518_1200_0004_drop_runs_usage_events.py new file mode 100644 index 0000000..8d0442c --- /dev/null +++ b/db/migrations/versions/20260518_1200_0004_drop_runs_usage_events.py @@ -0,0 +1,78 @@ +"""drop runs / usage_events tables + tasks 加 run_status / run_error。 + +Revision ID: 0004 +Revises: 0003 +Create Date: 2026-05-18 + +按 DESIGN §7.4 / §7.7 简化: +- usage_events 表纯死代码(从未写入 / 读取),为"未来计费"预付的架构成本 — 删 +- runs 表实质就是"task 当前 in-flight 状态"的影子表:tokens_p/c 写但从未被读 + (tokens 真正走 tasks.tokens_prompt/_completion 累计版),started_at/finished_at + /error 也只写不读,run_id 只有 broker pub/sub 和 cancel 参数两个用途 — 现在 + cancel 改 task-level、broker 改 task_id 索引,run_id 完全不需要 — 删 +- tasks 加两列承接 run 当前状态: + `run_status text default 'idle'`(idle / running / cancelling / error) + `run_error text`(error 状态的错误文本,其他状态为 NULL) + +`ok / cancelled` 收尾直接回 `idle`(不留"上次跑挂"的持久标记);只有 `error` 是 +持久终态(下次起新 run 时清掉)。 +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "0004" +down_revision: Union[str, None] = "0003" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.drop_table("usage_events") + op.drop_table("runs") + + op.add_column( + "tasks", + sa.Column( + "run_status", sa.Text(), nullable=False, server_default="idle" + ), + ) + op.add_column("tasks", sa.Column("run_error", sa.Text(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("tasks", "run_error") + op.drop_column("tasks", "run_status") + + op.create_table( + "runs", + sa.Column("run_id", sa.dialects.postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column( + "task_id", + sa.dialects.postgresql.UUID(as_uuid=True), + sa.ForeignKey("tasks.task_id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("status", sa.Text(), nullable=False, server_default="pending"), + sa.Column("started_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("error", sa.Text(), nullable=True), + sa.Column("tokens_p", sa.Integer(), nullable=False, server_default="0"), + sa.Column("tokens_c", sa.Integer(), nullable=False, server_default="0"), + ) + op.create_table( + "usage_events", + sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True), + sa.Column("user_id", sa.dialects.postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("task_id", sa.dialects.postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("run_id", sa.dialects.postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("kind", sa.Text(), nullable=False), + sa.Column("value", sa.Numeric(20, 8), nullable=False), + sa.Column( + "ts", + sa.DateTime(timezone=True), + server_default=sa.func.now(), + nullable=False, + ), + ) diff --git a/web/app.py b/web/app.py index 559e405..a4cac6b 100644 --- a/web/app.py +++ b/web/app.py @@ -35,7 +35,7 @@ from core.storage import ( check_no_subtask, session_scope, ) -from core.storage.models import Message, Run, Task +from core.storage.models import Message, Task from core.storage.utils import ensure_local_task_row from .auth import AuthConfig, ensure_user_row, make_require_user, mint_token @@ -99,6 +99,9 @@ def _task_dict(row: Any, *, n_messages: Optional[int] = None) -> dict: "tokens_prompt": row.tokens_prompt or 0, "tokens_completion": row.tokens_completion or 0, "tokens": (row.tokens_prompt or 0) + (row.tokens_completion or 0), + # 当前 run 状态(0004 schema 简化:原 runs 表合并入 task) + "run_status": row.run_status or "idle", + "run_error": row.run_error or None, "created_at": _iso(getattr(row, "created_at", None)), "updated_at": _iso(getattr(row, "updated_at", None)), } @@ -180,51 +183,48 @@ def _enumerate_files(root: Path, current: Path) -> tuple[list[dict], list[dict], return entries, crumbs, exists -# ─────────────────── Run 启动 + SSE 帧格式 ─────────────────── +# ─────────────────── BG run + SSE 帧格式 ─────────────────── -def _run_agent_bg(task_id: UUID, run_id: UUID, user_id: UUID, user_message: str) -> None: - """工作线程:`build_agent(resume=True)` → 装 WebEventSink + cancel_check → `agent.run` → 写 runs 状态。 +def _run_agent_bg(task_id: UUID, user_id: UUID, user_message: str) -> None: + """工作线程:`build_agent(resume=True)` → 装 WebEventSink + cancel_check → `agent.run` → 写 tasks.run_status。 sink 通过 broker.emit 桥事件回 asyncio loop;agent.run 是 sync,所以在 to_thread 跑。 user_id 必须从 JWT 那侧透传过来 —— 决定 memory_block 读哪个 per-user 子树。 cancel_check 桥 broker.is_cancelled,loop 在工具调用之间 poll(LLM 同步调用本身不可中断)。 + `ok / cancelled` 收尾直接回 `idle`(不留持久标记);只有 error 是持久终态。 """ from main import build_agent, sync_task_tokens try: - broker.emit(run_id, {"type": "run_start"}) + broker.emit(task_id, {"type": "run_start"}) agent, session, sid, task_state, task_dir = build_agent( session_id=str(task_id), resume=True, user_id=user_id, ) - agent.sink = WebEventSink(broker, run_id) - agent.cancel_check = lambda rid=run_id: broker.is_cancelled(rid) + agent.sink = WebEventSink(broker, task_id) + agent.cancel_check = lambda tid=task_id: broker.is_cancelled(tid) agent.run(user_message) sync_task_tokens(task_state, agent.llm) - # cancel 命中时 agent.run 提前 return + 已 emit `cancelled`;终态写 "cancelled" - final_status = "cancelled" if broker.is_cancelled(run_id) else "ok" + # cancel 命中或正常完成 → run_status 回 idle(error 才持久) with session_scope() as s: s.execute( - update(Run).where(Run.run_id == run_id).values( - status=final_status, - finished_at=func.now(), - tokens_p=agent.llm.token_counter.prompt_tokens, - tokens_c=agent.llm.token_counter.completion_tokens, + update(Task).where(Task.task_id == task_id).values( + run_status="idle", run_error=None, ) ) except Exception as e: err = f"{type(e).__name__}: {e}" - broker.emit(run_id, {"type": "error", "msg": err}) + broker.emit(task_id, {"type": "error", "msg": err}) try: with session_scope() as s: s.execute( - update(Run).where(Run.run_id == run_id).values( - status="error", error=err, finished_at=func.now() + update(Task).where(Task.task_id == task_id).values( + run_status="error", run_error=err, ) ) except Exception: pass # 已 emit error 给前端,DB 写失败不放大噪声 finally: - broker.clear_cancel(run_id) - broker.close(run_id) + broker.clear_cancel(task_id) + broker.close(task_id) def _sse_event(event_type: str, payload: dict) -> bytes: @@ -276,17 +276,16 @@ def create_app() -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): broker.bind_loop(asyncio.get_running_loop()) - # Stale-run reaper:上次进程 crash 留下的 "running" / "cancelling" 行已无 BG 线程 - # 继续,启动时标 error,让对应 task 重新可发消息(否则 409 gate 永挂)。 + # Stale-run reaper:上次进程 crash 留下的 "running" / "cancelling" 已无 BG 线程 + # 继续,启动时标 error,让对应 task 重新可发消息(否则 gate 永挂)。 # TODO 真生产 multi-worker:换 heartbeat / lease,只 reap 自家 worker 的孤儿。 with session_scope() as s: result = s.execute( - update(Run) - .where(Run.status.in_(("running", "cancelling"))) + update(Task) + .where(Task.run_status.in_(("running", "cancelling"))) .values( - status="error", - error="server restarted before run finished", - finished_at=func.now(), + run_status="error", + run_error="server restarted before run finished", ) ) if result.rowcount: @@ -633,11 +632,12 @@ def create_app() -> FastAPI: body: MessageRequest, user_id: UUID = Depends(require_user), ): - """发消息 + 起 BG run。返 `{run_id, events_url}`,客户端立刻订阅 SSE 拿流式。 + """发消息 + 起 BG run。返 `{events_url}`,客户端立刻订阅 SSE 拿流式。 - 同 task 单活 run:`SELECT … FOR UPDATE` 锁 task 行 + 活跃 Run 检查,把所有权 / - 活跃 / 插新 run 收进一个事务,挡住"用户连点 send 两条消息"导致两个 BG 线程 - 争 `messages.idx`(UniqueConstraint 会 race-crash)。已有 running run → 409。 + 单活 run:`SELECT … FOR UPDATE` 锁 task 行 + 活跃状态检查 + 标 running, + 全收进一个事务挡住"用户连点 send 两条消息"导致两个 BG 线程争 `messages.idx`。 + tasks.run_status in ('running','cancelling') → 409;'error' 走起新 run 时清掉 + (跟 ok / cancelled 一样视为可重启)。 """ try: tid = UUID(task_id) @@ -646,98 +646,89 @@ def create_app() -> FastAPI: content = (body.content or "").strip() if not content: raise HTTPException(400, "empty content") - run_id = uuid4() with session_scope() as s: - owned = s.execute( - select(Task.task_id) + row = s.execute( + select(Task.run_status) .where(Task.task_id == tid, Task.user_id == user_id) .with_for_update() ).first() - if owned is None: + if row is None: raise HTTPException(404, f"task not found: {tid}") - active = s.execute( - select(Run.run_id) - .where(Run.task_id == tid, Run.status.in_(("running", "cancelling"))) - .limit(1) - ).scalar_one_or_none() - if active is not None: + if row.run_status in ("running", "cancelling"): raise HTTPException( 409, - f"task already has an active run ({active}); wait for it to finish or cancel", + f"task already has an active run (status={row.run_status}); " + f"wait for it to finish or cancel", ) - s.add(Run(run_id=run_id, task_id=tid, status="running", started_at=func.now())) + s.execute( + update(Task).where(Task.task_id == tid).values( + run_status="running", run_error=None, + ) + ) + broker.start(tid) # 清上一轮 done 标记,新订阅者才能看到流式 # commit 后 lock 释放;BG 线程接管(sink 通过 broker 把 event 桥回 asyncio loop) - asyncio.create_task(asyncio.to_thread(_run_agent_bg, tid, run_id, user_id, content)) - return { - "run_id": str(run_id), - "events_url": f"/v1/tasks/{tid}/runs/{run_id}/events", - } + asyncio.create_task(asyncio.to_thread(_run_agent_bg, tid, user_id, content)) + return {"events_url": f"/v1/tasks/{tid}/events"} - # ───────────── Run cancel ───────────── + # ───────────── Cancel current run ───────────── - @app.post("/v1/tasks/{task_id}/runs/{run_id}/cancel", status_code=202, tags=["runs"]) - def cancel_run( + @app.post("/v1/tasks/{task_id}/cancel", status_code=202, tags=["tasks"]) + def cancel_task( task_id: str, - run_id: str, user_id: UUID = Depends(require_user), ): - """向当前 run 发协作式 cancel 信号。 - - 校验 task 归属 user + run 归属 task;否则 404 - - run.status 不是 `running` → 409(已结束 / 已 cancelling 不能重复 cancel) + """向当前 task 的活跃 run 发协作式 cancel 信号。 + - 单活 run 形态下"取消当前活动"语义无歧义;客户端只需 task_id + - 校验 task 归属 user;否则 404 + - tasks.run_status 不是 `running` → 409(idle / cancelling / error 都不能 cancel) - 标 `cancelling`(过渡态),BG 线程 loop 在工具调用之间 poll 看见即退; - 退出后 finally 写终态 `cancelled`(或异常路径 `error`) + 退出后 finally 写终态(正常→idle,异常→error) - LLM 同步调用本身不可中断,最坏要等当前 LLM call 跑完(通常几十秒内) """ try: tid = UUID(task_id) - rid = UUID(run_id) except ValueError: - raise HTTPException(404, "invalid id") + raise HTTPException(404, f"invalid task id: {task_id!r}") with session_scope() as s: - run = s.execute( - select(Run) - .join(Task, Task.task_id == Run.task_id) - .where( - Run.run_id == rid, - Run.task_id == tid, - Task.user_id == user_id, - ) + row = s.execute( + select(Task.run_status) + .where(Task.task_id == tid, Task.user_id == user_id) .with_for_update() - ).scalar_one_or_none() - if run is None: - raise HTTPException(404, f"run not found: {rid}") - if run.status != "running": + ).first() + if row is None: + raise HTTPException(404, f"task not found: {tid}") + if row.run_status != "running": raise HTTPException( 409, - f"run not running (status={run.status}); cannot cancel", + f"task not running (run_status={row.run_status}); cannot cancel", ) s.execute( - update(Run).where(Run.run_id == rid).values(status="cancelling") + update(Task).where(Task.task_id == tid).values(run_status="cancelling") ) - broker.request_cancel(rid) - return {"ok": True, "run_id": str(rid), "status": "cancelling"} + broker.request_cancel(tid) + return {"ok": True, "task_id": str(tid), "run_status": "cancelling"} # ───────────── SSE events ───────────── - @app.get("/v1/tasks/{task_id}/runs/{run_id}/events", tags=["runs"]) + @app.get("/v1/tasks/{task_id}/events", tags=["tasks"]) async def stream_events( task_id: str, - run_id: str, user_id: UUID = Depends(require_user), ): - """SSE 流。事件类型:run_start / llm_start / text / tool_call / tool_result / - llm_end / error / done。data 是 JSON dict(已剔除 `type` 字段,移到 event 名)。 + """SSE 流。订阅当前 task 的活动 event(单活 run 形态下无歧义)。 + 事件类型:run_start / llm_start / text / tool_call / tool_result / + llm_end / cancelled / error / done。data 是 JSON dict(已剔除 `type` 字段, + 移到 event 名)。 """ try: tid = UUID(task_id) - rid = UUID(run_id) except ValueError: - raise HTTPException(404, "invalid id") + raise HTTPException(404, f"invalid task id: {task_id!r}") with session_scope() as s: _assert_owns_task(s, tid, user_id) async def gen(): - q = broker.subscribe(rid) + q = broker.subscribe(tid) try: # 第一帧 retry 注释 + 心跳:让 EventSource 立即建立(不被 buffer 卡) yield b": connected\nretry: 3000\n\n" @@ -755,7 +746,7 @@ def create_app() -> FastAPI: except asyncio.CancelledError: pass # 客户端断开,静默退 finally: - broker.unsubscribe(rid, q) + broker.unsubscribe(tid, q) return StreamingResponse( gen(), diff --git a/web/broker.py b/web/broker.py index b26ad29..32fd692 100644 --- a/web/broker.py +++ b/web/broker.py @@ -1,20 +1,26 @@ """RunBroker:in-process pub/sub,把 agent run 产生的 event fan-out 给所有 SSE 订阅者。 +DESIGN §7 简化(0004 一并)—— 单活 run 形态下 run_id 是冗余的(同 task 同时 +最多 1 个活 run);broker 内部全用 task_id 索引,客户端只需要 task_id 即可 +订阅 / cancel,不再需要先拿 run_id。 + 设计: - emit() 从工作线程调(agent.run 在 to_thread 跑),用 loop.call_soon_threadsafe 桥到 asyncio queue;SSE generator await queue.get() 拉出来推流。 -- 同一 run_id 多个订阅者(刷新页面 / 多 tab / 桌面+移动)— 每个订阅 1 个独立 queue。 -- run 结束 → broker.close(run_id) 给所有订阅者派一条 done;新订阅者(在 done 后到的) - 立即收到 done 并断流(不漏不挂)。 +- 同一 task 多个订阅者(刷新页面 / 多 tab / 桌面+移动)— 每个订阅 1 个独立 queue。 +- run 结束 → broker.close(task_id) 给所有订阅者派一条 done;新订阅者(在 done + 后到的)立即收到 done 并断流(不漏不挂)。 +- 同 task 起新 run → broker.start(task_id) 清掉 _done 标记;否则上一轮 done + 会让新订阅者立刻断流看不到流式。 - 进程内单实例 / 多进程不共享 — 个人 SaaS 单 worker 够用;真要扩多 worker 再上 Redis。 -- 不持久化 event — messages 已落 PG,刷新页面走 G3 静态视图能看历史;真要"刷新继续看 - 实时流"未来加 event log 表 + backfill。 +- 不持久化 event — messages 已落 PG,刷新页面走 GET /v1/tasks/{id}/messages 看历史; + 真要"刷新继续看实时流"未来加 event log 表 + backfill。 线程模型: - broker.bind_loop(loop) 在 FastAPI startup 调一次,记录 asyncio loop 引用。 - emit() 调用方可能在任意线程;put_nowait 是 thread-unsafe(asyncio.Queue 设计前提 是单 loop),所以走 call_soon_threadsafe 跨回 loop 线程再 put。 -- subscribe / unsubscribe / close 也都用 call_soon_threadsafe 包,避免 race +- subscribe / unsubscribe / close / start 也都用 call_soon_threadsafe 包,避免 race (实测 SSE generator 在 finally 里 unsubscribe,这个就在 loop 线程,直接调也行)。 """ from __future__ import annotations @@ -29,9 +35,9 @@ from uuid import UUID class RunBroker: def __init__(self) -> None: self._subs: dict[UUID, set[asyncio.Queue]] = defaultdict(set) - # 已经发完 done 的 run — 后来订阅者直接收到 done,避免无限等 + # 已经发完 done 的 task — 后来订阅者直接收到 done,避免无限等 self._done: set[UUID] = set() - # cancel signal per-run。AgentLoop 在 BG 线程里 poll is_cancelled() 决定是否退; + # cancel signal per-task。AgentLoop 在 BG 线程里 poll is_cancelled() 决定是否退; # request_cancel 可在 BG 还没 register 时调用(setdefault),BG 启动后第一次 # check 即看到。run 完成在 finally 里 clear_cancel 回收。 self._cancel_flags: dict[UUID, threading.Event] = {} @@ -41,25 +47,31 @@ class RunBroker: """FastAPI startup 调一次。""" self._loop = loop - def subscribe(self, run_id: UUID) -> asyncio.Queue: - """订阅 run 的 event 流。已 done 的 run 立刻在 queue 放一条 done。 + def start(self, task_id: UUID) -> None: + """同 task 起新 run 时调:清 _done 标记,让新订阅者能看到流式。 + cancel flag 在 finally 里 clear_cancel 清,这里不动(避免擦掉刚刚 request_cancel 的请求)。 + """ + self._done.discard(task_id) + + def subscribe(self, task_id: UUID) -> asyncio.Queue: + """订阅 task 当前 run 的 event 流。已 done 的 task 立刻在 queue 放一条 done。 调用方:SSE handler(在 asyncio loop 线程内)。 """ q: asyncio.Queue = asyncio.Queue() - if run_id in self._done: + if task_id in self._done: q.put_nowait({"type": "done"}) else: - self._subs[run_id].add(q) + self._subs[task_id].add(q) return q - def unsubscribe(self, run_id: UUID, q: asyncio.Queue) -> None: + def unsubscribe(self, task_id: UUID, q: asyncio.Queue) -> None: """SSE generator finally 清理。""" - self._subs.get(run_id, set()).discard(q) - if run_id in self._subs and not self._subs[run_id]: - del self._subs[run_id] + self._subs.get(task_id, set()).discard(q) + if task_id in self._subs and not self._subs[task_id]: + del self._subs[task_id] - def emit(self, run_id: UUID, event: dict[str, Any]) -> None: + def emit(self, task_id: UUID, event: dict[str, Any]) -> None: """从工作线程调:把 event 推给所有订阅者。 如果没人订阅(run 在跑但没浏览器连上),event 丢弃 — 这是设计选择 @@ -68,41 +80,41 @@ class RunBroker: loop = self._loop if loop is None: return # 还没 bind,丢弃(测试 / 启动竞态) - for q in list(self._subs.get(run_id, [])): + for q in list(self._subs.get(task_id, [])): loop.call_soon_threadsafe(q.put_nowait, event) - def close(self, run_id: UUID) -> None: - """run 结束:派 done 给所有订阅者,标记 run_id 为已完成。 + def close(self, task_id: UUID) -> None: + """run 结束:派 done 给所有订阅者,标记 task 为已完成。 从工作线程调(agent.run 完成 / 抛异常 finally 清理)。 """ - self.emit(run_id, {"type": "done"}) - self._done.add(run_id) + self.emit(task_id, {"type": "done"}) + self._done.add(task_id) # subs 不在这里立即删 — SSE generator 会先收到 done、yield 它、走到 # finally unsubscribe;此处 emit 后立即删会让那次 emit 之后的清理无的放矢。 - def n_subscribers(self, run_id: UUID) -> int: + def n_subscribers(self, task_id: UUID) -> int: """供测试 / 监控用。""" - return len(self._subs.get(run_id, set())) + return len(self._subs.get(task_id, set())) - def is_done(self, run_id: UUID) -> bool: - return run_id in self._done + def is_done(self, task_id: UUID) -> bool: + return task_id in self._done # ─────────────── cancel signaling ─────────────── - def request_cancel(self, run_id: UUID) -> None: + def request_cancel(self, task_id: UUID) -> None: """主线程(HTTP handler)发的 cancel 信号 — BG 线程 poll is_cancelled() 看见即退。 setdefault:即便 BG 还没注册 flag 也能 set,BG 启动后第一次 check 立刻看见。 """ - self._cancel_flags.setdefault(run_id, threading.Event()).set() + self._cancel_flags.setdefault(task_id, threading.Event()).set() - def is_cancelled(self, run_id: UUID) -> bool: - ev = self._cancel_flags.get(run_id) + def is_cancelled(self, task_id: UUID) -> bool: + ev = self._cancel_flags.get(task_id) return bool(ev and ev.is_set()) - def clear_cancel(self, run_id: UUID) -> None: + def clear_cancel(self, task_id: UUID) -> None: """run 真正退出(BG finally)清掉 flag,避免 dict 无限增长。""" - self._cancel_flags.pop(run_id, None) + self._cancel_flags.pop(task_id, None) # 进程内单例 — FastAPI lifespan 里 bind_loop;agent / sink / SSE handler 共享。 diff --git a/web/sinks.py b/web/sinks.py index 970c1e9..4c581c4 100644 --- a/web/sinks.py +++ b/web/sinks.py @@ -1,7 +1,8 @@ """WebEventSink:实现 §7 A 的 sink 协议,把 AgentLoop.emit 桥到 RunBroker。 -每次 run 一个 sink 实例,绑死 run_id。`emit({type, ...})` 直接转 broker.emit(run_id, event)。 -sink 实例由 web 层在启 run 时创建,传进 AgentLoop;loop 完全不知 web 存在(§5 Less Scaffolding)。 +绑 task_id(0004 简化:run_id 已合并入 task 当前状态)。`emit({type, ...})` 直接转 +broker.emit(task_id, event)。sink 实例由 web 层在启 run 时创建,传进 AgentLoop; +loop 完全不知 web 存在(§5 Less Scaffolding)。 """ from __future__ import annotations @@ -12,9 +13,9 @@ from .broker import RunBroker class WebEventSink: - def __init__(self, broker: RunBroker, run_id: UUID) -> None: + def __init__(self, broker: RunBroker, task_id: UUID) -> None: self._broker = broker - self._run_id = run_id + self._task_id = task_id def emit(self, event: dict[str, Any]) -> None: - self._broker.emit(self._run_id, event) + self._broker.emit(self._task_id, event) diff --git a/web/static/dev.html b/web/static/dev.html index 3ad7d36..6bfb139 100644 --- a/web/static/dev.html +++ b/web/static/dev.html @@ -367,7 +367,7 @@ const state = { taskMeta: null, filesPath: "", evtSrc: null, - currentRunId: null, // 当前流式中的 run_id;用于 stop 按钮发 cancel + streaming: false, // 当前是否在流式中;true 时显示 stop 按钮 // task list 分页 + 筛选 taskPage: 1, taskPageSize: 20, @@ -729,7 +729,7 @@ async function sendMessage() { const r = await api("POST", `/v1/tasks/${state.taskId}/messages`, { content }); $("chat-input").value = ""; - state.currentRunId = r.run_id; + state.streaming = true; $("chat-cancel").style.display = ""; streamSse(r.events_url, asstCard); } catch (e) { @@ -740,14 +740,14 @@ async function sendMessage() { } } -async function cancelCurrentRun() { - if (!state.taskId || !state.currentRunId) return; +async function cancelCurrentTask() { + if (!state.taskId || !state.streaming) return; const btn = $("chat-cancel"); btn.disabled = true; $("chat-hint").textContent = "cancelling…"; try { - await api("POST", `/v1/tasks/${state.taskId}/runs/${state.currentRunId}/cancel`); - // 不重置 state.currentRunId / 按钮 — 等 SSE 的 cancelled / done 走完一并清 + await api("POST", `/v1/tasks/${state.taskId}/cancel`); + // 不重置 streaming / 按钮 — 等 SSE 的 cancelled / done 走完一并清 } catch (e) { if (e.status === 401) { logout(); return; } // 409 = 已结束 / 已 cancelling,不算错;其他贴 toast @@ -757,7 +757,7 @@ async function cancelCurrentRun() { } } -$("chat-cancel").addEventListener("click", cancelCurrentRun); +$("chat-cancel").addEventListener("click", cancelCurrentTask); function streamSse(url, asstCard) { // EventSource 不支持自定义 header,token 走 query string(?token=...) @@ -800,7 +800,7 @@ async function fetchSse(url, asstCard) { body.classList.remove("streaming"); $("chat-send").disabled = false; $("chat-hint").textContent = "ready"; - state.currentRunId = null; + state.streaming = false; const cb = $("chat-cancel"); cb.style.display = "none"; cb.disabled = false;