349 lines
15 KiB
Python
349 lines
15 KiB
Python
"""DockerExecutor:fs / shell / run_python 全走 docker exec,持 key 工具留 host(§7.5 #6)。
|
|
|
|
Backend 二分(§7.5 #6 信任域,2026-05-26 修正:`paths.py::resolve_user_path` 校验
|
|
原本是 DESIGN 假命题 ── 实际 host 工具 base_dir = Path.cwd() 无校验,模型能 read
|
|
host 整个 fs。改物理边界替代代码护栏):
|
|
- **container exec**:`shell` / `run_python` / `read` / `write` / `edit` / `glob` /
|
|
`grep` —— 全走 docker exec,容器内 user_root=/workspace 物理边界
|
|
- **host in-process**:`load_skill` / `web_*` / `seedream` / `seedance` —— 持
|
|
Bocha/ARK API key 不能入容器 env(SaaS 时 key 泄漏面);load_skill 是 SKILL 注册表
|
|
内存查找无 fs 访问越界
|
|
|
|
容器准入(per call):
|
|
1. `pool.ensure(user_id)` —— 拿到 / 起 `zcbot-sandbox-<uid>` 容器(per-user lock 已串行化)
|
|
2. 命令分两类:
|
|
- shell/run_python:`docker exec --user zcbot --workdir /workspace/<wd> -e ... setsid bash -c '<cmd>'`
|
|
- read/write/edit/glob/grep:`docker exec --user zcbot --workdir /workspace/<wd>
|
|
<c> python /sandbox/tool_runner.py <tool_name>`,JSON args 走 stdin
|
|
(不被 shell metachar 切,CJK 路径透明传)
|
|
3. timeout 到 → 杀 docker CLI 客户端(Popen.kill())
|
|
4. 完成 → `pool.mark_active(user_id)` 刷 idle 计时
|
|
|
|
run_python tmp .py 落 host 侧 `<user_root>/.zcbot_tmp/<task_id>/<rand>.py`(bind mount
|
|
自动可见于容器 `/workspace/.zcbot_tmp/<task_id>/`),执行完 unlink。dotfile 起头让
|
|
`/v1/files` API 天然过滤(`web/app.py:169` startswith(".")),用户视野不污染。
|
|
|
|
Cancel limitation(第一版接受):
|
|
- docker exec 客户端断开后,容器内 server 端进程**不会**因此终止 —— 这是 docker 设计
|
|
- 第一版只杀 docker CLI(Popen.kill());容器内残留进程靠 idle 5min reaper / 下次
|
|
ensure 时 rm -f 兜底
|
|
- 升级触发(§7.5 #3 PGID 协议):用户反馈"取消了但还在烧 CPU" / 多次 cancel 后
|
|
容器内进程堆积 → 启用「ZCBOT_EXEC_ID env + PGID 写文件 + 二次 exec kill」协议
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import secrets
|
|
import subprocess
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
from uuid import UUID
|
|
|
|
import json
|
|
|
|
from .executor import ExecCtx, Executor, ToolResult
|
|
from .executor_host import HostExecutor
|
|
from .sandbox import SandboxPool
|
|
|
|
|
|
# write/edit 走配额 gate;read/glob/grep 不消耗磁盘,放行
|
|
_FS_TOOLS_WRITE = frozenset({"write", "edit"})
|
|
|
|
|
|
# 信任域分类(§7.5 #6,2026-05-26 修正):
|
|
# - SHELL_LIKE:执行任意代码,Popen 直接喂 cmd / script,setsid 包一层
|
|
# - FS_TOOLS:fs 操作,docker exec → /sandbox/tool_runner.py + stdin 喂 JSON args
|
|
# 二者都走 docker exec,但调用形态不同(setsid bash vs python tool_runner)
|
|
SHELL_LIKE_TOOLS = frozenset({"shell", "run_python"})
|
|
FS_TOOLS = frozenset({"read", "write", "edit", "glob", "grep"})
|
|
CONTAINER_TOOLS = SHELL_LIKE_TOOLS | FS_TOOLS
|
|
|
|
# 容器内非 root 用户:用 username 让 docker 解析容器内 /etc/passwd 自动拿 uid。
|
|
# Dockerfile 里 `useradd -u ${HOST_UID} zcbot` 已对齐 host uid,这里写死 "zcbot"
|
|
# 让镜像 build 时不同 HOST_UID 部署形态(1000 / 1001 / 其他)都不用改代码或 env。
|
|
# 写死 uid:gid 形式("1000:1000")会与 bind mount owner 错配,导致 EACCES。
|
|
DEFAULT_EXEC_USER = "zcbot"
|
|
|
|
# host 侧 tmp 脚本目录(user_root 内 dotfile,被 /v1/files API 隐藏)
|
|
TMP_SUBDIR = ".zcbot_tmp"
|
|
|
|
|
|
class DockerExecutor(Executor):
|
|
"""组合 HostExecutor + docker exec dispatch shell/run_python。
|
|
|
|
host backend 仍承担 schema 列表 + 大部分 tool 执行;本类只在 shell/run_python
|
|
命中时夺路接管,docker exec 在 per-user 容器里跑。
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
host: HostExecutor,
|
|
pool: SandboxPool,
|
|
user_id: UUID,
|
|
user_root: Path,
|
|
working_dir: Path,
|
|
) -> None:
|
|
self.host = host
|
|
self.pool = pool
|
|
self.user_id = user_id
|
|
self.user_root = user_root.resolve()
|
|
self.working_dir = working_dir.resolve()
|
|
# 容器内对应路径 /workspace/<wd_name>
|
|
try:
|
|
wd_rel = self.working_dir.relative_to(self.user_root)
|
|
self.container_workdir = "/workspace/" + wd_rel.as_posix()
|
|
except ValueError:
|
|
# working_dir 不在 user_root 下 —— 防御性兜底,正常路径不会到这里
|
|
self.container_workdir = "/workspace"
|
|
self.exec_user = os.getenv("ZCBOT_SANDBOX_EXEC_USER", DEFAULT_EXEC_USER)
|
|
|
|
# ── Executor 接口 ────────────────────────────────────────
|
|
|
|
def has_tool(self, name: str) -> bool:
|
|
return self.host.has_tool(name)
|
|
|
|
def schemas(self) -> List[Dict[str, Any]]:
|
|
return self.host.schemas()
|
|
|
|
def call_tool(self, name: str, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult:
|
|
if name not in CONTAINER_TOOLS:
|
|
return self.host.call_tool(name, args, ctx)
|
|
if not self.host.has_tool(name):
|
|
# caps.enable_run_python=False 等场景下,host 没装该工具 → schema 也没暴露
|
|
return ToolResult(content=f"[Error] unknown tool: {name}", exit_code=2)
|
|
try:
|
|
if name == "shell":
|
|
return self._exec_shell(args, ctx)
|
|
if name == "run_python":
|
|
return self._exec_python(args, ctx)
|
|
if name in FS_TOOLS:
|
|
return self._exec_fs_tool(name, args, ctx)
|
|
except Exception as e:
|
|
return ToolResult(
|
|
content=f"[Error executing {name} via docker] {type(e).__name__}: {e}",
|
|
exit_code=1,
|
|
)
|
|
return ToolResult(content=f"[Error] unhandled container tool: {name}", exit_code=2)
|
|
|
|
# ── shell ────────────────────────────────────────────────
|
|
|
|
def _exec_shell(self, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult:
|
|
cmd = args.get("command")
|
|
if not isinstance(cmd, str) or not cmd.strip():
|
|
return ToolResult(
|
|
content="[Error] bad arguments to shell: command must be non-empty string",
|
|
exit_code=2,
|
|
)
|
|
timeout = int(args.get("timeout") or 60)
|
|
|
|
container = self.pool.ensure(self.user_id)
|
|
argv = self._docker_exec_argv(container) + ["setsid", "bash", "-c", cmd]
|
|
result = self._run_subprocess(argv, timeout=timeout, ctx=ctx)
|
|
self.pool.mark_active(self.user_id)
|
|
return result
|
|
|
|
# ── run_python ───────────────────────────────────────────
|
|
|
|
def _exec_python(self, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult:
|
|
code = args.get("code")
|
|
if not isinstance(code, str):
|
|
return ToolResult(
|
|
content="[Error] bad arguments to run_python: code must be string",
|
|
exit_code=2,
|
|
)
|
|
timeout = int(args.get("timeout") or 120)
|
|
|
|
# tmp .py 落 host 侧 `.zcbot_tmp/<task_id>/<rand>.py`;
|
|
# 容器内对应 /workspace/.zcbot_tmp/<task_id>/<rand>.py
|
|
tmp_root = self.user_root / TMP_SUBDIR / str(ctx.task_id)
|
|
tmp_root.mkdir(parents=True, exist_ok=True)
|
|
rand_name = f"{int(time.time() * 1000)}-{secrets.token_hex(4)}.py"
|
|
host_script = tmp_root / rand_name
|
|
container_script = f"/workspace/{TMP_SUBDIR}/{ctx.task_id}/{rand_name}"
|
|
host_script.write_text(code, encoding="utf-8")
|
|
|
|
try:
|
|
container = self.pool.ensure(self.user_id)
|
|
argv = self._docker_exec_argv(
|
|
container,
|
|
extra_env={
|
|
"PYTHONIOENCODING": "utf-8",
|
|
# /sandbox 在前:让 `from skills.xxx.helper import ...` work
|
|
# (skills/ bind mount 到 /sandbox/skills:ro,SKILL.md 教 LLM
|
|
# 这条 import path);/workspace 在后:用户 task 目录的本地脚本
|
|
"PYTHONPATH": "/sandbox:/workspace",
|
|
},
|
|
) + ["setsid", "python", container_script]
|
|
result = self._run_subprocess(argv, timeout=timeout, ctx=ctx)
|
|
self.pool.mark_active(self.user_id)
|
|
return result
|
|
finally:
|
|
try:
|
|
host_script.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
# ── fs tools(read/write/edit/glob/grep)──────────────────
|
|
|
|
def _exec_fs_tool(
|
|
self, name: str, args: Dict[str, Any], ctx: ExecCtx
|
|
) -> ToolResult:
|
|
"""fs 工具走 `python /sandbox/tool_runner.py <name>` + stdin 喂 JSON args。
|
|
|
|
fs 工具的 cancel / timeout 都用与 shell/run_python 不同的默认值:
|
|
- timeout 短(30s),fs 操作不会跑很久,卡住就说明撞 mount / 大目录扫描
|
|
- cancel 仍 poll(模型可能 grep 全 user_root 然后用户停止,响应即时)
|
|
|
|
write/edit 起手 check 磁盘配额(§7.5 #4),超额返 [Error] 不调容器。
|
|
read/glob/grep 不消耗磁盘放行。
|
|
"""
|
|
if name in _FS_TOOLS_WRITE:
|
|
err = _check_user_disk_quota(self.user_id)
|
|
if err is not None:
|
|
return ToolResult(content=err, exit_code=2)
|
|
|
|
timeout = int(args.get("timeout") or 30) if name == "grep" else 30
|
|
|
|
container = self.pool.ensure(self.user_id)
|
|
argv = self._docker_exec_argv(
|
|
container,
|
|
extra_env={"PYTHONIOENCODING": "utf-8"},
|
|
stdin_open=True,
|
|
) + ["python", "/sandbox/tool_runner.py", name]
|
|
|
|
# tool_runner.py 从 stdin 拿 args(JSON)── 路径含 CJK / 引号都透明传
|
|
stdin_payload = json.dumps(args, ensure_ascii=False)
|
|
result = self._run_subprocess(
|
|
argv, timeout=timeout, ctx=ctx, stdin=stdin_payload
|
|
)
|
|
self.pool.mark_active(self.user_id)
|
|
return result
|
|
|
|
# ── helpers ──────────────────────────────────────────────
|
|
|
|
def _docker_exec_argv(
|
|
self,
|
|
container: str,
|
|
extra_env: Optional[Dict[str, str]] = None,
|
|
stdin_open: bool = False,
|
|
) -> List[str]:
|
|
"""`stdin_open=True` 时加 `-i` 让 stdin 通到容器(fs tool_runner 用)。"""
|
|
argv = [
|
|
"docker", "exec",
|
|
"--user", self.exec_user,
|
|
"--workdir", self.container_workdir,
|
|
]
|
|
if stdin_open:
|
|
argv.append("-i")
|
|
env: Dict[str, str] = {}
|
|
if extra_env:
|
|
env.update(extra_env)
|
|
for k, v in env.items():
|
|
argv.extend(["-e", f"{k}={v}"])
|
|
argv.append(container)
|
|
return argv
|
|
|
|
def _run_subprocess(
|
|
self,
|
|
argv: List[str],
|
|
timeout: int,
|
|
ctx: ExecCtx,
|
|
stdin: Optional[str] = None,
|
|
) -> ToolResult:
|
|
"""跑 docker exec 子进程,带 cancel 协作 poll。
|
|
|
|
`stdin` 非空时通过 PIPE 喂给容器内进程(fs tool_runner 用 JSON args)。
|
|
cancel 命中 / timeout 到 → Popen.kill() 杀 docker CLI 客户端;
|
|
容器内 server 端进程接受 limitation(见模块头注释)。
|
|
|
|
fs tool_runner 返回形态特殊处理:
|
|
- stdout 是 Tool.execute 直接结果(纯文本,无 [stdout] 包装)
|
|
- exit_code != 0 时 stderr 含 [Error executing ...],透传给 LLM
|
|
"""
|
|
# 仅 shell/run_python 有 stdout/stderr 包装;fs tool_runner 输出本身就是
|
|
# LLM 拿到的最终串,不再包 [stdout]/[exit N]
|
|
is_fs_tool = stdin is not None
|
|
cancel_check = ctx.cancel_check
|
|
try:
|
|
proc = subprocess.Popen(
|
|
argv,
|
|
stdin=subprocess.PIPE if stdin is not None else None,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
)
|
|
except FileNotFoundError as e:
|
|
return ToolResult(content=f"[Error] docker CLI not found: {e}", exit_code=2)
|
|
|
|
start = time.monotonic()
|
|
cancel_hit = False
|
|
timeout_hit = False
|
|
stdout: str = ""
|
|
stderr: str = ""
|
|
while True:
|
|
try:
|
|
stdout, stderr = proc.communicate(input=stdin, timeout=0.5)
|
|
break
|
|
except subprocess.TimeoutExpired:
|
|
if cancel_check is not None and cancel_check():
|
|
cancel_hit = True
|
|
proc.kill()
|
|
stdout, stderr = proc.communicate()
|
|
break
|
|
if time.monotonic() - start > timeout:
|
|
timeout_hit = True
|
|
proc.kill()
|
|
stdout, stderr = proc.communicate()
|
|
break
|
|
|
|
if timeout_hit:
|
|
return ToolResult(
|
|
content=f"[Error] command timed out after {timeout}s",
|
|
exit_code=124,
|
|
)
|
|
if cancel_hit:
|
|
return ToolResult(
|
|
content="[Error] command cancelled by user",
|
|
exit_code=130,
|
|
)
|
|
|
|
# fs tool_runner:stdout 直返;exit != 0 走 stderr 当 [Error ...] 透传
|
|
if is_fs_tool:
|
|
if proc.returncode == 0:
|
|
return ToolResult(content=stdout, exit_code=0)
|
|
# tool_runner.py 把 [Error] ... 落 stderr,exit 1=异常 / 2=参数 / unknown
|
|
err_msg = stderr.strip() or f"tool_runner exit {proc.returncode}"
|
|
return ToolResult(content=err_msg, exit_code=proc.returncode)
|
|
|
|
# shell/run_python:原 [stdout]/[stderr]/[exit] 包装
|
|
parts: List[str] = []
|
|
if stdout:
|
|
parts.append(f"[stdout]\n{stdout.rstrip()}")
|
|
if stderr:
|
|
parts.append(f"[stderr]\n{stderr.rstrip()}")
|
|
parts.append(f"[exit {proc.returncode}]")
|
|
return ToolResult(content="\n".join(parts), exit_code=proc.returncode)
|
|
|
|
|
|
def _check_user_disk_quota(user_id: UUID):
|
|
"""write/edit 前 gate;读 yaml 配额 + 查 user_disk_usage 表。
|
|
|
|
放这里(模块级 helper)而非 DockerExecutor 方法是因为 host_executor 路径
|
|
也复用同款 gate(/v1/files/upload),实现一次写两处用。
|
|
"""
|
|
try:
|
|
from core.agent_builder import load_config
|
|
from core.storage.disk_quota import check_disk_quota, parse_bytes
|
|
cfg = load_config() or {}
|
|
quotas = cfg.get("quotas") or {}
|
|
limit = parse_bytes(quotas.get("disk_bytes_per_user"))
|
|
if limit is None or limit <= 0:
|
|
return None
|
|
return check_disk_quota(user_id, limit)
|
|
except Exception:
|
|
# 配额查询失败不阻塞主路径(写仍放行,日志靠 caller)
|
|
return None
|