"""文件系统工具: read / write / edit / glob / grep。 edit 工具采用 CoreCoder 的"唯一匹配"约束: old_str 必须在文件中出现且仅出现一次, 否则报错——这是防止 LLM 改错地方的业界最佳实践。 """ from __future__ import annotations import re from pathlib import Path from .base import Tool class ReadTool(Tool): name = "read" description = ( "Read a text file. Returns content with 1-indexed line numbers. " "Use offset/limit for large files." ) parameters = { "type": "object", "properties": { "path": {"type": "string", "description": "Absolute or relative file path"}, "offset": {"type": "integer", "description": "Start line (1-indexed)", "default": 1}, "limit": {"type": "integer", "description": "Max lines", "default": 2000}, }, "required": ["path"], } def execute(self, path: str, offset: int = 1, limit: int = 2000) -> str: p = self._resolve(path) disp = self._display(p) if not p.exists(): return f"[Error] file not found: {disp}" if not p.is_file(): return f"[Error] not a file: {disp}" try: text = p.read_text(encoding="utf-8") except UnicodeDecodeError: return f"[Error] not a UTF-8 text file: {disp}" lines = text.split("\n") start = max(1, offset) end = min(len(lines), start + limit - 1) out = [f"{i+1:6d}\t{lines[i]}" for i in range(start - 1, end)] header = f"[{disp}] lines {start}-{end} of {len(lines)}\n" return header + "\n".join(out) class WriteTool(Tool): name = "write" description = ( "Write content to a file (creates parent dirs, overwrites if exists). " "Prefer 'edit' for modifying existing files." ) parameters = { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], } def execute(self, path: str, content: str) -> str: p = self._resolve(path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content, encoding="utf-8") return f"[wrote {len(content)} chars to {self._display(p)}]" class EditTool(Tool): name = "edit" description = ( "Replace a unique string in a file. old_str MUST occur exactly once in the file, " "otherwise the call fails. Include enough surrounding context to make it unique." ) parameters = { "type": "object", "properties": { "path": {"type": "string"}, "old_str": {"type": "string", "description": "Exact substring to replace, must be unique"}, "new_str": {"type": "string", "description": "Replacement string"}, }, "required": ["path", "old_str", "new_str"], } def execute(self, path: str, old_str: str, new_str: str) -> str: p = self._resolve(path) disp = self._display(p) if not p.exists(): return f"[Error] file not found: {disp}" content = p.read_text(encoding="utf-8") count = content.count(old_str) if count == 0: return f"[Error] old_str not found in {disp}" if count > 1: return f"[Error] old_str appears {count} times in {disp}, must be unique — add more context" p.write_text(content.replace(old_str, new_str), encoding="utf-8") return f"[edited {disp}: 1 replacement]" class GlobTool(Tool): name = "glob" description = "Find files by glob pattern (e.g. '**/*.py', 'src/*.md'). Returns up to 200 paths." parameters = { "type": "object", "properties": { "pattern": {"type": "string"}, "path": {"type": "string", "description": "Base directory (default: cwd)", "default": "."}, }, "required": ["pattern"], } def execute(self, pattern: str, path: str = ".") -> str: base = self._resolve(path) if not base.exists(): return f"[Error] base path not found: {self._display(base)}" # 把 '**/' 前缀的递归交给 rglob,其他用 glob if "**" in pattern: matches = sorted(self._display(p) for p in base.glob(pattern)) else: matches = sorted(self._display(p) for p in base.glob(pattern)) if not matches: return f"[no matches for '{pattern}' under {self._display(base)}]" return "\n".join(matches[:200]) class GrepTool(Tool): name = "grep" description = "Search a regex in files. Returns up to 200 'path:line:content' lines." parameters = { "type": "object", "properties": { "pattern": {"type": "string", "description": "Python regex"}, "path": {"type": "string", "default": "."}, "glob": { "type": "string", "description": "File glob filter, e.g. '*.py' or '**/*.md'", "default": "", }, "ignore_case": {"type": "boolean", "default": False}, }, "required": ["pattern"], } SKIP_DIRS = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build"} def execute(self, pattern: str, path: str = ".", glob: str = "", ignore_case: bool = False) -> str: base = self._resolve(path) if not base.exists(): return f"[Error] base path not found: {self._display(base)}" flags = re.IGNORECASE if ignore_case else 0 try: regex = re.compile(pattern, flags) except re.error as e: return f"[Error] invalid regex: {e}" if glob: files = list(base.glob(glob)) if "**" in glob else list(base.rglob(glob)) else: files = list(base.rglob("*")) matches: list[str] = [] for f in files: if not f.is_file(): continue if any(part in self.SKIP_DIRS for part in f.parts): continue try: text = f.read_text(encoding="utf-8") except (UnicodeDecodeError, OSError): continue disp = self._display(f) for i, line in enumerate(text.split("\n"), 1): if regex.search(line): matches.append(f"{disp}:{i}:{line}") if len(matches) >= 200: break if len(matches) >= 200: break if not matches: return f"[no matches for /{pattern}/ in {self._display(base)}]" return "\n".join(matches)