121 lines
4.4 KiB
Python
121 lines
4.4 KiB
Python
"""装配入口: 读 config → 加载 capabilities/skills → 构造 LLM/tools/session/loop。"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple
|
|
|
|
import yaml
|
|
from rich.console import Console
|
|
|
|
from core.capabilities import ModelCapabilities
|
|
from core.llm import LLM
|
|
from core.loop import AgentLoop
|
|
from core.session import Session
|
|
from core.skills import SkillRegistry
|
|
from tools.fs import EditTool, GlobTool, GrepTool, ReadTool, WriteTool
|
|
from tools.run_python import RunPythonTool
|
|
from tools.shell import ShellTool
|
|
from tools.skill_tool import LoadSkillTool
|
|
|
|
ROOT = Path(__file__).resolve().parent
|
|
|
|
|
|
def load_config() -> dict:
|
|
return yaml.safe_load((ROOT / "config" / "agent.yaml").read_text(encoding="utf-8")) or {}
|
|
|
|
|
|
def resolve_workspace(workspace: Optional[str], cfg: Optional[dict] = None) -> Path:
|
|
cfg = cfg or load_config()
|
|
p = Path(workspace) if workspace else ROOT / cfg.get("workspace_dir", "workspace")
|
|
p.mkdir(parents=True, exist_ok=True)
|
|
return p
|
|
|
|
|
|
def sessions_dir(workspace_dir: Path) -> Path:
|
|
d = workspace_dir / "sessions"
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def resolve_session_path(workspace_dir: Path, session_id: Optional[str], resume: bool) -> Tuple[Path, str]:
|
|
"""返回 (path, session_id)。resume=True 时找现有文件,否则新建一个时间戳 id。"""
|
|
sdir = sessions_dir(workspace_dir)
|
|
if resume:
|
|
if session_id in (None, "", "last"):
|
|
existing = sorted(sdir.glob("*.json"))
|
|
if not existing:
|
|
raise FileNotFoundError(f"{sdir} 下没有任何 session 可恢复")
|
|
path = existing[-1]
|
|
return path, path.stem
|
|
path = sdir / f"{session_id}.json"
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"session 不存在: {path}")
|
|
return path, session_id # type: ignore[return-value]
|
|
sid = session_id or datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
return sdir / f"{sid}.json", sid
|
|
|
|
|
|
def build_agent(
|
|
model_name: Optional[str] = None,
|
|
workspace: Optional[str] = None,
|
|
console: Optional[Console] = None,
|
|
session_id: Optional[str] = None,
|
|
resume: bool = False,
|
|
) -> Tuple[AgentLoop, Session, str]:
|
|
cfg = load_config()
|
|
model = model_name or cfg["default_model"]
|
|
|
|
caps = ModelCapabilities.load(model, ROOT / cfg["models_dir"])
|
|
llm = LLM(caps)
|
|
|
|
workspace_dir = resolve_workspace(workspace, cfg)
|
|
session_path, sid = resolve_session_path(workspace_dir, session_id, resume)
|
|
|
|
# 工具基目录: 用户当前 cwd —— agent 操作的是用户项目,不是 zcbot 仓库本身
|
|
tool_base = Path.cwd()
|
|
|
|
skills = SkillRegistry(ROOT / cfg.get("skills_dir", "skills"))
|
|
|
|
if resume:
|
|
# 恢复: 直接加载老 session,不再注入新的 system prompt
|
|
session = Session.load(session_path)
|
|
saved_cwd = session.meta.get("cwd")
|
|
if saved_cwd and console is not None and saved_cwd != str(tool_base):
|
|
console.print(
|
|
f"[yellow]提示:[/yellow] 当前 cwd 与 session 记录不同 —— "
|
|
f"工具基于 current cwd,不会自动切回。\n"
|
|
f" session cwd: [dim]{saved_cwd}[/dim]\n"
|
|
f" current cwd: [dim]{tool_base}[/dim]"
|
|
)
|
|
else:
|
|
system_prompt = (ROOT / cfg["system_prompt"]).read_text(encoding="utf-8")
|
|
if skills.skills:
|
|
system_prompt += f"\n\n## 可用 skill (用 load_skill 加载完整指引)\n{skills.discovery_block()}"
|
|
system_prompt += f"\n\n## 当前工作目录\n{tool_base}"
|
|
meta = {
|
|
"id": sid,
|
|
"created_at": datetime.now().isoformat(timespec="seconds"),
|
|
"cwd": str(tool_base),
|
|
"model": caps.model_id,
|
|
"model_profile": model,
|
|
}
|
|
session = Session(system_prompt=system_prompt, path=session_path, meta=meta)
|
|
session.save() # 立刻落盘,占住文件名
|
|
|
|
tools = {}
|
|
for cls in (ReadTool, WriteTool, EditTool, GlobTool, GrepTool, ShellTool):
|
|
t = cls(base_dir=tool_base)
|
|
tools[t.name] = t
|
|
|
|
if skills.skills:
|
|
ls = LoadSkillTool(registry=skills, base_dir=tool_base)
|
|
tools[ls.name] = ls
|
|
|
|
if caps.enable_run_python:
|
|
rp = RunPythonTool(base_dir=tool_base)
|
|
tools[rp.name] = rp
|
|
|
|
agent = AgentLoop(llm, tools, session, caps, console=console)
|
|
return agent, session, sid
|