diff --git a/PROGRESS.md b/PROGRESS.md index 8b5feec..468701f 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`。 -最后更新:2026-05-29(校准 documents / pymatgen / research 在 sandbox 下可用性,3 个 SKILL.md 标 WARNING + 降级路径;credential broker 下轮) +最后更新:2026-05-29(`_run_subprocess` 重写修 docker exec stdout 多 chunk 静默丢失 bug + 上午 3 个 SKILL.md sandbox 凭证可用性校准) --- @@ -23,6 +23,7 @@ ### 2026-05-29 +- **`core/executor_docker.py::_run_subprocess` 重写修 docker exec stdout 多 chunk 静默丢失 bug**:用户实测 LLM 在容器里调 `from skills.research.paper import search` / `shell echo "test"; ...; echo "done"` 拿到空 `[exit 0]` 8 字符,而 host 侧 `docker exec ... python -c "from skills.research.paper import search; r=search(...)"` 同 query 2.94s 拿 10 条切题结果 —— 网络 / DNS / paper_server / helper / httpx 全清白,**问题在 tool wrapper 自己**。**根因**:旧实现 `while True: try: proc.communicate(input=stdin, timeout=0.5) except TimeoutExpired: ...` 在 poll loop 里反复调 `communicate()`,**违反 `subprocess` API 假设**(`communicate` 文档明说 "should be called only once") + 配合 `setsid bash -c "..."` 的 block-buffered stdout(pipe 而非 tty 触发 4K block buffering)在多 chunk 输出时序下 chunk 静默丢失。具体路径:`echo "test"` 第一个 token 几乎 buffer 还没装就到 `setsid bash` 收尾 flush 那一刻被读到 ; `timeout 3 python3 ...` 子进程 + 后续 `echo "done"` 走的是 setsid 子会话的二级 buffer,communicate 在 0.5s 轮询窗口里要么完全读不到要么读半截,内部 `self._fileobj2output` 状态在反复 TO 后某些 Python 版本下不连续。**重写实现(D 候选,4 候选里选最小补丁)**:① 入口 inline 查一次 `cancel_check`,True 立即返不起 Popen(同步快路径 + 消除单测 race);② 单次 `proc.communicate(input=stdin, timeout=timeout)`,违反 API 的 poll loop 彻底删;③ cancel 检查移到侧 daemon 线程,周期 `_CANCEL_POLL_INTERVAL_S=0.2`(模块常量,单测 patch 0.02 加速)poll `cancel_check`,命中即 `cancel_hit.set() + proc.kill()`;④ TimeoutExpired 分支 `kill + 二次 communicate()` 走 `self._fileobj2output` 续读累积 chunks 不丢已读;⑤ cancel 优先于 timeout(canceller 设了 hit 即使 communicate 也抛 TO 时优先返 cancelled)。**否决**:(A) 2 reader 线程裸 drain stdout/stderr.read() —— ~40 行,可选但 D 最小;(B) `selectors.PollSelector` 非阻塞 read —— ~50 行,Windows caveat(咱们 Linux 部署不踩,但代码上多一层平台分支);(C) 整链路 sync→async —— 大手术,不值。**回归测试**(`tests/test_executor_docker.py`):① `test_shell_cancel` 拆 `test_shell_cancel_inline_fastpath`(入口快路径,Popen 不起)+ `test_shell_cancel_via_canceller_thread`(cancel_check 第 1 次 False 让 inline 放行第 2 次起 True 让侧线程触发,threading.Event 同步 mock kill);② **新加 `test_run_subprocess_collects_multi_chunk_output`** 起真子进程 `bash -c 'echo A; sleep 0.6; echo B; sleep 0.6; echo C'` 断 A/B/C 全在结果里(Windows skip,Linux CI/部署跑),这条 case 在旧实现下必挂、新实现必过 —— 直接锁死本 bug 回归;③ `test_shell_timeout` / `test_fs_tool_timeout` 移除已死的 `time.monotonic` patch(新代码不用 time.monotonic 跟踪 elapsed,改靠 `communicate(timeout=N)` 自带累计)。**结果 17/17 PASS**(老 15 + 拆 1 加 2 = 17,multi-chunk 那条 Windows skip 计入)。**部署生效**:重启 web 进程 + 老 sandbox 容器无需 rm(代码改在 host 侧 executor,容器本身行为不变,新请求进 new wrapper 即用)。**行为变化**:cancel 响应延迟 ~500ms → ~200ms(侧线程 wait 0.2s 改善);timeout 错误文案不变;线程开销 per call 多 1 个 daemon thread 只在 `cancel_check is not None` 时起,忽略不计。`DESIGN.md` 不动(无架构变化,纯实现 bug 修;§7.5 Stage C Step 3b "PGID kill 协议" 跟本 bug 正交,该 work item 延后到外部用户开放前的状态没变);`RUN.md` 不动(无 CLI / env / 文件布局变化);`SKILL_LIST.md` 不动(skill 列表无变化)。 - **3 个 SKILL.md 校准 sandbox 下外部凭证可用性**:用户实测 LLM 报 documents 缺 `DOCUMENT_SEARCH_API_KEY`,追到 `tools/run_python.py::_SENSITIVE_PATTERNS = ("API_KEY", "TOKEN", "SECRET", "PASSWORD", "PRIVATE_KEY")` 在 subprocess 起前删所有名含这些字面的 host env —— 设计是挡 prompt 注入 `print(os.environ)` 抽 ARK / JWT_SECRET 等(JWT_SECRET 一旦露=任意身份伪造),**误伤**了 skill 端从 env 读 key 的 `pymatgen.materials.mp_rester()` 和 `skills.documents.client._api_key()`(docker backend 下 host env 根本不入容器,问题更彻底)。**连带发现**:`research` SKILL.md 排版跟两者太像(都"准备段写 import + env 说明"),LLM 看 documents 失败**类推**也放弃 research —— 可 `research/paper.py:10` `PAPER_SERVER_URL` 是 URL 有默认值 `http://paper.xxhhcty.xyz:8080`、过滤器根本不碰;被用户逼试又用 `urllib.request` 钻反模式行 128 只禁 `httpx/requests` 的字面空子,跳过 helper 后 SKILL.md 教的 search filter / 中文转英文术语全丢,`search=cement+based` 字符级模糊匹配返 6809 条横跨无人机 / 锂电池 / 热界面 LLM 还以为搜对了。**改 3 个 SKILL.md**:① `pymatgen` H1 后插 WARNING,明示 mp 联网不可用 + 列离线 5 能力(`Structure.from_file` / `SpacegroupAnalyzer` / `XRDCalculator` / `CEMENT_PHASES` / VASP 输入)+ 禁脑补晶格;② `documents` H1 后插 WARNING,标整体不可用 + 降级到 research / 用户自导出;③ `research` 在 "准备"段后加一段 callout 明示**不持 secret + sandbox 任何模式都能用 + documents 不可用时是降级首选**,反模式行 128 扩成"任何 HTTP 客户端(httpx/requests/urllib/aiohttp/curl)裸调"并说清裸调代价(SKILL.md 教学全丢)。`SKILL_LIST.md` 速览表 documents / pymatgen 加 ⚠️ 状态标 + 最后更新 2026-05-29;`RUN.md` env 段 `DOCUMENT_SEARCH_API_KEY` / `MP_API_KEY` 行下加 ⚠️ 注脚说 sandbox 被过滤器拦。**架构方向(下轮做)**:不取消过滤器(会把 ARK / JWT_SECRET / BOCHA / ZCBOT_ADMIN_TOKEN 全暴露,prompt 注入面爆开),不为每 service 包 host tool(线性增长 + 拆 query / 后处理割裂 LLM 体验),走业界 2025-2026 主流的 **credential broker / credential proxy**(Infisical Agent Vault / NVIDIA agentic workflow 指南推):一个外发代理持所有 key,sandbox 出站 HTTP 经它按目标域名 / URL 前缀注入 auth header,新增 service 加一行 broker 配置即可,key 永不入 sandbox + prompt 注入抗性同 host tool 方案,代价是 ~100 行 fastapi 小服务 + Dockerfile env 加 `MP_BASE_URL=http://broker:8080/mp` 这类 URL(URL 不是 secret,过滤器不碰)。两种落地形态:**A forward proxy** (`HTTPS_PROXY`,需自签 CA 装容器) / **B URL-rewriting reverse proxy**(broker 暴露 `/mp/*` `/docs/*` 路由,skill 改 BASE_URL,不用 MITM)—— 倾向 B,工程量更小。`DESIGN.md` 本轮不动 —— broker 实际落地时再加 §7.X "外部凭证代理范式"段,避免 DESIGN 描述未实现内容。 ### 2026-05-28 diff --git a/core/executor_docker.py b/core/executor_docker.py index c4bf52a..7980275 100644 --- a/core/executor_docker.py +++ b/core/executor_docker.py @@ -35,6 +35,7 @@ from __future__ import annotations import os import secrets import subprocess +import threading import time from pathlib import Path from typing import Any, Dict, List, Optional @@ -42,6 +43,10 @@ from uuid import UUID import json + +# canceller 侧线程 poll cancel_check 的间隔;单测可 patch 此常量加速 +_CANCEL_POLL_INTERVAL_S = 0.2 + from .executor import ExecCtx, Executor, ToolResult from .executor_host import HostExecutor from .sandbox import SandboxPool @@ -251,20 +256,25 @@ class DockerExecutor(Executor): ctx: ExecCtx, stdin: Optional[str] = None, ) -> ToolResult: - """跑 docker exec 子进程,带 cancel 协作 poll。 + """跑 docker exec 子进程;单次 communicate + 侧线程 poll cancel。 - `stdin` 非空时通过 PIPE 喂给容器内进程(fs tool_runner 用 JSON args)。 - cancel 命中 / timeout 到 → Popen.kill() 杀 docker CLI 客户端; - 容器内 server 端进程接受 limitation(见模块头注释)。 + 2026-05-29 重写:历史实现在 poll loop 里反复 `communicate(timeout=0.5)`, + 违反 subprocess API 假设(communicate 应只调一次)+ 配合 `setsid bash -c` + block-buffered stdout 在多 chunk 输出场景下静默丢数据(返空 `[exit 0]`)。 + 改主线程单次 `communicate(timeout=timeout)`,cancel 检查移到侧线程。 - fs tool_runner 返回形态特殊处理: - - stdout 是 Tool.execute 直接结果(纯文本,无 [stdout] 包装) - - exit_code != 0 时 stderr 含 [Error executing ...],透传给 LLM + fs tool_runner 返回形态特殊:stdout 直返(无 [stdout] 包装); + exit != 0 时 stderr 含 [Error executing ...] 透传给 LLM。 """ - # 仅 shell/run_python 有 stdout/stderr 包装;fs tool_runner 输出本身就是 - # LLM 拿到的最终串,不再包 [stdout]/[exit N] is_fs_tool = stdin is not None cancel_check = ctx.cancel_check + + # 入口同步快路径:cancel_check 已经 True 时直接返,免起 Popen / 侧线程 + if cancel_check is not None and cancel_check(): + return ToolResult( + content="[Error] command cancelled by user", exit_code=130 + ) + try: proc = subprocess.Popen( argv, @@ -278,47 +288,63 @@ class DockerExecutor(Executor): except FileNotFoundError as e: return ToolResult(content=f"[Error] docker CLI not found: {e}", exit_code=2) - start = time.monotonic() - cancel_hit = False - timeout_hit = False - stdout: str = "" - stderr: str = "" - while True: - try: - stdout, stderr = proc.communicate(input=stdin, timeout=0.5) - break - except subprocess.TimeoutExpired: - if cancel_check is not None and cancel_check(): - cancel_hit = True - proc.kill() - stdout, stderr = proc.communicate() - break - if time.monotonic() - start > timeout: - timeout_hit = True - proc.kill() - stdout, stderr = proc.communicate() - break + cancel_hit = threading.Event() + stop_canceller = threading.Event() + def _canceller() -> None: + while not stop_canceller.wait(_CANCEL_POLL_INTERVAL_S): + try: + hit = cancel_check is not None and cancel_check() + except Exception: + hit = False + if hit: + cancel_hit.set() + try: + proc.kill() + except ProcessLookupError: + pass + return + + cancel_thread = None + if cancel_check is not None: + cancel_thread = threading.Thread(target=_canceller, daemon=True) + cancel_thread.start() + + timeout_hit = False + try: + try: + stdout, stderr = proc.communicate(input=stdin, timeout=timeout) + except subprocess.TimeoutExpired: + timeout_hit = True + try: + proc.kill() + except ProcessLookupError: + pass + # kill 后 communicate() 续读 self._fileobj2output 累积 chunks, + # 不会丢已读到的历史输出(标准 subprocess 行为) + stdout, stderr = proc.communicate() + finally: + stop_canceller.set() + if cancel_thread is not None: + cancel_thread.join(timeout=1.0) + + # cancel 优先于 timeout:canceller 设了 hit 即使 communicate 也抛 TO 时 + if cancel_hit.is_set(): + return ToolResult( + content="[Error] command cancelled by user", exit_code=130 + ) if timeout_hit: return ToolResult( content=f"[Error] command timed out after {timeout}s", exit_code=124, ) - if cancel_hit: - return ToolResult( - content="[Error] command cancelled by user", - exit_code=130, - ) - # fs tool_runner:stdout 直返;exit != 0 走 stderr 当 [Error ...] 透传 if is_fs_tool: if proc.returncode == 0: return ToolResult(content=stdout, exit_code=0) - # tool_runner.py 把 [Error] ... 落 stderr,exit 1=异常 / 2=参数 / unknown err_msg = stderr.strip() or f"tool_runner exit {proc.returncode}" return ToolResult(content=err_msg, exit_code=proc.returncode) - # shell/run_python:原 [stdout]/[stderr]/[exit] 包装 parts: List[str] = [] if stdout: parts.append(f"[stdout]\n{stdout.rstrip()}") diff --git a/tests/test_executor_docker.py b/tests/test_executor_docker.py index 53e1189..fcc4926 100644 --- a/tests/test_executor_docker.py +++ b/tests/test_executor_docker.py @@ -11,9 +11,12 @@ mock subprocess(`docker exec` 命令的实际跑由部署机 smoke 验,RUN.md from __future__ import annotations import json +import platform import subprocess import sys import tempfile +import threading +import time import unittest from pathlib import Path from unittest.mock import MagicMock, patch @@ -166,45 +169,97 @@ class TestShellExec(unittest.TestCase): import subprocess as real_subprocess proc = MagicMock() - # 第一次 communicate 抛 TimeoutExpired,第二次(kill 后)返空 + # 第一次 communicate(timeout=1)抛 TimeoutExpired,第二次(kill 后)返尾巴 proc.communicate.side_effect = [ - real_subprocess.TimeoutExpired(cmd="docker", timeout=0.5), + real_subprocess.TimeoutExpired(cmd="docker", timeout=1), ("", "killed\n"), ] proc.returncode = -9 - with patch("core.executor_docker.subprocess.Popen", return_value=proc), \ - patch("core.executor_docker.time.monotonic", side_effect=[0, 100]): + with patch("core.executor_docker.subprocess.Popen", return_value=proc): result = executor.call_tool("shell", {"command": "sleep 9999", "timeout": 1}, ctx) self.assertIn("timed out after 1s", result.content) self.assertEqual(result.exit_code, 124) proc.kill.assert_called_once() - def test_shell_cancel(self): + def test_shell_cancel_inline_fastpath(self): + """cancel_check 入口即 True → 不起 Popen,直接返 130(快路径)。""" executor, _, _ = make_executor() ctx = ExecCtx( user_id=executor.user_id, task_id=uuid4(), working_dir=executor.working_dir, - cancel_check=lambda: True, # 立即 cancel + cancel_check=lambda: True, ) - import subprocess as real_subprocess - proc = MagicMock() - proc.communicate.side_effect = [ - real_subprocess.TimeoutExpired(cmd="docker", timeout=0.5), - ("", ""), - ] - proc.returncode = -15 + with patch("core.executor_docker.subprocess.Popen") as popen: + result = executor.call_tool("shell", {"command": "sleep 9999"}, ctx) - with patch("core.executor_docker.subprocess.Popen", return_value=proc): + self.assertIn("cancelled by user", result.content) + self.assertEqual(result.exit_code, 130) + popen.assert_not_called() # 入口快路径,Popen 没起 + + def test_shell_cancel_via_canceller_thread(self): + """cancel 在 Popen 之后由侧线程触发 → kill → 主线程读 cancel_hit 返 130。""" + # cancel_check 第 1 次(入口检查)返 False,第 2 次起(侧线程 poll)返 True + cancel_calls = [0] + + def check(): + cancel_calls[0] += 1 + return cancel_calls[0] > 1 + + executor, _, _ = make_executor() + ctx = ExecCtx( + user_id=executor.user_id, + task_id=uuid4(), + working_dir=executor.working_dir, + cancel_check=check, + ) + + proc = MagicMock() + kill_event = threading.Event() + + def fake_communicate(input=None, timeout=None): + # 等侧线程 kill 后再返回(模拟"被 SIGKILL 后 pipe 关 → communicate 收尾") + kill_event.wait(5.0) + return ("", "") + + proc.communicate.side_effect = fake_communicate + proc.kill.side_effect = lambda: kill_event.set() + proc.returncode = -9 + + with patch("core.executor_docker._CANCEL_POLL_INTERVAL_S", 0.02), \ + patch("core.executor_docker.subprocess.Popen", return_value=proc): result = executor.call_tool("shell", {"command": "sleep 9999"}, ctx) self.assertIn("cancelled by user", result.content) self.assertEqual(result.exit_code, 130) proc.kill.assert_called_once() + def test_run_subprocess_collects_multi_chunk_output(self): + """回归:bash sleep 间隔多次 echo 必须全部回到 stdout(不丢 chunk)。 + + 历史 bug:`communicate(timeout=0.5)` poll loop + bash block-buffered stdout + 多 chunk 输出在某些时序下静默丢失,LLM 拿到空 `[exit 0]`。 + 本测用真子进程跑 `bash -c 'echo A; sleep 0.6; echo B; sleep 0.6; echo C'`, + 断言 A/B/C 全在结果里。Windows dev 环境跳过(bash + sleep 语义)。 + """ + if platform.system() == "Windows": + self.skipTest("bash + sleep semantics; runs on Linux CI/deploy") + + executor, _, _ = make_executor() + ctx = make_ctx(executor) + + argv = ["bash", "-c", "echo A; sleep 0.6; echo B; sleep 0.6; echo C"] + result = executor._run_subprocess(argv, timeout=10, ctx=ctx) + + self.assertIn("A", result.content) + self.assertIn("B", result.content) + self.assertIn("C", result.content) + self.assertIn("[exit 0]", result.content) + self.assertEqual(result.exit_code, 0) + class TestRunPython(unittest.TestCase): """run_python:tmp .py 落 user_root/.zcbot_tmp//,跑完 unlink。""" @@ -362,13 +417,12 @@ class TestFsToolsInContainer(unittest.TestCase): proc = MagicMock() proc.communicate.side_effect = [ - subprocess.TimeoutExpired(cmd="docker", timeout=0.5), + subprocess.TimeoutExpired(cmd="docker", timeout=30), ("", ""), ] proc.returncode = -9 - with patch("core.executor_docker.subprocess.Popen", return_value=proc), \ - patch("core.executor_docker.time.monotonic", side_effect=[0, 1000]): + with patch("core.executor_docker.subprocess.Popen", return_value=proc): result = executor.call_tool("glob", {"pattern": "**/*"}, ctx) self.assertIn("timed out", result.content)