zcbot/cli.py

485 lines
18 KiB
Python

"""CLI 入口: 简单 REPL。
用法:
python cli.py chat # 新建一个 task
python cli.py chat --mode coding --desc "修一处 bug" # 带元数据建任务
python cli.py chat --resume last # 恢复最近一个 task
python cli.py chat --resume 20260506_141523 # 显式 task_id
python cli.py chat --model deepseek_v4.pro
python cli.py tasks # 列出 task
python cli.py probe # 实测对账 yaml 声称的能力
"""
from __future__ import annotations
import json
import shutil
import sys
from pathlib import Path
import click
from rich.prompt import Prompt
from rich.table import Table
from core.task import TaskState
from core.ui import make_console
from main import (
ROOT,
build_agent,
load_config,
resolve_workspace,
sync_task_tokens,
tasks_dir,
)
@click.group()
def cli() -> None:
"""zcbot - 个人任务 agent"""
@cli.group()
def db() -> None:
"""数据库管理 (alembic upgrade/downgrade/current)。需先 export ZCBOT_DB_URL。"""
def _alembic_cfg():
from alembic.config import Config
return Config(str(ROOT / "alembic.ini"))
def _run_alembic(fn, *args) -> None:
"""统一包一层友好出错(ZCBOT_DB_URL 未设置 / 连不上 → 简洁报错,不打 traceback)。"""
try:
fn(_alembic_cfg(), *args)
except RuntimeError as e:
click.echo(f"[err] {e}", err=True)
sys.exit(2)
except Exception as e:
click.echo(f"[err] {type(e).__name__}: {e}", err=True)
sys.exit(3)
@db.command("upgrade")
@click.argument("revision", default="head")
def db_upgrade(revision: str) -> None:
"""alembic upgrade <revision> (default head)."""
from alembic import command
_run_alembic(command.upgrade, revision)
@db.command("downgrade")
@click.argument("revision")
def db_downgrade(revision: str) -> None:
"""alembic downgrade <revision> (use -1 for one step, base for all)."""
from alembic import command
_run_alembic(command.downgrade, revision)
@db.command("current")
def db_current() -> None:
"""alembic current -- show currently applied revision."""
from alembic import command
_run_alembic(command.current)
def _cleanup_if_empty(task_dir, session, console=None) -> bool:
"""切走前清理 task_dir。三条都满足才删:
1) session 没有 user 消息
2) task_dir 在磁盘上(懒创建后,没说话就没目录,直接 no-op)
3) 目录里只剩 messages.json(state.json 存在 = `/done /abandon /desc` 留下的显式痕迹,要保)
原子写留下的 `*.tmp` 孤儿不算痕迹,放过。
"""
if any(m.get("role") == "user" for m in session.messages):
return False
try:
entries = list(task_dir.iterdir())
except FileNotFoundError:
return False
if any(p.is_dir() for p in entries):
return False
meaningful = {
p.name for p in entries
if p.is_file() and not p.name.endswith(".tmp")
}
if meaningful - {"messages.json"}:
return False
shutil.rmtree(task_dir, ignore_errors=True)
if console is not None:
console.print(f"[muted]清理空 task {task_dir.name}[/muted]")
return True
def _list_task_rows(workspace_dir, limit=20, status=None):
"""返回 [(mtime, task_id, status, mode, model, tokens, n_msgs, desc), ...] mtime 降序。"""
tdir = tasks_dir(workspace_dir)
rows = []
for d in tdir.iterdir():
if not d.is_dir():
continue
msg_path = d / "messages.json"
if not msg_path.exists():
continue
st = TaskState.load(d)
if st is None:
continue
if status and st.status != status:
continue
try:
data = json.loads(msg_path.read_text(encoding="utf-8"))
n = len(data.get("messages", []))
except Exception:
n = -1
rows.append((
msg_path.stat().st_mtime, st.task_id, st.status, st.mode,
st.model_profile or st.model, st.tokens_total, n, st.description,
))
rows.sort(reverse=True)
return rows[:limit]
@cli.command()
@click.option("--model", default=None, help="模型档案,如 deepseek_v4.flash 或 deepseek_v4.pro")
@click.option("--workspace", default=None, help="工作目录(存 tasks/ 和 sessions/)")
@click.option("--resume", default=None, help="恢复 task: 'last' 或 task_id")
@click.option("--mode", default="", help="任务模式标签(coding/ppt/proposal/...自由形式)")
@click.option("--desc", default="", help="一句话任务描述,便于 tasks 列表识别")
def chat(model: str, workspace: str, resume: str, mode: str, desc: str) -> None:
"""启动交互式 REPL。每次启动默认开新 task,用 --resume 接老的。"""
console = make_console()
try:
agent, session, sid, task_state, task_dir = build_agent(
model_name=model,
workspace=workspace,
console=console,
session_id=resume,
resume=bool(resume),
mode=mode,
description=desc,
)
except Exception as e:
console.print(f"[err]启动失败:[/err] {type(e).__name__}: {e}")
sys.exit(1)
if resume:
console.print(
f"[ok]恢复 task[/ok] [bold]{sid}[/bold] ({len(session.messages)} 条消息) "
f"model: [accent]{agent.caps.model_id}[/accent]"
)
else:
meta_tail = ""
if task_state.mode or task_state.description:
meta_tail = f" mode={task_state.mode!r} desc={task_state.description!r}"
console.print(
f"[ok]新 task[/ok] [bold]{sid}[/bold] "
f"model: [accent]{agent.caps.model_id}[/accent]{meta_tail}"
)
console.print(
"[info]/exit 退出 /reset 清空对话(保留 task) /new 开新 task "
"/resume [last|<id>] 切到已有 task /id /status 查看 "
"/done /abandon 改状态 /desc <文本> 设描述 "
"/export [<id>] 导出对话为 .docx[/info]\n"
)
while True:
try:
user_input = Prompt.ask("[user]you[/user]", console=console)
except (EOFError, KeyboardInterrupt):
console.print("\n[muted]bye[/muted]")
_cleanup_if_empty(task_dir, session, console)
break
cmd = user_input.strip()
if cmd in ("/exit", "/quit"):
_cleanup_if_empty(task_dir, session, console)
break
if cmd == "/reset":
session.reset(keep_system=True)
console.print("[info]当前 task 对话已重置(保留 system 和 state)[/info]")
continue
if cmd == "/new":
_cleanup_if_empty(task_dir, session, console)
try:
agent, session, sid, task_state, task_dir = build_agent(
model_name=model, workspace=workspace, console=console,
mode=mode, description=desc,
)
except Exception as e:
console.print(f"[err]新建失败:[/err] {type(e).__name__}: {e}")
continue
console.print(f"[ok]新 task[/ok] [bold]{sid}[/bold]")
continue
if cmd.startswith("/resume"):
arg = cmd[len("/resume"):].strip()
ws_dir = resolve_workspace(workspace)
target_id = None
if arg == "last":
rs = _list_task_rows(ws_dir, limit=1)
if not rs:
console.print("[warn]没有可恢复的 task[/warn]")
continue
target_id = rs[0][1]
elif arg:
target_id = arg
else:
rs = _list_task_rows(ws_dir, limit=10)
if not rs:
console.print("[warn]没有可恢复的 task[/warn]")
continue
tbl = Table(show_lines=False)
tbl.add_column("#", style="bold")
tbl.add_column("task id")
tbl.add_column("status")
tbl.add_column("mode")
tbl.add_column("msgs", justify="right")
tbl.add_column("desc")
sc = {"active": "status.active", "completed": "status.completed", "abandoned": "status.abandoned"}
for i, (_, tid, st, md, _mdl, _tok, n, dsc) in enumerate(rs, 1):
c = sc.get(st, "info")
d_show = dsc if len(dsc) <= 50 else dsc[:47] + "..."
tbl.add_row(str(i), tid, f"[{c}]{st}[/{c}]", md, str(n), d_show)
console.print(tbl)
try:
sel = Prompt.ask("[user]选编号或输入 task_id (回车取消)[/user]", console=console, default="")
except (EOFError, KeyboardInterrupt):
continue
sel = sel.strip()
if not sel:
continue
if sel.isdigit():
idx = int(sel) - 1
if 0 <= idx < len(rs):
target_id = rs[idx][1]
else:
console.print(f"[err]编号超界: {sel}[/err]")
continue
else:
target_id = sel
if target_id == sid:
console.print(f"[info]已是当前 task: {sid}[/info]")
continue
_cleanup_if_empty(task_dir, session, console)
try:
agent, session, sid, task_state, task_dir = build_agent(
model_name=model, workspace=workspace, console=console,
session_id=target_id, resume=True,
)
except Exception as e:
console.print(f"[err]恢复失败:[/err] {type(e).__name__}: {e}")
continue
console.print(
f"[ok]切到 task[/ok] [bold]{sid}[/bold] ({len(session.messages)} 条消息) "
f"model: [accent]{agent.caps.model_id}[/accent]"
)
continue
if cmd == "/id":
cwd_disp = session.meta.get("cwd", "?")
model_disp = session.meta.get("model", agent.caps.model_id)
console.print(f"[info]task: {sid} model: {model_disp} cwd: {cwd_disp}[/info]")
continue
if cmd == "/status":
console.print(
f"[info]task {task_state.task_id} status={task_state.status} "
f"mode={task_state.mode!r} desc={task_state.description!r}\n"
f" model={task_state.model} tokens={task_state.tokens_total} "
f"(p={task_state.tokens_prompt}/c={task_state.tokens_completion}) "
f"created={task_state.created_at} updated={task_state.updated_at}[/info]"
)
continue
if cmd == "/done":
task_state.status = "completed"
task_state.save(task_dir)
console.print(f"[ok]task {sid} marked completed[/ok]")
break
if cmd == "/abandon":
task_state.status = "abandoned"
task_state.save(task_dir)
console.print(f"[warn]task {sid} marked abandoned[/warn]")
break
if cmd.startswith("/desc"):
new_desc = cmd[len("/desc"):].strip()
task_state.description = new_desc
task_state.save(task_dir)
console.print(f"[info]description set: {new_desc!r}[/info]")
continue
if cmd.startswith("/export"):
arg = cmd[len("/export"):].strip()
target_dir = task_dir
if arg:
ws_dir = resolve_workspace(workspace)
if arg == "last":
rs = _list_task_rows(ws_dir, limit=1)
if not rs:
console.print("[warn]没有 task 可导出[/warn]")
continue
arg = rs[0][1]
target_dir = tasks_dir(ws_dir) / arg
if not (target_dir / "messages.json").exists():
console.print(
f"[warn]无可导出内容: {target_dir.name} 还没有消息[/warn]"
)
continue
try:
from core.export_docx import export_chat_to_docx
out = export_chat_to_docx(target_dir)
except Exception as e:
console.print(f"[err]导出失败:[/err] {type(e).__name__}: {e}")
continue
console.print(f"[ok]已导出[/ok] -> {out}")
continue
if not cmd:
continue
try:
agent.run(user_input)
except KeyboardInterrupt:
console.print("\n[warn]已中断本轮。下一条输入会继续这个 task。[/warn]")
except Exception as e:
console.print(f"[err]运行错误:[/err] {type(e).__name__}: {e}")
finally:
sync_task_tokens(task_state, task_dir, agent.llm)
@cli.command()
@click.option("--workspace", default=None, help="工作目录")
@click.option("--limit", default=20, help="显示最近 N 个")
@click.option("--status", default=None, help="只看某状态: active / completed / abandoned")
def tasks(workspace: str, limit: int, status: str) -> None:
"""列出已有 task(新格式,workspace/tasks/<id>/state.json)。"""
cfg = load_config()
ws = resolve_workspace(workspace, cfg)
rows = _list_task_rows(ws, limit=limit, status=status)
if not rows:
click.echo(f"(no tasks in {tasks_dir(ws)})")
return
tbl = Table(show_lines=False)
tbl.add_column("task id", style="bold")
tbl.add_column("status")
tbl.add_column("mode")
tbl.add_column("model")
tbl.add_column("msgs", justify="right")
tbl.add_column("tokens", justify="right")
tbl.add_column("desc")
sc = {"active": "status.active", "completed": "status.completed", "abandoned": "status.abandoned"}
for _, tid, st, mode, model, tok, n, desc in rows:
c = sc.get(st, "info")
d_show = desc if len(desc) <= 50 else desc[:47] + "..."
tbl.add_row(tid, f"[{c}]{st}[/{c}]", mode, model, str(n), str(tok), d_show)
make_console().print(tbl)
@cli.command()
@click.argument("task_id")
@click.option("--workspace", default=None, help="工作目录")
@click.option("-o", "--output", default=None,
help="输出 .docx 路径,默认 <task_dir>/chat_<task_id>.docx")
@click.option("--include-system", is_flag=True,
help="包含 system prompt(默认跳过,信息密度低)")
@click.option("--no-reasoning", is_flag=True,
help="不包含 reasoning_content(默认带)")
@click.option("--tool-head", default=1000, type=int,
help="tool 结果保留前 N 字符(默认 1000)")
@click.option("--tool-tail", default=500, type=int,
help="tool 结果保留后 N 字符(默认 500)")
def export(task_id: str, workspace: str, output: str, include_system: bool,
no_reasoning: bool, tool_head: int, tool_tail: int) -> None:
"""把指定 task 的对话导出为 .docx。task_id 用 'last' 取最近一个。"""
from core.export_docx import export_chat_to_docx
console = make_console()
cfg = load_config()
ws = resolve_workspace(workspace, cfg)
if task_id == "last":
rs = _list_task_rows(ws, limit=1)
if not rs:
console.print("[err]没有 task 可导出[/err]")
sys.exit(1)
task_id = rs[0][1]
td = tasks_dir(ws) / task_id
if not (td / "messages.json").exists():
console.print(f"[err]task 不存在或无 messages.json:[/err] {td}")
sys.exit(1)
out = Path(output).resolve() if output else None
try:
path = export_chat_to_docx(
td, out,
include_system=include_system,
include_reasoning=not no_reasoning,
tool_head=tool_head,
tool_tail=tool_tail,
)
except Exception as e:
console.print(f"[err]导出失败:[/err] {type(e).__name__}: {e}")
sys.exit(1)
console.print(f"[ok]导出完成[/ok] -> {path}")
@cli.command()
@click.option("--model", default=None, help="模型档案,如 deepseek_v4.flash 或 deepseek_v4.pro")
@click.option("--long-context", is_flag=True, help="加跑 needle-in-haystack(费 token,默认关)")
def probe(model: str, long_context: bool) -> None:
"""实测对账模型 yaml 声称的能力。会调用 LLM,有 API 开销。"""
from core.capabilities import ModelCapabilities
from core.llm import LLM
from core.probe import probe_capabilities
cfg = load_config()
name = model or cfg["default_model"]
console = make_console()
try:
caps = ModelCapabilities.load(name, ROOT / cfg["models_dir"])
except Exception as e:
console.print(f"[err]档案加载失败:[/err] {type(e).__name__}: {e}")
sys.exit(1)
console.print(
f"[bold]probing[/bold] [accent]{caps.model_id}[/accent] (profile: {name}) "
f"[muted]long-context={long_context}[/muted]\n"
)
try:
llm = LLM(caps)
except Exception as e:
console.print(f"[err]LLM 构造失败:[/err] {type(e).__name__}: {e}")
sys.exit(1)
with console.status("[muted]running probes...[/muted]", spinner="dots"):
report = probe_capabilities(caps, llm, include_long_context=long_context)
tbl = Table(show_lines=False)
tbl.add_column("capability", style="bold")
tbl.add_column("declared")
tbl.add_column("observed")
tbl.add_column("status")
tbl.add_column("detail")
color = {"ok": "ok", "mismatch": "warn", "error": "err", "skip": "muted"}
for r in report.results:
c = color.get(r.status, "info")
tbl.add_row(
r.name,
str(r.declared),
str(r.observed),
f"[{c}]{r.status}[/{c}]",
r.detail,
)
console.print(tbl)
if report.has_mismatch:
console.print(
"\n[warn]存在能力对账差异 —— 看 detail,必要时改 "
f"config/models/{caps.family}.yaml[/warn]"
)
sys.exit(2)
if any(r.status == "error" for r in report.results):
console.print("\n[err]部分探测出错(见 detail)[/err]")
sys.exit(3)
console.print("\n[ok]全部能力声明与实测一致。[/ok]")
if __name__ == "__main__":
cli()