Phase 4 + 6: capability probe + task 概念 / state.json

- core/probe.py + cli.py probe: basic_chat / parallel_tools /
  thinking_mode / long_context 四项实测对账 yaml;不进启动路径
- core/task.py + main.py: workspace/tasks/<id>/{state.json, messages.json},
  TaskState 跟 mode/desc/status/tokens/timestamps;build_agent 返 5 元组
- cli.py tasks 子命令 + REPL /status /done /abandon /desc;chat 加
  --mode/--desc 选项;移除 legacy workspace/sessions/ 兼容

Phase 5 evals 评估后决定不做:个人工具用 dogfooding 判断模型升级,
probe 已覆盖健康检查;造作 case 没区分度,维护成本不划算。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-06 16:21:17 +08:00
parent 235d43bc1d
commit dbb778fe10
5 changed files with 612 additions and 97 deletions

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md` 阅读。本文件记录已完成的事、关键决策、与原设计的偏差。
最后更新: 2026-05-06 (PPT skill v3:红色硬约束 + ⛔ blocking + canvas 合并 + apply_brand 品牌条 + 强制尾页 + Iconify 图标库)
最后更新: 2026-05-06 (Phase 4 + Phase 6 task/state.json 落地;移除 legacy session 兼容;Phase 5 evals 决定不做)
---
@ -13,9 +13,9 @@
| 1 | 最小可用骨架 | ✅ 完成 | 全部验收点过 |
| 2 | Skill 系统 + 三个 skill | ✅ 完成 | Anthropic 格式;coding/ppt/proposal |
| 3 | Hybrid 范式 (run_python) | ✅ 完成 | subprocess + 敏感 env 过滤 |
| 4 | 演化性能力 | 🟡 部分 | Model Profile 已就位;capability probing 未做;版本化 prompts 未做 |
| 5 | Eval Suite | ❌ 未开始 | |
| 6 | 长任务工程化 | 🟡 部分 | session 中断恢复已完成;context 压缩、双层记忆未做 |
| 4 | 演化性能力 | 🟡 部分 | Model Profile 已就位;capability probing ;版本化 prompts 未做 |
| 5 | Eval Suite | ⏸ 不做 | 个人工具,造作 case 没区分度;真换模型用 dogfooding 判断 |
| 6 | 长任务工程化 | 🟡 部分 | task + state.json ✅;session 中断恢复 ✅;context 压缩、双层记忆未做 |
| 7 | 打磨 | ❌ 未开始 | Docker 沙盒 / 更多 skill / Web UI |
---
@ -86,6 +86,29 @@
- `prompts/system/general_v1.md`(无版本化软链接,直接引用 v1)
- 启动时拼接顺序: 通用指引 → discovery 块(skill 列表) → 当前工作目录
### 8. Capability Probing(Phase 4)
- `core/probe.py`:四项探测 + ProbeReport
- `probe_basic_chat` —— 连通性,失败则跳过其余
- `probe_parallel_tools` —— 给两个独立工具,看 single response 里 tool_calls 数 ≥2
- `probe_thinking_mode` —— 对 declared=True 的模型传 reasoning_effort,看 API 接受 + 是否产出 reasoning_content/thinking
- `probe_long_context` —— needle-in-haystack 简化版,默认 reliable_context * 4 / 8 字符,上限 200K(opt-in,需 `--long-context`)
- `cli.py probe [--model X] [--long-context]`:rich Table 输出;退出码 0=全 ok / 2=有 mismatch / 3=有 error
- 不修改 yaml,只报告差异——是否调档案由用户决定
- 不进启动路径(每次启动跑会烧 API),用户显式触发
### 9. Task 概念 + state.json(Phase 6)
- `core/task.py`:`TaskState` dataclass(对齐 DESIGN.md §7.1)
- 字段:task_id / mode / description / status (active/completed/abandoned) / model / model_profile / reasoning_effort / cwd / created_at / updated_at / tokens_prompt / tokens_completion / cost_usd
- `save(task_dir)` / `load(task_dir)`,加载时容忍未知字段(schema 演化)
- 存储布局:`workspace/tasks/<task_id>/{state.json, messages.json}`
- `main.build_agent` 返回 5 元组 `(agent, session, sid, task_state, task_dir)`
- `main.sync_task_tokens(state, dir, llm)`:每轮 agent.run 后调,把 LLM 累计 tokens 写回 state.json
- CLI:
- `chat --mode coding --desc "修一处 bug"`(可选元数据)
- REPL:`/status` 看 state、`/done` 标完成、`/abandon` 标弃、`/desc <文本>` 改描述
- `cli.py tasks [--status active|completed|abandoned]`:列任务,显示 mode/model/msgs/tokens/desc
- 已知小坑:`Session.save()` 不原子,write_text 抛错(如 stdin 注入了 UTF-8 surrogate 半字符)会留下 0 字节文件——交互使用没问题,后续可加 tmp+rename
---
## 关键决策与偏差
@ -93,8 +116,9 @@
| 项 | 决策 | 与设计差异 |
|---|------|-----------|
| 工具基目录 | 用户当前 cwd,不是 workspace/ | 设计未明说;选 cwd 是因为 agent 该操作用户的项目 |
| Workspace 用途 | 只存 sessions/(暂时) | 设计含 `tasks/ memory/ logs/`,后续 Phase 6 再加 |
| Session 粒度 | 一个文件一个 session,无 task 概念 | 设计有 task_id / state.json,Phase 6 再加 |
| Workspace 用途 | `tasks/<id>/{state.json, messages.json}`;memory/ 待 Phase 6 双层记忆 | 设计含 `tasks/ memory/ logs/`,部分落地 |
| Session 粒度 | 一个目录一个 task,含 state.json | Phase 6 落地;DESIGN.md §7.1 字段对齐 |
| Eval Suite | 决定不做 | DESIGN.md §6.3 设计为团队/产品场景;个人工具用 dogfooding 替代,probe 覆盖健康检查 |
| 版本化 prompt | 直接 general_v1.md,无 active.md 软链接 | Windows 软链接麻烦;后续要切版本时再做 |
| run_python 沙盒 | subprocess + env 过滤 | 设计阶段 1 就是这套,未升级 Docker |
| 工具数 | 8 个 (read/write/edit/glob/grep/shell/run_python/load_skill) | 设计上限 ≤10 同时可见,目前刚好 |
@ -106,10 +130,14 @@
- 全项目 `ast.parse` 语法 OK
- yaml 配置可解析
- 所有 import 链路在 venv 中跑通
- `cli.py --help` / `cli.py chat --help` / `cli.py sessions --help` 正常
- `cli.py --help` / `cli.py chat --help` / `cli.py tasks --help` / `cli.py probe --help` 正常
- `SkillRegistry` 识别出 3 个 skill,discovery 块拼装正确
- 缺 `DEEPSEEK_API_KEY` 时报清晰错误
- 实测 DeepSeek API 接通(`deepseek-v4-flash` 模型 ID 被认),仅因账户余额不足而返回 InsufficientBalance —— **接入路径已通**
- 实测 DeepSeek API 接通,`flash` 和 `pro` 两档都能调通
- **真实 probe 结果**(2026-05-06):
- `deepseek_v4.flash`:basic_chat ok / **parallel_tools mismatch**(yaml=false, observed=true,2 个 tool_calls)/ thinking_mode skip(declared false)
- `deepseek_v4.pro`:basic_chat ok / parallel_tools ok / thinking_mode ok(reasoning_content 返回)
- flash 的 mismatch 暂不自动改 yaml —— `parallel_tools=true` 会影响所有实际调用,需更多场景观察后再决定
---
@ -117,13 +145,11 @@
按性价比排序:
1. **Phase 4 capability probing**(~半天)—— 启动时跑 needle-in-haystack / 并行 tool 探测,把 yaml 声称的能力对账
2. **Phase 5 Eval Suite**(~2 天)—— 模型升级决策的依据。每类任务 3-5 个 case,客观 + LLM judge 双评分
3. **Phase 6 task 概念 + state.json**(~1 天)—— 让 session 升级为任务,workspace 加 `tasks/<task_id>/`
4. **Phase 6 context 三层压缩**(~1 天)—— 兜底用,V4 长上下文一般用不到
5. **Phase 6 双层记忆**(~半天)—— `workspace/memory/core.md` 注 prompt + `extended/` 按需读
6. **Phase 7 Docker 沙盒**(~1 天)—— 替换 subprocess,run_python 安全升级
7. **Phase 7 更多 skill / 模型档案**(持续)
1. **Phase 6 双层记忆**(~半天)—— `workspace/memory/core.md` 注 prompt + `extended/` 按需读
2. **Phase 6 context 三层压缩**(~1 天)—— 兜底用,V4 长上下文一般用不到
3. **小修打磨**(~半小时)—— `Session.save()` 改原子写(tmp + rename),防 surrogate 等异常 truncate
4. **Phase 7 Docker 沙盒**(~1 天)—— 替换 subprocess,run_python 安全升级
5. **Phase 7 更多 skill / 模型档案**(持续)
---
@ -133,17 +159,19 @@
core/capabilities.py 71 行
core/llm.py 89 行
core/loop.py 99 行
core/probe.py 243 行 ← Phase 4
core/session.py 77 行
core/skills.py 81 行
core/task.py 63 行 ← Phase 6
tools/base.py 34 行
tools/fs.py 182 行
tools/shell.py 63 行
tools/run_python.py 84 行
tools/skill_tool.py 45 行
main.py 120 行
cli.py 138 行
main.py 175 行 ← +tasks 布局 / TaskState 装配
cli.py 265 行 ← +probe / +tasks 子命令
─────────────────────────────────
合计 Python 1083
合计 Python ~1571 行
prompts/system/general_v1.md
skills/coding/SKILL.md
@ -154,4 +182,4 @@ config/models/deepseek_v4.yaml
requirements.txt
```
设计预估 Phase 1-3 大约 800-1000 行,实际 1083 行,略多但仍在可读范围
设计预估 Phase 1-3 大约 800-1000 行,实际 1083 行,加上 Phase 4 + Phase 6 约 1571 行 Python

226
cli.py
View File

@ -1,23 +1,33 @@
"""CLI 入口: 简单 REPL。
用法:
python cli.py chat # 新建一个 session
python cli.py chat --resume last # 恢复最近一个
python cli.py chat --resume 20260506_141523
python cli.py chat # 新建一个 task
python cli.py chat --mode coding --desc "修一处 bug" # 带元数据建任务
python cli.py chat --resume last # 恢复最近一个 task
python cli.py chat --resume 20260506_141523 # 显式 task_id
python cli.py chat --model deepseek_v4.pro
python cli.py sessions # 列出历史 session
python cli.py tasks # 列出 task
python cli.py probe # 实测对账 yaml 声称的能力
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
import click
from rich.console import Console
from rich.prompt import Prompt
from rich.table import Table
from main import build_agent, load_config, resolve_workspace, sessions_dir
from core.task import TaskState
from main import (
ROOT,
build_agent,
load_config,
resolve_workspace,
sync_task_tokens,
tasks_dir,
)
@click.group()
@ -27,18 +37,22 @@ def cli() -> None:
@cli.command()
@click.option("--model", default=None, help="模型档案,如 deepseek_v4.flash 或 deepseek_v4.pro")
@click.option("--workspace", default=None, help="工作目录(存 sessions/)")
@click.option("--resume", default=None, help="恢复某个 session: 'last' 或 session_id")
def chat(model: str, workspace: str, resume: str) -> None:
"""启动交互式 REPL。每次启动默认开新 session,用 --resume 接老的。"""
@click.option("--workspace", default=None, help="工作目录(存 tasks/ 和 sessions/)")
@click.option("--resume", default=None, help="恢复 task: 'last' 或 task_id")
@click.option("--mode", default="", help="任务模式标签(coding/ppt/proposal/...自由形式)")
@click.option("--desc", default="", help="一句话任务描述,便于 tasks 列表识别")
def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None:
"""启动交互式 REPL。每次启动默认开新 task,用 --resume 接老的。"""
console = Console()
try:
agent, session, sid = build_agent(
agent, session, sid, task_state, task_dir = build_agent(
model_name=model,
workspace=workspace,
console=console,
session_id=resume,
resume=bool(resume),
mode=mode,
description=desc,
)
except Exception as e:
console.print(f"[red]启动失败:[/red] {type(e).__name__}: {e}")
@ -46,15 +60,21 @@ def chat(model: str, workspace: str, resume: str) -> None:
if resume:
console.print(
f"[green]恢复 session[/green] [bold]{sid}[/bold] ({len(session.messages)} 条消息) "
f"[green]恢复 task[/green] [bold]{sid}[/bold] ({len(session.messages)} 条消息) "
f"model: [bold]{agent.caps.model_id}[/bold]"
)
else:
meta_tail = ""
if task_state.mode or task_state.description:
meta_tail = f" mode={task_state.mode!r} desc={task_state.description!r}"
console.print(
f"[green]新 session[/green] [bold]{sid}[/bold] "
f"model: [bold]{agent.caps.model_id}[/bold]"
f"[green]新 task[/green] [bold]{sid}[/bold] "
f"model: [bold]{agent.caps.model_id}[/bold]{meta_tail}"
)
console.print("[dim]/exit 退出 /reset 清空当前对话 /new 开一个新 session /id 显示 session id[/dim]\n")
console.print(
"[dim]/exit 退出 /reset 清空对话(保留 task) /new 开新 task /id /status 查看 "
"/done /abandon 改状态 /desc <文本> 设描述[/dim]\n"
)
while True:
try:
@ -68,22 +88,48 @@ def chat(model: str, workspace: str, resume: str) -> None:
break
if cmd == "/reset":
session.reset(keep_system=True)
console.print("[dim]当前 session 已重置(保留 system)[/dim]")
console.print("[dim]当前 task 对话已重置(保留 system 和 state)[/dim]")
continue
if cmd == "/new":
try:
agent, session, sid = build_agent(
model_name=model, workspace=workspace, console=console
agent, session, sid, task_state, task_dir = build_agent(
model_name=model, workspace=workspace, console=console,
mode=mode, description=desc,
)
except Exception as e:
console.print(f"[red]新建失败:[/red] {type(e).__name__}: {e}")
continue
console.print(f"[green]新 session[/green] [bold]{sid}[/bold]")
console.print(f"[green]新 task[/green] [bold]{sid}[/bold]")
continue
if cmd == "/id":
cwd_disp = session.meta.get("cwd", "?")
model_disp = session.meta.get("model", agent.caps.model_id)
console.print(f"[dim]session: {sid} model: {model_disp} cwd: {cwd_disp}[/dim]")
console.print(f"[dim]task: {sid} model: {model_disp} cwd: {cwd_disp}[/dim]")
continue
if cmd == "/status":
console.print(
f"[dim]task {task_state.task_id} status={task_state.status} "
f"mode={task_state.mode!r} desc={task_state.description!r}\n"
f" model={task_state.model} tokens={task_state.tokens_total} "
f"(p={task_state.tokens_prompt}/c={task_state.tokens_completion}) "
f"created={task_state.created_at} updated={task_state.updated_at}[/dim]"
)
continue
if cmd == "/done":
task_state.status = "completed"
task_state.save(task_dir)
console.print(f"[green]task {sid} marked completed[/green]")
break
if cmd == "/abandon":
task_state.status = "abandoned"
task_state.save(task_dir)
console.print(f"[yellow]task {sid} marked abandoned[/yellow]")
break
if cmd.startswith("/desc"):
new_desc = cmd[len("/desc"):].strip()
task_state.description = new_desc
task_state.save(task_dir)
console.print(f"[dim]description set: {new_desc!r}[/dim]")
continue
if not cmd:
continue
@ -91,47 +137,127 @@ def chat(model: str, workspace: str, resume: str) -> None:
try:
agent.run(user_input)
except KeyboardInterrupt:
console.print("\n[yellow]已中断本轮。下一条输入会继续这个 session。[/yellow]")
console.print("\n[yellow]已中断本轮。下一条输入会继续这个 task。[/yellow]")
except Exception as e:
console.print(f"[red]运行错误:[/red] {type(e).__name__}: {e}")
finally:
sync_task_tokens(task_state, task_dir, agent.llm)
@cli.command()
@click.option("--workspace", default=None, help="工作目录")
@click.option("--limit", default=20, help="显示最近 N 个")
def sessions(workspace: str, limit: int) -> None:
"""列出已有 session。"""
@click.option("--status", default=None, help="只看某状态: active / completed / abandoned")
def tasks(workspace: str, limit: int, status: str) -> None:
"""列出已有 task(新格式,workspace/tasks/<id>/state.json)。"""
cfg = load_config()
ws = resolve_workspace(workspace, cfg)
sdir = sessions_dir(ws)
tdir = tasks_dir(ws)
items = sorted(sdir.glob("*.json"), reverse=True)[:limit]
if not items:
click.echo(f"(no sessions in {sdir})")
return
click.echo(f"{'session id':<18} {'msgs':>4} {'cwd':<32} preview")
click.echo("-" * 100)
for p in items:
rows = [] # (mtime, task_id, status, mode, model, tokens, n_msgs, desc)
for d in tdir.iterdir():
if not d.is_dir():
continue
msg_path = d / "messages.json"
if not msg_path.exists():
continue
st = TaskState.load(d)
if st is None:
continue
if status and st.status != status:
continue
try:
data = json.loads(p.read_text(encoding="utf-8"))
if isinstance(data, list):
messages, meta = data, {}
else:
messages = data.get("messages", []) or []
meta = data.get("meta", {}) or {}
n = len(messages)
preview = ""
for m in messages:
if isinstance(m, dict) and m.get("role") == "user":
preview = (m.get("content") or "")[:50].replace("\n", " ")
break
cwd = meta.get("cwd") or "?"
if len(cwd) > 32:
cwd = "..." + cwd[-29:]
except Exception as e:
n, preview, cwd = -1, f"[parse error: {e}]", "?"
click.echo(f"{p.stem:<18} {n:>4} {cwd:<32} {preview}")
data = json.loads(msg_path.read_text(encoding="utf-8"))
n = len(data.get("messages", []))
except Exception:
n = -1
rows.append((
msg_path.stat().st_mtime, st.task_id, st.status, st.mode,
st.model_profile or st.model, st.tokens_total, n, st.description,
))
rows.sort(reverse=True)
rows = rows[:limit]
if not rows:
click.echo(f"(no tasks in {tdir})")
return
tbl = Table(show_lines=False)
tbl.add_column("task id", style="bold")
tbl.add_column("status")
tbl.add_column("mode")
tbl.add_column("model")
tbl.add_column("msgs", justify="right")
tbl.add_column("tokens", justify="right")
tbl.add_column("desc")
sc = {"active": "cyan", "completed": "green", "abandoned": "dim"}
for _, tid, st, mode, model, tok, n, desc in rows:
c = sc.get(st, "white")
d_show = desc if len(desc) <= 50 else desc[:47] + "..."
tbl.add_row(tid, f"[{c}]{st}[/{c}]", mode, model, str(n), str(tok), d_show)
Console().print(tbl)
@cli.command()
@click.option("--model", default=None, help="模型档案,如 deepseek_v4.flash 或 deepseek_v4.pro")
@click.option("--long-context", is_flag=True, help="加跑 needle-in-haystack(费 token,默认关)")
def probe(model: str, long_context: bool) -> None:
"""实测对账模型 yaml 声称的能力。会调用 LLM,有 API 开销。"""
from core.capabilities import ModelCapabilities
from core.llm import LLM
from core.probe import probe_capabilities
cfg = load_config()
name = model or cfg["default_model"]
console = Console()
try:
caps = ModelCapabilities.load(name, ROOT / cfg["models_dir"])
except Exception as e:
console.print(f"[red]档案加载失败:[/red] {type(e).__name__}: {e}")
sys.exit(1)
console.print(
f"[bold]probing[/bold] [cyan]{caps.model_id}[/cyan] (profile: {name}) "
f"[dim]long-context={long_context}[/dim]\n"
)
try:
llm = LLM(caps)
except Exception as e:
console.print(f"[red]LLM 构造失败:[/red] {type(e).__name__}: {e}")
sys.exit(1)
with console.status("[dim]running probes...[/dim]", spinner="dots"):
report = probe_capabilities(caps, llm, include_long_context=long_context)
tbl = Table(show_lines=False)
tbl.add_column("capability", style="bold")
tbl.add_column("declared")
tbl.add_column("observed")
tbl.add_column("status")
tbl.add_column("detail")
color = {"ok": "green", "mismatch": "yellow", "error": "red", "skip": "dim"}
for r in report.results:
c = color.get(r.status, "white")
tbl.add_row(
r.name,
str(r.declared),
str(r.observed),
f"[{c}]{r.status}[/{c}]",
r.detail,
)
console.print(tbl)
if report.has_mismatch:
console.print(
"\n[yellow]存在能力对账差异 —— 看 detail,必要时改 "
f"config/models/{caps.family}.yaml[/yellow]"
)
sys.exit(2)
if any(r.status == "error" for r in report.results):
console.print("\n[red]部分探测出错(见 detail)[/red]")
sys.exit(3)
console.print("\n[green]全部能力声明与实测一致。[/green]")
if __name__ == "__main__":

243
core/probe.py Normal file
View File

@ -0,0 +1,243 @@
"""能力探测: 用真实 LLM 调用对账 yaml 中声称的能力。
只在用户显式触发(`cli.py probe`)时跑会花 API 额度,不进启动路径
不修改 yaml,只输出对比报告;让用户自己判断要不要改档案
四项探测:
- basic_chat:连通性失败则跳过其余
- parallel_tools:给两个独立工具, single response tool_calls 数量
- thinking_mode: declared=True 的模型传 reasoning_effort, API 是否接受 + 是否产出 thinking
- long_context(opt-in):needle-in-haystack 简化版,默认探 reliable_context 1/8
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, List, Optional
from .capabilities import ModelCapabilities
from .llm import LLM
@dataclass
class ProbeResult:
name: str
declared: Any
observed: Any
status: str # "ok" / "mismatch" / "skip" / "error"
detail: str = ""
@dataclass
class ProbeReport:
model: str
results: List[ProbeResult] = field(default_factory=list)
def add(self, r: ProbeResult) -> None:
self.results.append(r)
@property
def has_mismatch(self) -> bool:
return any(r.status == "mismatch" for r in self.results)
def _msg_dict(msg: Any) -> dict:
if hasattr(msg, "model_dump"):
return msg.model_dump()
if hasattr(msg, "dict"):
return msg.dict()
return {}
# ----- 单项 probe -----
def probe_basic_chat(llm: LLM) -> ProbeResult:
try:
resp = llm.chat(
messages=[{"role": "user", "content": "Reply with exactly the word: pong"}],
)
text = (resp.choices[0].message.content or "").strip()
ok = "pong" in text.lower()
return ProbeResult(
name="basic_chat",
declared="reachable",
observed=text[:40] or "<empty>",
status="ok" if ok else "mismatch",
detail="" if ok else f"expected 'pong', got: {text[:80]!r}",
)
except Exception as e:
return ProbeResult(
name="basic_chat",
declared="reachable",
observed=None,
status="error",
detail=f"{type(e).__name__}: {e}",
)
def probe_parallel_tools(llm: LLM, caps: ModelCapabilities) -> ProbeResult:
declared = caps.parallel_tools
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a city.",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
},
{
"type": "function",
"function": {
"name": "get_time",
"description": "Get current time in a timezone.",
"parameters": {
"type": "object",
"properties": {"tz": {"type": "string"}},
"required": ["tz"],
},
},
},
]
user_msg = (
"I need two independent pieces of information at the same time: the weather "
"in Beijing AND the current time in Tokyo. Please call BOTH tools in this "
"single turn (in parallel)."
)
try:
resp = llm.chat(
messages=[{"role": "user", "content": user_msg}],
tools=tools,
parallel_tool_calls=True,
)
tool_calls = getattr(resp.choices[0].message, "tool_calls", None) or []
n = len(tool_calls)
observed = n >= 2
return ProbeResult(
name="parallel_tools",
declared=declared,
observed=observed,
status="ok" if observed == bool(declared) else "mismatch",
detail=f"{n} tool_calls in single response",
)
except Exception as e:
return ProbeResult(
name="parallel_tools",
declared=declared,
observed=None,
status="error",
detail=f"{type(e).__name__}: {e}",
)
def probe_thinking_mode(llm: LLM, caps: ModelCapabilities) -> ProbeResult:
declared = caps.thinking_mode
if not declared:
return ProbeResult(
name="thinking_mode",
declared=False,
observed=None,
status="skip",
detail="declared false; skipping (cap-side flag controls API forwarding)",
)
effort = (
caps.default_reasoning_effort
or (caps.reasoning_effort_levels[0] if caps.reasoning_effort_levels else "medium")
)
try:
resp = llm.chat(
messages=[{"role": "user", "content": "Briefly: what is 17 * 23?"}],
reasoning_effort=effort,
)
msg = resp.choices[0].message
d = _msg_dict(msg)
rc = (
getattr(msg, "reasoning_content", None)
or getattr(msg, "thinking", None)
or d.get("reasoning_content")
or d.get("thinking")
)
observed = bool(rc)
return ProbeResult(
name="thinking_mode",
declared=True,
observed=observed,
status="ok" if observed else "mismatch",
detail=(
f"reasoning_effort={effort} accepted; "
+ ("thinking content returned" if observed else "no thinking content in response")
),
)
except Exception as e:
return ProbeResult(
name="thinking_mode",
declared=True,
observed=False,
status="mismatch",
detail=f"reasoning_effort rejected: {type(e).__name__}: {e}",
)
def probe_long_context(
llm: LLM, caps: ModelCapabilities, target_chars: Optional[int] = None
) -> ProbeResult:
"""needle-in-haystack 简化版。默认探 reliable_context * 4 / 8 字符,上限 200K。"""
if target_chars is None:
target_chars = caps.reliable_context * 4 // 8
target_chars = max(2_000, min(target_chars, 200_000))
SECRET = "K7-ZULU-9213"
pad = "The quick brown fox jumps over the lazy dog. " * 200
n_blocks = max(1, target_chars // len(pad))
middle = n_blocks // 2
parts: List[str] = []
for i in range(n_blocks):
if i == middle:
parts.append(f"\n>>> SECRET TOKEN: {SECRET} <<<\n")
parts.append(pad)
haystack = "".join(parts)
prompt = (
"Below is a long block of text. Somewhere in it a SECRET TOKEN is recorded "
"after the marker '>>> SECRET TOKEN:'. Reply with ONLY the token value, "
"nothing else.\n\n" + haystack
)
try:
resp = llm.chat(messages=[{"role": "user", "content": prompt}])
text = (resp.choices[0].message.content or "").strip()
ok = SECRET in text
return ProbeResult(
name="long_context",
declared=f"reliable_context={caps.reliable_context}",
observed=f"{len(haystack)} chars sent; secret {'recovered' if ok else 'missed'}",
status="ok" if ok else "mismatch",
detail=f"reply head: {text[:80]!r}",
)
except Exception as e:
return ProbeResult(
name="long_context",
declared=f"reliable_context={caps.reliable_context}",
observed=None,
status="error",
detail=f"{type(e).__name__}: {e}",
)
# ----- 顶层入口 -----
def probe_capabilities(
caps: ModelCapabilities,
llm: LLM,
*,
include_long_context: bool = False,
) -> ProbeReport:
report = ProbeReport(model=caps.model_id)
report.add(probe_basic_chat(llm))
if report.results[0].status == "error":
return report
report.add(probe_parallel_tools(llm, caps))
report.add(probe_thinking_mode(llm, caps))
if include_long_context:
report.add(probe_long_context(llm, caps))
return report

63
core/task.py Normal file
View File

@ -0,0 +1,63 @@
"""任务状态: DESIGN.md §7.1 规约,落 `<task_dir>/state.json`。
Task Session 的上层概念 Session 只管对话消息,Task 还管 mode/description/
status/tokens/cost/timestamps,这些是跨轮次共享和文件系统状态对齐的元数据
文件路径约定(workspace/ ):
tasks/<task_id>/state.json 此模块负责
tasks/<task_id>/messages.json Session
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, fields
from datetime import datetime
from pathlib import Path
from typing import Optional
@dataclass
class TaskState:
task_id: str
mode: str = "" # 自由形式: coding / ppt / proposal / general / 自定
description: str = "" # 一句话描述,便于列表识别
status: str = "active" # active / completed / abandoned
model: str = "" # caps.model_id
model_profile: str = "" # 档案名,如 deepseek_v4.flash
reasoning_effort: str = ""
cwd: str = "" # 任务的工作基目录
created_at: str = "" # ISO 时间戳
updated_at: str = ""
tokens_prompt: int = 0
tokens_completion: int = 0
cost_usd: float = 0.0 # 暂不算,留位
@property
def tokens_total(self) -> int:
return self.tokens_prompt + self.tokens_completion
def save(self, task_dir: Path) -> None:
task_dir.mkdir(parents=True, exist_ok=True)
self.updated_at = datetime.now().isoformat(timespec="seconds")
(task_dir / "state.json").write_text(
json.dumps(asdict(self), ensure_ascii=False, indent=2),
encoding="utf-8",
)
@classmethod
def load(cls, task_dir: Path) -> Optional["TaskState"]:
p = task_dir / "state.json"
if not p.exists():
return None
try:
data = json.loads(p.read_text(encoding="utf-8"))
except Exception:
return None
if not isinstance(data, dict):
return None
# 容忍 schema 演化:只取已知字段,缺失字段用 dataclass 默认
known = {f.name for f in fields(cls)}
kwargs = {k: v for k, v in data.items() if k in known}
if "task_id" not in kwargs:
kwargs["task_id"] = task_dir.name
return cls(**kwargs)

111
main.py
View File

@ -1,4 +1,9 @@
"""装配入口: 读 config → 加载 capabilities/skills → 构造 LLM/tools/session/loop。"""
"""装配入口: 读 config → 加载 capabilities/skills → 构造 LLM/tools/session/loop。
存储布局:
workspace/tasks/<task_id>/state.json TaskState
workspace/tasks/<task_id>/messages.json Session 消息
"""
from __future__ import annotations
from datetime import datetime
@ -13,6 +18,7 @@ from core.llm import LLM
from core.loop import AgentLoop
from core.session import Session
from core.skills import SkillRegistry
from core.task import TaskState
from tools.fs import EditTool, GlobTool, GrepTool, ReadTool, WriteTool
from tools.run_python import RunPythonTool
from tools.shell import ShellTool
@ -32,28 +38,38 @@ def resolve_workspace(workspace: Optional[str], cfg: Optional[dict] = None) -> P
return p
def sessions_dir(workspace_dir: Path) -> Path:
d = workspace_dir / "sessions"
def tasks_dir(workspace_dir: Path) -> Path:
d = workspace_dir / "tasks"
d.mkdir(parents=True, exist_ok=True)
return d
def resolve_session_path(workspace_dir: Path, session_id: Optional[str], resume: bool) -> Tuple[Path, str]:
"""返回 (path, session_id)。resume=True 时找现有文件,否则新建一个时间戳 id。"""
sdir = sessions_dir(workspace_dir)
def resolve_task_messages_path(
workspace_dir: Path, task_id: Optional[str], resume: bool
) -> Tuple[Path, str]:
"""返回 (messages_file_path, task_id)。
新建:tasks/<id>/messages.json;Resume:tasks/<id>/messages.json,'last' 取最新
"""
tdir = tasks_dir(workspace_dir)
if resume:
if session_id in (None, "", "last"):
existing = sorted(sdir.glob("*.json"))
if not existing:
raise FileNotFoundError(f"{sdir} 下没有任何 session 可恢复")
path = existing[-1]
return path, path.stem
path = sdir / f"{session_id}.json"
if not path.exists():
raise FileNotFoundError(f"session 不存在: {path}")
return path, session_id # type: ignore[return-value]
sid = session_id or datetime.now().strftime("%Y%m%d_%H%M%S")
return sdir / f"{sid}.json", sid
if task_id in (None, "", "last"):
candidates = []
for d in tdir.iterdir():
mf = d / "messages.json"
if mf.is_file():
candidates.append((mf.stat().st_mtime, mf, d.name))
if not candidates:
raise FileNotFoundError(f"无可恢复的 task: {tdir} 下无 task")
candidates.sort(key=lambda x: x[0], reverse=True)
_, path, sid = candidates[0]
return path, sid
task_msg = tdir / task_id / "messages.json"
if not task_msg.exists():
raise FileNotFoundError(f"task 不存在: {task_msg}")
return task_msg, task_id
sid = task_id or datetime.now().strftime("%Y%m%d_%H%M%S")
return tdir / sid / "messages.json", sid
def build_agent(
@ -62,7 +78,11 @@ def build_agent(
console: Optional[Console] = None,
session_id: Optional[str] = None,
resume: bool = False,
) -> Tuple[AgentLoop, Session, str]:
tool_base: Optional[Path] = None,
mode: str = "",
description: str = "",
) -> Tuple[AgentLoop, Session, str, TaskState, Path]:
"""返回 (agent, session, task_id, task_state, task_dir)。"""
cfg = load_config()
model = model_name or cfg["default_model"]
@ -70,38 +90,65 @@ def build_agent(
llm = LLM(caps)
workspace_dir = resolve_workspace(workspace, cfg)
session_path, sid = resolve_session_path(workspace_dir, session_id, resume)
session_path, sid = resolve_task_messages_path(workspace_dir, session_id, resume)
# 工具基目录: 用户当前 cwd —— agent 操作的是用户项目,不是 zcbot 仓库本身
tool_base = Path.cwd()
tool_base = Path(tool_base) if tool_base else Path.cwd()
skills = SkillRegistry(ROOT / cfg.get("skills_dir", "skills"))
task_dir = session_path.parent
if resume:
# 恢复: 直接加载老 session,不再注入新的 system prompt
session = Session.load(session_path)
saved_cwd = session.meta.get("cwd")
if saved_cwd and console is not None and saved_cwd != str(tool_base):
console.print(
f"[yellow]提示:[/yellow] 当前 cwd 与 session 记录不同 —— "
f"[yellow]提示:[/yellow] 当前 cwd 与 task 记录不同 —— "
f"工具基于 current cwd,不会自动切回。\n"
f" session cwd: [dim]{saved_cwd}[/dim]\n"
f" task cwd: [dim]{saved_cwd}[/dim]\n"
f" current cwd: [dim]{tool_base}[/dim]"
)
task_state = TaskState.load(task_dir)
if task_state is None:
# messages.json 存在但 state.json 缺失:用 session.meta 兜底重建
task_state = TaskState(
task_id=sid,
mode=mode,
description=description,
status="active",
model=session.meta.get("model", caps.model_id),
model_profile=session.meta.get("model_profile", model),
cwd=session.meta.get("cwd", str(tool_base)),
created_at=session.meta.get("created_at", datetime.now().isoformat(timespec="seconds")),
)
task_state.save(task_dir)
else:
system_prompt = (ROOT / cfg["system_prompt"]).read_text(encoding="utf-8")
if skills.skills:
system_prompt += f"\n\n## 可用 skill (用 load_skill 加载完整指引)\n{skills.discovery_block()}"
system_prompt += f"\n\n## 当前工作目录\n{tool_base}"
now_iso = datetime.now().isoformat(timespec="seconds")
meta = {
"id": sid,
"created_at": datetime.now().isoformat(timespec="seconds"),
"created_at": now_iso,
"cwd": str(tool_base),
"model": caps.model_id,
"model_profile": model,
}
session = Session(system_prompt=system_prompt, path=session_path, meta=meta)
session.save() # 立刻落盘,占住文件名
session.save() # 占住文件名
task_state = TaskState(
task_id=sid,
mode=mode,
description=description,
status="active",
model=caps.model_id,
model_profile=model,
reasoning_effort=caps.default_reasoning_effort or "",
cwd=str(tool_base),
created_at=now_iso,
)
task_state.save(task_dir)
tools = {}
for cls in (ReadTool, WriteTool, EditTool, GlobTool, GrepTool, ShellTool):
@ -117,4 +164,12 @@ def build_agent(
tools[rp.name] = rp
agent = AgentLoop(llm, tools, session, caps, console=console)
return agent, session, sid
return agent, session, sid, task_state, task_dir
def sync_task_tokens(task_state: TaskState, task_dir: Path, llm: LLM) -> None:
"""每轮 agent.run 后调,把 LLM 累计 tokens 写回 state.json。"""
tc = llm.token_counter
task_state.tokens_prompt = tc.prompt_tokens
task_state.tokens_completion = tc.completion_tokens
task_state.save(task_dir)