zcbot/tools/shell.py

95 lines
2.9 KiB
Python

"""Shell 执行: subprocess 跑命令,有黑名单拦明显危险操作。"""
from __future__ import annotations
import os
import re
import shlex
import subprocess
import sys
from .base import Tool
class ShellTool(Tool):
name = "shell"
description = (
"Execute a shell command and return stdout/stderr/exit_code. "
"Default 60s timeout. Working directory is the agent's base dir."
)
parameters = {
"type": "object",
"properties": {
"command": {"type": "string"},
"timeout": {"type": "integer", "default": 60, "description": "Seconds before kill"},
},
"required": ["command"],
}
BLOCKED_PATTERNS = (
"rm -rf /",
"rm -rf ~",
"rm -rf $HOME",
":(){ :|:& };:",
"mkfs",
"dd if=/dev/zero",
"> /dev/sda",
"format c:",
)
# Windows cmd 不识别 unix flag,常见踩坑命令直接在工具层兜底
_MKDIR_P_RE = re.compile(r"^\s*mkdir\s+-p\s+(.+?)\s*$")
def _windows_compat(self, command: str) -> tuple[str, str | None]:
"""Windows cmd 下把 unix 风格命令转译为可执行形式。
返回 (转译后命令, 转译说明 or None)。无需转译时第二项为 None。
"""
if sys.platform != "win32":
return command, None
m = self._MKDIR_P_RE.match(command)
if m:
paths = shlex.split(m.group(1), posix=False)
for p in paths:
p = p.strip('"').strip("'")
os.makedirs(p, exist_ok=True)
return (
"echo [shell-tool] mkdir -p handled in-process (Windows cmd doesn't support -p)",
f"intercepted `mkdir -p`: created {paths} via os.makedirs",
)
return command, None
def execute(self, command: str, timeout: int = 60) -> str:
normalized = command.lower()
for pat in self.BLOCKED_PATTERNS:
if pat in normalized:
return f"[Error] blocked dangerous command pattern: {pat!r}"
command, note = self._windows_compat(command)
try:
result = subprocess.run(
command,
shell=True,
cwd=str(self.base_dir),
capture_output=True,
timeout=timeout,
text=True,
encoding="utf-8",
errors="replace",
)
except subprocess.TimeoutExpired:
return f"[Error] command timed out after {timeout}s"
except FileNotFoundError as e:
return f"[Error] {e}"
if note:
result.stdout = (result.stdout or "") + f"\n[note] {note}"
parts = []
if result.stdout:
parts.append(f"[stdout]\n{result.stdout.rstrip()}")
if result.stderr:
parts.append(f"[stderr]\n{result.stderr.rstrip()}")
parts.append(f"[exit {result.returncode}]")
return "\n".join(parts)