core(POST /v1/tasks/{id}/messages): 同 task 单活 run 锁 + 启动 reaper

挡住"用户连点 send 两条 → 两个 BG 线程争 messages.idx UniqueConstraint
race"的旧 TODO。POST /messages 把所有权 + 活跃 Run 检查 + 新 Run INSERT
收进一个事务,首步 SELECT Task … FOR UPDATE 锁 task 行,命中 running 已
存在则 409。lifespan 加 stale-run reaper,把进程 crash 留下的孤儿 running
标 error,避免对应 task 被 409 永挂。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-18 08:27:48 +08:00
parent 9a7620f704
commit 976ef45e87
4 changed files with 58 additions and 19 deletions

View File

@ -250,8 +250,12 @@ Tasks
GET /v1/folders 列当前 user 的 working_dir(FS 是 source of truth + 关联 task 计数 + 最后使用时间)
GET /v1/tasks/{id}/messages 历史(后续 ?search= 走 jsonb GIN / tsvector)
POST /v1/tasks/{id}/messages {content} 发消息 + 起 run,返 {run_id}
**同 task 单活 run**:已有 running → 409
(`SELECT … FOR UPDATE` 锁 task 行,序列化并发
POST 防 `messages.idx` UniqueConstraint race;
UI 应 disable send 按钮直到 SSE `done`)
GET /v1/tasks/{id}/runs/{rid}/events SSE 流(见下)
POST /v1/tasks/{id}/runs/{rid}/cancel (待)
POST /v1/tasks/{id}/runs/{rid}/cancel (待 — 做出来后 409 可主动 cancel,不必等流式结束)
Files(user-rooted,不绑 task — `workspace/users/<uid>/` 为根)
GET /v1/files?path= 列子目录 {entries, crumbs, exists, root, current};留空 → user_root;
@ -399,6 +403,7 @@ usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts)
| 误删 folder | 二确认 + 输入 folder 名;真要再加 trash bin |
| DB-then-FS 中断留孤儿目录 | 后台 GC 周期扫"FS 有但 DB 无引用" |
| 同 folder 多 task 并发写同名 | 文件级悲观锁,冲突早失败 |
| 同 task 并发 POST messages 撞 `messages.idx` UniqueConstraint | `POST /v1/tasks/{id}/messages` 单活 run 检查:`SELECT … FOR UPDATE` 锁 task 行 + 查 `runs.status='running'`,有 → 409;同事务插新 Run 行避 TOCTOU。配启动 lifespan reaper 把孤儿 running 标 error(进程 crash 残留)。未来真生产 multi-worker 换 heartbeat / lease |
| Sandbox 出站越权 | egress allowlist 起步只放 LLM + PyPI |
| 资源滥用 | BYO key 默认;月度配额;cold task LRU 清 |

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。
最后更新:2026-05-17(files API 全面 user-rooted:4 个 `/v1/tasks/{id}/files*` 路由 → `/v1/files*`,以 `workspace/users/<uid>/` 为边界,task_id 不再是 files 访问的前置条件;dotfile `.memory/` 一律隐藏;dev SPA 文件面板登录即拉 user_root + 选 task 自动跳到 working_dir + 加 upload 按钮)
最后更新:2026-05-18(`POST /v1/tasks/{id}/messages` 同 task 单活 run:`SELECT … FOR UPDATE` 锁 task + 活跃 Run 检查,已有 running → 409;启动 lifespan reaper 把孤儿 running 标 error)
---
@ -15,12 +15,13 @@
| 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 |
| 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 |
| 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill |
| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**D `/v1` JSON API 完工 ✅**(原 Phase G Jinja2/HTMX UI 撤,改 platform 端实现);**D' 过渡 auth(PLATFORM_KEY → JWT)+ dev SPA ✅**;真 OIDC 待;C(Executor)待;E(CLI 双模式)待。 |
| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**D `/v1` JSON API 完工 ✅**(原 Phase G Jinja2/HTMX UI 撤,改 platform 端实现);**D' 过渡 auth(PLATFORM_KEY → JWT)+ dev SPA ✅**;**同 task 单活 run 锁 ✅**(POST /messages 409 + lifespan reaper);真 OIDC 待;C(Executor)待;E(CLI 双模式)待。 |
---
## 已完成关键能力
- **05-18 / `POST /v1/tasks/{id}/messages` 单活 run 锁 + 孤儿 reaper**:用户连点 send / 多 tab 同时发消息 → 两个 BG 线程争 `messages.idx`(UniqueConstraint 会 race-crash 第二个 INSERT)的旧 TODO 落地。**实现**:`web/app.py::post_message` 把所有权 + 活跃 Run 检查 + 新 Run INSERT 收进一个 `session_scope()` 事务,首行用 `select(Task.task_id).where(...).with_for_update()` 锁 task 行序列化并发 POST;事务内查 `Run.status='running'` 命中即 raise `HTTPException(409, "task already has a running run ({rid}); wait for it to finish")`;无活跃则同事务 `s.add(Run(...status="running"))` —— 三步原子完成,避免 TOCTOU。lifespan 加 **stale-run reaper**:启动时 `UPDATE runs SET status='error', error='server restarted before run finished' WHERE status='running'`,把进程 crash 留下的孤儿 running 全清掉(否则对应 task 永挂 409)。结果 rowcount > 0 时 print info 行 `[startup] reaped N stale running run(s)`。Cancel 路由(DESIGN §7.2 标 "待")没改:有了它 409 时用户可主动 cancel,不必等流式结束。**没动 `Session.append`**:gate 已在 HTTP 层挡住了,单写者前提下 idx 自递增不会冲;在 ORM 里再加锁是过度。**Smoke 10 case 全绿**(in-process TestClient + `_run_agent_bg` mock 不真起 LLM):happy(202 + Run INSERT running)/ gate(同 task 第二 POST 409 + detail 含 "running run" + "wait for it to finish")/ clear after Run.status=ok 解锁(202)/ clear after Run.status=error 同(202)/ ghost task 跨用户路径 404(锁前所有权检查)/ invalid UUID 404 / empty content 400 早于 lock / no auth 401 早于 lock / stale reaper 测试(强行 SET 全部 Run=running → 开新 TestClient 触发 lifespan → 所有 running 变 error + 之后 POST 还能 202)/ cross-user(other UID token 访 sentinel task → 404 不暴露存在性)。**采坑**:`@case` 每个用 `make_client()` 起新 app 会重复触发 reaper,把 case 1 留下的 running 清掉 → case 2 的 409 测不出来;改成全部 case 共享一个 SHARED_CLIENT 跑,仅 stale-reaper case 用 `fresh=True` 开第二个。**文档同步**:DESIGN §7.2 POST /messages 行注 409 行为 + cancel "待" 后注"做出来后 409 可主动 cancel" / §7.7 风险表加"同 task 并发 POST messages.idx race"行;RUN 路由表 POST /messages 注 409;故障兜底替过期 TODO 行 → 加 "POST 返 409" 处置 + "[startup] reaped N stale running" 解释。**未来 TODO**:multi-worker 部署形态下 reaper 不能简单全表清(会误清其他 worker 的真在跑 run),换 heartbeat + lease(注释里记了)。
- **05-17 / files API 全面 user-rooted(去掉 task_id 前置)**:用户反馈"web 页应该能看到 user 的所有目录,现在只能选 task 后右侧才刷新"——根因是原 files API 用 task_id 拐杖间接拿 working_dir,迫使前端必须先选 task。语义上 files 操作只关心"路径 + user 边界",task_id 是多余的;同时 §7.1 心智模型早就把 task 和 dir 定义为正交副视图,API 不该混。**后端**:删 `_load_working_dir(task_id, user_id)`,加 `_load_user_root(user_id)`(走 `main.user_root(ws, uid)` 自动 mkdir 拿 `workspace/users/<uid>/`);4 路由全换:`GET /v1/files?path=` / `GET /v1/files/download?path=` / `POST /v1/files/upload` / `POST /v1/files/delete`。`_safe_join` 边界从 task_dir 改 user_root,安全性不降低;`_enumerate_files` 加 dotfile 过滤(`if p.name.startswith(".")` 跳过 `.memory/` 等,同 `/v1/folders` 约定);`_rel_to` 把 `Path(".")` 归一为空串(避免 root 时 current="." 这种 ugly 形态)。删 `from_db_path` import(只剩 `to_db_path`)。**dev SPA**:`loadFiles()` 不再 gate on `state.taskId`,enterApp 时直接调一次拉 user_root;`selectTask` 在拿到 task meta 后 `state.filesPath = wdName`(从 working_dir 末段抽出)再 loadFiles,选 task 自动跳到对应子目录但用户可点 crumb 回 root 看其他目录;crumbs root 标签 "/" → "我的"(user_root 直观);files-proj header 从"项目名(state.taskMeta 派生)"改"路径首段(数据驱动)",空时显示 `(user root)`。**新增 upload 按钮**(原来藏在外部页面里没暴露给 SPA):pane-head 加 `⬆` 按钮 + 隐藏 `<input type=file multiple>`,onchange 走 FormData POST `/v1/files/upload`,path 取当前 `state.filesPath`(空 → user_root);上传完 loadFiles 刷新。`deleteCurrentTask` 不再重置 files 面板(task 删了但 FS 文件保留,继续浏览有意义),只 reload 当前路径。`btn-refresh-files` 移除 disabled 状态(任何时候可用)。**Smoke 68 case 全绿**(in-process TestClient,跑完即删 `_smoke_files.py`):列 user_root(包含 working_dir 目录,`.memory` 被过滤) / 列子目录 2 层 / 不存在路径 200+exists=False / 路径安全 6 case(`../` / 绝对 / Windows 绝对 / `\\` 起头)/ upload 单 / multi+nested mkdir / 上传到 root / 文件名攻击 4 case(`../` `..` `/` `\\`)/ download 文件 + 深度 + 目录 400 + ghost 404 + 越界 400 / delete 文件 / 空目录 / 非空 400 / user_root 拒 / ghost 404 / 越界 400 / 跨 user 隔离 4 case(A 不见 B,B 不见 A)/ 无 token 全 401(GET list / POST upload / POST delete / GET download)/ 子目录里 dotfile 也过滤 / 新 user 首访 user_root 自动 mkdir + 列表空。**文档**:DESIGN §7.2 路由表段 + lead-in 同步("Task 一等公民,files 是其副视图(经 task_dir 暴露)" → "Task 一等公民;files 与 task 正交,走 user-rooted /v1/files*,以 workspace/users/<uid>/ 为边界")。
- **Q1 → 05-06 / Phase 1-4**:骨架 / 三 skill / run_python / Model Profile + Probing。ppt v3 加商务红 + apply_brand + Iconify;素材摄取改 markitdown CLI。
- **05-06 / Phase 6 部分**:task + state.json + tokens 累计;CLI `tasks` + REPL `/status /done /abandon /desc`;移除 legacy `workspace/sessions/`
@ -115,12 +116,10 @@ Python 合计 ~3700 行(+ dev.html ~600 静态)
## 下一步候选(性价比排序)
1. **platform 端起 API 联调**(~?)—— platform 服务端持 `PLATFORM_KEY``POST /v1/auth/login {user_id, platform_key}` 拿 token,后续走 `Authorization: Bearer <jwt>`。Swagger UI(`http://127.0.0.1:8765/docs`)生成 client stub。
2. **dev.html 浏览器手验**(~10 分钟)—— `cli.py web` 起后访问 `http://127.0.0.1:8765/`,login(填 sentinel UUID + PLATFORM_KEY)→ 看 3 栏布局 + 新建 task + 发消息 SSE 流式 + 文件浏览。
3. **真 OIDC 接入 + CORS 收紧**(~1 天)—— 把 `/v1/auth/login` 内部从 platform_key 校验换成 OIDC ID token 校验(路由层 Depends 不动);CORS 改成 platform 域名 allowlist。
4. **§7 C Executor + sandbox**(~2-3 天)—— D 完工,继续 C。
5. **并发 run 互锁**(~2 小时)—— 用户连发两条消息 messages idx UniqueConstraint 在 race 下会冲;PG `SELECT ... FOR UPDATE` 锁 tasks 行,或 advisory lock。
6. **§7 E CLI transport 双模式**(~1.5 天)—— `cli.py chat --remote https://...` 走 HTTP 替代 in-process。
7. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。
1. **真 OIDC 接入 + CORS 收紧**(~1 天)—— 把 `/v1/auth/login` 内部从 platform_key 校验换成 OIDC ID token 校验(路由层 Depends 不动);CORS 改成 platform 域名 allowlist。**真发布给真实用户前必做**。
2. **`POST /v1/tasks/{id}/runs/{rid}/cancel`**(~1-2 小时)—— DESIGN §7.2 标 "待"。有了它 409 时用户可主动 cancel 当前 run 而非等流式跑完;BG 线程需要 cooperative cancel(check `Run.status` 已被改 `cancelling` 时 raise/break)。
3. **§7 C Executor + sandbox**(~2-3 天)—— `run_python`/`shell` → `Executor.run(...)`,本地保留 subprocess、SaaS 走 docker;`api_key_env` → `KeyProvider` 运行时注入。多用户在线跑代码前置。
4. **§7 E CLI transport 双模式**(~1.5 天)—— `cli.py chat --remote https://...` 走 HTTP 替代 in-process。dogfood ≡ 用户路径。
5. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。
> §7 B + D + D'(过渡 auth)主体已完工。剩余路线:真 OIDC → C(Executor)→ E(CLI 双模式)→ F(deploy / billing)。原 Phase G Web UI 路线撤(DESIGN §7.9),UI 改 platform 端实现;`web/static/dev.html` 是开发期单文件 SPA,跟 platform UI 并存不冲突。
> §7 B + D + D'(过渡 auth)+ 单活 run 锁 主体已完工。剩余路线:真 OIDC → cancel 路由 → C(Executor)→ E(CLI 双模式)→ F(deploy / billing)。原 Phase G Web UI 路线撤(DESIGN §7.9),UI 改 platform 端实现;`web/static/dev.html` 是开发期单文件 SPA,跟 platform UI 并存不冲突。

7
RUN.md
View File

@ -2,7 +2,7 @@
> 怎么把 zcbot 跑起来。env / 常用命令 / 故障兜底。设计看 `DESIGN.md`,进度看 `PROGRESS.md`
最后更新:2026-05-17(task 拆 `--name`(必填,任务名)+ `--working-dir`(可选,目录名);`--mode → --skill`;`/v1/folders` 列已有目录;0003 migration)
最后更新:2026-05-18(`POST /v1/tasks/{id}/messages` 同 task 单活 run:已有 running → 409;启动 lifespan 把孤儿 running 标 error)
---
@ -144,7 +144,7 @@ curl --noproxy '*' -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8765/v1/ta
| `DELETE /v1/tasks/{id}` | 硬删 DB 行(messages CASCADE),FS working_dir 保留 | 必填 |
| `GET /v1/folders` | 列当前 user 工作目录 + n_tasks + last_used(供创建 task 自动补全用) | 必填 |
| `GET /v1/tasks/{id}/messages` | LiteLLM payload 透传 | 必填 |
| `POST /v1/tasks/{id}/messages` | `{content}` 发消息;返 `{run_id, events_url}` | 必填 |
| `POST /v1/tasks/{id}/messages` | `{content}` 发消息;返 `{run_id, events_url}`;**同 task 已有 running run → 409**(单活 run 保护,UI 应 disable send 按钮直到 SSE `done`) | 必填 |
| `GET /v1/tasks/{id}/runs/{rid}/events` | SSE 流(`event: <type>` + `data: <json>`) | 必填 |
| `GET /v1/tasks/{id}/files?path=` | 列子目录条目 + 面包屑 | 必填 |
| `GET /v1/tasks/{id}/files/download?path=` | 下单文件 | 必填 |
@ -175,7 +175,8 @@ curl --noproxy '*' -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8765/v1/ta
| `cli.py web` 启动后 curl 连不上 | 检查 proxy(`HTTP_PROXY` / `HTTPS_PROXY`):本地服务在 127.0.0.1,系统 proxy 拦截会 502。临时 `unset HTTP_PROXY HTTPS_PROXY` 或加 `curl --noproxy '*'`。验通:`curl --noproxy '*' http://127.0.0.1:8765/healthz` → `{"status":"ok"}` |
| SSE 卡住不流(经 nginx) | 反代要关 buffering — 后端响应头已带 `X-Accel-Buffering: no`,nginx ≥ 1.5.6 默认认。仍卡看 nginx 配 `proxy_buffering off; proxy_read_timeout 3600s;` |
| platform 端 CORS preflight 失败 | 本地 dev `allow_origins=["*"]` 应该没事;部署后看是否按 platform 域名收紧过头(`access-control-allow-origin` 响应头要含 platform 域名 或 `*`)|
| `UniqueViolation idx already exists` from messages | 同 task 并发 POST messages,idx 冲突。**已知 TODO**:加 task 级 `SELECT ... FOR UPDATE` 或 advisory lock(留到 D' / 真发布前) |
| `POST /v1/tasks/{id}/messages` 返 409 `task already has a running run` | 上一条消息的 BG run 还没跑完(SSE 没 `done`)。等流式跑完;或服务异常下 Run 行卡 `running`,启动 reaper 会清(crash 重启 / `cli.py web` 重启)。后续 `POST /v1/tasks/{id}/runs/{rid}/cancel`(DESIGN §7.2 待办)做出来后可主动 cancel |
| `[startup] reaped N stale running run(s)` | 上次 `cli.py web` 进程未正常 finish 留下 N 个 running Run 行,启动 lifespan 自动标 error。无需处理,info 级 |
| `cli.py web` 启动报 `PLATFORM_KEY env not set` / `JWT_SECRET env not set` | D' 过渡 auth 强制双 env 必填。生成 `python -c "import secrets;print(secrets.token_urlsafe(48))"` 各填一,写进 `.env` 重起 |
| `/v1/*` 全返 401 `missing Authorization: Bearer` | 没拿 token 或没带 header。先 `POST /v1/auth/login` 拿 token,curl 加 `-H "Authorization: Bearer $TOKEN"` |
| `/v1/*` 返 401 `token expired` | JWT 默 7d TTL 到期,重 login。要更长改 `ZCBOT_JWT_TTL_SECONDS` env |

View File

@ -271,6 +271,21 @@ def create_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
broker.bind_loop(asyncio.get_running_loop())
# Stale-run reaper:上次进程 crash 留下的 "running" 行已无 BG 线程继续,
# 启动时标 error,让对应 task 重新可发消息(否则 409 gate 永挂)。
# TODO 真生产 multi-worker:换 heartbeat / lease,只 reap 自家 worker 的孤儿。
with session_scope() as s:
result = s.execute(
update(Run)
.where(Run.status == "running")
.values(
status="error",
error="server restarted before run finished",
finished_at=func.now(),
)
)
if result.rowcount:
print(f"[startup] reaped {result.rowcount} stale running run(s)")
yield
app = FastAPI(
@ -613,7 +628,12 @@ def create_app() -> FastAPI:
body: MessageRequest,
user_id: UUID = Depends(require_user),
):
"""发消息 + 起 BG run。返 `{run_id, events_url}`,客户端立刻订阅 SSE 拿流式。"""
"""发消息 + 起 BG run。返 `{run_id, events_url}`,客户端立刻订阅 SSE 拿流式。
task 单活 run:`SELECT FOR UPDATE` task + 活跃 Run 检查,把所有权 /
活跃 / 插新 run 收进一个事务,挡住"用户连点 send 两条消息"导致两个 BG 线程
`messages.idx`(UniqueConstraint race-crash)已有 running run 409
"""
try:
tid = UUID(task_id)
except ValueError:
@ -621,13 +641,27 @@ def create_app() -> FastAPI:
content = (body.content or "").strip()
if not content:
raise HTTPException(400, "empty content")
with session_scope() as s:
_assert_owns_task(s, tid, user_id)
run_id = uuid4()
with session_scope() as s:
owned = s.execute(
select(Task.task_id)
.where(Task.task_id == tid, Task.user_id == user_id)
.with_for_update()
).first()
if owned is None:
raise HTTPException(404, f"task not found: {tid}")
active = s.execute(
select(Run.run_id)
.where(Run.task_id == tid, Run.status == "running")
.limit(1)
).scalar_one_or_none()
if active is not None:
raise HTTPException(
409,
f"task already has a running run ({active}); wait for it to finish",
)
s.add(Run(run_id=run_id, task_id=tid, status="running", started_at=func.now()))
# to_thread 跑 sync agent.run;sink 通过 broker 把 event 桥回 asyncio
# commit 后 lock 释放;BG 线程接管(sink 通过 broker 把 event 桥回 asyncio loop)
asyncio.create_task(asyncio.to_thread(_run_agent_bg, tid, run_id, user_id, content))
return {
"run_id": str(run_id),