core(§7 B Step 2): Session ORM — messages 走 PG, task_id 切 UUID

Session 重写
- messages 落 PG `messages` 表(append-only, idx 严格递增, jsonb payload)
- system prompt 不入库(每次 build_agent 重建到 messages[0],memory 演化即时生效)
- Session.load(task_id, system_prompt=...) 从 DB 读历史
- Session.task_exists / n_user_msgs 工具

Storage utils
- ensure_local_task_row: 首条消息前 INSERT ... ON CONFLICT DO NOTHING
  打底 tasks 行(Step 3 后由 TaskState.save 接管字段更新)

task_id 切 UUID
- resolve_task_id(workspace, arg, resume): UUID + 前缀匹配,'last' 从 PG
  按 updated_at 取最近
- 显示一律截前 8 位;完整 UUID 在 /id /status 保留
- 旧 workspace 老 task(时间戳格式)**不做兼容**

CLI 适配
- _cleanup_if_empty 双检查:DB messages count + FS 产物
- _list_task_rows: PG tasks ORDER BY updated_at + state.json 兜底字段
- _task_has_messages: /export 检查改 DB
- core/export_docx.py: messages 从 PG 读,state.json 留作 meta

Step 5 (migrate-from-fs) 取消。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-14 10:55:50 +08:00
parent 5fbf3746be
commit 4f87bf14ee
7 changed files with 362 additions and 181 deletions

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。
最后更新:2026-05-14
最后更新:2026-05-14(Step 2)
---
@ -15,7 +15,7 @@
| 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 |
| 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 |
| 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill |
| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 进行中(Step 1 基建 ✅;Step 2-6 待) |
| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 进行中(Step 1 基建 ✅;Step 2 Session ORM ✅;Step 3-4/6 待;Step 5 migrate-from-fs 取消) |
---
@ -30,6 +30,7 @@
- **05-12 / §7 改写**:platform/core 多租户方案废弃,改 user-direct(folder-centric、task/messages 入 PG、no-subtask、hard cascade)。
- **05-14 / §7.1 心智模型修正**:`Folder-centric` → **Task 一等公民 + Dir 文件副视图**(双视图正交,dir 不是 task 父容器);task_dir 留空=一次性对话 / 指定=项目化二分语义入文。
- **05-14 / §7 B Step 1 基建**:`core/storage/{engine,models}.py` SQLAlchemy 2.x ORM(users/tasks/messages/runs/usage_events 5 表)+ alembic(初版 migration `0001_initial_schema`,GIN/复合索引)+ `cli db {upgrade,downgrade,current}` 子命令组 + 本地 sentinel user(`00000000-...`)+ `ZCBOT_DB_URL` 必填(未设给清晰报错,不引导 docker)。已在远端测试 PG 跑通 `db upgrade head`
- **05-14 / §7 B Step 2 Session ORM**:`core/session.py` 重写,messages 走 PG(append-only,jsonb,idx 严格递增);system prompt 不入库(每次 build_agent 重建);`Session.load(task_id, system_prompt=...)` resume 接口;`ensure_local_task_row` idempotent UPSERT(`INSERT ... ON CONFLICT DO NOTHING`)在首条非 system 消息前打底 tasks 行。task_id 切换为 UUID(原时间戳格式废弃,旧 workspace **不做兼容**)。main.py/cli.py 适配:`resolve_task_id`(UUID 前缀解析)、`_cleanup_if_empty` 双检查(DB messages + FS 产物)、`_list_task_rows` 改读 PG。`core/export_docx.py` 改从 PG 读 messages。端到端 build/append/resume/cleanup smoke 全绿。**取消 Step 5 migrate-from-fs**(用户决定不兼容旧 workspace)。
---
@ -54,21 +55,22 @@ core/loop.py 152 ← §7 A: sink.emit
core/sinks.py 101 ← §7 A
core/ui.py 38
core/probe.py 243
core/session.py 93 ← +atomic_write_text
core/session.py 148 ← §7 B Step 2: ORM 改写,task_id anchored
core/skills.py 81
core/task.py 64
core/memory.py 76
core/export_docx.py 372
core/storage/__init__.py 22 ← §7 B Step 1
core/export_docx.py 379 ← §7 B Step 2: 改从 PG 读 messages
core/storage/__init__.py 25 ← §7 B Step 1-2
core/storage/engine.py 80 ← §7 B Step 1
core/storage/models.py 124 ← §7 B Step 1
core/storage/utils.py 55 ← §7 B Step 2: ensure_local_task_row
tools/base.py 34
tools/fs.py 182
tools/shell.py 94
tools/run_python.py 84
tools/skill_tool.py 45
main.py 210
cli.py 484 ← §7 B Step 1: +db 子命令组
main.py 228 ← §7 B Step 2: resolve_task_id (UUID)
cli.py 526 ← §7 B Step 1-2: +db 子命令组 / +_task_has_messages
db/migrations/env.py 61 ← §7 B Step 1
db/migrations/versions/
0001_initial_schema.py 125 ← §7 B Step 1
@ -82,12 +84,11 @@ Python 合计 ~2841 行
## 下一步候选(性价比排序)
1. **§7 B 剩余 Step 2-6**(~4 天)
- Step 2 Session ORM 改造(append/load → PG messages 表)
- Step 3 TaskState ORM 改造(state.json → PG tasks 表;加 task_dir 字段语义)
- Step 4 main.py / cli.py 适配 ORM(`resolve_task_messages_path` 重构、`_cleanup_if_empty` 新双检查、UUID task_id 处理)
- Step 5 `cli migrate-from-fs`(导旧 workspace/tasks/*/ → PG,idempotent)
1. **§7 B 剩余 Step 3-4 / 6**(~2 天)
- Step 3 TaskState ORM 改造(state.json → PG tasks 表;tasks.cwd 字段去掉,改读 task_dir;sync_task_tokens 走 UPDATE)
- Step 4 main.py / cli.py 收尾(`_list_task_rows` 全 DB;清掉 state.json 路径剩余分支)
- Step 6 no-subtask SQL 校验(`new LIKE existing/%` cascade)
- ~~Step 5 migrate-from-fs~~(取消,不兼容旧 workspace)
2. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。
3. **Phase 7 更多 skill / 模型档案**(持续)。
4. **Proposal mermaid 预渲染**(~半天)—— ASCII 透传不够用时再上 `mmdc`

114
cli.py
View File

@ -83,57 +83,99 @@ def db_current() -> None:
def _cleanup_if_empty(task_dir, session, console=None) -> bool:
"""切走前清理 task_dir。三条都满足才删:
1) session 没有 user 消息
2) task_dir 在磁盘上(懒创建后,没说话就没目录,直接 no-op)
3) 目录里只剩 messages.json(state.json 存在 = `/done /abandon /desc` 留下的显式痕迹,要保)
原子写留下的 `*.tmp` 孤儿不算痕迹,放过
"""切走前清理空 task。三条都满足才删:
1) session 在内存没有 user 消息
2) task_dir FS 上无产物(懒创建后没说话就没目录,直接 no-op)
3) PG tasks 行如果有也一并删(messages CASCADE)
:state.json 还在 task_dir (Step 3 ),它是 task 元数据痕迹 留着不删
"""
if any(m.get("role") == "user" for m in session.messages):
if session.n_user_msgs() > 0:
return False
try:
entries = list(task_dir.iterdir())
except FileNotFoundError:
# 目录都没建,只清 DB 占位行(若 Session 早调过 ensure_local_task_row)
_delete_task_db_row(session.task_id)
return False
if any(p.is_dir() for p in entries):
return False
meaningful = {
p.name for p in entries
if p.is_file() and not p.name.endswith(".tmp")
}
if meaningful - {"messages.json"}:
meaningful = {p.name for p in entries if p.is_file() and not p.name.endswith(".tmp")}
if meaningful - {"state.json"}:
# 还有其他文件(产物)→ 保留
return False
# state.json 单独存在不算实质内容(/done /abandon /desc 也会留下,但 n_user_msgs=0
# 时这些状态变更没意义,允许删)
shutil.rmtree(task_dir, ignore_errors=True)
_delete_task_db_row(session.task_id)
if console is not None:
console.print(f"[muted]清理空 task {task_dir.name}[/muted]")
console.print(f"[muted]cleaned empty task {str(session.task_id)[:8]}[/muted]")
return True
def _delete_task_db_row(task_id) -> None:
"""删 PG tasks 行(messages 走 CASCADE)。task_id 可能从未入库,DELETE 0 行无副作用。"""
from sqlalchemy import delete
from core.storage import session_scope
from core.storage.models import Task
with session_scope() as s:
s.execute(delete(Task).where(Task.task_id == task_id))
def _task_has_messages(task_id_str: str) -> bool:
"""PG 里该 task_id 有至少一条 message。task_id 字符串(UUID 完整形式)。"""
from uuid import UUID
from sqlalchemy import select
from core.storage import session_scope
from core.storage.models import Message
try:
tid = UUID(task_id_str)
except ValueError:
return False
with session_scope() as s:
row = s.execute(
select(Message.message_id).where(Message.task_id == tid).limit(1)
).scalar_one_or_none()
return row is not None
def _list_task_rows(workspace_dir, limit=20, status=None):
"""返回 [(mtime, task_id, status, mode, model, tokens, n_msgs, desc), ...] mtime 降序。"""
"""返回 [(updated_at, task_id_str, status, mode, model, tokens, n_msgs, desc), ...] 时间降序。
Step 2 阶段:tasks 元字段(mode/desc/status/model/tokens)仍从 state.json ,
时间排序和 task_id 列表从 PG tasks 表读(messages 数也从 PG )Step 3 后统一走 DB
"""
from sqlalchemy import func, select
from core.storage import session_scope
from core.storage.models import Message, Task
tdir = tasks_dir(workspace_dir)
with session_scope() as s:
rows_db = s.execute(
select(Task.task_id, Task.updated_at).order_by(Task.updated_at.desc()).limit(limit * 3)
).all()
msg_counts = dict(s.execute(
select(Message.task_id, func.count()).group_by(Message.task_id)
).all())
rows = []
for d in tdir.iterdir():
if not d.is_dir():
continue
msg_path = d / "messages.json"
if not msg_path.exists():
continue
for tid, updated_at in rows_db:
d = tdir / str(tid)
st = TaskState.load(d)
if st is None:
# 仅 DB 有行(lazy 占位但 state.json 没写过),展示最小信息
n = msg_counts.get(tid, 0)
if n == 0:
continue
rows.append((updated_at, str(tid), "active", "", "", 0, n, ""))
continue
if status and st.status != status:
continue
try:
data = json.loads(msg_path.read_text(encoding="utf-8"))
n = len(data.get("messages", []))
except Exception:
n = -1
n = msg_counts.get(tid, 0)
rows.append((
msg_path.stat().st_mtime, st.task_id, st.status, st.mode,
updated_at, str(tid), st.status, st.mode,
st.model_profile or st.model, st.tokens_total, n, st.description,
))
rows.sort(reverse=True)
return rows[:limit]
@ -162,7 +204,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None:
if resume:
console.print(
f"[ok]恢复 task[/ok] [bold]{sid}[/bold] ({len(session.messages)} 条消息) "
f"[ok]恢复 task[/ok] [bold]{sid[:8]}[/bold] ({len(session.messages)} 条消息) "
f"model: [accent]{agent.caps.model_id}[/accent]"
)
else:
@ -170,7 +212,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None:
if task_state.mode or task_state.description:
meta_tail = f" mode={task_state.mode!r} desc={task_state.description!r}"
console.print(
f"[ok]新 task[/ok] [bold]{sid}[/bold] "
f"[ok]新 task[/ok] [bold]{sid[:8]}[/bold] "
f"model: [accent]{agent.caps.model_id}[/accent]{meta_tail}"
)
console.print(
@ -206,7 +248,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None:
except Exception as e:
console.print(f"[err]新建失败:[/err] {type(e).__name__}: {e}")
continue
console.print(f"[ok]新 task[/ok] [bold]{sid}[/bold]")
console.print(f"[ok]新 task[/ok] [bold]{sid[:8]}[/bold]")
continue
if cmd.startswith("/resume"):
arg = cmd[len("/resume"):].strip()
@ -236,7 +278,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None:
for i, (_, tid, st, md, _mdl, _tok, n, dsc) in enumerate(rs, 1):
c = sc.get(st, "info")
d_show = dsc if len(dsc) <= 50 else dsc[:47] + "..."
tbl.add_row(str(i), tid, f"[{c}]{st}[/{c}]", md, str(n), d_show)
tbl.add_row(str(i), tid[:8], f"[{c}]{st}[/{c}]", md, str(n), d_show)
console.print(tbl)
try:
sel = Prompt.ask("[user]选编号或输入 task_id (回车取消)[/user]", console=console, default="")
@ -267,7 +309,7 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None:
console.print(f"[err]恢复失败:[/err] {type(e).__name__}: {e}")
continue
console.print(
f"[ok]切到 task[/ok] [bold]{sid}[/bold] ({len(session.messages)} 条消息) "
f"[ok]切到 task[/ok] [bold]{sid[:8]}[/bold] ({len(session.messages)} 条消息) "
f"model: [accent]{agent.caps.model_id}[/accent]"
)
continue
@ -313,9 +355,9 @@ def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None:
continue
arg = rs[0][1]
target_dir = tasks_dir(ws_dir) / arg
if not (target_dir / "messages.json").exists():
if not _task_has_messages(target_dir.name):
console.print(
f"[warn]无可导出内容: {target_dir.name} 还没有消息[/warn]"
f"[warn]无可导出内容: {target_dir.name[:8]} 还没有消息[/warn]"
)
continue
try:
@ -364,7 +406,7 @@ def tasks(workspace: str, limit: int, status: str) -> None:
for _, tid, st, mode, model, tok, n, desc in rows:
c = sc.get(st, "info")
d_show = desc if len(desc) <= 50 else desc[:47] + "..."
tbl.add_row(tid, f"[{c}]{st}[/{c}]", mode, model, str(n), str(tok), d_show)
tbl.add_row(tid[:8], f"[{c}]{st}[/{c}]", mode, model, str(n), str(tok), d_show)
make_console().print(tbl)
@ -398,8 +440,8 @@ def export(task_id: str, workspace: str, output: str, include_system: bool,
task_id = rs[0][1]
td = tasks_dir(ws) / task_id
if not (td / "messages.json").exists():
console.print(f"[err]task 不存在或无 messages.json:[/err] {td}")
if not _task_has_messages(task_id):
console.print(f"[err]task 不存在或无 messages:[/err] {task_id}")
sys.exit(1)
out = Path(output).resolve() if output else None

View File

@ -1,4 +1,4 @@
"""把 task 的 messages.json 渲染为 .docx 对话稿。
"""把 task 的 PG messages 表 + state.json 渲染为 .docx 对话稿。
布局:
- 文档开头 meta (task_id / 模式 / 描述 / 模型 / 创建时间 / 消息数 / tokens / 导出时间)
@ -10,6 +10,8 @@
调用入口:
- 顶层函数 export_chat_to_docx(task_dir, out_path=None, ...)
- CLI 子命令 `python cli.py export <task_id>` REPL `/export [<task_id>]` 都走它
§7 B Step 2 :messages PG ( task_id);state.json 还在 task_dir (Step 3 )
"""
from __future__ import annotations
@ -17,6 +19,7 @@ import json
from datetime import datetime
from pathlib import Path
from typing import Optional
from uuid import UUID
from docx import Document
from docx.enum.text import WD_ALIGN_PARAGRAPH
@ -163,7 +166,7 @@ def _format_args(args_str: str) -> str:
# ───────────────────────── Meta 区块 ─────────────────────────
def _add_meta_block(
doc: Document, meta: dict, task_state: dict, n_msgs: int, source_path: Path
doc: Document, meta: dict, task_state: dict, n_msgs: int, task_dir: Path
) -> None:
p = doc.add_paragraph()
p.alignment = WD_ALIGN_PARAGRAPH.LEFT
@ -199,7 +202,7 @@ def _add_meta_block(
("更新时间", updated),
("消息数", str(n_msgs)),
("Tokens", f"{tp} prompt / {tc} completion / {tp + tc} total"),
("源文件", str(source_path)),
("Task dir", str(task_dir)),
("导出时间", datetime.now().isoformat(timespec="seconds")),
]
@ -320,26 +323,26 @@ def export_chat_to_docx(
tool_head: int = 1000,
tool_tail: int = 500,
) -> Path:
"""渲染 task_dir 下的 messages.json 为 .docx,返回写入路径。
"""渲染 task 对话为 .docx,返回写入路径。
out_path 缺省落到 task_dir/chat_<task_id>.docx
include_system 默认 False(system prompt 信息密度低,默认跳过)
include_reasoning 默认 True(模型思考过程,有观察价值)
tool 结果默认前 1000 + 500,中间省略
task_dir 名字必须是 UUID(messages PG task_id )state.json 仍在
task_dir (Step 3 )提供 mode/desc/tokens meta
"""
msg_path = task_dir / "messages.json"
if not msg_path.exists():
raise FileNotFoundError(f"messages.json 不存在: {msg_path}")
try:
tid = UUID(task_dir.name)
except ValueError:
raise ValueError(f"task_dir name 不是有效 UUID: {task_dir.name}")
data = json.loads(msg_path.read_text(encoding="utf-8"))
if isinstance(data, list):
meta = {}
messages = data
elif isinstance(data, dict):
meta = data.get("meta") or {}
messages = data.get("messages") or []
else:
raise ValueError(f"messages.json 格式不识别: {type(data).__name__}")
# 从 PG 读 messages,按 idx 排序
from sqlalchemy import select
from core.storage import session_scope
from core.storage.models import Message as MessageRow
with session_scope() as s:
rows = s.execute(
select(MessageRow).where(MessageRow.task_id == tid).order_by(MessageRow.idx)
).scalars().all()
messages = [dict(r.payload) for r in rows]
state_path = task_dir / "state.json"
task_state: dict = {}
@ -350,11 +353,15 @@ def export_chat_to_docx(
task_state = {}
if out_path is None:
tid = meta.get("id") or task_state.get("task_id") or task_dir.name
out_path = task_dir / f"chat_{tid}.docx"
meta = {"id": str(tid), "model": task_state.get("model", ""),
"model_profile": task_state.get("model_profile", ""),
"cwd": task_state.get("cwd", ""),
"created_at": task_state.get("created_at", "")}
doc = _init_doc()
_add_meta_block(doc, meta, task_state, len(messages), msg_path)
_add_meta_block(doc, meta, task_state, len(messages), task_dir)
doc.add_paragraph() # 与 meta 表保持一行间距
for msg in messages:

View File

@ -1,19 +1,24 @@
"""会话: 内存中的消息列表 + meta(cwd / model / created_at) + 落盘 json
"""会话: 内存中的消息列表 + meta + 落 PG `messages` 表
文件格式:
{
"meta": {"id": "...", "created_at": "...", "cwd": "...", "model": "..."},
"messages": [...]
}
§7 B Step 2:消息走 ORM(append-only, idx 严格递增,payload jsonb)
兼容老格式: 如果文件根是 list,就当 messages 处理,meta 为空
system prompt **不入库** 每次 build_agent 重建拼到 messages[0](§3.7
"memory 演化即时生效")Session 内存里仍维持 [system, user_1, assistant_1, ...]
全列表;DB idx 0 开始数第一条非 system 消息
保留 `atomic_write_text` skill 产物 / 其他 .md 文件写入使用
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from uuid import UUID
from sqlalchemy import delete, select
from .storage import session_scope
from .storage.models import Message, Task
def _to_dict(msg: Any) -> Any:
@ -30,8 +35,7 @@ def atomic_write_text(path: Path, text: str, encoding: str = "utf-8") -> None:
"""原子写: 先写到 path.tmp 再 os.replace 到 path。
防止写中途异常(磁盘满 / surrogate 编码错 / 进程被杀)留下 0 字节或半文件
REPL task 假设下 .tmp 名固定;若上次写崩留下孤儿,本次写会覆盖它
`_cleanup_if_empty` 已配合放过 `*.tmp` 文件
skill 产物(spec_lock.md / sections/*.md )走这里,messages 已改走 PG
"""
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
@ -43,51 +47,102 @@ def atomic_write_text(path: Path, text: str, encoding: str = "utf-8") -> None:
class Session:
"""消息列表 anchored on task_id。
Lazy-persist: 构造时不动 DB,第一条非 system 消息 append :
1) ensure_task_row 保证 tasks 行存在(Step 2 用占位值,Step 3 TaskState 提供完整值)
2) INSERT 一行 messages
系统 reset DB DELETE task 全部 messages
"""
def __init__(
self,
task_id: UUID,
system_prompt: str = "",
path: Optional[Path] = None,
meta: Optional[dict] = None,
) -> None:
self.task_id: UUID = task_id
self.messages: List[dict] = []
self.path = path
self.meta: Dict[str, Any] = dict(meta or {})
self._db_idx: int = 0 # 下一条要写 DB 的 idx
if system_prompt:
self.messages.append({"role": "system", "content": system_prompt})
def append(self, msg: Any) -> None:
self.messages.append(_to_dict(msg))
if self.path is not None:
self.save()
"""追加消息;非 system 落 DB,system 仅内存。"""
msg_dict = _to_dict(msg)
self.messages.append(msg_dict)
if msg_dict.get("role") == "system":
return
# 首次写入前,让 tasks 行就位。`ensure_local_task_row` 在 storage 层 idempotent。
from .storage.utils import ensure_local_task_row
ensure_local_task_row(
task_id=self.task_id,
task_dir=self.meta.get("task_dir", ""),
model=self.meta.get("model", ""),
model_profile=self.meta.get("model_profile", ""),
)
with session_scope() as s:
s.add(Message(
task_id=self.task_id,
idx=self._db_idx,
payload=msg_dict,
))
self._db_idx += 1
def reset(self, keep_system: bool = True) -> None:
"""清空消息。keep_system 仅影响内存(system 本来就不在 DB)。"""
if keep_system and self.messages and self.messages[0].get("role") == "system":
self.messages = [self.messages[0]]
else:
self.messages = []
if self.path is not None:
self.save()
def save(self) -> None:
if self.path is None:
return
payload = {"meta": self.meta, "messages": self.messages}
atomic_write_text(
self.path,
json.dumps(payload, ensure_ascii=False, indent=2),
)
with session_scope() as s:
s.execute(delete(Message).where(Message.task_id == self.task_id))
self._db_idx = 0
@classmethod
def load(cls, path: Path) -> "Session":
s = cls(path=path)
if not path.exists():
return s
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, list):
# 老格式: 纯消息列表
s.messages = data
s.meta = {}
elif isinstance(data, dict):
s.messages = data.get("messages", []) or []
s.meta = data.get("meta", {}) or {}
return s
def load(
cls,
task_id: UUID,
system_prompt: str = "",
meta: Optional[dict] = None,
) -> "Session":
"""从 DB 读历史 messages。system_prompt 由调用方注入(memory 演化即时生效)。
task_id DB 不存在,返回空 Session(messages 只含 system,_db_idx=0);
调用方判断该不该报错
"""
sess = cls(task_id=task_id, system_prompt=system_prompt, meta=meta)
with session_scope() as s:
rows = s.execute(
select(Message)
.where(Message.task_id == task_id)
.order_by(Message.idx)
).scalars().all()
for row in rows:
sess.messages.append(dict(row.payload))
sess._db_idx = len(rows)
return sess
@classmethod
def task_exists(cls, task_id: UUID) -> bool:
"""tasks 行 + messages 至少 1 条 → 该 task 真存在(不是 lazy 占位)。"""
with session_scope() as s:
row = s.execute(
select(Task.task_id).where(Task.task_id == task_id)
).scalar_one_or_none()
if row is None:
return False
cnt = s.execute(
select(Message.message_id)
.where(Message.task_id == task_id)
.limit(1)
).scalar_one_or_none()
return cnt is not None
def n_user_msgs(self) -> int:
"""内存里 user 消息数,用于 _cleanup_if_empty 守门(避免回 DB)。"""
return sum(1 for m in self.messages if m.get("role") == "user")

View File

@ -8,15 +8,18 @@ ZCBOT_DB_URL 环境变量必填(本地连测试 / staging PG;SaaS 连生产 PG)
未设置时 get_engine() RuntimeError 并指引设置
"""
from .engine import (
SENTINEL_USER_ID,
ensure_local_sentinel,
get_engine,
session_scope,
)
from .models import SENTINEL_USER_ID
from .utils import ensure_local_task_row, get_task
__all__ = [
"SENTINEL_USER_ID",
"ensure_local_sentinel",
"ensure_local_task_row",
"get_engine",
"get_task",
"session_scope",
]

55
core/storage/utils.py Normal file
View File

@ -0,0 +1,55 @@
"""Storage 辅助工具:idempotent task 行创建、本地形态简化封装。"""
from __future__ import annotations
from typing import Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
from .engine import session_scope
from .models import SENTINEL_USER_ID, Task
def ensure_local_task_row(
task_id: UUID,
task_dir: str = "",
mode: str = "",
description: str = "",
model: str = "",
model_profile: str = "",
reasoning_effort: str = "",
user_id: UUID = SENTINEL_USER_ID,
) -> None:
"""本地形态 idempotent INSERT tasks 行。
用于 Session.append 首次写消息前打底,Step 2 阶段字段都是占位值;
Step 3 引入 TaskState ORM ,TaskState.save 会把字段更新成真实值
PG `INSERT ... ON CONFLICT DO NOTHING` 保证幂等且单 SQL, SELECT-then-INSERT
竞态
"""
stmt = (
insert(Task)
.values(
task_id=task_id,
user_id=user_id,
task_dir=task_dir,
mode=mode,
description=description,
model=model,
model_profile=model_profile,
reasoning_effort=reasoning_effort,
)
.on_conflict_do_nothing(index_elements=["task_id"])
)
with session_scope() as s:
s.execute(stmt)
def get_task(task_id: UUID) -> Optional[Task]:
"""读 tasks 行,不存在返回 None。"""
with session_scope() as s:
return s.execute(
select(Task).where(Task.task_id == task_id)
).scalar_one_or_none()

160
main.py
View File

@ -1,14 +1,16 @@
"""装配入口: 读 config → 加载 capabilities/skills → 构造 LLM/tools/session/loop。
存储布局:
workspace/tasks/<task_id>/state.json TaskState
workspace/tasks/<task_id>/messages.json Session 消息
存储布局(§7 B Step 2 ):
workspace/tasks/<task_id>/state.json TaskState(Step 3 前还在,Step 3 )
PG messages Session 消息(Step 2 切换)
task_id UUID,task_dir = workspace/tasks/<task_id>/
"""
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple
from uuid import UUID, uuid4
import yaml
from rich.console import Console
@ -20,6 +22,7 @@ from core.memory import memory_block
from core.session import Session
from core.sinks import ConsoleEventSink
from core.skills import SkillRegistry
from core.storage import ensure_local_sentinel
from core.task import TaskState
from tools.fs import EditTool, GlobTool, GrepTool, ReadTool, WriteTool
from tools.run_python import RunPythonTool
@ -46,32 +49,56 @@ def tasks_dir(workspace_dir: Path) -> Path:
return d
def resolve_task_messages_path(
workspace_dir: Path, task_id: Optional[str], resume: bool
) -> Tuple[Path, str]:
"""返回 (messages_file_path, task_id)。
新建:tasks/<id>/messages.json;Resume:tasks/<id>/messages.json,'last' 取最新
def resolve_task_id(
workspace_dir: Path, task_id_arg: Optional[str], resume: bool
) -> Tuple[UUID, Path]:
"""返回 (task_id, task_dir)。
新建:UUID + workspace/tasks/<uuid>/(懒创建,目录不预占)
Resume:解析 task_id_arg UUID(支持前缀匹配);'last' 取最近( PG tasks.updated_at)
"""
tdir = tasks_dir(workspace_dir)
if resume:
if task_id in (None, "", "last"):
candidates = []
for d in tdir.iterdir():
mf = d / "messages.json"
if mf.is_file():
candidates.append((mf.stat().st_mtime, mf, d.name))
if not candidates:
raise FileNotFoundError(f"无可恢复的 task: {tdir} 下无 task")
candidates.sort(key=lambda x: x[0], reverse=True)
_, path, sid = candidates[0]
return path, sid
task_msg = tdir / task_id / "messages.json"
if not task_msg.exists():
raise FileNotFoundError(f"task 不存在: {task_msg}")
return task_msg, task_id
from sqlalchemy import select
from core.storage import session_scope
from core.storage.models import Task
sid = task_id or datetime.now().strftime("%Y%m%d_%H%M%S")
return tdir / sid / "messages.json", sid
if task_id_arg in (None, "", "last"):
with session_scope() as s:
row = s.execute(
select(Task.task_id).order_by(Task.updated_at.desc()).limit(1)
).scalar_one_or_none()
if row is None:
raise FileNotFoundError("no recoverable task: PG tasks 表为空")
return row, tdir / str(row)
# 接受完整 UUID 或前缀(8 字符够辨识本机量级)
tid = _resolve_uuid_or_prefix(task_id_arg)
return tid, tdir / str(tid)
tid = uuid4()
return tid, tdir / str(tid)
def _resolve_uuid_or_prefix(s: str) -> UUID:
"""完整 UUID 字符串直接解析;否则当前缀,从 tasks 表精确匹配一个。"""
try:
return UUID(s)
except ValueError:
pass
from sqlalchemy import cast, String, select
from core.storage import session_scope
from core.storage.models import Task
with session_scope() as sess:
matches = sess.execute(
select(Task.task_id).where(cast(Task.task_id, String).like(f"{s}%"))
).scalars().all()
if not matches:
raise FileNotFoundError(f"no task matching prefix: {s}")
if len(matches) > 1:
raise ValueError(f"ambiguous prefix {s!r}, matched {len(matches)} tasks")
return matches[0]
def _build_system_prompt(
@ -113,75 +140,66 @@ def build_agent(
mode: str = "",
description: str = "",
) -> Tuple[AgentLoop, Session, str, TaskState, Path]:
"""返回 (agent, session, task_id, task_state, task_dir)。"""
"""返回 (agent, session, task_id_str, task_state, task_dir)。"""
cfg = load_config()
model = model_name or cfg["default_model"]
# 本地 sentinel user 入库(idempotent);build_agent 是所有 task 操作的入口
ensure_local_sentinel()
caps = ModelCapabilities.load(model, ROOT / cfg["models_dir"])
llm = LLM(caps)
workspace_dir = resolve_workspace(workspace, cfg)
session_path, sid = resolve_task_messages_path(workspace_dir, session_id, resume)
task_id, task_dir = resolve_task_id(workspace_dir, session_id, resume)
sid = str(task_id)
tool_base = Path(tool_base) if tool_base else Path.cwd()
skills = SkillRegistry(ROOT / cfg.get("skills_dir", "skills"))
task_dir = session_path.parent
system_prompt = _build_system_prompt(cfg, skills, workspace_dir, tool_base, task_dir)
now_iso = datetime.now().isoformat(timespec="seconds")
meta = {
"id": sid,
"created_at": now_iso,
"cwd": str(tool_base),
"task_dir": str(task_dir),
"model": caps.model_id,
"model_profile": model,
}
if resume:
session = Session.load(session_path)
# 用最新 memory + skill 列表刷新 system prompt(messages[0]),memory 演化即时生效
if session.messages and session.messages[0].get("role") == "system":
session.messages[0]["content"] = system_prompt
else:
session.messages.insert(0, {"role": "system", "content": system_prompt})
saved_cwd = session.meta.get("cwd")
if saved_cwd and console is not None and saved_cwd != str(tool_base):
console.print(
f"[warn]提示:[/warn] 当前 cwd 与 task 记录不同 —— "
f"工具基于 current cwd,不会自动切回。\n"
f" task cwd: [info]{saved_cwd}[/info]\n"
f" current cwd: [info]{tool_base}[/info]"
)
session = Session.load(task_id, system_prompt=system_prompt, meta=meta)
task_state = TaskState.load(task_dir)
if task_state is None:
# messages.json 存在但 state.json 缺失:用 session.meta 兜底重建
# tasks 行存在但 state.json 缺失:兜底重建(Step 3 后该分支会消失)
task_state = TaskState(
task_id=sid,
mode=mode,
description=description,
status="active",
model=session.meta.get("model", caps.model_id),
model_profile=session.meta.get("model_profile", model),
cwd=session.meta.get("cwd", str(tool_base)),
created_at=session.meta.get("created_at", datetime.now().isoformat(timespec="seconds")),
task_id=sid, mode=mode, description=description, status="active",
model=caps.model_id, model_profile=model,
cwd=str(tool_base), created_at=now_iso,
)
task_state.save(task_dir)
else:
# 提示 cwd 漂移(老 state.json 保留过启动时 cwd)
saved_cwd = task_state.cwd
if saved_cwd and console is not None and saved_cwd != str(tool_base):
console.print(
f"[warn]提示:[/warn] 当前 cwd 与 task 记录不同 —— "
f"工具基于 current cwd,不会自动切回。\n"
f" task cwd: [info]{saved_cwd}[/info]\n"
f" current cwd: [info]{tool_base}[/info]"
)
else:
now_iso = datetime.now().isoformat(timespec="seconds")
meta = {
"id": sid,
"created_at": now_iso,
"cwd": str(tool_base),
"model": caps.model_id,
"model_profile": model,
}
session = Session(system_prompt=system_prompt, path=session_path, meta=meta)
# 懒创建:不预占文件。首条 user 消息触发 Session.append → save() 才会 mkdir + 落盘。
# task_state 同步推迟到首轮 sync_task_tokens。直到那一刻为止,task_dir 在磁盘上不存在。
session = Session(task_id=task_id, system_prompt=system_prompt, meta=meta)
# 懒创建:Session 不触发 DB 写,Task 行在首条 user 消息 append 时由
# ensure_local_task_row 插入;state.json 在 task_state.save 第一次调时落地。
task_state = TaskState(
task_id=sid,
mode=mode,
description=description,
status="active",
model=caps.model_id,
model_profile=model,
task_id=sid, mode=mode, description=description, status="active",
model=caps.model_id, model_profile=model,
reasoning_effort=caps.default_reasoning_effort or "",
cwd=str(tool_base),
created_at=now_iso,
cwd=str(tool_base), created_at=now_iso,
)
tools = {}