zcbot/tests/test_executor_docker.py

499 lines
19 KiB
Python

"""DockerExecutor 单元测试。
mock subprocess(`docker exec` 命令的实际跑由部署机 smoke 验,RUN.md 有 5 条命令)。
覆盖关键路径:
- 信任域 dispatch:host 工具直通 / container 工具走 docker exec
- argv 形态:--user / --workdir / setsid / bash -c / python <script>
- tmp .py:写到 host 侧 `.zcbot_tmp/<task_id>/`,执行完 unlink,无残留
- timeout / cancel:Popen.kill() 兜底
- schemas() / has_tool() 透传 host
"""
from __future__ import annotations
import json
import platform
import subprocess
import sys
import tempfile
import threading
import time
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
from uuid import uuid4
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from core.executor import ExecCtx, ToolResult
from core.executor_docker import DockerExecutor, TMP_SUBDIR
from core.executor_host import HostExecutor
class FakePool:
"""SandboxPool 替身:ensure 返固定容器名,mark_active 记录调用。"""
def __init__(self):
self.ensure_calls = []
self.mark_active_calls = []
def ensure(self, user_id):
name = f"zcbot-sandbox-{user_id}"
self.ensure_calls.append(user_id)
return name
def mark_active(self, user_id):
self.mark_active_calls.append(user_id)
class FakeTool:
"""tools.base.Tool 替身:execute 返串,schema 暴露 name + 空 parameters。"""
def __init__(self, name, output="ok"):
self.name = name
self._output = output
self.execute_calls = []
@property
def schema(self):
return {"type": "function", "function": {"name": self.name}}
def execute(self, **kwargs):
self.execute_calls.append(kwargs)
return self._output
def make_executor(tools_dict=None):
"""构造 DockerExecutor + FakePool + tmp user_root。返回 (executor, pool, tmp_dir)。"""
tmp = tempfile.mkdtemp()
user_root = Path(tmp) / "users" / "u1"
user_root.mkdir(parents=True)
working_dir = user_root / "demo"
working_dir.mkdir()
if tools_dict is None:
tools_dict = {
"read": FakeTool("read", "READ_OUT"),
"shell": FakeTool("shell"), # host shell 不应被调用
"run_python": FakeTool("run_python"),
}
host = HostExecutor(tools_dict)
pool = FakePool()
executor = DockerExecutor(
host=host,
pool=pool,
user_id=uuid4(),
user_root=user_root,
working_dir=working_dir,
)
return executor, pool, Path(tmp)
def make_ctx(executor):
return ExecCtx(
user_id=executor.user_id,
task_id=uuid4(),
working_dir=executor.working_dir,
cancel_check=None,
)
class TestHostPassthrough(unittest.TestCase):
"""非 container tool 直通 host backend,不调 pool / subprocess。
2026-05-26 修正:fs 工具(read/write/edit/glob/grep)也进容器了,host passthrough
剩 load_skill / web_* / seedream / seedance(持 key)。用 load_skill 测 passthrough。
"""
def test_load_skill_passthrough_to_host(self):
executor, pool, _ = make_executor(tools_dict={
"load_skill": FakeTool("load_skill", "LOAD_OUT"),
})
ctx = make_ctx(executor)
result = executor.call_tool("load_skill", {"name": "x"}, ctx)
self.assertEqual(result.content, "LOAD_OUT")
self.assertEqual(result.exit_code, 0)
self.assertEqual(pool.ensure_calls, [])
self.assertEqual(pool.mark_active_calls, [])
def test_schemas_and_has_tool_from_host(self):
executor, _, _ = make_executor()
names = [s["function"]["name"] for s in executor.schemas()]
self.assertIn("read", names)
self.assertIn("shell", names)
self.assertTrue(executor.has_tool("shell"))
self.assertFalse(executor.has_tool("nope"))
class TestShellExec(unittest.TestCase):
"""shell 调用走 docker exec subprocess,argv 形态正确。"""
def test_shell_invokes_docker_exec(self):
executor, pool, _ = make_executor()
ctx = make_ctx(executor)
proc = MagicMock()
proc.communicate.return_value = ("hello\n", "")
proc.returncode = 0
with patch("core.executor_docker.subprocess.Popen", return_value=proc) as popen:
result = executor.call_tool("shell", {"command": "echo hello"}, ctx)
self.assertIn("[stdout]\nhello", result.content)
self.assertIn("[exit 0]", result.content)
self.assertEqual(result.exit_code, 0)
argv = popen.call_args[0][0]
self.assertEqual(argv[:2], ["docker", "exec"])
self.assertIn("--user", argv)
self.assertIn("--workdir", argv)
# workdir 应是 /workspace/demo(working_dir 相对 user_root)
self.assertEqual(argv[argv.index("--workdir") + 1], "/workspace/demo")
# container name = zcbot-sandbox-<uid>
container_idx = argv.index(f"zcbot-sandbox-{executor.user_id}")
# bash -c 紧跟 container 之后(setsid 2026-05-29 删 —— 跟 docker exec stdio 不兼容)
self.assertEqual(argv[container_idx + 1:], ["bash", "-c", "echo hello"])
self.assertEqual(pool.ensure_calls, [executor.user_id])
self.assertEqual(pool.mark_active_calls, [executor.user_id])
def test_shell_bad_args(self):
executor, _, _ = make_executor()
ctx = make_ctx(executor)
result = executor.call_tool("shell", {"command": ""}, ctx)
self.assertIn("[Error]", result.content)
self.assertEqual(result.exit_code, 2)
def test_shell_timeout(self):
executor, pool, _ = make_executor()
ctx = make_ctx(executor)
import subprocess as real_subprocess
proc = MagicMock()
# 第一次 communicate(timeout=1)抛 TimeoutExpired,第二次(kill 后)返尾巴
proc.communicate.side_effect = [
real_subprocess.TimeoutExpired(cmd="docker", timeout=1),
("", "killed\n"),
]
proc.returncode = -9
with patch("core.executor_docker.subprocess.Popen", return_value=proc):
result = executor.call_tool("shell", {"command": "sleep 9999", "timeout": 1}, ctx)
self.assertIn("timed out after 1s", result.content)
self.assertEqual(result.exit_code, 124)
proc.kill.assert_called_once()
def test_shell_cancel_inline_fastpath(self):
"""cancel_check 入口即 True → 不起 Popen,直接返 130(快路径)。"""
executor, _, _ = make_executor()
ctx = ExecCtx(
user_id=executor.user_id,
task_id=uuid4(),
working_dir=executor.working_dir,
cancel_check=lambda: True,
)
with patch("core.executor_docker.subprocess.Popen") as popen:
result = executor.call_tool("shell", {"command": "sleep 9999"}, ctx)
self.assertIn("cancelled by user", result.content)
self.assertEqual(result.exit_code, 130)
popen.assert_not_called() # 入口快路径,Popen 没起
def test_shell_cancel_via_canceller_thread(self):
"""cancel 在 Popen 之后由侧线程触发 → kill → 主线程读 cancel_hit 返 130。"""
# cancel_check 第 1 次(入口检查)返 False,第 2 次起(侧线程 poll)返 True
cancel_calls = [0]
def check():
cancel_calls[0] += 1
return cancel_calls[0] > 1
executor, _, _ = make_executor()
ctx = ExecCtx(
user_id=executor.user_id,
task_id=uuid4(),
working_dir=executor.working_dir,
cancel_check=check,
)
proc = MagicMock()
kill_event = threading.Event()
def fake_communicate(input=None, timeout=None):
# 等侧线程 kill 后再返回(模拟"被 SIGKILL 后 pipe 关 → communicate 收尾")
kill_event.wait(5.0)
return ("", "")
proc.communicate.side_effect = fake_communicate
proc.kill.side_effect = lambda: kill_event.set()
proc.returncode = -9
with patch("core.executor_docker._CANCEL_POLL_INTERVAL_S", 0.02), \
patch("core.executor_docker.subprocess.Popen", return_value=proc):
result = executor.call_tool("shell", {"command": "sleep 9999"}, ctx)
self.assertIn("cancelled by user", result.content)
self.assertEqual(result.exit_code, 130)
proc.kill.assert_called_once()
def test_run_subprocess_collects_multi_chunk_output(self):
"""回归:bash sleep 间隔多次 echo 必须全部回到 stdout(不丢 chunk)。
历史 bug:`communicate(timeout=0.5)` poll loop + bash block-buffered stdout
多 chunk 输出在某些时序下静默丢失,LLM 拿到空 `[exit 0]`。
本测用真子进程跑 `bash -c 'echo A; sleep 0.6; echo B; sleep 0.6; echo C'`,
断言 A/B/C 全在结果里。Windows dev 环境跳过(bash + sleep 语义)。
"""
if platform.system() == "Windows":
self.skipTest("bash + sleep semantics; runs on Linux CI/deploy")
executor, _, _ = make_executor()
ctx = make_ctx(executor)
argv = ["bash", "-c", "echo A; sleep 0.6; echo B; sleep 0.6; echo C"]
result = executor._run_subprocess(argv, timeout=10, ctx=ctx)
self.assertIn("A", result.content)
self.assertIn("B", result.content)
self.assertIn("C", result.content)
self.assertIn("[exit 0]", result.content)
self.assertEqual(result.exit_code, 0)
def test_run_subprocess_delayed_output_not_lost(self):
"""回归:子进程 sleep 后 print 必须完整捕获(实际是 _run_subprocess 层 + bash 行为)。
历史 bug:`docker exec ... setsid python script` 在 setsid 进入 new session
之后 docker exec / runc stdio attach 出问题,延迟输出丢失,LLM 拿空 [exit 0]。
本层测的是 _run_subprocess 自己的 stdout 收集逻辑(不经 docker exec);
跟 argv 不含 setsid 的断言两层防回潮。Windows 跳过(sleep 语义)。
"""
if platform.system() == "Windows":
self.skipTest("Linux only (sleep semantics)")
executor, _, _ = make_executor()
ctx = make_ctx(executor)
argv = [
sys.executable, "-c",
"import time; time.sleep(1); print('LATE')",
]
result = executor._run_subprocess(argv, timeout=10, ctx=ctx)
self.assertIn("LATE", result.content)
self.assertIn("[exit 0]", result.content)
self.assertEqual(result.exit_code, 0)
def test_argv_does_not_contain_setsid(self):
"""setsid 已删(docker exec + setsid 会丢延迟 stdout,见 PROGRESS 2026-05-29)。
未来 PGID kill 协议要做时改用 `setsid --wait` 或 wrapper,不能裸 setsid。
"""
executor, _, _ = make_executor()
ctx = make_ctx(executor)
proc = MagicMock()
proc.communicate.return_value = ("ok", "")
proc.returncode = 0
captured = []
def _popen(argv, **kw):
captured.append(argv)
return proc
with patch("core.executor_docker.subprocess.Popen", side_effect=_popen):
executor.call_tool("shell", {"command": "true"}, ctx)
executor.call_tool("run_python", {"code": "pass"}, ctx)
self.assertEqual(len(captured), 2, "expected 2 Popen invocations")
for argv in captured:
self.assertNotIn("setsid", argv, f"setsid should not be in argv: {argv}")
class TestRunPython(unittest.TestCase):
"""run_python:tmp .py 落 user_root/.zcbot_tmp/<task_id>/,跑完 unlink。"""
def test_run_python_tmp_script(self):
executor, pool, tmp_root = make_executor()
ctx = make_ctx(executor)
proc = MagicMock()
proc.communicate.return_value = ("42\n", "")
proc.returncode = 0
captured_argv = []
def _popen(argv, **kwargs):
captured_argv.append(argv)
return proc
with patch("core.executor_docker.subprocess.Popen", side_effect=_popen):
result = executor.call_tool(
"run_python", {"code": "print(42)"}, ctx
)
self.assertIn("[stdout]\n42", result.content)
self.assertEqual(result.exit_code, 0)
argv = captured_argv[0]
# 末尾形态:python /workspace/.zcbot_tmp/<task_id>/<rand>.py
# (2026-05-29 删 setsid,见 _exec_python 注释)
self.assertEqual(argv[-2], "python")
self.assertTrue(argv[-1].startswith(f"/workspace/{TMP_SUBDIR}/{ctx.task_id}/"))
self.assertTrue(argv[-1].endswith(".py"))
# PYTHONIOENCODING / PYTHONPATH 注入
env_kvs = [argv[i + 1] for i, a in enumerate(argv) if a == "-e"]
self.assertIn("PYTHONIOENCODING=utf-8", env_kvs)
# PYTHONPATH 必须含 /sandbox(让 SKILL.md 教的 `from skills.xxx import` work,
# skills/ bind mount 到 /sandbox/skills:ro)+ /workspace(用户 task 目录)
self.assertIn("PYTHONPATH=/sandbox:/workspace", env_kvs)
# host 侧 tmp 已 unlink(目录可能仍在,无所谓 —— ensure 容器时会重新 mkdir)
tmp_subroot = executor.user_root / TMP_SUBDIR / str(ctx.task_id)
leftover = list(tmp_subroot.glob("*.py")) if tmp_subroot.exists() else []
self.assertEqual(leftover, [], f"tmp .py not cleaned up: {leftover}")
def test_run_python_bad_code_type(self):
executor, _, _ = make_executor()
ctx = make_ctx(executor)
result = executor.call_tool("run_python", {"code": 123}, ctx)
self.assertIn("[Error]", result.content)
self.assertEqual(result.exit_code, 2)
def test_run_python_cleans_tmp_on_exception(self):
"""Popen 抛异常时 tmp .py 仍要被清理(finally 兜底)。"""
executor, _, _ = make_executor()
ctx = make_ctx(executor)
with patch(
"core.executor_docker.subprocess.Popen",
side_effect=RuntimeError("boom"),
):
result = executor.call_tool("run_python", {"code": "x"}, ctx)
self.assertIn("[Error executing run_python via docker]", result.content)
self.assertEqual(result.exit_code, 1)
tmp_subroot = executor.user_root / TMP_SUBDIR / str(ctx.task_id)
leftover = list(tmp_subroot.glob("*.py")) if tmp_subroot.exists() else []
self.assertEqual(leftover, [])
class TestFsToolsInContainer(unittest.TestCase):
"""fs 工具(read/write/edit/glob/grep)走 docker exec + tool_runner.py(§7.5 #6)。"""
def _setup_fs_executor(self):
return make_executor(tools_dict={
"read": FakeTool("read"),
"write": FakeTool("write"),
"edit": FakeTool("edit"),
"glob": FakeTool("glob"),
"grep": FakeTool("grep"),
})
def test_read_invokes_tool_runner(self):
executor, pool, _ = self._setup_fs_executor()
ctx = make_ctx(executor)
proc = MagicMock()
proc.communicate.return_value = ("file content here", "")
proc.returncode = 0
with patch("core.executor_docker.subprocess.Popen", return_value=proc) as popen:
result = executor.call_tool("read", {"path": "foo.txt"}, ctx)
# fs 工具:stdout 直返,不包 [stdout]/[exit]
self.assertEqual(result.content, "file content here")
self.assertEqual(result.exit_code, 0)
argv = popen.call_args[0][0]
# argv 末三:python /sandbox/tool_runner.py read
self.assertEqual(argv[-3:], ["python", "/sandbox/tool_runner.py", "read"])
# 必须有 -i(stdin 通到容器)
self.assertIn("-i", argv)
# workdir / user 正常
self.assertEqual(argv[argv.index("--workdir") + 1], "/workspace/demo")
# stdin 喂的 JSON args
kwargs = popen.call_args[1]
self.assertEqual(kwargs.get("stdin"), subprocess.PIPE)
stdin_payload = proc.communicate.call_args[1].get("input")
self.assertEqual(json.loads(stdin_payload), {"path": "foo.txt"})
# pool 调过
self.assertEqual(pool.ensure_calls, [executor.user_id])
self.assertEqual(pool.mark_active_calls, [executor.user_id])
def test_write_with_cjk_path(self):
"""CJK 路径不被 shell metachar 切(stdin 喂 JSON 的核心论据)。"""
executor, _, _ = self._setup_fs_executor()
ctx = make_ctx(executor)
proc = MagicMock()
proc.communicate.return_value = ("[wrote 100 chars to 测试.md]", "")
proc.returncode = 0
with patch("core.executor_docker.subprocess.Popen", return_value=proc):
result = executor.call_tool(
"write",
{"path": "测试目录/中文文件.md", "content": "你好"},
ctx,
)
self.assertIn("[wrote", result.content)
stdin_payload = proc.communicate.call_args[1].get("input")
parsed = json.loads(stdin_payload)
self.assertEqual(parsed["path"], "测试目录/中文文件.md")
self.assertEqual(parsed["content"], "你好")
def test_grep_error_to_stderr(self):
"""tool_runner.py exit != 0 时 stderr 当 ToolResult content 透传。"""
executor, _, _ = self._setup_fs_executor()
ctx = make_ctx(executor)
proc = MagicMock()
proc.communicate.return_value = ("", "[Error] invalid regex: ...\n")
proc.returncode = 1
with patch("core.executor_docker.subprocess.Popen", return_value=proc):
result = executor.call_tool("grep", {"pattern": "["}, ctx)
self.assertIn("[Error]", result.content)
self.assertEqual(result.exit_code, 1)
def test_fs_tool_timeout(self):
executor, _, _ = self._setup_fs_executor()
ctx = make_ctx(executor)
proc = MagicMock()
proc.communicate.side_effect = [
subprocess.TimeoutExpired(cmd="docker", timeout=30),
("", ""),
]
proc.returncode = -9
with patch("core.executor_docker.subprocess.Popen", return_value=proc):
result = executor.call_tool("glob", {"pattern": "**/*"}, ctx)
self.assertIn("timed out", result.content)
self.assertEqual(result.exit_code, 124)
proc.kill.assert_called_once()
class TestUnknownTool(unittest.TestCase):
def test_unknown_tool_goes_to_host(self):
executor, _, _ = make_executor(tools_dict={}) # 空 host → 啥都没
ctx = make_ctx(executor)
result = executor.call_tool("nope", {}, ctx)
self.assertIn("unknown tool", result.content)
self.assertEqual(result.exit_code, 2)
def test_container_tool_not_registered_on_host(self):
"""caps.enable_run_python=False:host 没装 run_python,docker 也应拒。"""
executor, _, _ = make_executor(tools_dict={"read": FakeTool("read")})
ctx = make_ctx(executor)
result = executor.call_tool("run_python", {"code": "x"}, ctx)
self.assertIn("unknown tool", result.content)
self.assertEqual(result.exit_code, 2)
if __name__ == "__main__":
unittest.main()