"""DockerExecutor:fs / shell / run_python 全走 docker exec,持 key 工具留 host(§7.5 #6)。 Backend 二分(§7.5 #6 信任域,2026-05-26 修正:`paths.py::resolve_user_path` 校验 原本是 DESIGN 假命题 ── 实际 host 工具 base_dir = Path.cwd() 无校验,模型能 read host 整个 fs。改物理边界替代代码护栏): - **container exec**:`shell` / `run_python` / `read` / `write` / `edit` / `glob` / `grep` —— 全走 docker exec,容器内 user_root=/workspace 物理边界 - **host in-process**:`load_skill` / `web_*` / `seedream` / `seedance` —— 持 Bocha/ARK API key 不能入容器 env(SaaS 时 key 泄漏面);load_skill 是 SKILL 注册表 内存查找无 fs 访问越界 容器准入(per call): 1. `pool.ensure(user_id)` —— 拿到 / 起 `zcbot-sandbox-` 容器(per-user lock 已串行化) 2. 命令分两类: - shell/run_python:`docker exec --user zcbot --workdir /workspace/ -e ... setsid bash -c ''` - read/write/edit/glob/grep:`docker exec --user zcbot --workdir /workspace/ python /sandbox/tool_runner.py `,JSON args 走 stdin (不被 shell metachar 切,CJK 路径透明传) 3. timeout 到 → 杀 docker CLI 客户端(Popen.kill()) 4. 完成 → `pool.mark_active(user_id)` 刷 idle 计时 run_python tmp .py 落 host 侧 `/.zcbot_tmp//.py`(bind mount 自动可见于容器 `/workspace/.zcbot_tmp//`),执行完 unlink。dotfile 起头让 `/v1/files` API 天然过滤(`web/app.py:169` startswith(".")),用户视野不污染。 Cancel limitation(第一版接受): - docker exec 客户端断开后,容器内 server 端进程**不会**因此终止 —— 这是 docker 设计 - 第一版只杀 docker CLI(Popen.kill());容器内残留进程靠 idle 5min reaper / 下次 ensure 时 rm -f 兜底 - 升级触发(§7.5 #3 PGID 协议):用户反馈"取消了但还在烧 CPU" / 多次 cancel 后 容器内进程堆积 → 启用「ZCBOT_EXEC_ID env + PGID 写文件 + 二次 exec kill」协议 """ from __future__ import annotations import os import secrets import subprocess import time from pathlib import Path from typing import Any, Dict, List, Optional from uuid import UUID import json from .executor import ExecCtx, Executor, ToolResult from .executor_host import HostExecutor from .sandbox import SandboxPool # write/edit 走配额 gate;read/glob/grep 不消耗磁盘,放行 _FS_TOOLS_WRITE = frozenset({"write", "edit"}) # 信任域分类(§7.5 #6,2026-05-26 修正): # - SHELL_LIKE:执行任意代码,Popen 直接喂 cmd / script,setsid 包一层 # - FS_TOOLS:fs 操作,docker exec → /sandbox/tool_runner.py + stdin 喂 JSON args # 二者都走 docker exec,但调用形态不同(setsid bash vs python tool_runner) SHELL_LIKE_TOOLS = frozenset({"shell", "run_python"}) FS_TOOLS = frozenset({"read", "write", "edit", "glob", "grep"}) CONTAINER_TOOLS = SHELL_LIKE_TOOLS | FS_TOOLS # 容器内非 root 用户:用 username 让 docker 解析容器内 /etc/passwd 自动拿 uid。 # Dockerfile 里 `useradd -u ${HOST_UID} zcbot` 已对齐 host uid,这里写死 "zcbot" # 让镜像 build 时不同 HOST_UID 部署形态(1000 / 1001 / 其他)都不用改代码或 env。 # 写死 uid:gid 形式("1000:1000")会与 bind mount owner 错配,导致 EACCES。 DEFAULT_EXEC_USER = "zcbot" # host 侧 tmp 脚本目录(user_root 内 dotfile,被 /v1/files API 隐藏) TMP_SUBDIR = ".zcbot_tmp" class DockerExecutor(Executor): """组合 HostExecutor + docker exec dispatch shell/run_python。 host backend 仍承担 schema 列表 + 大部分 tool 执行;本类只在 shell/run_python 命中时夺路接管,docker exec 在 per-user 容器里跑。 """ def __init__( self, host: HostExecutor, pool: SandboxPool, user_id: UUID, user_root: Path, working_dir: Path, ) -> None: self.host = host self.pool = pool self.user_id = user_id self.user_root = user_root.resolve() self.working_dir = working_dir.resolve() # 容器内对应路径 /workspace/ try: wd_rel = self.working_dir.relative_to(self.user_root) self.container_workdir = "/workspace/" + wd_rel.as_posix() except ValueError: # working_dir 不在 user_root 下 —— 防御性兜底,正常路径不会到这里 self.container_workdir = "/workspace" self.exec_user = os.getenv("ZCBOT_SANDBOX_EXEC_USER", DEFAULT_EXEC_USER) # ── Executor 接口 ──────────────────────────────────────── def has_tool(self, name: str) -> bool: return self.host.has_tool(name) def schemas(self) -> List[Dict[str, Any]]: return self.host.schemas() def call_tool(self, name: str, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult: if name not in CONTAINER_TOOLS: return self.host.call_tool(name, args, ctx) if not self.host.has_tool(name): # caps.enable_run_python=False 等场景下,host 没装该工具 → schema 也没暴露 return ToolResult(content=f"[Error] unknown tool: {name}", exit_code=2) try: if name == "shell": return self._exec_shell(args, ctx) if name == "run_python": return self._exec_python(args, ctx) if name in FS_TOOLS: return self._exec_fs_tool(name, args, ctx) except Exception as e: return ToolResult( content=f"[Error executing {name} via docker] {type(e).__name__}: {e}", exit_code=1, ) return ToolResult(content=f"[Error] unhandled container tool: {name}", exit_code=2) # ── shell ──────────────────────────────────────────────── def _exec_shell(self, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult: cmd = args.get("command") if not isinstance(cmd, str) or not cmd.strip(): return ToolResult( content="[Error] bad arguments to shell: command must be non-empty string", exit_code=2, ) timeout = int(args.get("timeout") or 60) container = self.pool.ensure(self.user_id) argv = self._docker_exec_argv(container) + ["setsid", "bash", "-c", cmd] result = self._run_subprocess(argv, timeout=timeout, ctx=ctx) self.pool.mark_active(self.user_id) return result # ── run_python ─────────────────────────────────────────── def _exec_python(self, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult: code = args.get("code") if not isinstance(code, str): return ToolResult( content="[Error] bad arguments to run_python: code must be string", exit_code=2, ) timeout = int(args.get("timeout") or 120) # tmp .py 落 host 侧 `.zcbot_tmp//.py`; # 容器内对应 /workspace/.zcbot_tmp//.py tmp_root = self.user_root / TMP_SUBDIR / str(ctx.task_id) tmp_root.mkdir(parents=True, exist_ok=True) rand_name = f"{int(time.time() * 1000)}-{secrets.token_hex(4)}.py" host_script = tmp_root / rand_name container_script = f"/workspace/{TMP_SUBDIR}/{ctx.task_id}/{rand_name}" host_script.write_text(code, encoding="utf-8") try: container = self.pool.ensure(self.user_id) argv = self._docker_exec_argv( container, extra_env={ "PYTHONIOENCODING": "utf-8", "PYTHONPATH": "/workspace", }, ) + ["setsid", "python", container_script] result = self._run_subprocess(argv, timeout=timeout, ctx=ctx) self.pool.mark_active(self.user_id) return result finally: try: host_script.unlink() except OSError: pass # ── fs tools(read/write/edit/glob/grep)────────────────── def _exec_fs_tool( self, name: str, args: Dict[str, Any], ctx: ExecCtx ) -> ToolResult: """fs 工具走 `python /sandbox/tool_runner.py ` + stdin 喂 JSON args。 fs 工具的 cancel / timeout 都用与 shell/run_python 不同的默认值: - timeout 短(30s),fs 操作不会跑很久,卡住就说明撞 mount / 大目录扫描 - cancel 仍 poll(模型可能 grep 全 user_root 然后用户停止,响应即时) write/edit 起手 check 磁盘配额(§7.5 #4),超额返 [Error] 不调容器。 read/glob/grep 不消耗磁盘放行。 """ if name in _FS_TOOLS_WRITE: err = _check_user_disk_quota(self.user_id) if err is not None: return ToolResult(content=err, exit_code=2) timeout = int(args.get("timeout") or 30) if name == "grep" else 30 container = self.pool.ensure(self.user_id) argv = self._docker_exec_argv( container, extra_env={"PYTHONIOENCODING": "utf-8"}, stdin_open=True, ) + ["python", "/sandbox/tool_runner.py", name] # tool_runner.py 从 stdin 拿 args(JSON)── 路径含 CJK / 引号都透明传 stdin_payload = json.dumps(args, ensure_ascii=False) result = self._run_subprocess( argv, timeout=timeout, ctx=ctx, stdin=stdin_payload ) self.pool.mark_active(self.user_id) return result # ── helpers ────────────────────────────────────────────── def _docker_exec_argv( self, container: str, extra_env: Optional[Dict[str, str]] = None, stdin_open: bool = False, ) -> List[str]: """`stdin_open=True` 时加 `-i` 让 stdin 通到容器(fs tool_runner 用)。""" argv = [ "docker", "exec", "--user", self.exec_user, "--workdir", self.container_workdir, ] if stdin_open: argv.append("-i") env: Dict[str, str] = {} if extra_env: env.update(extra_env) for k, v in env.items(): argv.extend(["-e", f"{k}={v}"]) argv.append(container) return argv def _run_subprocess( self, argv: List[str], timeout: int, ctx: ExecCtx, stdin: Optional[str] = None, ) -> ToolResult: """跑 docker exec 子进程,带 cancel 协作 poll。 `stdin` 非空时通过 PIPE 喂给容器内进程(fs tool_runner 用 JSON args)。 cancel 命中 / timeout 到 → Popen.kill() 杀 docker CLI 客户端; 容器内 server 端进程接受 limitation(见模块头注释)。 fs tool_runner 返回形态特殊处理: - stdout 是 Tool.execute 直接结果(纯文本,无 [stdout] 包装) - exit_code != 0 时 stderr 含 [Error executing ...],透传给 LLM """ # 仅 shell/run_python 有 stdout/stderr 包装;fs tool_runner 输出本身就是 # LLM 拿到的最终串,不再包 [stdout]/[exit N] is_fs_tool = stdin is not None cancel_check = ctx.cancel_check try: proc = subprocess.Popen( argv, stdin=subprocess.PIPE if stdin is not None else None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", ) except FileNotFoundError as e: return ToolResult(content=f"[Error] docker CLI not found: {e}", exit_code=2) start = time.monotonic() cancel_hit = False timeout_hit = False stdout: str = "" stderr: str = "" while True: try: stdout, stderr = proc.communicate(input=stdin, timeout=0.5) break except subprocess.TimeoutExpired: if cancel_check is not None and cancel_check(): cancel_hit = True proc.kill() stdout, stderr = proc.communicate() break if time.monotonic() - start > timeout: timeout_hit = True proc.kill() stdout, stderr = proc.communicate() break if timeout_hit: return ToolResult( content=f"[Error] command timed out after {timeout}s", exit_code=124, ) if cancel_hit: return ToolResult( content="[Error] command cancelled by user", exit_code=130, ) # fs tool_runner:stdout 直返;exit != 0 走 stderr 当 [Error ...] 透传 if is_fs_tool: if proc.returncode == 0: return ToolResult(content=stdout, exit_code=0) # tool_runner.py 把 [Error] ... 落 stderr,exit 1=异常 / 2=参数 / unknown err_msg = stderr.strip() or f"tool_runner exit {proc.returncode}" return ToolResult(content=err_msg, exit_code=proc.returncode) # shell/run_python:原 [stdout]/[stderr]/[exit] 包装 parts: List[str] = [] if stdout: parts.append(f"[stdout]\n{stdout.rstrip()}") if stderr: parts.append(f"[stderr]\n{stderr.rstrip()}") parts.append(f"[exit {proc.returncode}]") return ToolResult(content="\n".join(parts), exit_code=proc.returncode) def _check_user_disk_quota(user_id: UUID): """write/edit 前 gate;读 yaml 配额 + 查 user_disk_usage 表。 放这里(模块级 helper)而非 DockerExecutor 方法是因为 host_executor 路径 也复用同款 gate(/v1/files/upload),实现一次写两处用。 """ try: from core.agent_builder import load_config from core.storage.disk_quota import check_disk_quota, parse_bytes cfg = load_config() or {} quotas = cfg.get("quotas") or {} limit = parse_bytes(quotas.get("disk_bytes_per_user")) if limit is None or limit <= 0: return None return check_disk_quota(user_id, limit) except Exception: # 配额查询失败不阻塞主路径(写仍放行,日志靠 caller) return None