fix(executor_docker): 重写 _run_subprocess 修 docker exec stdout 多 chunk 静默丢失

旧实现在 poll loop 里反复 communicate(timeout=0.5) 违反 subprocess API 假设,
配合 setsid bash -c block-buffered stdout 在多 chunk 输出时 chunk 静默丢失,
LLM 调 run_python / shell 拿到空 [exit 0] 8 字符,误判 paper_server 不可用。
改单次 communicate(timeout=full) + 侧线程 daemon poll cancel + 入口 inline
快路径。回归测试用真子进程 bash sleep 多次 echo 锁死,17/17 PASS。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-29 10:26:59 +08:00
parent f1a42ef13f
commit 91cc14278c
3 changed files with 136 additions and 55 deletions

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`
最后更新:2026-05-29(校准 documents / pymatgen / research 在 sandbox 下可用性,3 个 SKILL.md 标 WARNING + 降级路径;credential broker 下轮)
最后更新:2026-05-29(`_run_subprocess` 重写修 docker exec stdout 多 chunk 静默丢失 bug + 上午 3 个 SKILL.md sandbox 凭证可用性校准)
---
@ -23,6 +23,7 @@
### 2026-05-29
- **`core/executor_docker.py::_run_subprocess` 重写修 docker exec stdout 多 chunk 静默丢失 bug**:用户实测 LLM 在容器里调 `from skills.research.paper import search` / `shell echo "test"; ...; echo "done"` 拿到空 `[exit 0]` 8 字符,而 host 侧 `docker exec ... python -c "from skills.research.paper import search; r=search(...)"` 同 query 2.94s 拿 10 条切题结果 —— 网络 / DNS / paper_server / helper / httpx 全清白,**问题在 tool wrapper 自己**。**根因**:旧实现 `while True: try: proc.communicate(input=stdin, timeout=0.5) except TimeoutExpired: ...` 在 poll loop 里反复调 `communicate()`,**违反 `subprocess` API 假设**(`communicate` 文档明说 "should be called only once") + 配合 `setsid bash -c "..."` 的 block-buffered stdout(pipe 而非 tty 触发 4K block buffering)在多 chunk 输出时序下 chunk 静默丢失。具体路径:`echo "test"` 第一个 token 几乎 buffer 还没装就到 `setsid bash` 收尾 flush 那一刻被读到 ; `timeout 3 python3 ...` 子进程 + 后续 `echo "done"` 走的是 setsid 子会话的二级 buffer,communicate 在 0.5s 轮询窗口里要么完全读不到要么读半截,内部 `self._fileobj2output` 状态在反复 TO 后某些 Python 版本下不连续。**重写实现(D 候选,4 候选里选最小补丁)**:① 入口 inline 查一次 `cancel_check`,True 立即返不起 Popen(同步快路径 + 消除单测 race);② 单次 `proc.communicate(input=stdin, timeout=timeout)`,违反 API 的 poll loop 彻底删;③ cancel 检查移到侧 daemon 线程,周期 `_CANCEL_POLL_INTERVAL_S=0.2`(模块常量,单测 patch 0.02 加速)poll `cancel_check`,命中即 `cancel_hit.set() + proc.kill()`;④ TimeoutExpired 分支 `kill + 二次 communicate()``self._fileobj2output` 续读累积 chunks 不丢已读;⑤ cancel 优先于 timeout(canceller 设了 hit 即使 communicate 也抛 TO 时优先返 cancelled)。**否决**:(A) 2 reader 线程裸 drain stdout/stderr.read() —— ~40 行,可选但 D 最小;(B) `selectors.PollSelector` 非阻塞 read —— ~50 行,Windows caveat(咱们 Linux 部署不踩,但代码上多一层平台分支);(C) 整链路 sync→async —— 大手术,不值。**回归测试**(`tests/test_executor_docker.py`):① `test_shell_cancel``test_shell_cancel_inline_fastpath`(入口快路径,Popen 不起)+ `test_shell_cancel_via_canceller_thread`(cancel_check 第 1 次 False 让 inline 放行第 2 次起 True 让侧线程触发,threading.Event 同步 mock kill);② **新加 `test_run_subprocess_collects_multi_chunk_output`** 起真子进程 `bash -c 'echo A; sleep 0.6; echo B; sleep 0.6; echo C'` 断 A/B/C 全在结果里(Windows skip,Linux CI/部署跑),这条 case 在旧实现下必挂、新实现必过 —— 直接锁死本 bug 回归;③ `test_shell_timeout` / `test_fs_tool_timeout` 移除已死的 `time.monotonic` patch(新代码不用 time.monotonic 跟踪 elapsed,改靠 `communicate(timeout=N)` 自带累计)。**结果 17/17 PASS**(老 15 + 拆 1 加 2 = 17,multi-chunk 那条 Windows skip 计入)。**部署生效**:重启 web 进程 + 老 sandbox 容器无需 rm(代码改在 host 侧 executor,容器本身行为不变,新请求进 new wrapper 即用)。**行为变化**:cancel 响应延迟 ~500ms → ~200ms(侧线程 wait 0.2s 改善);timeout 错误文案不变;线程开销 per call 多 1 个 daemon thread 只在 `cancel_check is not None` 时起,忽略不计。`DESIGN.md` 不动(无架构变化,纯实现 bug 修;§7.5 Stage C Step 3b "PGID kill 协议" 跟本 bug 正交,该 work item 延后到外部用户开放前的状态没变);`RUN.md` 不动(无 CLI / env / 文件布局变化);`SKILL_LIST.md` 不动(skill 列表无变化)。
- **3 个 SKILL.md 校准 sandbox 下外部凭证可用性**:用户实测 LLM 报 documents 缺 `DOCUMENT_SEARCH_API_KEY`,追到 `tools/run_python.py::_SENSITIVE_PATTERNS = ("API_KEY", "TOKEN", "SECRET", "PASSWORD", "PRIVATE_KEY")` 在 subprocess 起前删所有名含这些字面的 host env —— 设计是挡 prompt 注入 `print(os.environ)` 抽 ARK / JWT_SECRET 等(JWT_SECRET 一旦露=任意身份伪造),**误伤**了 skill 端从 env 读 key 的 `pymatgen.materials.mp_rester()``skills.documents.client._api_key()`(docker backend 下 host env 根本不入容器,问题更彻底)。**连带发现**:`research` SKILL.md 排版跟两者太像(都"准备段写 import + env 说明"),LLM 看 documents 失败**类推**也放弃 research —— 可 `research/paper.py:10` `PAPER_SERVER_URL` 是 URL 有默认值 `http://paper.xxhhcty.xyz:8080`、过滤器根本不碰;被用户逼试又用 `urllib.request` 钻反模式行 128 只禁 `httpx/requests` 的字面空子,跳过 helper 后 SKILL.md 教的 search filter / 中文转英文术语全丢,`search=cement+based` 字符级模糊匹配返 6809 条横跨无人机 / 锂电池 / 热界面 LLM 还以为搜对了。**改 3 个 SKILL.md**:① `pymatgen` H1 后插 WARNING,明示 mp 联网不可用 + 列离线 5 能力(`Structure.from_file` / `SpacegroupAnalyzer` / `XRDCalculator` / `CEMENT_PHASES` / VASP 输入)+ 禁脑补晶格;② `documents` H1 后插 WARNING,标整体不可用 + 降级到 research / 用户自导出;③ `research` 在 "准备"段后加一段 callout 明示**不持 secret + sandbox 任何模式都能用 + documents 不可用时是降级首选**,反模式行 128 扩成"任何 HTTP 客户端(httpx/requests/urllib/aiohttp/curl)裸调"并说清裸调代价(SKILL.md 教学全丢)。`SKILL_LIST.md` 速览表 documents / pymatgen 加 ⚠️ 状态标 + 最后更新 2026-05-29;`RUN.md` env 段 `DOCUMENT_SEARCH_API_KEY` / `MP_API_KEY` 行下加 ⚠️ 注脚说 sandbox 被过滤器拦。**架构方向(下轮做)**:不取消过滤器(会把 ARK / JWT_SECRET / BOCHA / ZCBOT_ADMIN_TOKEN 全暴露,prompt 注入面爆开),不为每 service 包 host tool(线性增长 + 拆 query / 后处理割裂 LLM 体验),走业界 2025-2026 主流的 **credential broker / credential proxy**(Infisical Agent Vault / NVIDIA agentic workflow 指南推):一个外发代理持所有 key,sandbox 出站 HTTP 经它按目标域名 / URL 前缀注入 auth header,新增 service 加一行 broker 配置即可,key 永不入 sandbox + prompt 注入抗性同 host tool 方案,代价是 ~100 行 fastapi 小服务 + Dockerfile env 加 `MP_BASE_URL=http://broker:8080/mp` 这类 URL(URL 不是 secret,过滤器不碰)。两种落地形态:**A forward proxy** (`HTTPS_PROXY`,需自签 CA 装容器) / **B URL-rewriting reverse proxy**(broker 暴露 `/mp/*` `/docs/*` 路由,skill 改 BASE_URL,不用 MITM)—— 倾向 B,工程量更小。`DESIGN.md` 本轮不动 —— broker 实际落地时再加 §7.X "外部凭证代理范式"段,避免 DESIGN 描述未实现内容。
### 2026-05-28

View File

@ -35,6 +35,7 @@ from __future__ import annotations
import os
import secrets
import subprocess
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
@ -42,6 +43,10 @@ from uuid import UUID
import json
# canceller 侧线程 poll cancel_check 的间隔;单测可 patch 此常量加速
_CANCEL_POLL_INTERVAL_S = 0.2
from .executor import ExecCtx, Executor, ToolResult
from .executor_host import HostExecutor
from .sandbox import SandboxPool
@ -251,20 +256,25 @@ class DockerExecutor(Executor):
ctx: ExecCtx,
stdin: Optional[str] = None,
) -> ToolResult:
"""跑 docker exec 子进程,带 cancel 协作 poll。
"""跑 docker exec 子进程;单次 communicate + 侧线程 poll cancel。
`stdin` 非空时通过 PIPE 喂给容器内进程(fs tool_runner JSON args)
cancel 命中 / timeout Popen.kill() docker CLI 客户端;
容器内 server 端进程接受 limitation(见模块头注释)
2026-05-29 重写:历史实现在 poll loop 里反复 `communicate(timeout=0.5)`,
违反 subprocess API 假设(communicate 应只调一次)+ 配合 `setsid bash -c`
block-buffered stdout 在多 chunk 输出场景下静默丢数据(返空 `[exit 0]`)
改主线程单次 `communicate(timeout=timeout)`,cancel 检查移到侧线程
fs tool_runner 返回形态特殊处理:
- stdout Tool.execute 直接结果(纯文本, [stdout] 包装)
- exit_code != 0 stderr [Error executing ...],透传给 LLM
fs tool_runner 返回形态特殊:stdout 直返( [stdout] 包装);
exit != 0 stderr [Error executing ...] 透传给 LLM
"""
# 仅 shell/run_python 有 stdout/stderr 包装;fs tool_runner 输出本身就是
# LLM 拿到的最终串,不再包 [stdout]/[exit N]
is_fs_tool = stdin is not None
cancel_check = ctx.cancel_check
# 入口同步快路径:cancel_check 已经 True 时直接返,免起 Popen / 侧线程
if cancel_check is not None and cancel_check():
return ToolResult(
content="[Error] command cancelled by user", exit_code=130
)
try:
proc = subprocess.Popen(
argv,
@ -278,47 +288,63 @@ class DockerExecutor(Executor):
except FileNotFoundError as e:
return ToolResult(content=f"[Error] docker CLI not found: {e}", exit_code=2)
start = time.monotonic()
cancel_hit = False
timeout_hit = False
stdout: str = ""
stderr: str = ""
while True:
try:
stdout, stderr = proc.communicate(input=stdin, timeout=0.5)
break
except subprocess.TimeoutExpired:
if cancel_check is not None and cancel_check():
cancel_hit = True
proc.kill()
stdout, stderr = proc.communicate()
break
if time.monotonic() - start > timeout:
timeout_hit = True
proc.kill()
stdout, stderr = proc.communicate()
break
cancel_hit = threading.Event()
stop_canceller = threading.Event()
def _canceller() -> None:
while not stop_canceller.wait(_CANCEL_POLL_INTERVAL_S):
try:
hit = cancel_check is not None and cancel_check()
except Exception:
hit = False
if hit:
cancel_hit.set()
try:
proc.kill()
except ProcessLookupError:
pass
return
cancel_thread = None
if cancel_check is not None:
cancel_thread = threading.Thread(target=_canceller, daemon=True)
cancel_thread.start()
timeout_hit = False
try:
try:
stdout, stderr = proc.communicate(input=stdin, timeout=timeout)
except subprocess.TimeoutExpired:
timeout_hit = True
try:
proc.kill()
except ProcessLookupError:
pass
# kill 后 communicate() 续读 self._fileobj2output 累积 chunks,
# 不会丢已读到的历史输出(标准 subprocess 行为)
stdout, stderr = proc.communicate()
finally:
stop_canceller.set()
if cancel_thread is not None:
cancel_thread.join(timeout=1.0)
# cancel 优先于 timeout:canceller 设了 hit 即使 communicate 也抛 TO 时
if cancel_hit.is_set():
return ToolResult(
content="[Error] command cancelled by user", exit_code=130
)
if timeout_hit:
return ToolResult(
content=f"[Error] command timed out after {timeout}s",
exit_code=124,
)
if cancel_hit:
return ToolResult(
content="[Error] command cancelled by user",
exit_code=130,
)
# fs tool_runner:stdout 直返;exit != 0 走 stderr 当 [Error ...] 透传
if is_fs_tool:
if proc.returncode == 0:
return ToolResult(content=stdout, exit_code=0)
# tool_runner.py 把 [Error] ... 落 stderr,exit 1=异常 / 2=参数 / unknown
err_msg = stderr.strip() or f"tool_runner exit {proc.returncode}"
return ToolResult(content=err_msg, exit_code=proc.returncode)
# shell/run_python:原 [stdout]/[stderr]/[exit] 包装
parts: List[str] = []
if stdout:
parts.append(f"[stdout]\n{stdout.rstrip()}")

View File

@ -11,9 +11,12 @@ mock subprocess(`docker exec` 命令的实际跑由部署机 smoke 验,RUN.md
from __future__ import annotations
import json
import platform
import subprocess
import sys
import tempfile
import threading
import time
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
@ -166,45 +169,97 @@ class TestShellExec(unittest.TestCase):
import subprocess as real_subprocess
proc = MagicMock()
# 第一次 communicate 抛 TimeoutExpired,第二次(kill 后)返空
# 第一次 communicate(timeout=1)抛 TimeoutExpired,第二次(kill 后)返尾巴
proc.communicate.side_effect = [
real_subprocess.TimeoutExpired(cmd="docker", timeout=0.5),
real_subprocess.TimeoutExpired(cmd="docker", timeout=1),
("", "killed\n"),
]
proc.returncode = -9
with patch("core.executor_docker.subprocess.Popen", return_value=proc), \
patch("core.executor_docker.time.monotonic", side_effect=[0, 100]):
with patch("core.executor_docker.subprocess.Popen", return_value=proc):
result = executor.call_tool("shell", {"command": "sleep 9999", "timeout": 1}, ctx)
self.assertIn("timed out after 1s", result.content)
self.assertEqual(result.exit_code, 124)
proc.kill.assert_called_once()
def test_shell_cancel(self):
def test_shell_cancel_inline_fastpath(self):
"""cancel_check 入口即 True → 不起 Popen,直接返 130(快路径)。"""
executor, _, _ = make_executor()
ctx = ExecCtx(
user_id=executor.user_id,
task_id=uuid4(),
working_dir=executor.working_dir,
cancel_check=lambda: True, # 立即 cancel
cancel_check=lambda: True,
)
import subprocess as real_subprocess
proc = MagicMock()
proc.communicate.side_effect = [
real_subprocess.TimeoutExpired(cmd="docker", timeout=0.5),
("", ""),
]
proc.returncode = -15
with patch("core.executor_docker.subprocess.Popen") as popen:
result = executor.call_tool("shell", {"command": "sleep 9999"}, ctx)
with patch("core.executor_docker.subprocess.Popen", return_value=proc):
self.assertIn("cancelled by user", result.content)
self.assertEqual(result.exit_code, 130)
popen.assert_not_called() # 入口快路径,Popen 没起
def test_shell_cancel_via_canceller_thread(self):
"""cancel 在 Popen 之后由侧线程触发 → kill → 主线程读 cancel_hit 返 130。"""
# cancel_check 第 1 次(入口检查)返 False,第 2 次起(侧线程 poll)返 True
cancel_calls = [0]
def check():
cancel_calls[0] += 1
return cancel_calls[0] > 1
executor, _, _ = make_executor()
ctx = ExecCtx(
user_id=executor.user_id,
task_id=uuid4(),
working_dir=executor.working_dir,
cancel_check=check,
)
proc = MagicMock()
kill_event = threading.Event()
def fake_communicate(input=None, timeout=None):
# 等侧线程 kill 后再返回(模拟"被 SIGKILL 后 pipe 关 → communicate 收尾")
kill_event.wait(5.0)
return ("", "")
proc.communicate.side_effect = fake_communicate
proc.kill.side_effect = lambda: kill_event.set()
proc.returncode = -9
with patch("core.executor_docker._CANCEL_POLL_INTERVAL_S", 0.02), \
patch("core.executor_docker.subprocess.Popen", return_value=proc):
result = executor.call_tool("shell", {"command": "sleep 9999"}, ctx)
self.assertIn("cancelled by user", result.content)
self.assertEqual(result.exit_code, 130)
proc.kill.assert_called_once()
def test_run_subprocess_collects_multi_chunk_output(self):
"""回归:bash sleep 间隔多次 echo 必须全部回到 stdout(不丢 chunk)。
历史 bug:`communicate(timeout=0.5)` poll loop + bash block-buffered stdout
chunk 输出在某些时序下静默丢失,LLM 拿到空 `[exit 0]`
本测用真子进程跑 `bash -c 'echo A; sleep 0.6; echo B; sleep 0.6; echo C'`,
断言 A/B/C 全在结果里Windows dev 环境跳过(bash + sleep 语义)
"""
if platform.system() == "Windows":
self.skipTest("bash + sleep semantics; runs on Linux CI/deploy")
executor, _, _ = make_executor()
ctx = make_ctx(executor)
argv = ["bash", "-c", "echo A; sleep 0.6; echo B; sleep 0.6; echo C"]
result = executor._run_subprocess(argv, timeout=10, ctx=ctx)
self.assertIn("A", result.content)
self.assertIn("B", result.content)
self.assertIn("C", result.content)
self.assertIn("[exit 0]", result.content)
self.assertEqual(result.exit_code, 0)
class TestRunPython(unittest.TestCase):
"""run_python:tmp .py 落 user_root/.zcbot_tmp/<task_id>/,跑完 unlink。"""
@ -362,13 +417,12 @@ class TestFsToolsInContainer(unittest.TestCase):
proc = MagicMock()
proc.communicate.side_effect = [
subprocess.TimeoutExpired(cmd="docker", timeout=0.5),
subprocess.TimeoutExpired(cmd="docker", timeout=30),
("", ""),
]
proc.returncode = -9
with patch("core.executor_docker.subprocess.Popen", return_value=proc), \
patch("core.executor_docker.time.monotonic", side_effect=[0, 1000]):
with patch("core.executor_docker.subprocess.Popen", return_value=proc):
result = executor.call_tool("glob", {"pattern": "**/*"}, ctx)
self.assertIn("timed out", result.content)