zcbot/tools/fs.py

183 lines
6.3 KiB
Python

"""文件系统工具: read / write / edit / glob / grep。
edit 工具采用 CoreCoder 的"唯一匹配"约束: old_str 必须在文件中出现且仅出现一次,
否则报错——这是防止 LLM 改错地方的业界最佳实践。
"""
from __future__ import annotations
import re
from pathlib import Path
from .base import Tool
class ReadTool(Tool):
name = "read"
description = (
"Read a text file. Returns content with 1-indexed line numbers. "
"Use offset/limit for large files."
)
parameters = {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Absolute or relative file path"},
"offset": {"type": "integer", "description": "Start line (1-indexed)", "default": 1},
"limit": {"type": "integer", "description": "Max lines", "default": 2000},
},
"required": ["path"],
}
def execute(self, path: str, offset: int = 1, limit: int = 2000) -> str:
p = self._resolve(path)
if not p.exists():
return f"[Error] file not found: {p}"
if not p.is_file():
return f"[Error] not a file: {p}"
try:
text = p.read_text(encoding="utf-8")
except UnicodeDecodeError:
return f"[Error] not a UTF-8 text file: {p}"
lines = text.split("\n")
start = max(1, offset)
end = min(len(lines), start + limit - 1)
out = [f"{i+1:6d}\t{lines[i]}" for i in range(start - 1, end)]
header = f"[{p}] lines {start}-{end} of {len(lines)}\n"
return header + "\n".join(out)
class WriteTool(Tool):
name = "write"
description = (
"Write content to a file (creates parent dirs, overwrites if exists). "
"Prefer 'edit' for modifying existing files."
)
parameters = {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["path", "content"],
}
def execute(self, path: str, content: str) -> str:
p = self._resolve(path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content, encoding="utf-8")
return f"[wrote {len(content)} chars to {p}]"
class EditTool(Tool):
name = "edit"
description = (
"Replace a unique string in a file. old_str MUST occur exactly once in the file, "
"otherwise the call fails. Include enough surrounding context to make it unique."
)
parameters = {
"type": "object",
"properties": {
"path": {"type": "string"},
"old_str": {"type": "string", "description": "Exact substring to replace, must be unique"},
"new_str": {"type": "string", "description": "Replacement string"},
},
"required": ["path", "old_str", "new_str"],
}
def execute(self, path: str, old_str: str, new_str: str) -> str:
p = self._resolve(path)
if not p.exists():
return f"[Error] file not found: {p}"
content = p.read_text(encoding="utf-8")
count = content.count(old_str)
if count == 0:
return f"[Error] old_str not found in {p}"
if count > 1:
return f"[Error] old_str appears {count} times in {p}, must be unique — add more context"
p.write_text(content.replace(old_str, new_str), encoding="utf-8")
return f"[edited {p}: 1 replacement]"
class GlobTool(Tool):
name = "glob"
description = "Find files by glob pattern (e.g. '**/*.py', 'src/*.md'). Returns up to 200 paths."
parameters = {
"type": "object",
"properties": {
"pattern": {"type": "string"},
"path": {"type": "string", "description": "Base directory (default: cwd)", "default": "."},
},
"required": ["pattern"],
}
def execute(self, pattern: str, path: str = ".") -> str:
base = self._resolve(path)
if not base.exists():
return f"[Error] base path not found: {base}"
# 把 '**/' 前缀的递归交给 rglob,其他用 glob
if "**" in pattern:
matches = sorted(str(p) for p in base.glob(pattern))
else:
matches = sorted(str(p) for p in base.glob(pattern))
if not matches:
return f"[no matches for '{pattern}' under {base}]"
return "\n".join(matches[:200])
class GrepTool(Tool):
name = "grep"
description = "Search a regex in files. Returns up to 200 'path:line:content' lines."
parameters = {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Python regex"},
"path": {"type": "string", "default": "."},
"glob": {
"type": "string",
"description": "File glob filter, e.g. '*.py' or '**/*.md'",
"default": "",
},
"ignore_case": {"type": "boolean", "default": False},
},
"required": ["pattern"],
}
SKIP_DIRS = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build"}
def execute(self, pattern: str, path: str = ".", glob: str = "", ignore_case: bool = False) -> str:
base = self._resolve(path)
if not base.exists():
return f"[Error] base path not found: {base}"
flags = re.IGNORECASE if ignore_case else 0
try:
regex = re.compile(pattern, flags)
except re.error as e:
return f"[Error] invalid regex: {e}"
if glob:
files = list(base.glob(glob)) if "**" in glob else list(base.rglob(glob))
else:
files = list(base.rglob("*"))
matches: list[str] = []
for f in files:
if not f.is_file():
continue
if any(part in self.SKIP_DIRS for part in f.parts):
continue
try:
text = f.read_text(encoding="utf-8")
except (UnicodeDecodeError, OSError):
continue
for i, line in enumerate(text.split("\n"), 1):
if regex.search(line):
matches.append(f"{f}:{i}:{line}")
if len(matches) >= 200:
break
if len(matches) >= 200:
break
if not matches:
return f"[no matches for /{pattern}/ in {base}]"
return "\n".join(matches)