186 lines
6.5 KiB
Python
186 lines
6.5 KiB
Python
"""文件系统工具: read / write / edit / glob / grep。
|
|
|
|
edit 工具采用 CoreCoder 的"唯一匹配"约束: old_str 必须在文件中出现且仅出现一次,
|
|
否则报错——这是防止 LLM 改错地方的业界最佳实践。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from pathlib import Path
|
|
|
|
from .base import Tool
|
|
|
|
|
|
class ReadTool(Tool):
|
|
name = "read"
|
|
description = (
|
|
"Read a text file. Returns content with 1-indexed line numbers. "
|
|
"Use offset/limit for large files."
|
|
)
|
|
parameters = {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string", "description": "Absolute or relative file path"},
|
|
"offset": {"type": "integer", "description": "Start line (1-indexed)", "default": 1},
|
|
"limit": {"type": "integer", "description": "Max lines", "default": 2000},
|
|
},
|
|
"required": ["path"],
|
|
}
|
|
|
|
def execute(self, path: str, offset: int = 1, limit: int = 2000) -> str:
|
|
p = self._resolve(path)
|
|
disp = self._display(p)
|
|
if not p.exists():
|
|
return f"[Error] file not found: {disp}"
|
|
if not p.is_file():
|
|
return f"[Error] not a file: {disp}"
|
|
try:
|
|
text = p.read_text(encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
return f"[Error] not a UTF-8 text file: {disp}"
|
|
|
|
lines = text.split("\n")
|
|
start = max(1, offset)
|
|
end = min(len(lines), start + limit - 1)
|
|
out = [f"{i+1:6d}\t{lines[i]}" for i in range(start - 1, end)]
|
|
header = f"[{disp}] lines {start}-{end} of {len(lines)}\n"
|
|
return header + "\n".join(out)
|
|
|
|
|
|
class WriteTool(Tool):
|
|
name = "write"
|
|
description = (
|
|
"Write content to a file (creates parent dirs, overwrites if exists). "
|
|
"Prefer 'edit' for modifying existing files."
|
|
)
|
|
parameters = {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string"},
|
|
"content": {"type": "string"},
|
|
},
|
|
"required": ["path", "content"],
|
|
}
|
|
|
|
def execute(self, path: str, content: str) -> str:
|
|
p = self._resolve(path)
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
p.write_text(content, encoding="utf-8")
|
|
return f"[wrote {len(content)} chars to {self._display(p)}]"
|
|
|
|
|
|
class EditTool(Tool):
|
|
name = "edit"
|
|
description = (
|
|
"Replace a unique string in a file. old_str MUST occur exactly once in the file, "
|
|
"otherwise the call fails. Include enough surrounding context to make it unique."
|
|
)
|
|
parameters = {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string"},
|
|
"old_str": {"type": "string", "description": "Exact substring to replace, must be unique"},
|
|
"new_str": {"type": "string", "description": "Replacement string"},
|
|
},
|
|
"required": ["path", "old_str", "new_str"],
|
|
}
|
|
|
|
def execute(self, path: str, old_str: str, new_str: str) -> str:
|
|
p = self._resolve(path)
|
|
disp = self._display(p)
|
|
if not p.exists():
|
|
return f"[Error] file not found: {disp}"
|
|
content = p.read_text(encoding="utf-8")
|
|
count = content.count(old_str)
|
|
if count == 0:
|
|
return f"[Error] old_str not found in {disp}"
|
|
if count > 1:
|
|
return f"[Error] old_str appears {count} times in {disp}, must be unique — add more context"
|
|
p.write_text(content.replace(old_str, new_str), encoding="utf-8")
|
|
return f"[edited {disp}: 1 replacement]"
|
|
|
|
|
|
class GlobTool(Tool):
|
|
name = "glob"
|
|
description = "Find files by glob pattern (e.g. '**/*.py', 'src/*.md'). Returns up to 200 paths."
|
|
parameters = {
|
|
"type": "object",
|
|
"properties": {
|
|
"pattern": {"type": "string"},
|
|
"path": {"type": "string", "description": "Base directory (default: cwd)", "default": "."},
|
|
},
|
|
"required": ["pattern"],
|
|
}
|
|
|
|
def execute(self, pattern: str, path: str = ".") -> str:
|
|
base = self._resolve(path)
|
|
if not base.exists():
|
|
return f"[Error] base path not found: {self._display(base)}"
|
|
# 把 '**/' 前缀的递归交给 rglob,其他用 glob
|
|
if "**" in pattern:
|
|
matches = sorted(self._display(p) for p in base.glob(pattern))
|
|
else:
|
|
matches = sorted(self._display(p) for p in base.glob(pattern))
|
|
if not matches:
|
|
return f"[no matches for '{pattern}' under {self._display(base)}]"
|
|
return "\n".join(matches[:200])
|
|
|
|
|
|
class GrepTool(Tool):
|
|
name = "grep"
|
|
description = "Search a regex in files. Returns up to 200 'path:line:content' lines."
|
|
parameters = {
|
|
"type": "object",
|
|
"properties": {
|
|
"pattern": {"type": "string", "description": "Python regex"},
|
|
"path": {"type": "string", "default": "."},
|
|
"glob": {
|
|
"type": "string",
|
|
"description": "File glob filter, e.g. '*.py' or '**/*.md'",
|
|
"default": "",
|
|
},
|
|
"ignore_case": {"type": "boolean", "default": False},
|
|
},
|
|
"required": ["pattern"],
|
|
}
|
|
|
|
SKIP_DIRS = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build"}
|
|
|
|
def execute(self, pattern: str, path: str = ".", glob: str = "", ignore_case: bool = False) -> str:
|
|
base = self._resolve(path)
|
|
if not base.exists():
|
|
return f"[Error] base path not found: {self._display(base)}"
|
|
flags = re.IGNORECASE if ignore_case else 0
|
|
try:
|
|
regex = re.compile(pattern, flags)
|
|
except re.error as e:
|
|
return f"[Error] invalid regex: {e}"
|
|
|
|
if glob:
|
|
files = list(base.glob(glob)) if "**" in glob else list(base.rglob(glob))
|
|
else:
|
|
files = list(base.rglob("*"))
|
|
|
|
matches: list[str] = []
|
|
for f in files:
|
|
if not f.is_file():
|
|
continue
|
|
if any(part in self.SKIP_DIRS for part in f.parts):
|
|
continue
|
|
try:
|
|
text = f.read_text(encoding="utf-8")
|
|
except (UnicodeDecodeError, OSError):
|
|
continue
|
|
disp = self._display(f)
|
|
for i, line in enumerate(text.split("\n"), 1):
|
|
if regex.search(line):
|
|
matches.append(f"{disp}:{i}:{line}")
|
|
if len(matches) >= 200:
|
|
break
|
|
if len(matches) >= 200:
|
|
break
|
|
|
|
if not matches:
|
|
return f"[no matches for /{pattern}/ in {self._display(base)}]"
|
|
return "\n".join(matches)
|