"""装配入口: 读 config → 加载 capabilities/skills → 构造 LLM/tools/session/loop。 存储布局: workspace/tasks//state.json ← TaskState workspace/tasks//messages.json ← Session 消息 """ from __future__ import annotations from datetime import datetime from pathlib import Path from typing import Optional, Tuple import yaml from rich.console import Console from core.capabilities import ModelCapabilities from core.llm import LLM from core.loop import AgentLoop from core.session import Session from core.skills import SkillRegistry from core.task import TaskState from tools.fs import EditTool, GlobTool, GrepTool, ReadTool, WriteTool from tools.run_python import RunPythonTool from tools.shell import ShellTool from tools.skill_tool import LoadSkillTool ROOT = Path(__file__).resolve().parent def load_config() -> dict: return yaml.safe_load((ROOT / "config" / "agent.yaml").read_text(encoding="utf-8")) or {} def resolve_workspace(workspace: Optional[str], cfg: Optional[dict] = None) -> Path: cfg = cfg or load_config() p = Path(workspace) if workspace else ROOT / cfg.get("workspace_dir", "workspace") p.mkdir(parents=True, exist_ok=True) return p def tasks_dir(workspace_dir: Path) -> Path: d = workspace_dir / "tasks" d.mkdir(parents=True, exist_ok=True) return d def resolve_task_messages_path( workspace_dir: Path, task_id: Optional[str], resume: bool ) -> Tuple[Path, str]: """返回 (messages_file_path, task_id)。 新建:tasks//messages.json;Resume:tasks//messages.json,'last' 取最新。 """ tdir = tasks_dir(workspace_dir) if resume: if task_id in (None, "", "last"): candidates = [] for d in tdir.iterdir(): mf = d / "messages.json" if mf.is_file(): candidates.append((mf.stat().st_mtime, mf, d.name)) if not candidates: raise FileNotFoundError(f"无可恢复的 task: {tdir} 下无 task") candidates.sort(key=lambda x: x[0], reverse=True) _, path, sid = candidates[0] return path, sid task_msg = tdir / task_id / "messages.json" if not task_msg.exists(): raise FileNotFoundError(f"task 不存在: {task_msg}") return task_msg, task_id sid = task_id or datetime.now().strftime("%Y%m%d_%H%M%S") return tdir / sid / "messages.json", sid def build_agent( model_name: Optional[str] = None, workspace: Optional[str] = None, console: Optional[Console] = None, session_id: Optional[str] = None, resume: bool = False, tool_base: Optional[Path] = None, mode: str = "", description: str = "", ) -> Tuple[AgentLoop, Session, str, TaskState, Path]: """返回 (agent, session, task_id, task_state, task_dir)。""" cfg = load_config() model = model_name or cfg["default_model"] caps = ModelCapabilities.load(model, ROOT / cfg["models_dir"]) llm = LLM(caps) workspace_dir = resolve_workspace(workspace, cfg) session_path, sid = resolve_task_messages_path(workspace_dir, session_id, resume) tool_base = Path(tool_base) if tool_base else Path.cwd() skills = SkillRegistry(ROOT / cfg.get("skills_dir", "skills")) task_dir = session_path.parent if resume: session = Session.load(session_path) saved_cwd = session.meta.get("cwd") if saved_cwd and console is not None and saved_cwd != str(tool_base): console.print( f"[yellow]提示:[/yellow] 当前 cwd 与 task 记录不同 —— " f"工具基于 current cwd,不会自动切回。\n" f" task cwd: [dim]{saved_cwd}[/dim]\n" f" current cwd: [dim]{tool_base}[/dim]" ) task_state = TaskState.load(task_dir) if task_state is None: # messages.json 存在但 state.json 缺失:用 session.meta 兜底重建 task_state = TaskState( task_id=sid, mode=mode, description=description, status="active", model=session.meta.get("model", caps.model_id), model_profile=session.meta.get("model_profile", model), cwd=session.meta.get("cwd", str(tool_base)), created_at=session.meta.get("created_at", datetime.now().isoformat(timespec="seconds")), ) task_state.save(task_dir) else: system_prompt = (ROOT / cfg["system_prompt"]).read_text(encoding="utf-8") if skills.skills: system_prompt += f"\n\n## 可用 skill (用 load_skill 加载完整指引)\n{skills.discovery_block()}" system_prompt += f"\n\n## 当前工作目录\n{tool_base}" now_iso = datetime.now().isoformat(timespec="seconds") meta = { "id": sid, "created_at": now_iso, "cwd": str(tool_base), "model": caps.model_id, "model_profile": model, } session = Session(system_prompt=system_prompt, path=session_path, meta=meta) session.save() # 占住文件名 task_state = TaskState( task_id=sid, mode=mode, description=description, status="active", model=caps.model_id, model_profile=model, reasoning_effort=caps.default_reasoning_effort or "", cwd=str(tool_base), created_at=now_iso, ) task_state.save(task_dir) tools = {} for cls in (ReadTool, WriteTool, EditTool, GlobTool, GrepTool, ShellTool): t = cls(base_dir=tool_base) tools[t.name] = t if skills.skills: ls = LoadSkillTool(registry=skills, base_dir=tool_base) tools[ls.name] = ls if caps.enable_run_python: rp = RunPythonTool(base_dir=tool_base) tools[rp.name] = rp agent = AgentLoop(llm, tools, session, caps, console=console) return agent, session, sid, task_state, task_dir def sync_task_tokens(task_state: TaskState, task_dir: Path, llm: LLM) -> None: """每轮 agent.run 后调,把 LLM 累计 tokens 写回 state.json。""" tc = llm.token_counter task_state.tokens_prompt = tc.prompt_tokens task_state.tokens_completion = tc.completion_tokens task_state.save(task_dir)