95 lines
2.9 KiB
Python
95 lines
2.9 KiB
Python
"""Shell 执行: subprocess 跑命令,有黑名单拦明显危险操作。"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
|
|
from .base import Tool
|
|
|
|
|
|
class ShellTool(Tool):
|
|
name = "shell"
|
|
description = (
|
|
"Execute a shell command and return stdout/stderr/exit_code. "
|
|
"Default 60s timeout. Working directory is the agent's base dir."
|
|
)
|
|
parameters = {
|
|
"type": "object",
|
|
"properties": {
|
|
"command": {"type": "string"},
|
|
"timeout": {"type": "integer", "default": 60, "description": "Seconds before kill"},
|
|
},
|
|
"required": ["command"],
|
|
}
|
|
|
|
BLOCKED_PATTERNS = (
|
|
"rm -rf /",
|
|
"rm -rf ~",
|
|
"rm -rf $HOME",
|
|
":(){ :|:& };:",
|
|
"mkfs",
|
|
"dd if=/dev/zero",
|
|
"> /dev/sda",
|
|
"format c:",
|
|
)
|
|
|
|
# Windows cmd 不识别 unix flag,常见踩坑命令直接在工具层兜底
|
|
_MKDIR_P_RE = re.compile(r"^\s*mkdir\s+-p\s+(.+?)\s*$")
|
|
|
|
def _windows_compat(self, command: str) -> tuple[str, str | None]:
|
|
"""Windows cmd 下把 unix 风格命令转译为可执行形式。
|
|
|
|
返回 (转译后命令, 转译说明 or None)。无需转译时第二项为 None。
|
|
"""
|
|
if sys.platform != "win32":
|
|
return command, None
|
|
m = self._MKDIR_P_RE.match(command)
|
|
if m:
|
|
paths = shlex.split(m.group(1), posix=False)
|
|
for p in paths:
|
|
p = p.strip('"').strip("'")
|
|
os.makedirs(p, exist_ok=True)
|
|
return (
|
|
"echo [shell-tool] mkdir -p handled in-process (Windows cmd doesn't support -p)",
|
|
f"intercepted `mkdir -p`: created {paths} via os.makedirs",
|
|
)
|
|
return command, None
|
|
|
|
def execute(self, command: str, timeout: int = 60) -> str:
|
|
normalized = command.lower()
|
|
for pat in self.BLOCKED_PATTERNS:
|
|
if pat in normalized:
|
|
return f"[Error] blocked dangerous command pattern: {pat!r}"
|
|
|
|
command, note = self._windows_compat(command)
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
shell=True,
|
|
cwd=str(self.base_dir),
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return f"[Error] command timed out after {timeout}s"
|
|
except FileNotFoundError as e:
|
|
return f"[Error] {e}"
|
|
|
|
if note:
|
|
result.stdout = (result.stdout or "") + f"\n[note] {note}"
|
|
|
|
parts = []
|
|
if result.stdout:
|
|
parts.append(f"[stdout]\n{result.stdout.rstrip()}")
|
|
if result.stderr:
|
|
parts.append(f"[stderr]\n{result.stderr.rstrip()}")
|
|
parts.append(f"[exit {result.returncode}]")
|
|
return "\n".join(parts)
|