"""装配入口: 读 config → 加载 capabilities/skills → 构造 LLM/tools/session/loop。""" from __future__ import annotations from datetime import datetime from pathlib import Path from typing import Optional, Tuple import yaml from rich.console import Console from core.capabilities import ModelCapabilities from core.llm import LLM from core.loop import AgentLoop from core.session import Session from core.skills import SkillRegistry from tools.fs import EditTool, GlobTool, GrepTool, ReadTool, WriteTool from tools.run_python import RunPythonTool from tools.shell import ShellTool from tools.skill_tool import LoadSkillTool ROOT = Path(__file__).resolve().parent def load_config() -> dict: return yaml.safe_load((ROOT / "config" / "agent.yaml").read_text(encoding="utf-8")) or {} def resolve_workspace(workspace: Optional[str], cfg: Optional[dict] = None) -> Path: cfg = cfg or load_config() p = Path(workspace) if workspace else ROOT / cfg.get("workspace_dir", "workspace") p.mkdir(parents=True, exist_ok=True) return p def sessions_dir(workspace_dir: Path) -> Path: d = workspace_dir / "sessions" d.mkdir(parents=True, exist_ok=True) return d def resolve_session_path(workspace_dir: Path, session_id: Optional[str], resume: bool) -> Tuple[Path, str]: """返回 (path, session_id)。resume=True 时找现有文件,否则新建一个时间戳 id。""" sdir = sessions_dir(workspace_dir) if resume: if session_id in (None, "", "last"): existing = sorted(sdir.glob("*.json")) if not existing: raise FileNotFoundError(f"{sdir} 下没有任何 session 可恢复") path = existing[-1] return path, path.stem path = sdir / f"{session_id}.json" if not path.exists(): raise FileNotFoundError(f"session 不存在: {path}") return path, session_id # type: ignore[return-value] sid = session_id or datetime.now().strftime("%Y%m%d_%H%M%S") return sdir / f"{sid}.json", sid def build_agent( model_name: Optional[str] = None, workspace: Optional[str] = None, console: Optional[Console] = None, session_id: Optional[str] = None, resume: bool = False, ) -> Tuple[AgentLoop, Session, str]: cfg = load_config() model = model_name or cfg["default_model"] caps = ModelCapabilities.load(model, ROOT / cfg["models_dir"]) llm = LLM(caps) workspace_dir = resolve_workspace(workspace, cfg) session_path, sid = resolve_session_path(workspace_dir, session_id, resume) # 工具基目录: 用户当前 cwd —— agent 操作的是用户项目,不是 zcbot 仓库本身 tool_base = Path.cwd() skills = SkillRegistry(ROOT / cfg.get("skills_dir", "skills")) if resume: # 恢复: 直接加载老 session,不再注入新的 system prompt session = Session.load(session_path) saved_cwd = session.meta.get("cwd") if saved_cwd and console is not None and saved_cwd != str(tool_base): console.print( f"[yellow]提示:[/yellow] 当前 cwd 与 session 记录不同 —— " f"工具基于 current cwd,不会自动切回。\n" f" session cwd: [dim]{saved_cwd}[/dim]\n" f" current cwd: [dim]{tool_base}[/dim]" ) else: system_prompt = (ROOT / cfg["system_prompt"]).read_text(encoding="utf-8") if skills.skills: system_prompt += f"\n\n## 可用 skill (用 load_skill 加载完整指引)\n{skills.discovery_block()}" system_prompt += f"\n\n## 当前工作目录\n{tool_base}" meta = { "id": sid, "created_at": datetime.now().isoformat(timespec="seconds"), "cwd": str(tool_base), "model": caps.model_id, "model_profile": model, } session = Session(system_prompt=system_prompt, path=session_path, meta=meta) session.save() # 立刻落盘,占住文件名 tools = {} for cls in (ReadTool, WriteTool, EditTool, GlobTool, GrepTool, ShellTool): t = cls(base_dir=tool_base) tools[t.name] = t if skills.skills: ls = LoadSkillTool(registry=skills, base_dir=tool_base) tools[ls.name] = ls if caps.enable_run_python: rp = RunPythonTool(base_dir=tool_base) tools[rp.name] = rp agent = AgentLoop(llm, tools, session, caps, console=console) return agent, session, sid