zcbot/cli.py

359 lines
14 KiB
Python

"""CLI 入口: 简单 REPL。
用法:
python cli.py chat # 新建一个 task
python cli.py chat --mode coding --desc "修一处 bug" # 带元数据建任务
python cli.py chat --resume last # 恢复最近一个 task
python cli.py chat --resume 20260506_141523 # 显式 task_id
python cli.py chat --model deepseek_v4.pro
python cli.py tasks # 列出 task
python cli.py probe # 实测对账 yaml 声称的能力
"""
from __future__ import annotations
import json
import shutil
import sys
import click
from rich.prompt import Prompt
from rich.table import Table
from core.task import TaskState
from core.ui import make_console
from main import (
ROOT,
build_agent,
load_config,
resolve_workspace,
sync_task_tokens,
tasks_dir,
)
@click.group()
def cli() -> None:
"""zcbot - 个人任务 agent"""
def _cleanup_if_empty(task_dir, session, console=None) -> bool:
"""切走前清理 task_dir。三条都满足才删:
1) session 没有 user 消息
2) task_dir 在磁盘上(懒创建后,没说话就没目录,直接 no-op)
3) 目录里只剩 messages.json(state.json 存在 = `/done /abandon /desc` 留下的显式痕迹,要保)
"""
if any(m.get("role") == "user" for m in session.messages):
return False
try:
entries = list(task_dir.iterdir())
except FileNotFoundError:
return False
if any(p.is_dir() for p in entries):
return False
if {p.name for p in entries if p.is_file()} - {"messages.json"}:
return False
shutil.rmtree(task_dir, ignore_errors=True)
if console is not None:
console.print(f"[muted]清理空 task {task_dir.name}[/muted]")
return True
def _list_task_rows(workspace_dir, limit=20, status=None):
"""返回 [(mtime, task_id, status, mode, model, tokens, n_msgs, desc), ...] mtime 降序。"""
tdir = tasks_dir(workspace_dir)
rows = []
for d in tdir.iterdir():
if not d.is_dir():
continue
msg_path = d / "messages.json"
if not msg_path.exists():
continue
st = TaskState.load(d)
if st is None:
continue
if status and st.status != status:
continue
try:
data = json.loads(msg_path.read_text(encoding="utf-8"))
n = len(data.get("messages", []))
except Exception:
n = -1
rows.append((
msg_path.stat().st_mtime, st.task_id, st.status, st.mode,
st.model_profile or st.model, st.tokens_total, n, st.description,
))
rows.sort(reverse=True)
return rows[:limit]
@cli.command()
@click.option("--model", default=None, help="模型档案,如 deepseek_v4.flash 或 deepseek_v4.pro")
@click.option("--workspace", default=None, help="工作目录(存 tasks/ 和 sessions/)")
@click.option("--resume", default=None, help="恢复 task: 'last' 或 task_id")
@click.option("--mode", default="", help="任务模式标签(coding/ppt/proposal/...自由形式)")
@click.option("--desc", default="", help="一句话任务描述,便于 tasks 列表识别")
def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None:
"""启动交互式 REPL。每次启动默认开新 task,用 --resume 接老的。"""
console = make_console()
try:
agent, session, sid, task_state, task_dir = build_agent(
model_name=model,
workspace=workspace,
console=console,
session_id=resume,
resume=bool(resume),
mode=mode,
description=desc,
)
except Exception as e:
console.print(f"[err]启动失败:[/err] {type(e).__name__}: {e}")
sys.exit(1)
if resume:
console.print(
f"[ok]恢复 task[/ok] [bold]{sid}[/bold] ({len(session.messages)} 条消息) "
f"model: [accent]{agent.caps.model_id}[/accent]"
)
else:
meta_tail = ""
if task_state.mode or task_state.description:
meta_tail = f" mode={task_state.mode!r} desc={task_state.description!r}"
console.print(
f"[ok]新 task[/ok] [bold]{sid}[/bold] "
f"model: [accent]{agent.caps.model_id}[/accent]{meta_tail}"
)
console.print(
"[info]/exit 退出 /reset 清空对话(保留 task) /new 开新 task "
"/resume [last|<id>] 切到已有 task /id /status 查看 "
"/done /abandon 改状态 /desc <文本> 设描述[/info]\n"
)
while True:
try:
user_input = Prompt.ask("[user]you[/user]", console=console)
except (EOFError, KeyboardInterrupt):
console.print("\n[muted]bye[/muted]")
_cleanup_if_empty(task_dir, session, console)
break
cmd = user_input.strip()
if cmd in ("/exit", "/quit"):
_cleanup_if_empty(task_dir, session, console)
break
if cmd == "/reset":
session.reset(keep_system=True)
console.print("[info]当前 task 对话已重置(保留 system 和 state)[/info]")
continue
if cmd == "/new":
_cleanup_if_empty(task_dir, session, console)
try:
agent, session, sid, task_state, task_dir = build_agent(
model_name=model, workspace=workspace, console=console,
mode=mode, description=desc,
)
except Exception as e:
console.print(f"[err]新建失败:[/err] {type(e).__name__}: {e}")
continue
console.print(f"[ok]新 task[/ok] [bold]{sid}[/bold]")
continue
if cmd.startswith("/resume"):
arg = cmd[len("/resume"):].strip()
ws_dir = resolve_workspace(workspace)
target_id = None
if arg == "last":
rs = _list_task_rows(ws_dir, limit=1)
if not rs:
console.print("[warn]没有可恢复的 task[/warn]")
continue
target_id = rs[0][1]
elif arg:
target_id = arg
else:
rs = _list_task_rows(ws_dir, limit=10)
if not rs:
console.print("[warn]没有可恢复的 task[/warn]")
continue
tbl = Table(show_lines=False)
tbl.add_column("#", style="bold")
tbl.add_column("task id")
tbl.add_column("status")
tbl.add_column("mode")
tbl.add_column("msgs", justify="right")
tbl.add_column("desc")
sc = {"active": "status.active", "completed": "status.completed", "abandoned": "status.abandoned"}
for i, (_, tid, st, md, _mdl, _tok, n, dsc) in enumerate(rs, 1):
c = sc.get(st, "info")
d_show = dsc if len(dsc) <= 50 else dsc[:47] + "..."
tbl.add_row(str(i), tid, f"[{c}]{st}[/{c}]", md, str(n), d_show)
console.print(tbl)
try:
sel = Prompt.ask("[user]选编号或输入 task_id (回车取消)[/user]", console=console, default="")
except (EOFError, KeyboardInterrupt):
continue
sel = sel.strip()
if not sel:
continue
if sel.isdigit():
idx = int(sel) - 1
if 0 <= idx < len(rs):
target_id = rs[idx][1]
else:
console.print(f"[err]编号超界: {sel}[/err]")
continue
else:
target_id = sel
if target_id == sid:
console.print(f"[info]已是当前 task: {sid}[/info]")
continue
_cleanup_if_empty(task_dir, session, console)
try:
agent, session, sid, task_state, task_dir = build_agent(
model_name=model, workspace=workspace, console=console,
session_id=target_id, resume=True,
)
except Exception as e:
console.print(f"[err]恢复失败:[/err] {type(e).__name__}: {e}")
continue
console.print(
f"[ok]切到 task[/ok] [bold]{sid}[/bold] ({len(session.messages)} 条消息) "
f"model: [accent]{agent.caps.model_id}[/accent]"
)
continue
if cmd == "/id":
cwd_disp = session.meta.get("cwd", "?")
model_disp = session.meta.get("model", agent.caps.model_id)
console.print(f"[info]task: {sid} model: {model_disp} cwd: {cwd_disp}[/info]")
continue
if cmd == "/status":
console.print(
f"[info]task {task_state.task_id} status={task_state.status} "
f"mode={task_state.mode!r} desc={task_state.description!r}\n"
f" model={task_state.model} tokens={task_state.tokens_total} "
f"(p={task_state.tokens_prompt}/c={task_state.tokens_completion}) "
f"created={task_state.created_at} updated={task_state.updated_at}[/info]"
)
continue
if cmd == "/done":
task_state.status = "completed"
task_state.save(task_dir)
console.print(f"[ok]task {sid} marked completed[/ok]")
break
if cmd == "/abandon":
task_state.status = "abandoned"
task_state.save(task_dir)
console.print(f"[warn]task {sid} marked abandoned[/warn]")
break
if cmd.startswith("/desc"):
new_desc = cmd[len("/desc"):].strip()
task_state.description = new_desc
task_state.save(task_dir)
console.print(f"[info]description set: {new_desc!r}[/info]")
continue
if not cmd:
continue
try:
agent.run(user_input)
except KeyboardInterrupt:
console.print("\n[warn]已中断本轮。下一条输入会继续这个 task。[/warn]")
except Exception as e:
console.print(f"[err]运行错误:[/err] {type(e).__name__}: {e}")
finally:
sync_task_tokens(task_state, task_dir, agent.llm)
@cli.command()
@click.option("--workspace", default=None, help="工作目录")
@click.option("--limit", default=20, help="显示最近 N 个")
@click.option("--status", default=None, help="只看某状态: active / completed / abandoned")
def tasks(workspace: str, limit: int, status: str) -> None:
"""列出已有 task(新格式,workspace/tasks/<id>/state.json)。"""
cfg = load_config()
ws = resolve_workspace(workspace, cfg)
rows = _list_task_rows(ws, limit=limit, status=status)
if not rows:
click.echo(f"(no tasks in {tasks_dir(ws)})")
return
tbl = Table(show_lines=False)
tbl.add_column("task id", style="bold")
tbl.add_column("status")
tbl.add_column("mode")
tbl.add_column("model")
tbl.add_column("msgs", justify="right")
tbl.add_column("tokens", justify="right")
tbl.add_column("desc")
sc = {"active": "status.active", "completed": "status.completed", "abandoned": "status.abandoned"}
for _, tid, st, mode, model, tok, n, desc in rows:
c = sc.get(st, "info")
d_show = desc if len(desc) <= 50 else desc[:47] + "..."
tbl.add_row(tid, f"[{c}]{st}[/{c}]", mode, model, str(n), str(tok), d_show)
make_console().print(tbl)
@cli.command()
@click.option("--model", default=None, help="模型档案,如 deepseek_v4.flash 或 deepseek_v4.pro")
@click.option("--long-context", is_flag=True, help="加跑 needle-in-haystack(费 token,默认关)")
def probe(model: str, long_context: bool) -> None:
"""实测对账模型 yaml 声称的能力。会调用 LLM,有 API 开销。"""
from core.capabilities import ModelCapabilities
from core.llm import LLM
from core.probe import probe_capabilities
cfg = load_config()
name = model or cfg["default_model"]
console = make_console()
try:
caps = ModelCapabilities.load(name, ROOT / cfg["models_dir"])
except Exception as e:
console.print(f"[err]档案加载失败:[/err] {type(e).__name__}: {e}")
sys.exit(1)
console.print(
f"[bold]probing[/bold] [accent]{caps.model_id}[/accent] (profile: {name}) "
f"[muted]long-context={long_context}[/muted]\n"
)
try:
llm = LLM(caps)
except Exception as e:
console.print(f"[err]LLM 构造失败:[/err] {type(e).__name__}: {e}")
sys.exit(1)
with console.status("[muted]running probes...[/muted]", spinner="dots"):
report = probe_capabilities(caps, llm, include_long_context=long_context)
tbl = Table(show_lines=False)
tbl.add_column("capability", style="bold")
tbl.add_column("declared")
tbl.add_column("observed")
tbl.add_column("status")
tbl.add_column("detail")
color = {"ok": "ok", "mismatch": "warn", "error": "err", "skip": "muted"}
for r in report.results:
c = color.get(r.status, "info")
tbl.add_row(
r.name,
str(r.declared),
str(r.observed),
f"[{c}]{r.status}[/{c}]",
r.detail,
)
console.print(tbl)
if report.has_mismatch:
console.print(
"\n[warn]存在能力对账差异 —— 看 detail,必要时改 "
f"config/models/{caps.family}.yaml[/warn]"
)
sys.exit(2)
if any(r.status == "error" for r in report.results):
console.print("\n[err]部分探测出错(见 detail)[/err]")
sys.exit(3)
console.print("\n[ok]全部能力声明与实测一致。[/ok]")
if __name__ == "__main__":
cli()