diff --git a/PROGRESS.md b/PROGRESS.md index bcbacb6..9931318 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。 -最后更新:2026-05-14 +最后更新:2026-05-14(Step 2) --- @@ -15,7 +15,7 @@ | 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 | | 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 | | 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill | -| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 进行中(Step 1 基建 ✅;Step 2-6 待) | +| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 进行中(Step 1 基建 ✅;Step 2 Session ORM ✅;Step 3-4/6 待;Step 5 migrate-from-fs 取消) | --- @@ -30,6 +30,7 @@ - **05-12 / §7 改写**:platform/core 多租户方案废弃,改 user-direct(folder-centric、task/messages 入 PG、no-subtask、hard cascade)。 - **05-14 / §7.1 心智模型修正**:`Folder-centric` → **Task 一等公民 + Dir 文件副视图**(双视图正交,dir 不是 task 父容器);task_dir 留空=一次性对话 / 指定=项目化二分语义入文。 - **05-14 / §7 B Step 1 基建**:`core/storage/{engine,models}.py` SQLAlchemy 2.x ORM(users/tasks/messages/runs/usage_events 5 表)+ alembic(初版 migration `0001_initial_schema`,GIN/复合索引)+ `cli db {upgrade,downgrade,current}` 子命令组 + 本地 sentinel user(`00000000-...`)+ `ZCBOT_DB_URL` 必填(未设给清晰报错,不引导 docker)。已在远端测试 PG 跑通 `db upgrade head`。 +- **05-14 / §7 B Step 2 Session ORM**:`core/session.py` 重写,messages 走 PG(append-only,jsonb,idx 严格递增);system prompt 不入库(每次 build_agent 重建);`Session.load(task_id, system_prompt=...)` resume 接口;`ensure_local_task_row` idempotent UPSERT(`INSERT ... ON CONFLICT DO NOTHING`)在首条非 system 消息前打底 tasks 行。task_id 切换为 UUID(原时间戳格式废弃,旧 workspace **不做兼容**)。main.py/cli.py 适配:`resolve_task_id`(UUID 前缀解析)、`_cleanup_if_empty` 双检查(DB messages + FS 产物)、`_list_task_rows` 改读 PG。`core/export_docx.py` 改从 PG 读 messages。端到端 build/append/resume/cleanup smoke 全绿。**取消 Step 5 migrate-from-fs**(用户决定不兼容旧 workspace)。 --- @@ -54,21 +55,22 @@ core/loop.py 152 ← §7 A: sink.emit core/sinks.py 101 ← §7 A core/ui.py 38 core/probe.py 243 -core/session.py 93 ← +atomic_write_text +core/session.py 148 ← §7 B Step 2: ORM 改写,task_id anchored core/skills.py 81 core/task.py 64 core/memory.py 76 -core/export_docx.py 372 -core/storage/__init__.py 22 ← §7 B Step 1 +core/export_docx.py 379 ← §7 B Step 2: 改从 PG 读 messages +core/storage/__init__.py 25 ← §7 B Step 1-2 core/storage/engine.py 80 ← §7 B Step 1 core/storage/models.py 124 ← §7 B Step 1 +core/storage/utils.py 55 ← §7 B Step 2: ensure_local_task_row tools/base.py 34 tools/fs.py 182 tools/shell.py 94 tools/run_python.py 84 tools/skill_tool.py 45 -main.py 210 -cli.py 484 ← §7 B Step 1: +db 子命令组 +main.py 228 ← §7 B Step 2: resolve_task_id (UUID) +cli.py 526 ← §7 B Step 1-2: +db 子命令组 / +_task_has_messages db/migrations/env.py 61 ← §7 B Step 1 db/migrations/versions/ 0001_initial_schema.py 125 ← §7 B Step 1 @@ -82,12 +84,11 @@ Python 合计 ~2841 行 ## 下一步候选(性价比排序) -1. **§7 B 剩余 Step 2-6**(~4 天) - - Step 2 Session ORM 改造(append/load → PG messages 表) - - Step 3 TaskState ORM 改造(state.json → PG tasks 表;加 task_dir 字段语义) - - Step 4 main.py / cli.py 适配 ORM(`resolve_task_messages_path` 重构、`_cleanup_if_empty` 新双检查、UUID task_id 处理) - - Step 5 `cli migrate-from-fs`(导旧 workspace/tasks/*/ → PG,idempotent) +1. **§7 B 剩余 Step 3-4 / 6**(~2 天) + - Step 3 TaskState ORM 改造(state.json → PG tasks 表;tasks.cwd 字段去掉,改读 task_dir;sync_task_tokens 走 UPDATE) + - Step 4 main.py / cli.py 收尾(`_list_task_rows` 全 DB;清掉 state.json 路径剩余分支) - Step 6 no-subtask SQL 校验(`new LIKE existing/%` cascade) + - ~~Step 5 migrate-from-fs~~(取消,不兼容旧 workspace) 2. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。 3. **Phase 7 更多 skill / 模型档案**(持续)。 4. **Proposal mermaid 预渲染**(~半天)—— ASCII 透传不够用时再上 `mmdc`。 diff --git a/cli.py b/cli.py index 3ebf8f3..484c164 100644 --- a/cli.py +++ b/cli.py @@ -83,57 +83,99 @@ def db_current() -> None: def _cleanup_if_empty(task_dir, session, console=None) -> bool: - """切走前清理 task_dir。三条都满足才删: - 1) session 没有 user 消息 - 2) task_dir 在磁盘上(懒创建后,没说话就没目录,直接 no-op) - 3) 目录里只剩 messages.json(state.json 存在 = `/done /abandon /desc` 留下的显式痕迹,要保) - 原子写留下的 `*.tmp` 孤儿不算痕迹,放过。 + """切走前清理空 task。三条都满足才删: + 1) session 在内存没有 user 消息 + 2) task_dir 在 FS 上无产物(懒创建后没说话就没目录,直接 no-op) + 3) PG tasks 行如果有也一并删(messages 走 CASCADE) + + 注:state.json 还在 task_dir 下(Step 3 前),它是 task 元数据痕迹 → 留着不删。 """ - if any(m.get("role") == "user" for m in session.messages): + if session.n_user_msgs() > 0: return False try: entries = list(task_dir.iterdir()) except FileNotFoundError: + # 目录都没建,只清 DB 占位行(若 Session 早调过 ensure_local_task_row) + _delete_task_db_row(session.task_id) return False if any(p.is_dir() for p in entries): return False - meaningful = { - p.name for p in entries - if p.is_file() and not p.name.endswith(".tmp") - } - if meaningful - {"messages.json"}: + meaningful = {p.name for p in entries if p.is_file() and not p.name.endswith(".tmp")} + if meaningful - {"state.json"}: + # 还有其他文件(产物)→ 保留 return False + # state.json 单独存在不算实质内容(/done /abandon /desc 也会留下,但 n_user_msgs=0 + # 时这些状态变更没意义,允许删) shutil.rmtree(task_dir, ignore_errors=True) + _delete_task_db_row(session.task_id) if console is not None: - console.print(f"[muted]清理空 task {task_dir.name}[/muted]") + console.print(f"[muted]cleaned empty task {str(session.task_id)[:8]}[/muted]") return True +def _delete_task_db_row(task_id) -> None: + """删 PG tasks 行(messages 走 CASCADE)。task_id 可能从未入库,DELETE 0 行无副作用。""" + from sqlalchemy import delete + from core.storage import session_scope + from core.storage.models import Task + with session_scope() as s: + s.execute(delete(Task).where(Task.task_id == task_id)) + + +def _task_has_messages(task_id_str: str) -> bool: + """PG 里该 task_id 有至少一条 message。task_id 字符串(UUID 完整形式)。""" + from uuid import UUID + from sqlalchemy import select + from core.storage import session_scope + from core.storage.models import Message + try: + tid = UUID(task_id_str) + except ValueError: + return False + with session_scope() as s: + row = s.execute( + select(Message.message_id).where(Message.task_id == tid).limit(1) + ).scalar_one_or_none() + return row is not None + + def _list_task_rows(workspace_dir, limit=20, status=None): - """返回 [(mtime, task_id, status, mode, model, tokens, n_msgs, desc), ...] mtime 降序。""" + """返回 [(updated_at, task_id_str, status, mode, model, tokens, n_msgs, desc), ...] 时间降序。 + + Step 2 阶段:tasks 元字段(mode/desc/status/model/tokens)仍从 state.json 读, + 时间排序和 task_id 列表从 PG tasks 表读(messages 数也从 PG 数)。Step 3 后统一走 DB。 + """ + from sqlalchemy import func, select + from core.storage import session_scope + from core.storage.models import Message, Task + tdir = tasks_dir(workspace_dir) + with session_scope() as s: + rows_db = s.execute( + select(Task.task_id, Task.updated_at).order_by(Task.updated_at.desc()).limit(limit * 3) + ).all() + msg_counts = dict(s.execute( + select(Message.task_id, func.count()).group_by(Message.task_id) + ).all()) + rows = [] - for d in tdir.iterdir(): - if not d.is_dir(): - continue - msg_path = d / "messages.json" - if not msg_path.exists(): - continue + for tid, updated_at in rows_db: + d = tdir / str(tid) st = TaskState.load(d) if st is None: + # 仅 DB 有行(lazy 占位但 state.json 没写过),展示最小信息 + n = msg_counts.get(tid, 0) + if n == 0: + continue + rows.append((updated_at, str(tid), "active", "", "", 0, n, "")) continue if status and st.status != status: continue - try: - data = json.loads(msg_path.read_text(encoding="utf-8")) - n = len(data.get("messages", [])) - except Exception: - n = -1 + n = msg_counts.get(tid, 0) rows.append(( - msg_path.stat().st_mtime, st.task_id, st.status, st.mode, + updated_at, str(tid), st.status, st.mode, st.model_profile or st.model, st.tokens_total, n, st.description, )) - rows.sort(reverse=True) return rows[:limit] @@ -162,7 +204,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None: if resume: console.print( - f"[ok]恢复 task[/ok] [bold]{sid}[/bold] ({len(session.messages)} 条消息) " + f"[ok]恢复 task[/ok] [bold]{sid[:8]}[/bold] ({len(session.messages)} 条消息) " f"model: [accent]{agent.caps.model_id}[/accent]" ) else: @@ -170,7 +212,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None: if task_state.mode or task_state.description: meta_tail = f" mode={task_state.mode!r} desc={task_state.description!r}" console.print( - f"[ok]新 task[/ok] [bold]{sid}[/bold] " + f"[ok]新 task[/ok] [bold]{sid[:8]}[/bold] " f"model: [accent]{agent.caps.model_id}[/accent]{meta_tail}" ) console.print( @@ -206,7 +248,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None: except Exception as e: console.print(f"[err]新建失败:[/err] {type(e).__name__}: {e}") continue - console.print(f"[ok]新 task[/ok] [bold]{sid}[/bold]") + console.print(f"[ok]新 task[/ok] [bold]{sid[:8]}[/bold]") continue if cmd.startswith("/resume"): arg = cmd[len("/resume"):].strip() @@ -236,7 +278,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None: for i, (_, tid, st, md, _mdl, _tok, n, dsc) in enumerate(rs, 1): c = sc.get(st, "info") d_show = dsc if len(dsc) <= 50 else dsc[:47] + "..." - tbl.add_row(str(i), tid, f"[{c}]{st}[/{c}]", md, str(n), d_show) + tbl.add_row(str(i), tid[:8], f"[{c}]{st}[/{c}]", md, str(n), d_show) console.print(tbl) try: sel = Prompt.ask("[user]选编号或输入 task_id (回车取消)[/user]", console=console, default="") @@ -267,7 +309,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None: console.print(f"[err]恢复失败:[/err] {type(e).__name__}: {e}") continue console.print( - f"[ok]切到 task[/ok] [bold]{sid}[/bold] ({len(session.messages)} 条消息) " + f"[ok]切到 task[/ok] [bold]{sid[:8]}[/bold] ({len(session.messages)} 条消息) " f"model: [accent]{agent.caps.model_id}[/accent]" ) continue @@ -313,9 +355,9 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None: continue arg = rs[0][1] target_dir = tasks_dir(ws_dir) / arg - if not (target_dir / "messages.json").exists(): + if not _task_has_messages(target_dir.name): console.print( - f"[warn]无可导出内容: {target_dir.name} 还没有消息[/warn]" + f"[warn]无可导出内容: {target_dir.name[:8]} 还没有消息[/warn]" ) continue try: @@ -364,7 +406,7 @@ def tasks(workspace: str, limit: int, status: str) -> None: for _, tid, st, mode, model, tok, n, desc in rows: c = sc.get(st, "info") d_show = desc if len(desc) <= 50 else desc[:47] + "..." - tbl.add_row(tid, f"[{c}]{st}[/{c}]", mode, model, str(n), str(tok), d_show) + tbl.add_row(tid[:8], f"[{c}]{st}[/{c}]", mode, model, str(n), str(tok), d_show) make_console().print(tbl) @@ -398,8 +440,8 @@ def export(task_id: str, workspace: str, output: str, include_system: bool, task_id = rs[0][1] td = tasks_dir(ws) / task_id - if not (td / "messages.json").exists(): - console.print(f"[err]task 不存在或无 messages.json:[/err] {td}") + if not _task_has_messages(task_id): + console.print(f"[err]task 不存在或无 messages:[/err] {task_id}") sys.exit(1) out = Path(output).resolve() if output else None diff --git a/core/export_docx.py b/core/export_docx.py index a64e4e7..37979a5 100644 --- a/core/export_docx.py +++ b/core/export_docx.py @@ -1,4 +1,4 @@ -"""把 task 的 messages.json 渲染为 .docx 对话稿。 +"""把 task 的 PG messages 表 + state.json 渲染为 .docx 对话稿。 布局: - 文档开头 meta 表(task_id / 模式 / 描述 / 模型 / 创建时间 / 消息数 / tokens / 导出时间) @@ -10,6 +10,8 @@ 调用入口: - 顶层函数 export_chat_to_docx(task_dir, out_path=None, ...) - CLI 子命令 `python cli.py export ` 与 REPL `/export []` 都走它 + +§7 B Step 2 后:messages 从 PG 读(按 task_id);state.json 还在 task_dir 下(Step 3 删)。 """ from __future__ import annotations @@ -17,6 +19,7 @@ import json from datetime import datetime from pathlib import Path from typing import Optional +from uuid import UUID from docx import Document from docx.enum.text import WD_ALIGN_PARAGRAPH @@ -163,7 +166,7 @@ def _format_args(args_str: str) -> str: # ───────────────────────── Meta 区块 ───────────────────────── def _add_meta_block( - doc: Document, meta: dict, task_state: dict, n_msgs: int, source_path: Path + doc: Document, meta: dict, task_state: dict, n_msgs: int, task_dir: Path ) -> None: p = doc.add_paragraph() p.alignment = WD_ALIGN_PARAGRAPH.LEFT @@ -199,7 +202,7 @@ def _add_meta_block( ("更新时间", updated), ("消息数", str(n_msgs)), ("Tokens", f"{tp} prompt / {tc} completion / {tp + tc} total"), - ("源文件", str(source_path)), + ("Task dir", str(task_dir)), ("导出时间", datetime.now().isoformat(timespec="seconds")), ] @@ -320,26 +323,26 @@ def export_chat_to_docx( tool_head: int = 1000, tool_tail: int = 500, ) -> Path: - """渲染 task_dir 下的 messages.json 为 .docx,返回写入路径。 + """渲染 task 对话为 .docx,返回写入路径。 - out_path 缺省落到 task_dir/chat_.docx。 - include_system 默认 False(system prompt 信息密度低,默认跳过)。 - include_reasoning 默认 True(模型思考过程,有观察价值)。 - tool 结果默认前 1000 + 后 500,中间省略。 + task_dir 名字必须是 UUID(messages 从 PG 按 task_id 读)。state.json 仍在 + task_dir 下(Step 3 前)提供 mode/desc/tokens 等 meta。 """ - msg_path = task_dir / "messages.json" - if not msg_path.exists(): - raise FileNotFoundError(f"messages.json 不存在: {msg_path}") + try: + tid = UUID(task_dir.name) + except ValueError: + raise ValueError(f"task_dir name 不是有效 UUID: {task_dir.name}") - data = json.loads(msg_path.read_text(encoding="utf-8")) - if isinstance(data, list): - meta = {} - messages = data - elif isinstance(data, dict): - meta = data.get("meta") or {} - messages = data.get("messages") or [] - else: - raise ValueError(f"messages.json 格式不识别: {type(data).__name__}") + # 从 PG 读 messages,按 idx 排序 + from sqlalchemy import select + from core.storage import session_scope + from core.storage.models import Message as MessageRow + + with session_scope() as s: + rows = s.execute( + select(MessageRow).where(MessageRow.task_id == tid).order_by(MessageRow.idx) + ).scalars().all() + messages = [dict(r.payload) for r in rows] state_path = task_dir / "state.json" task_state: dict = {} @@ -350,11 +353,15 @@ def export_chat_to_docx( task_state = {} if out_path is None: - tid = meta.get("id") or task_state.get("task_id") or task_dir.name out_path = task_dir / f"chat_{tid}.docx" + meta = {"id": str(tid), "model": task_state.get("model", ""), + "model_profile": task_state.get("model_profile", ""), + "cwd": task_state.get("cwd", ""), + "created_at": task_state.get("created_at", "")} + doc = _init_doc() - _add_meta_block(doc, meta, task_state, len(messages), msg_path) + _add_meta_block(doc, meta, task_state, len(messages), task_dir) doc.add_paragraph() # 与 meta 表保持一行间距 for msg in messages: diff --git a/core/session.py b/core/session.py index d5ed433..b14ad00 100644 --- a/core/session.py +++ b/core/session.py @@ -1,19 +1,24 @@ -"""会话: 内存中的消息列表 + meta(cwd / model / created_at) + 落盘 json。 +"""会话: 内存中的消息列表 + meta + 落 PG `messages` 表。 -文件格式: -{ - "meta": {"id": "...", "created_at": "...", "cwd": "...", "model": "..."}, - "messages": [...] -} +§7 B Step 2:消息走 ORM(append-only, idx 严格递增,payload jsonb)。 -兼容老格式: 如果文件根是 list,就当 messages 处理,meta 为空。 +system prompt **不入库** —— 每次 build_agent 重建拼到 messages[0](§3.7 +"memory 演化即时生效")。Session 内存里仍维持 [system, user_1, assistant_1, ...] +全列表;DB idx 从 0 开始数第一条非 system 消息。 + +保留 `atomic_write_text` 给 skill 产物 / 其他 .md 文件写入使用。 """ from __future__ import annotations -import json import os from pathlib import Path from typing import Any, Dict, List, Optional +from uuid import UUID + +from sqlalchemy import delete, select + +from .storage import session_scope +from .storage.models import Message, Task def _to_dict(msg: Any) -> Any: @@ -30,8 +35,7 @@ def atomic_write_text(path: Path, text: str, encoding: str = "utf-8") -> None: """原子写: 先写到 path.tmp 再 os.replace 到 path。 防止写中途异常(磁盘满 / surrogate 编码错 / 进程被杀)留下 0 字节或半文件。 - 单 REPL 单 task 假设下 .tmp 名固定;若上次写崩留下孤儿,本次写会覆盖它。 - `_cleanup_if_empty` 已配合放过 `*.tmp` 文件。 + skill 产物(spec_lock.md / sections/*.md 等)走这里,messages 已改走 PG。 """ path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") @@ -43,51 +47,102 @@ def atomic_write_text(path: Path, text: str, encoding: str = "utf-8") -> None: class Session: + """消息列表 anchored on task_id。 + + Lazy-persist: 构造时不动 DB,第一条非 system 消息 append 时: + 1) 调 ensure_task_row 保证 tasks 行存在(Step 2 用占位值,Step 3 由 TaskState 提供完整值) + 2) INSERT 一行 messages + + 系统 reset 走 DB DELETE 该 task 全部 messages。 + """ + def __init__( self, + task_id: UUID, system_prompt: str = "", - path: Optional[Path] = None, meta: Optional[dict] = None, ) -> None: + self.task_id: UUID = task_id self.messages: List[dict] = [] - self.path = path self.meta: Dict[str, Any] = dict(meta or {}) + self._db_idx: int = 0 # 下一条要写 DB 的 idx if system_prompt: self.messages.append({"role": "system", "content": system_prompt}) def append(self, msg: Any) -> None: - self.messages.append(_to_dict(msg)) - if self.path is not None: - self.save() + """追加消息;非 system 落 DB,system 仅内存。""" + msg_dict = _to_dict(msg) + self.messages.append(msg_dict) + if msg_dict.get("role") == "system": + return + + # 首次写入前,让 tasks 行就位。`ensure_local_task_row` 在 storage 层 idempotent。 + from .storage.utils import ensure_local_task_row + ensure_local_task_row( + task_id=self.task_id, + task_dir=self.meta.get("task_dir", ""), + model=self.meta.get("model", ""), + model_profile=self.meta.get("model_profile", ""), + ) + + with session_scope() as s: + s.add(Message( + task_id=self.task_id, + idx=self._db_idx, + payload=msg_dict, + )) + self._db_idx += 1 def reset(self, keep_system: bool = True) -> None: + """清空消息。keep_system 仅影响内存(system 本来就不在 DB)。""" if keep_system and self.messages and self.messages[0].get("role") == "system": self.messages = [self.messages[0]] else: self.messages = [] - if self.path is not None: - self.save() - - def save(self) -> None: - if self.path is None: - return - payload = {"meta": self.meta, "messages": self.messages} - atomic_write_text( - self.path, - json.dumps(payload, ensure_ascii=False, indent=2), - ) + with session_scope() as s: + s.execute(delete(Message).where(Message.task_id == self.task_id)) + self._db_idx = 0 @classmethod - def load(cls, path: Path) -> "Session": - s = cls(path=path) - if not path.exists(): - return s - data = json.loads(path.read_text(encoding="utf-8")) - if isinstance(data, list): - # 老格式: 纯消息列表 - s.messages = data - s.meta = {} - elif isinstance(data, dict): - s.messages = data.get("messages", []) or [] - s.meta = data.get("meta", {}) or {} - return s + def load( + cls, + task_id: UUID, + system_prompt: str = "", + meta: Optional[dict] = None, + ) -> "Session": + """从 DB 读历史 messages。system_prompt 由调用方注入(memory 演化即时生效)。 + + 若 task_id 在 DB 不存在,返回空 Session(messages 只含 system,_db_idx=0); + 调用方判断该不该报错。 + """ + sess = cls(task_id=task_id, system_prompt=system_prompt, meta=meta) + with session_scope() as s: + rows = s.execute( + select(Message) + .where(Message.task_id == task_id) + .order_by(Message.idx) + ).scalars().all() + for row in rows: + sess.messages.append(dict(row.payload)) + sess._db_idx = len(rows) + return sess + + @classmethod + def task_exists(cls, task_id: UUID) -> bool: + """tasks 行 + messages 至少 1 条 → 该 task 真存在(不是 lazy 占位)。""" + with session_scope() as s: + row = s.execute( + select(Task.task_id).where(Task.task_id == task_id) + ).scalar_one_or_none() + if row is None: + return False + cnt = s.execute( + select(Message.message_id) + .where(Message.task_id == task_id) + .limit(1) + ).scalar_one_or_none() + return cnt is not None + + def n_user_msgs(self) -> int: + """内存里 user 消息数,用于 _cleanup_if_empty 守门(避免回 DB)。""" + return sum(1 for m in self.messages if m.get("role") == "user") diff --git a/core/storage/__init__.py b/core/storage/__init__.py index f88421d..c068e93 100644 --- a/core/storage/__init__.py +++ b/core/storage/__init__.py @@ -8,15 +8,18 @@ ZCBOT_DB_URL 环境变量必填(本地连测试 / staging PG;SaaS 连生产 PG) 未设置时 get_engine() 抛 RuntimeError 并指引设置。 """ from .engine import ( - SENTINEL_USER_ID, ensure_local_sentinel, get_engine, session_scope, ) +from .models import SENTINEL_USER_ID +from .utils import ensure_local_task_row, get_task __all__ = [ "SENTINEL_USER_ID", "ensure_local_sentinel", + "ensure_local_task_row", "get_engine", + "get_task", "session_scope", ] diff --git a/core/storage/utils.py b/core/storage/utils.py new file mode 100644 index 0000000..1ac99df --- /dev/null +++ b/core/storage/utils.py @@ -0,0 +1,55 @@ +"""Storage 辅助工具:idempotent task 行创建、本地形态简化封装。""" +from __future__ import annotations + +from typing import Optional +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert + +from .engine import session_scope +from .models import SENTINEL_USER_ID, Task + + +def ensure_local_task_row( + task_id: UUID, + task_dir: str = "", + mode: str = "", + description: str = "", + model: str = "", + model_profile: str = "", + reasoning_effort: str = "", + user_id: UUID = SENTINEL_USER_ID, +) -> None: + """本地形态 idempotent INSERT tasks 行。 + + 用于 Session.append 首次写消息前打底,Step 2 阶段字段都是占位值; + Step 3 引入 TaskState ORM 后,TaskState.save 会把字段更新成真实值。 + + PG `INSERT ... ON CONFLICT DO NOTHING` 保证幂等且单 SQL,无 SELECT-then-INSERT + 竞态。 + """ + stmt = ( + insert(Task) + .values( + task_id=task_id, + user_id=user_id, + task_dir=task_dir, + mode=mode, + description=description, + model=model, + model_profile=model_profile, + reasoning_effort=reasoning_effort, + ) + .on_conflict_do_nothing(index_elements=["task_id"]) + ) + with session_scope() as s: + s.execute(stmt) + + +def get_task(task_id: UUID) -> Optional[Task]: + """读 tasks 行,不存在返回 None。""" + with session_scope() as s: + return s.execute( + select(Task).where(Task.task_id == task_id) + ).scalar_one_or_none() diff --git a/main.py b/main.py index 345af18..92718a7 100644 --- a/main.py +++ b/main.py @@ -1,14 +1,16 @@ """装配入口: 读 config → 加载 capabilities/skills → 构造 LLM/tools/session/loop。 -存储布局: - workspace/tasks//state.json ← TaskState - workspace/tasks//messages.json ← Session 消息 +存储布局(§7 B Step 2 后): + workspace/tasks//state.json ← TaskState(Step 3 前还在,Step 3 删) + PG messages ← Session 消息(Step 2 切换) +task_id 用 UUID,task_dir = workspace/tasks//。 """ from __future__ import annotations from datetime import datetime from pathlib import Path from typing import Optional, Tuple +from uuid import UUID, uuid4 import yaml from rich.console import Console @@ -20,6 +22,7 @@ from core.memory import memory_block from core.session import Session from core.sinks import ConsoleEventSink from core.skills import SkillRegistry +from core.storage import ensure_local_sentinel from core.task import TaskState from tools.fs import EditTool, GlobTool, GrepTool, ReadTool, WriteTool from tools.run_python import RunPythonTool @@ -46,32 +49,56 @@ def tasks_dir(workspace_dir: Path) -> Path: return d -def resolve_task_messages_path( - workspace_dir: Path, task_id: Optional[str], resume: bool -) -> Tuple[Path, str]: - """返回 (messages_file_path, task_id)。 - 新建:tasks//messages.json;Resume:tasks//messages.json,'last' 取最新。 +def resolve_task_id( + workspace_dir: Path, task_id_arg: Optional[str], resume: bool +) -> Tuple[UUID, Path]: + """返回 (task_id, task_dir)。 + + 新建:UUID + workspace/tasks//(懒创建,目录不预占) + Resume:解析 task_id_arg 为 UUID(支持前缀匹配);'last' 取最近(按 PG tasks.updated_at) """ tdir = tasks_dir(workspace_dir) if resume: - if task_id in (None, "", "last"): - candidates = [] - for d in tdir.iterdir(): - mf = d / "messages.json" - if mf.is_file(): - candidates.append((mf.stat().st_mtime, mf, d.name)) - if not candidates: - raise FileNotFoundError(f"无可恢复的 task: {tdir} 下无 task") - candidates.sort(key=lambda x: x[0], reverse=True) - _, path, sid = candidates[0] - return path, sid - task_msg = tdir / task_id / "messages.json" - if not task_msg.exists(): - raise FileNotFoundError(f"task 不存在: {task_msg}") - return task_msg, task_id + from sqlalchemy import select + from core.storage import session_scope + from core.storage.models import Task - sid = task_id or datetime.now().strftime("%Y%m%d_%H%M%S") - return tdir / sid / "messages.json", sid + if task_id_arg in (None, "", "last"): + with session_scope() as s: + row = s.execute( + select(Task.task_id).order_by(Task.updated_at.desc()).limit(1) + ).scalar_one_or_none() + if row is None: + raise FileNotFoundError("no recoverable task: PG tasks 表为空") + return row, tdir / str(row) + + # 接受完整 UUID 或前缀(8 字符够辨识本机量级) + tid = _resolve_uuid_or_prefix(task_id_arg) + return tid, tdir / str(tid) + + tid = uuid4() + return tid, tdir / str(tid) + + +def _resolve_uuid_or_prefix(s: str) -> UUID: + """完整 UUID 字符串直接解析;否则当前缀,从 tasks 表精确匹配一个。""" + try: + return UUID(s) + except ValueError: + pass + from sqlalchemy import cast, String, select + from core.storage import session_scope + from core.storage.models import Task + + with session_scope() as sess: + matches = sess.execute( + select(Task.task_id).where(cast(Task.task_id, String).like(f"{s}%")) + ).scalars().all() + if not matches: + raise FileNotFoundError(f"no task matching prefix: {s}") + if len(matches) > 1: + raise ValueError(f"ambiguous prefix {s!r}, matched {len(matches)} tasks") + return matches[0] def _build_system_prompt( @@ -113,75 +140,66 @@ def build_agent( mode: str = "", description: str = "", ) -> Tuple[AgentLoop, Session, str, TaskState, Path]: - """返回 (agent, session, task_id, task_state, task_dir)。""" + """返回 (agent, session, task_id_str, task_state, task_dir)。""" cfg = load_config() model = model_name or cfg["default_model"] + # 本地 sentinel user 入库(idempotent);build_agent 是所有 task 操作的入口 + ensure_local_sentinel() + caps = ModelCapabilities.load(model, ROOT / cfg["models_dir"]) llm = LLM(caps) workspace_dir = resolve_workspace(workspace, cfg) - session_path, sid = resolve_task_messages_path(workspace_dir, session_id, resume) + task_id, task_dir = resolve_task_id(workspace_dir, session_id, resume) + sid = str(task_id) tool_base = Path(tool_base) if tool_base else Path.cwd() skills = SkillRegistry(ROOT / cfg.get("skills_dir", "skills")) - task_dir = session_path.parent - system_prompt = _build_system_prompt(cfg, skills, workspace_dir, tool_base, task_dir) + now_iso = datetime.now().isoformat(timespec="seconds") + meta = { + "id": sid, + "created_at": now_iso, + "cwd": str(tool_base), + "task_dir": str(task_dir), + "model": caps.model_id, + "model_profile": model, + } + if resume: - session = Session.load(session_path) - # 用最新 memory + skill 列表刷新 system prompt(messages[0]),memory 演化即时生效 - if session.messages and session.messages[0].get("role") == "system": - session.messages[0]["content"] = system_prompt - else: - session.messages.insert(0, {"role": "system", "content": system_prompt}) - saved_cwd = session.meta.get("cwd") - if saved_cwd and console is not None and saved_cwd != str(tool_base): - console.print( - f"[warn]提示:[/warn] 当前 cwd 与 task 记录不同 —— " - f"工具基于 current cwd,不会自动切回。\n" - f" task cwd: [info]{saved_cwd}[/info]\n" - f" current cwd: [info]{tool_base}[/info]" - ) + session = Session.load(task_id, system_prompt=system_prompt, meta=meta) task_state = TaskState.load(task_dir) if task_state is None: - # messages.json 存在但 state.json 缺失:用 session.meta 兜底重建 + # tasks 行存在但 state.json 缺失:兜底重建(Step 3 后该分支会消失) task_state = TaskState( - task_id=sid, - mode=mode, - description=description, - status="active", - model=session.meta.get("model", caps.model_id), - model_profile=session.meta.get("model_profile", model), - cwd=session.meta.get("cwd", str(tool_base)), - created_at=session.meta.get("created_at", datetime.now().isoformat(timespec="seconds")), + task_id=sid, mode=mode, description=description, status="active", + model=caps.model_id, model_profile=model, + cwd=str(tool_base), created_at=now_iso, ) task_state.save(task_dir) + else: + # 提示 cwd 漂移(老 state.json 保留过启动时 cwd) + saved_cwd = task_state.cwd + if saved_cwd and console is not None and saved_cwd != str(tool_base): + console.print( + f"[warn]提示:[/warn] 当前 cwd 与 task 记录不同 —— " + f"工具基于 current cwd,不会自动切回。\n" + f" task cwd: [info]{saved_cwd}[/info]\n" + f" current cwd: [info]{tool_base}[/info]" + ) else: - now_iso = datetime.now().isoformat(timespec="seconds") - meta = { - "id": sid, - "created_at": now_iso, - "cwd": str(tool_base), - "model": caps.model_id, - "model_profile": model, - } - session = Session(system_prompt=system_prompt, path=session_path, meta=meta) - # 懒创建:不预占文件。首条 user 消息触发 Session.append → save() 才会 mkdir + 落盘。 - # task_state 同步推迟到首轮 sync_task_tokens。直到那一刻为止,task_dir 在磁盘上不存在。 + session = Session(task_id=task_id, system_prompt=system_prompt, meta=meta) + # 懒创建:Session 不触发 DB 写,Task 行在首条 user 消息 append 时由 + # ensure_local_task_row 插入;state.json 在 task_state.save 第一次调时落地。 task_state = TaskState( - task_id=sid, - mode=mode, - description=description, - status="active", - model=caps.model_id, - model_profile=model, + task_id=sid, mode=mode, description=description, status="active", + model=caps.model_id, model_profile=model, reasoning_effort=caps.default_reasoning_effort or "", - cwd=str(tool_base), - created_at=now_iso, + cwd=str(tool_base), created_at=now_iso, ) tools = {}