zcbot/core/executor_docker.py

349 lines
15 KiB
Python

"""DockerExecutor:fs / shell / run_python 全走 docker exec,持 key 工具留 host(§7.5 #6)。
Backend 二分(§7.5 #6 信任域,2026-05-26 修正:`paths.py::resolve_user_path` 校验
原本是 DESIGN 假命题 ── 实际 host 工具 base_dir = Path.cwd() 无校验,模型能 read
host 整个 fs。改物理边界替代代码护栏):
- **container exec**:`shell` / `run_python` / `read` / `write` / `edit` / `glob` /
`grep` —— 全走 docker exec,容器内 user_root=/workspace 物理边界
- **host in-process**:`load_skill` / `web_*` / `seedream` / `seedance` —— 持
Bocha/ARK API key 不能入容器 env(SaaS 时 key 泄漏面);load_skill 是 SKILL 注册表
内存查找无 fs 访问越界
容器准入(per call):
1. `pool.ensure(user_id)` —— 拿到 / 起 `zcbot-sandbox-<uid>` 容器(per-user lock 已串行化)
2. 命令分两类:
- shell/run_python:`docker exec --user zcbot --workdir /workspace/<wd> -e ... setsid bash -c '<cmd>'`
- read/write/edit/glob/grep:`docker exec --user zcbot --workdir /workspace/<wd>
<c> python /sandbox/tool_runner.py <tool_name>`,JSON args 走 stdin
(不被 shell metachar 切,CJK 路径透明传)
3. timeout 到 → 杀 docker CLI 客户端(Popen.kill())
4. 完成 → `pool.mark_active(user_id)` 刷 idle 计时
run_python tmp .py 落 host 侧 `<user_root>/.zcbot_tmp/<task_id>/<rand>.py`(bind mount
自动可见于容器 `/workspace/.zcbot_tmp/<task_id>/`),执行完 unlink。dotfile 起头让
`/v1/files` API 天然过滤(`web/app.py:169` startswith(".")),用户视野不污染。
Cancel limitation(第一版接受):
- docker exec 客户端断开后,容器内 server 端进程**不会**因此终止 —— 这是 docker 设计
- 第一版只杀 docker CLI(Popen.kill());容器内残留进程靠 idle 5min reaper / 下次
ensure 时 rm -f 兜底
- 升级触发(§7.5 #3 PGID 协议):用户反馈"取消了但还在烧 CPU" / 多次 cancel 后
容器内进程堆积 → 启用「ZCBOT_EXEC_ID env + PGID 写文件 + 二次 exec kill」协议
"""
from __future__ import annotations
import os
import secrets
import subprocess
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from uuid import UUID
import json
from .executor import ExecCtx, Executor, ToolResult
from .executor_host import HostExecutor
from .sandbox import SandboxPool
# write/edit 走配额 gate;read/glob/grep 不消耗磁盘,放行
_FS_TOOLS_WRITE = frozenset({"write", "edit"})
# 信任域分类(§7.5 #6,2026-05-26 修正):
# - SHELL_LIKE:执行任意代码,Popen 直接喂 cmd / script,setsid 包一层
# - FS_TOOLS:fs 操作,docker exec → /sandbox/tool_runner.py + stdin 喂 JSON args
# 二者都走 docker exec,但调用形态不同(setsid bash vs python tool_runner)
SHELL_LIKE_TOOLS = frozenset({"shell", "run_python"})
FS_TOOLS = frozenset({"read", "write", "edit", "glob", "grep"})
CONTAINER_TOOLS = SHELL_LIKE_TOOLS | FS_TOOLS
# 容器内非 root 用户:用 username 让 docker 解析容器内 /etc/passwd 自动拿 uid。
# Dockerfile 里 `useradd -u ${HOST_UID} zcbot` 已对齐 host uid,这里写死 "zcbot"
# 让镜像 build 时不同 HOST_UID 部署形态(1000 / 1001 / 其他)都不用改代码或 env。
# 写死 uid:gid 形式("1000:1000")会与 bind mount owner 错配,导致 EACCES。
DEFAULT_EXEC_USER = "zcbot"
# host 侧 tmp 脚本目录(user_root 内 dotfile,被 /v1/files API 隐藏)
TMP_SUBDIR = ".zcbot_tmp"
class DockerExecutor(Executor):
"""组合 HostExecutor + docker exec dispatch shell/run_python。
host backend 仍承担 schema 列表 + 大部分 tool 执行;本类只在 shell/run_python
命中时夺路接管,docker exec 在 per-user 容器里跑。
"""
def __init__(
self,
host: HostExecutor,
pool: SandboxPool,
user_id: UUID,
user_root: Path,
working_dir: Path,
) -> None:
self.host = host
self.pool = pool
self.user_id = user_id
self.user_root = user_root.resolve()
self.working_dir = working_dir.resolve()
# 容器内对应路径 /workspace/<wd_name>
try:
wd_rel = self.working_dir.relative_to(self.user_root)
self.container_workdir = "/workspace/" + wd_rel.as_posix()
except ValueError:
# working_dir 不在 user_root 下 —— 防御性兜底,正常路径不会到这里
self.container_workdir = "/workspace"
self.exec_user = os.getenv("ZCBOT_SANDBOX_EXEC_USER", DEFAULT_EXEC_USER)
# ── Executor 接口 ────────────────────────────────────────
def has_tool(self, name: str) -> bool:
return self.host.has_tool(name)
def schemas(self) -> List[Dict[str, Any]]:
return self.host.schemas()
def call_tool(self, name: str, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult:
if name not in CONTAINER_TOOLS:
return self.host.call_tool(name, args, ctx)
if not self.host.has_tool(name):
# caps.enable_run_python=False 等场景下,host 没装该工具 → schema 也没暴露
return ToolResult(content=f"[Error] unknown tool: {name}", exit_code=2)
try:
if name == "shell":
return self._exec_shell(args, ctx)
if name == "run_python":
return self._exec_python(args, ctx)
if name in FS_TOOLS:
return self._exec_fs_tool(name, args, ctx)
except Exception as e:
return ToolResult(
content=f"[Error executing {name} via docker] {type(e).__name__}: {e}",
exit_code=1,
)
return ToolResult(content=f"[Error] unhandled container tool: {name}", exit_code=2)
# ── shell ────────────────────────────────────────────────
def _exec_shell(self, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult:
cmd = args.get("command")
if not isinstance(cmd, str) or not cmd.strip():
return ToolResult(
content="[Error] bad arguments to shell: command must be non-empty string",
exit_code=2,
)
timeout = int(args.get("timeout") or 60)
container = self.pool.ensure(self.user_id)
argv = self._docker_exec_argv(container) + ["setsid", "bash", "-c", cmd]
result = self._run_subprocess(argv, timeout=timeout, ctx=ctx)
self.pool.mark_active(self.user_id)
return result
# ── run_python ───────────────────────────────────────────
def _exec_python(self, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult:
code = args.get("code")
if not isinstance(code, str):
return ToolResult(
content="[Error] bad arguments to run_python: code must be string",
exit_code=2,
)
timeout = int(args.get("timeout") or 120)
# tmp .py 落 host 侧 `.zcbot_tmp/<task_id>/<rand>.py`;
# 容器内对应 /workspace/.zcbot_tmp/<task_id>/<rand>.py
tmp_root = self.user_root / TMP_SUBDIR / str(ctx.task_id)
tmp_root.mkdir(parents=True, exist_ok=True)
rand_name = f"{int(time.time() * 1000)}-{secrets.token_hex(4)}.py"
host_script = tmp_root / rand_name
container_script = f"/workspace/{TMP_SUBDIR}/{ctx.task_id}/{rand_name}"
host_script.write_text(code, encoding="utf-8")
try:
container = self.pool.ensure(self.user_id)
argv = self._docker_exec_argv(
container,
extra_env={
"PYTHONIOENCODING": "utf-8",
# /sandbox 在前:让 `from skills.xxx.helper import ...` work
# (skills/ bind mount 到 /sandbox/skills:ro,SKILL.md 教 LLM
# 这条 import path);/workspace 在后:用户 task 目录的本地脚本
"PYTHONPATH": "/sandbox:/workspace",
},
) + ["setsid", "python", container_script]
result = self._run_subprocess(argv, timeout=timeout, ctx=ctx)
self.pool.mark_active(self.user_id)
return result
finally:
try:
host_script.unlink()
except OSError:
pass
# ── fs tools(read/write/edit/glob/grep)──────────────────
def _exec_fs_tool(
self, name: str, args: Dict[str, Any], ctx: ExecCtx
) -> ToolResult:
"""fs 工具走 `python /sandbox/tool_runner.py <name>` + stdin 喂 JSON args。
fs 工具的 cancel / timeout 都用与 shell/run_python 不同的默认值:
- timeout 短(30s),fs 操作不会跑很久,卡住就说明撞 mount / 大目录扫描
- cancel 仍 poll(模型可能 grep 全 user_root 然后用户停止,响应即时)
write/edit 起手 check 磁盘配额(§7.5 #4),超额返 [Error] 不调容器。
read/glob/grep 不消耗磁盘放行。
"""
if name in _FS_TOOLS_WRITE:
err = _check_user_disk_quota(self.user_id)
if err is not None:
return ToolResult(content=err, exit_code=2)
timeout = int(args.get("timeout") or 30) if name == "grep" else 30
container = self.pool.ensure(self.user_id)
argv = self._docker_exec_argv(
container,
extra_env={"PYTHONIOENCODING": "utf-8"},
stdin_open=True,
) + ["python", "/sandbox/tool_runner.py", name]
# tool_runner.py 从 stdin 拿 args(JSON)── 路径含 CJK / 引号都透明传
stdin_payload = json.dumps(args, ensure_ascii=False)
result = self._run_subprocess(
argv, timeout=timeout, ctx=ctx, stdin=stdin_payload
)
self.pool.mark_active(self.user_id)
return result
# ── helpers ──────────────────────────────────────────────
def _docker_exec_argv(
self,
container: str,
extra_env: Optional[Dict[str, str]] = None,
stdin_open: bool = False,
) -> List[str]:
"""`stdin_open=True` 时加 `-i` 让 stdin 通到容器(fs tool_runner 用)。"""
argv = [
"docker", "exec",
"--user", self.exec_user,
"--workdir", self.container_workdir,
]
if stdin_open:
argv.append("-i")
env: Dict[str, str] = {}
if extra_env:
env.update(extra_env)
for k, v in env.items():
argv.extend(["-e", f"{k}={v}"])
argv.append(container)
return argv
def _run_subprocess(
self,
argv: List[str],
timeout: int,
ctx: ExecCtx,
stdin: Optional[str] = None,
) -> ToolResult:
"""跑 docker exec 子进程,带 cancel 协作 poll。
`stdin` 非空时通过 PIPE 喂给容器内进程(fs tool_runner 用 JSON args)。
cancel 命中 / timeout 到 → Popen.kill() 杀 docker CLI 客户端;
容器内 server 端进程接受 limitation(见模块头注释)。
fs tool_runner 返回形态特殊处理:
- stdout 是 Tool.execute 直接结果(纯文本,无 [stdout] 包装)
- exit_code != 0 时 stderr 含 [Error executing ...],透传给 LLM
"""
# 仅 shell/run_python 有 stdout/stderr 包装;fs tool_runner 输出本身就是
# LLM 拿到的最终串,不再包 [stdout]/[exit N]
is_fs_tool = stdin is not None
cancel_check = ctx.cancel_check
try:
proc = subprocess.Popen(
argv,
stdin=subprocess.PIPE if stdin is not None else None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
)
except FileNotFoundError as e:
return ToolResult(content=f"[Error] docker CLI not found: {e}", exit_code=2)
start = time.monotonic()
cancel_hit = False
timeout_hit = False
stdout: str = ""
stderr: str = ""
while True:
try:
stdout, stderr = proc.communicate(input=stdin, timeout=0.5)
break
except subprocess.TimeoutExpired:
if cancel_check is not None and cancel_check():
cancel_hit = True
proc.kill()
stdout, stderr = proc.communicate()
break
if time.monotonic() - start > timeout:
timeout_hit = True
proc.kill()
stdout, stderr = proc.communicate()
break
if timeout_hit:
return ToolResult(
content=f"[Error] command timed out after {timeout}s",
exit_code=124,
)
if cancel_hit:
return ToolResult(
content="[Error] command cancelled by user",
exit_code=130,
)
# fs tool_runner:stdout 直返;exit != 0 走 stderr 当 [Error ...] 透传
if is_fs_tool:
if proc.returncode == 0:
return ToolResult(content=stdout, exit_code=0)
# tool_runner.py 把 [Error] ... 落 stderr,exit 1=异常 / 2=参数 / unknown
err_msg = stderr.strip() or f"tool_runner exit {proc.returncode}"
return ToolResult(content=err_msg, exit_code=proc.returncode)
# shell/run_python:原 [stdout]/[stderr]/[exit] 包装
parts: List[str] = []
if stdout:
parts.append(f"[stdout]\n{stdout.rstrip()}")
if stderr:
parts.append(f"[stderr]\n{stderr.rstrip()}")
parts.append(f"[exit {proc.returncode}]")
return ToolResult(content="\n".join(parts), exit_code=proc.returncode)
def _check_user_disk_quota(user_id: UUID):
"""write/edit 前 gate;读 yaml 配额 + 查 user_disk_usage 表。
放这里(模块级 helper)而非 DockerExecutor 方法是因为 host_executor 路径
也复用同款 gate(/v1/files/upload),实现一次写两处用。
"""
try:
from core.agent_builder import load_config
from core.storage.disk_quota import check_disk_quota, parse_bytes
cfg = load_config() or {}
quotas = cfg.get("quotas") or {}
limit = parse_bytes(quotas.get("disk_bytes_per_user"))
if limit is None or limit <= 0:
return None
return check_disk_quota(user_id, limit)
except Exception:
# 配额查询失败不阻塞主路径(写仍放行,日志靠 caller)
return None