From aeecc7f0f3e43c0dd31244d19c48265d2d0e6aa4 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 14 May 2026 11:25:53 +0800 Subject: [PATCH] =?UTF-8?q?core(=C2=A77=20B=20Step=203):=20TaskState=20ORM?= =?UTF-8?q?=20+=20Web=20UI=20=E8=AE=BE=E8=AE=A1=20(Phase=20G)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - TaskState dataclass 改 PG-backed:save() → upsert_task (INSERT ON CONFLICT DO UPDATE,显式刷 updated_at);load(task_id) → SELECT。state.json 全面 废除,task_dir 只承担 skill 产物。 - TaskState 字段去 cwd / 加 task_dir(对齐 §7 SaaS task_dir-as-identity); cwd 只在 session.meta 内存视图保留(展示用)。 - core/storage/utils.py 新增 upsert_task / update_task;ORM-level UPDATE 自带 onupdate=func.now(),DO UPDATE 需显式 set。 - session.py Session.append 的 ensure 调用补传 mode/description/ reasoning_effort,避免首次 INSERT 后 _list_task_rows 看到空 meta。 - sync_task_tokens 改成 update_task 单字段 UPDATE,避免无谓全字段 UPSERT。 - cli.py _list_task_rows 全字段从 PG 读,status 过滤走 SQL WHERE; _cleanup_if_empty 去 state.json 特例(任何 FS 文件/子目录都算实质痕迹)。 - core/export_docx.py meta 走 TaskState.load(tid),CWD 字段从 meta 表移除。 - DESIGN.md 追加 Phase G(Web UI 简洁版,FastAPI + Jinja2 + HTMX + SSE), 排在 §7.7 D 后;§7.9 补 server-render 不上 SPA 的取舍 4 条。 Co-Authored-By: Claude Opus 4.7 (1M context) --- DESIGN.md | 6 ++- PROGRESS.md | 33 +++++++------- cli.py | 68 +++++++++++++---------------- core/export_docx.py | 33 +++++++------- core/session.py | 5 +++ core/storage/__init__.py | 4 +- core/storage/utils.py | 58 +++++++++++++++++++++---- core/task.py | 92 ++++++++++++++++++++++++---------------- main.py | 55 ++++++++++++------------ 9 files changed, 207 insertions(+), 147 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index 68310be..04effcd 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -310,8 +310,9 @@ usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts) | 6 | **Executor + sandbox**:`run_python`/`shell` → `Executor.run(...)`;本地保留 subprocess executor,SaaS 走 docker;`api_key_env` → `KeyProvider` 运行时注入 | 2-3 天 | | 7 | **HTTP /v1**:FastAPI + SSE + OIDC | 4 天 | | 8 | **CLI 双模式**:transport 层抽象,默认 in-process;`--remote` 走 HTTP;**本地直跑不删** | 1.5 天 | +| 9 | **Web UI(简洁版)**:FastAPI + Jinja2 + HTMX + 原生 SSE,task list / chat / folder tree / 文件上传下载;无 React/Vue 构建链 | 2-3 天 | -代码量增量:**+1000~1500 行**(单一 PG 比双 adapter 省 500-800 行)。 +代码量增量:**+1000~1500 行**(单一 PG 比双 adapter 省 500-800 行;Web UI 加 600-1000 行 HTML/CSS/JS 不计入 Python 主仓库)。 ### 7.7 分阶段落地 @@ -322,6 +323,7 @@ usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts) | C | #6(Executor + sandbox) | 3 天 | 两本地账号互不可见对方 folder,本地 subprocess executor 仍可用 | | D | #7(HTTP /v1 + auth) | 4 天 | curl / Postman 跑通主流程 | | E | #8(CLI transport 双模式) | 1.5 天 | 默认本地直跑保留,`--remote` 走 HTTP 跑通 | +| G | #9(Web UI 简洁版) | 2-3 天 | 浏览器跑通:列 task → 进 chat → 流式回复 → 文件上传下载;与 D / E 无强序,但需 D 的 SSE 端点 | | F | 上线打磨(限流 / 监控 / 告警 / HA) | 持续 | SLO 99.5% | **B 阶段一次性切换** —— 切到 PG 后本地与 SaaS 走相同代码路径,无回退、无双轨。**dogfood 即生效**(messages 进 DB → 全文搜、jsonb 查询立刻可用)。 @@ -357,6 +359,8 @@ usage_events(id, user_id, task_id uuid, run_id uuid, kind, value, ts) **Memory 不入 DB**:跨 task 共享靠"同一 user 同一 FS 目录"自动达成。md 用户直接编辑器改,DB 化反而要造 UI、违反 §3.7"事实由用户判断"。 +**Web UI 走 server-render + HTMX 不上 SPA**:① 与 §5 "Less Scaffolding" 一致,不引入 React/Vue 构建链 / node_modules / 双语言双 lint;② chat 主交互是 SSE 流式追加 + 表单提交,HTMX `hx-swap` / `sse-swap` 原生覆盖,无需客户端状态管理;③ FastAPI 单进程既出 `/v1` JSON 也出 HTML 模板,部署单容器;④ 上限低(协作 / 实时多光标 / 复杂表单态做不动),真要做重前端再换栈,届时 `/v1` 已稳定可直接对接 SPA。 + **Tasks/Messages 在 PG 但 skill 产物在 FS**:tasks / messages 需要查询、过滤、全文搜、跨 task 统计 —— DB 强项;skill 产物(`*.pptx` / `*.docx` / `sections/*.md`)终用户拿走,期望文件管理器看到、Office 打开、邮件发出 —— 进 DB 要做"导出"多余操作。**FS 是产物天然存储,DB 是元数据 / 状态 / 索引天然存储**。同理 §7.5 bind mount = user root,容器里 ≡ 用户在 Web UI 看到的目录,无中间层翻译。 --- diff --git a/PROGRESS.md b/PROGRESS.md index 9931318..4cdf483 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。 -最后更新:2026-05-14(Step 2) +最后更新:2026-05-14(Step 3) --- @@ -15,7 +15,7 @@ | 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 | | 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 | | 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill | -| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 进行中(Step 1 基建 ✅;Step 2 Session ORM ✅;Step 3-4/6 待;Step 5 migrate-from-fs 取消) | +| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 进行中(Step 1 基建 ✅;Step 2 Session ORM ✅;Step 3 TaskState ORM ✅;Step 4/6 待;Step 5 migrate-from-fs 取消)。Phase G(Web UI 简洁版)已上设计,排在 D 后。 | --- @@ -31,6 +31,7 @@ - **05-14 / §7.1 心智模型修正**:`Folder-centric` → **Task 一等公民 + Dir 文件副视图**(双视图正交,dir 不是 task 父容器);task_dir 留空=一次性对话 / 指定=项目化二分语义入文。 - **05-14 / §7 B Step 1 基建**:`core/storage/{engine,models}.py` SQLAlchemy 2.x ORM(users/tasks/messages/runs/usage_events 5 表)+ alembic(初版 migration `0001_initial_schema`,GIN/复合索引)+ `cli db {upgrade,downgrade,current}` 子命令组 + 本地 sentinel user(`00000000-...`)+ `ZCBOT_DB_URL` 必填(未设给清晰报错,不引导 docker)。已在远端测试 PG 跑通 `db upgrade head`。 - **05-14 / §7 B Step 2 Session ORM**:`core/session.py` 重写,messages 走 PG(append-only,jsonb,idx 严格递增);system prompt 不入库(每次 build_agent 重建);`Session.load(task_id, system_prompt=...)` resume 接口;`ensure_local_task_row` idempotent UPSERT(`INSERT ... ON CONFLICT DO NOTHING`)在首条非 system 消息前打底 tasks 行。task_id 切换为 UUID(原时间戳格式废弃,旧 workspace **不做兼容**)。main.py/cli.py 适配:`resolve_task_id`(UUID 前缀解析)、`_cleanup_if_empty` 双检查(DB messages + FS 产物)、`_list_task_rows` 改读 PG。`core/export_docx.py` 改从 PG 读 messages。端到端 build/append/resume/cleanup smoke 全绿。**取消 Step 5 migrate-from-fs**(用户决定不兼容旧 workspace)。 +- **05-14 / §7 B Step 3 TaskState ORM**:`core/task.py` 重写,TaskState dataclass 保留为内存 DTO 但落地走 PG —— `save()` 调 `upsert_task`(INSERT ON CONFLICT DO UPDATE,显式 set `updated_at=func.now()`),`load(task_id)` 走 SELECT;**字段去掉 `cwd`**(改读 task_dir,§7 SaaS task_dir-as-identity)。`state.json` 文件**全面废除**,task_dir 只承担 skill 产物。`core/storage/utils.py` 加 `upsert_task` / `update_task` 工具。`main.py::sync_task_tokens` 改 `update_task(tokens_p,tokens_c)` 单字段 UPDATE(ORM-level update 自带 onupdate=func.now())。`core/session.py::Session.append` 的 ensure 调用补传 `mode/description/reasoning_effort`,避免首次 INSERT 后 _list_task_rows 看到空 meta。`cli.py` 全字段从 ORM Task 列读;`_cleanup_if_empty` 去 state.json 特例(任何 FS 文件 / 子目录都算实质痕迹);`/done /abandon /desc` 走 PG。`core/export_docx.py` meta 改从 `TaskState.load(tid)` 读(asdict 拿到 dict),去 CWD 字段。端到端 smoke:storage UPSERT/UPDATE round-trip + build_agent 懒创建 + Session.append 自动 INSERT 完整 meta + sync_task_tokens 局部 UPDATE + task_state.save UPSERT 保留 task_dir/tokens + export → .docx 37KB 全绿。 --- @@ -55,27 +56,27 @@ core/loop.py 152 ← §7 A: sink.emit core/sinks.py 101 ← §7 A core/ui.py 38 core/probe.py 243 -core/session.py 148 ← §7 B Step 2: ORM 改写,task_id anchored +core/session.py 153 ← §7 B Step 2-3: ORM + ensure 补 meta core/skills.py 81 -core/task.py 64 +core/task.py 82 ← §7 B Step 3: PG-backed TaskState,去 cwd core/memory.py 76 -core/export_docx.py 379 ← §7 B Step 2: 改从 PG 读 messages -core/storage/__init__.py 25 ← §7 B Step 1-2 +core/export_docx.py 376 ← §7 B Step 2-3: meta 也走 PG +core/storage/__init__.py 27 ← §7 B Step 1-3 core/storage/engine.py 80 ← §7 B Step 1 core/storage/models.py 124 ← §7 B Step 1 -core/storage/utils.py 55 ← §7 B Step 2: ensure_local_task_row +core/storage/utils.py 95 ← §7 B Step 3: +upsert_task/update_task tools/base.py 34 tools/fs.py 182 tools/shell.py 94 tools/run_python.py 84 tools/skill_tool.py 45 -main.py 228 ← §7 B Step 2: resolve_task_id (UUID) -cli.py 526 ← §7 B Step 1-2: +db 子命令组 / +_task_has_messages +main.py 231 ← §7 B Step 3: sync_task_tokens UPDATE +cli.py 516 ← §7 B Step 3: _list_task_rows 全 DB db/migrations/env.py 61 ← §7 B Step 1 db/migrations/versions/ 0001_initial_schema.py 125 ← §7 B Step 1 ───────────────────────────────── -Python 合计 ~2841 行 +Python 合计 ~3035 行 ``` 加 skills/ppt 脚本 ~600 行 + SKILL.md / references / config / prompts + alembic.ini,总仓库约 3500 行。 @@ -84,11 +85,11 @@ Python 合计 ~2841 行 ## 下一步候选(性价比排序) -1. **§7 B 剩余 Step 3-4 / 6**(~2 天) - - Step 3 TaskState ORM 改造(state.json → PG tasks 表;tasks.cwd 字段去掉,改读 task_dir;sync_task_tokens 走 UPDATE) - - Step 4 main.py / cli.py 收尾(`_list_task_rows` 全 DB;清掉 state.json 路径剩余分支) +1. **§7 B 剩余 Step 4 / 6**(~0.5 天) + - Step 4 main.py / cli.py 已 Step 3 收尾(`_list_task_rows` 全 DB / state.json 路径已删);剩 task_dir 字段语义在 §7.6 #3 还要补:留空时默认派生(目前已默认为 `workspace/tasks//`,但用户显式指定还没上) - Step 6 no-subtask SQL 校验(`new LIKE existing/%` cascade) - ~~Step 5 migrate-from-fs~~(取消,不兼容旧 workspace) -2. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。 -3. **Phase 7 更多 skill / 模型档案**(持续)。 -4. **Proposal mermaid 预渲染**(~半天)—— ASCII 透传不够用时再上 `mmdc`。 +2. **§7 Phase G Web UI 简洁版**(~2-3 天)—— FastAPI + Jinja2 + HTMX + SSE,task list / chat / folder tree / 文件上传下载;依赖 D(HTTP /v1)的 SSE 端点,与 E 无强序。 +3. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。 +4. **Phase 7 更多 skill / 模型档案**(持续)。 +5. **Proposal mermaid 预渲染**(~半天)—— ASCII 透传不够用时再上 `mmdc`。 diff --git a/cli.py b/cli.py index 484c164..bfa4f73 100644 --- a/cli.py +++ b/cli.py @@ -4,14 +4,13 @@ python cli.py chat # 新建一个 task python cli.py chat --mode coding --desc "修一处 bug" # 带元数据建任务 python cli.py chat --resume last # 恢复最近一个 task - python cli.py chat --resume 20260506_141523 # 显式 task_id + python cli.py chat --resume # 显式 task_id(前缀 ≥8 字符) python cli.py chat --model deepseek_v4.pro python cli.py tasks # 列出 task python cli.py probe # 实测对账 yaml 声称的能力 """ from __future__ import annotations -import json import shutil import sys from pathlib import Path @@ -20,7 +19,6 @@ import click from rich.prompt import Prompt from rich.table import Table -from core.task import TaskState from core.ui import make_console from main import ( ROOT, @@ -83,12 +81,12 @@ def db_current() -> None: def _cleanup_if_empty(task_dir, session, console=None) -> bool: - """切走前清理空 task。三条都满足才删: + """切走前清理空 task。两条都满足才删: 1) session 在内存没有 user 消息 2) task_dir 在 FS 上无产物(懒创建后没说话就没目录,直接 no-op) - 3) PG tasks 行如果有也一并删(messages 走 CASCADE) - 注:state.json 还在 task_dir 下(Step 3 前),它是 task 元数据痕迹 → 留着不删。 + Step 3 后 state.json 已废除,task_dir 只承担 skill 产物。任何文件 / 子目录 + 都算实质痕迹,保留 task。DB tasks 行随之 DELETE(messages 走 CASCADE)。 """ if session.n_user_msgs() > 0: return False @@ -98,14 +96,12 @@ def _cleanup_if_empty(task_dir, session, console=None) -> bool: # 目录都没建,只清 DB 占位行(若 Session 早调过 ensure_local_task_row) _delete_task_db_row(session.task_id) return False - if any(p.is_dir() for p in entries): + meaningful = [ + p for p in entries + if not (p.is_file() and p.name.endswith(".tmp")) + ] + if meaningful: return False - meaningful = {p.name for p in entries if p.is_file() and not p.name.endswith(".tmp")} - if meaningful - {"state.json"}: - # 还有其他文件(产物)→ 保留 - return False - # state.json 单独存在不算实质内容(/done /abandon /desc 也会留下,但 n_user_msgs=0 - # 时这些状态变更没意义,允许删) shutil.rmtree(task_dir, ignore_errors=True) _delete_task_db_row(session.task_id) if console is not None: @@ -142,41 +138,35 @@ def _task_has_messages(task_id_str: str) -> bool: def _list_task_rows(workspace_dir, limit=20, status=None): """返回 [(updated_at, task_id_str, status, mode, model, tokens, n_msgs, desc), ...] 时间降序。 - Step 2 阶段:tasks 元字段(mode/desc/status/model/tokens)仍从 state.json 读, - 时间排序和 task_id 列表从 PG tasks 表读(messages 数也从 PG 数)。Step 3 后统一走 DB。 + Step 3 后:全字段从 PG tasks 表读,messages 数从 PG 数;workspace_dir 仅用于 + 保持签名向后兼容(不再读 state.json)。status 过滤走 SQL WHERE。 """ from sqlalchemy import func, select from core.storage import session_scope from core.storage.models import Message, Task - tdir = tasks_dir(workspace_dir) + _ = workspace_dir # 签名占位,Step 3 后已不需要 with session_scope() as s: - rows_db = s.execute( - select(Task.task_id, Task.updated_at).order_by(Task.updated_at.desc()).limit(limit * 3) - ).all() + q = select( + Task.task_id, Task.updated_at, Task.status, Task.mode, + Task.model, Task.model_profile, Task.tokens_prompt, + Task.tokens_completion, Task.description, + ).order_by(Task.updated_at.desc()) + if status: + q = q.where(Task.status == status) + rows_db = s.execute(q.limit(limit)).all() msg_counts = dict(s.execute( select(Message.task_id, func.count()).group_by(Message.task_id) ).all()) rows = [] - for tid, updated_at in rows_db: - d = tdir / str(tid) - st = TaskState.load(d) - if st is None: - # 仅 DB 有行(lazy 占位但 state.json 没写过),展示最小信息 - n = msg_counts.get(tid, 0) - if n == 0: - continue - rows.append((updated_at, str(tid), "active", "", "", 0, n, "")) - continue - if status and st.status != status: - continue + for tid, updated_at, st_, md, mdl, prof, tp, tc, desc in rows_db: n = msg_counts.get(tid, 0) rows.append(( - updated_at, str(tid), st.status, st.mode, - st.model_profile or st.model, st.tokens_total, n, st.description, + updated_at, str(tid), st_, md, + prof or mdl, (tp or 0) + (tc or 0), n, desc, )) - return rows[:limit] + return rows @cli.command() @@ -329,18 +319,18 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None: continue if cmd == "/done": task_state.status = "completed" - task_state.save(task_dir) + task_state.save() console.print(f"[ok]task {sid} marked completed[/ok]") break if cmd == "/abandon": task_state.status = "abandoned" - task_state.save(task_dir) + task_state.save() console.print(f"[warn]task {sid} marked abandoned[/warn]") break if cmd.startswith("/desc"): new_desc = cmd[len("/desc"):].strip() task_state.description = new_desc - task_state.save(task_dir) + task_state.save() console.print(f"[info]description set: {new_desc!r}[/info]") continue if cmd.startswith("/export"): @@ -378,7 +368,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None: except Exception as e: console.print(f"[err]运行错误:[/err] {type(e).__name__}: {e}") finally: - sync_task_tokens(task_state, task_dir, agent.llm) + sync_task_tokens(task_state, agent.llm) @cli.command() @@ -386,7 +376,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None: @click.option("--limit", default=20, help="显示最近 N 个") @click.option("--status", default=None, help="只看某状态: active / completed / abandoned") def tasks(workspace: str, limit: int, status: str) -> None: - """列出已有 task(新格式,workspace/tasks//state.json)。""" + """列出已有 task(从 PG tasks 表读,按 updated_at 降序)。""" cfg = load_config() ws = resolve_workspace(workspace, cfg) rows = _list_task_rows(ws, limit=limit, status=status) diff --git a/core/export_docx.py b/core/export_docx.py index 37979a5..0cb1448 100644 --- a/core/export_docx.py +++ b/core/export_docx.py @@ -1,4 +1,4 @@ -"""把 task 的 PG messages 表 + state.json 渲染为 .docx 对话稿。 +"""把 task 的 PG messages 表 + tasks 元数据 渲染为 .docx 对话稿。 布局: - 文档开头 meta 表(task_id / 模式 / 描述 / 模型 / 创建时间 / 消息数 / tokens / 导出时间) @@ -11,7 +11,7 @@ - 顶层函数 export_chat_to_docx(task_dir, out_path=None, ...) - CLI 子命令 `python cli.py export ` 与 REPL `/export []` 都走它 -§7 B Step 2 后:messages 从 PG 读(按 task_id);state.json 还在 task_dir 下(Step 3 删)。 +§7 B Step 3 后:meta 和 messages 都从 PG 读(state.json 已废除)。 """ from __future__ import annotations @@ -21,6 +21,8 @@ from pathlib import Path from typing import Optional from uuid import UUID +from core.task import TaskState + from docx import Document from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.oxml import OxmlElement @@ -184,7 +186,6 @@ def _add_meta_block( status = task_state.get("status") or "" model = meta.get("model") or task_state.get("model") or "" profile = meta.get("model_profile") or task_state.get("model_profile") or "" - cwd = meta.get("cwd") or task_state.get("cwd") or "" created = meta.get("created_at") or task_state.get("created_at") or "" updated = task_state.get("updated_at") or "" tp = task_state.get("tokens_prompt", 0) @@ -197,7 +198,6 @@ def _add_meta_block( ("状态", status), ("模型", model), ("Profile", profile), - ("CWD", cwd), ("创建时间", created), ("更新时间", updated), ("消息数", str(n_msgs)), @@ -325,15 +325,15 @@ def export_chat_to_docx( ) -> Path: """渲染 task 对话为 .docx,返回写入路径。 - task_dir 名字必须是 UUID(messages 从 PG 按 task_id 读)。state.json 仍在 - task_dir 下(Step 3 前)提供 mode/desc/tokens 等 meta。 + task_dir 目录名必须是 UUID(messages / tasks 元数据都按该 task_id 从 PG 读)。 """ try: tid = UUID(task_dir.name) except ValueError: raise ValueError(f"task_dir name 不是有效 UUID: {task_dir.name}") - # 从 PG 读 messages,按 idx 排序 + # 从 PG 读 messages 与 tasks 元数据 + from dataclasses import asdict from sqlalchemy import select from core.storage import session_scope from core.storage.models import Message as MessageRow @@ -344,21 +344,18 @@ def export_chat_to_docx( ).scalars().all() messages = [dict(r.payload) for r in rows] - state_path = task_dir / "state.json" - task_state: dict = {} - if state_path.exists(): - try: - task_state = json.loads(state_path.read_text(encoding="utf-8")) or {} - except Exception: - task_state = {} + st = TaskState.load(tid) + task_state: dict = asdict(st) if st is not None else {} if out_path is None: out_path = task_dir / f"chat_{tid}.docx" - meta = {"id": str(tid), "model": task_state.get("model", ""), - "model_profile": task_state.get("model_profile", ""), - "cwd": task_state.get("cwd", ""), - "created_at": task_state.get("created_at", "")} + meta = { + "id": str(tid), + "model": task_state.get("model", ""), + "model_profile": task_state.get("model_profile", ""), + "created_at": task_state.get("created_at", ""), + } doc = _init_doc() _add_meta_block(doc, meta, task_state, len(messages), task_dir) diff --git a/core/session.py b/core/session.py index b14ad00..0c04e52 100644 --- a/core/session.py +++ b/core/session.py @@ -77,12 +77,17 @@ class Session: return # 首次写入前,让 tasks 行就位。`ensure_local_task_row` 在 storage 层 idempotent。 + # meta 字段(mode/description/reasoning_effort)走 INSERT 一次性带入,避免 + # 首次 append 后 _list_task_rows 看到空 meta;后续 task_state.save() 走 UPSERT 覆盖。 from .storage.utils import ensure_local_task_row ensure_local_task_row( task_id=self.task_id, task_dir=self.meta.get("task_dir", ""), + mode=self.meta.get("mode", ""), + description=self.meta.get("description", ""), model=self.meta.get("model", ""), model_profile=self.meta.get("model_profile", ""), + reasoning_effort=self.meta.get("reasoning_effort", ""), ) with session_scope() as s: diff --git a/core/storage/__init__.py b/core/storage/__init__.py index c068e93..a5f4c2b 100644 --- a/core/storage/__init__.py +++ b/core/storage/__init__.py @@ -13,7 +13,7 @@ from .engine import ( session_scope, ) from .models import SENTINEL_USER_ID -from .utils import ensure_local_task_row, get_task +from .utils import ensure_local_task_row, get_task, update_task, upsert_task __all__ = [ "SENTINEL_USER_ID", @@ -22,4 +22,6 @@ __all__ = [ "get_engine", "get_task", "session_scope", + "update_task", + "upsert_task", ] diff --git a/core/storage/utils.py b/core/storage/utils.py index 1ac99df..ec861fa 100644 --- a/core/storage/utils.py +++ b/core/storage/utils.py @@ -1,10 +1,10 @@ -"""Storage 辅助工具:idempotent task 行创建、本地形态简化封装。""" +"""Storage 辅助:tasks 表的 idempotent 创建 / UPSERT / UPDATE。""" from __future__ import annotations -from typing import Optional +from typing import Any, Optional from uuid import UUID -from sqlalchemy import select +from sqlalchemy import func, select, update from sqlalchemy.dialects.postgresql import insert from .engine import session_scope @@ -21,13 +21,11 @@ def ensure_local_task_row( reasoning_effort: str = "", user_id: UUID = SENTINEL_USER_ID, ) -> None: - """本地形态 idempotent INSERT tasks 行。 + """占位 INSERT(ON CONFLICT DO NOTHING)—— 不覆盖已有字段。 - 用于 Session.append 首次写消息前打底,Step 2 阶段字段都是占位值; - Step 3 引入 TaskState ORM 后,TaskState.save 会把字段更新成真实值。 - - PG `INSERT ... ON CONFLICT DO NOTHING` 保证幂等且单 SQL,无 SELECT-then-INSERT - 竞态。 + 用于 `Session.append` 在首条非 system 消息前打底 tasks 行,避免 messages + FK 违反。字段是 build_agent 阶段已知的最小集;TaskState.save 之后会通过 + `upsert_task` 把真实字段(desc/status/tokens 等)写进去。 """ stmt = ( insert(Task) @@ -47,6 +45,48 @@ def ensure_local_task_row( s.execute(stmt) +def upsert_task( + task_id: UUID, + *, + user_id: UUID = SENTINEL_USER_ID, + **fields: Any, +) -> None: + """INSERT ... ON CONFLICT DO UPDATE —— TaskState.save 的落地点。 + + fields 可包含 tasks 表任意可写列(task_dir/mode/description/status/model/ + model_profile/reasoning_effort/tokens_prompt/tokens_completion/cost_usd)。 + 不传的字段在 INSERT 时走 ORM 默认值,UPDATE 时不动。 + """ + values = {"task_id": task_id, "user_id": user_id, **fields} + stmt = insert(Task).values(**values) + update_cols = {k: stmt.excluded[k] for k in fields} + if update_cols: + # ORM 的 onupdate=func.now() 只在 ORM-level UPDATE 触发,DO UPDATE 是 raw DML + # 不会自动刷 updated_at —— 这里显式追加。 + update_cols["updated_at"] = func.now() + stmt = stmt.on_conflict_do_update( + index_elements=["task_id"], set_=update_cols + ) + else: + stmt = stmt.on_conflict_do_nothing(index_elements=["task_id"]) + with session_scope() as s: + s.execute(stmt) + + +def update_task(task_id: UUID, **fields: Any) -> int: + """UPDATE 已有 tasks 行;不存在则 no-op(返回 0)。 + + ORM-level update 会带 onupdate=func.now() 自动刷 updated_at,无需显式传。 + """ + if not fields: + return 0 + with session_scope() as s: + result = s.execute( + update(Task).where(Task.task_id == task_id).values(**fields) + ) + return result.rowcount or 0 + + def get_task(task_id: UUID) -> Optional[Task]: """读 tasks 行,不存在返回 None。""" with session_scope() as s: diff --git a/core/task.py b/core/task.py index 5383a30..79a6dde 100644 --- a/core/task.py +++ b/core/task.py @@ -1,64 +1,82 @@ -"""任务状态: DESIGN.md §7.1 规约,落 `/state.json`。 +"""任务元数据: Session 上层,落 PG `tasks` 表(§7 B Step 3)。 -Task 是 Session 的上层概念 —— Session 只管对话消息,Task 还管 mode/description/ -status/tokens/cost/timestamps,这些是跨轮次共享、和文件系统状态对齐的元数据。 +Session 只管对话消息;Task 管 mode/description/status/model/tokens/cost/时间戳 +—— 跨轮次共享的元数据,DESIGN.md §7.1 / §7.4 规约。 -文件路径约定(workspace/ 下): - tasks//state.json ← 此模块负责 - tasks//messages.json ← Session 落 +state.json 已废除;字段从 PG 读出,save() 走 INSERT ... ON CONFLICT DO UPDATE。 +created_at / updated_at 由 PG server_default / onupdate 管,Python 侧只读。 """ from __future__ import annotations -import json -from dataclasses import asdict, dataclass, fields +from dataclasses import dataclass from datetime import datetime -from pathlib import Path from typing import Optional +from uuid import UUID -from .session import atomic_write_text +from .storage import upsert_task +from .storage.models import Task as TaskRow +from .storage.utils import get_task + + +def _iso(dt: Optional[datetime]) -> str: + return dt.isoformat(timespec="seconds") if dt else "" @dataclass class TaskState: - task_id: str - mode: str = "" # 自由形式: coding / ppt / proposal / general / 自定 + task_id: str # UUID 字符串形式(对外展示用,DB 仍是 UUID) + task_dir: str = "" # 绝对路径或留空(留空= ChatGPT thread 默认派生,§7.1) + mode: str = "" # coding / ppt / proposal / general / 自由形式 description: str = "" # 一句话描述,便于列表识别 status: str = "active" # active / completed / abandoned model: str = "" # caps.model_id model_profile: str = "" # 档案名,如 deepseek_v4.flash reasoning_effort: str = "" - cwd: str = "" # 任务的工作基目录 - created_at: str = "" # ISO 时间戳 - updated_at: str = "" tokens_prompt: int = 0 tokens_completion: int = 0 - cost_usd: float = 0.0 # 暂不算,留位 + cost_usd: float = 0.0 + created_at: str = "" # PG server_default 填,Python 侧只读 + updated_at: str = "" @property def tokens_total(self) -> int: return self.tokens_prompt + self.tokens_completion - def save(self, task_dir: Path) -> None: - self.updated_at = datetime.now().isoformat(timespec="seconds") - atomic_write_text( - task_dir / "state.json", - json.dumps(asdict(self), ensure_ascii=False, indent=2), + def save(self) -> None: + """UPSERT 到 PG。created_at / updated_at 不参与写入(PG 自动管)。""" + upsert_task( + UUID(self.task_id), + task_dir=self.task_dir, + mode=self.mode, + description=self.description, + status=self.status, + model=self.model, + model_profile=self.model_profile, + reasoning_effort=self.reasoning_effort, + tokens_prompt=self.tokens_prompt, + tokens_completion=self.tokens_completion, ) @classmethod - def load(cls, task_dir: Path) -> Optional["TaskState"]: - p = task_dir / "state.json" - if not p.exists(): - return None - try: - data = json.loads(p.read_text(encoding="utf-8")) - except Exception: - return None - if not isinstance(data, dict): - return None - # 容忍 schema 演化:只取已知字段,缺失字段用 dataclass 默认 - known = {f.name for f in fields(cls)} - kwargs = {k: v for k, v in data.items() if k in known} - if "task_id" not in kwargs: - kwargs["task_id"] = task_dir.name - return cls(**kwargs) + def from_row(cls, row: TaskRow) -> "TaskState": + return cls( + task_id=str(row.task_id), + task_dir=row.task_dir, + mode=row.mode, + description=row.description, + status=row.status, + model=row.model, + model_profile=row.model_profile, + reasoning_effort=row.reasoning_effort, + tokens_prompt=row.tokens_prompt, + tokens_completion=row.tokens_completion, + cost_usd=float(row.cost_usd or 0), + created_at=_iso(row.created_at), + updated_at=_iso(row.updated_at), + ) + + @classmethod + def load(cls, task_id: UUID) -> Optional["TaskState"]: + """从 PG 读;不存在返回 None。""" + row = get_task(task_id) + return cls.from_row(row) if row is not None else None diff --git a/main.py b/main.py index 92718a7..cd3eb51 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,9 @@ """装配入口: 读 config → 加载 capabilities/skills → 构造 LLM/tools/session/loop。 -存储布局(§7 B Step 2 后): - workspace/tasks//state.json ← TaskState(Step 3 前还在,Step 3 删) - PG messages ← Session 消息(Step 2 切换) -task_id 用 UUID,task_dir = workspace/tasks//。 +存储布局(§7 B Step 3 后): + PG tasks / messages ← Task 元数据 + Session 消息 + workspace/tasks// ← task_dir,只承担 skill 产物 +task_id 用 UUID,state.json 已删除(元数据全在 PG)。 """ from __future__ import annotations @@ -168,38 +168,31 @@ def build_agent( "task_dir": str(task_dir), "model": caps.model_id, "model_profile": model, + "mode": mode, + "description": description, + "reasoning_effort": caps.default_reasoning_effort or "", } if resume: session = Session.load(task_id, system_prompt=system_prompt, meta=meta) - task_state = TaskState.load(task_dir) + task_state = TaskState.load(task_id) if task_state is None: - # tasks 行存在但 state.json 缺失:兜底重建(Step 3 后该分支会消失) + # tasks 行不存在 —— 理论上 resolve_task_id 已经定位到 DB 行了,走到这里 + # 说明被并发删了,兜底构造空 state(不主动 save,等下条 append / 命令) task_state = TaskState( - task_id=sid, mode=mode, description=description, status="active", + task_id=sid, task_dir=str(task_dir), + mode=mode, description=description, status="active", model=caps.model_id, model_profile=model, - cwd=str(tool_base), created_at=now_iso, ) - task_state.save(task_dir) - else: - # 提示 cwd 漂移(老 state.json 保留过启动时 cwd) - saved_cwd = task_state.cwd - if saved_cwd and console is not None and saved_cwd != str(tool_base): - console.print( - f"[warn]提示:[/warn] 当前 cwd 与 task 记录不同 —— " - f"工具基于 current cwd,不会自动切回。\n" - f" task cwd: [info]{saved_cwd}[/info]\n" - f" current cwd: [info]{tool_base}[/info]" - ) else: session = Session(task_id=task_id, system_prompt=system_prompt, meta=meta) - # 懒创建:Session 不触发 DB 写,Task 行在首条 user 消息 append 时由 - # ensure_local_task_row 插入;state.json 在 task_state.save 第一次调时落地。 + # 懒创建:TaskState 仅内存。tasks 行在首条 user 消息 append 时由 + # ensure_local_task_row 占位 INSERT;首次 sync_task_tokens 或 /done /desc 走 upsert 覆盖。 task_state = TaskState( - task_id=sid, mode=mode, description=description, status="active", + task_id=sid, task_dir=str(task_dir), + mode=mode, description=description, status="active", model=caps.model_id, model_profile=model, reasoning_effort=caps.default_reasoning_effort or "", - cwd=str(tool_base), created_at=now_iso, ) tools = {} @@ -220,9 +213,19 @@ def build_agent( return agent, session, sid, task_state, task_dir -def sync_task_tokens(task_state: TaskState, task_dir: Path, llm: LLM) -> None: - """每轮 agent.run 后调,把 LLM 累计 tokens 写回 state.json。""" +def sync_task_tokens(task_state: TaskState, llm: LLM) -> None: + """每轮 agent.run 后调,把 LLM 累计 tokens UPDATE 到 PG tasks 表。 + + 走 update_task 而非 task_state.save() —— 只更 tokens 两列,避免无谓全字段 UPSERT + 且 ORM-level update 自动刷 updated_at。 + """ + from uuid import UUID + from core.storage import update_task tc = llm.token_counter task_state.tokens_prompt = tc.prompt_tokens task_state.tokens_completion = tc.completion_tokens - task_state.save(task_dir) + update_task( + UUID(task_state.task_id), + tokens_prompt=tc.prompt_tokens, + tokens_completion=tc.completion_tokens, + )