fix(wechat,email): host-side 文件工具翻译容器路径,修复附件发不出 + bump 0.24.1

docker 模式下 fs 工具在容器跑,文件落宿主 users/<uid>/<wd>/,但 send_email /
wechat_push 是宿主进程工具:base_dir=cwd 且不识别容器↔宿主路径映射,agent 给的
相对路径拼到 cwd、容器绝对路径 /workspace/... 宿主上瞎解析,relative_to(user_root)
必越界 → 附件永远发不出(probe 直调 send_file 绕过解析,故"测试可发")。

- tools/base.py: 共享 _resolve_user_file(/workspace 前缀翻回 user_root + 相对拼
  base_dir + 越界校验)+ FileOutOfBounds
- agent_builder: 两个 host 工具 base_dir=working_dir_path(宿主 task 目录)而非 cwd
- send_email / wechat_bot: 改用 helper
- tests: 加 3 例回归(翻译+越界、send_email 容器路径、wechat_push 相对路径)
- scripts/diag_wechat_push.py: 诊断脚本

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-06-24 14:02:48 +08:00
parent 193b545b75
commit 6008e1b8a0
8 changed files with 235 additions and 18 deletions

View File

@ -21,6 +21,12 @@
## 已完成关键能力
### 2026-06-24 / 修复 host-side 文件工具发不出附件(docker 容器路径未翻译,bump 0.24.1)
- 根因:生产 docker 模式下,fs 工具在容器里跑(文件落容器卷=宿主 `users/<uid>/<wd>/`),但 `send_email` / `wechat_push` 是**宿主进程**工具;它们 `base_dir=Path.cwd()`(部署根)且不识别容器↔宿主路径映射 → agent 给的相对路径拼到 cwd、容器绝对路径 `/workspace/...` 宿主上瞎解析,`relative_to(user_root)` 必越界 → 附件永远发不出(微信 DB 实锤 `#7` 相对 + `#15` 容器绝对两条都「文件路径越界」)。probe 脚本能发是因直接调 `send_file` 绕过解析。
- 修复:`tools/base.py` 加共享 `_resolve_user_file`(`/workspace` 前缀翻回 `user_root` + 相对拼 `base_dir` + 越界校验,抽 `FileOutOfBounds`);`agent_builder` 给两个 host 工具传 `base_dir=working_dir_path`(宿主 task 目录)而非 cwd;`send_email`/`wechat_bot` 改用 helper。host 模式同样受益(相对路径之前也错)。
- 测试:`tests/test_secret_host_tools.py` 加 3 例(helper 翻译+越界、send_email 容器路径附件、wechat_push 相对路径);诊断脚本 `scripts/diag_wechat_push.py`
### 2026-06-24 / 企业微信渠道 B:纯推送 + OAuth 扫码绑定(bump 0.24.0)
- 决策:**企业微信只做推送、不做对话**(用户拍板"和邮箱似的")——省掉入站回调 + AES + 5s ACK + agent 回推一整套;要对话走 ClawBot。企业微信的**无条件主动推**(不挑活跃度、无 24h 窗口)正补 ClawBot 短板,定时简报必达首选。

View File

@ -1,3 +1,3 @@
# zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。
# 改版本只动这一行。
__version__ = "0.24.0"
__version__ = "0.24.1"

View File

@ -565,14 +565,19 @@ def build_agent(
# 发邮件(§8.5 投递):仅当 SMTP_* env 齐了才挂(沿用"有 key 才注册",没配的
# 部署里 agent 看不到一个永远报错的工具)。定时与交互 run 都可用。
# base_dir 用 working_dir_path(该 task 的**宿主**工作目录绝对路径),不是 tool_base(cwd)。
# send_email 在宿主进程读附件文件,docker 下 agent 给的相对路径相对容器 workdir=task_dir,
# 翻回宿主即 working_dir_path;tool 内 _resolve_user_file 再处理 /workspace 容器绝对路径。
if smtp_configured():
se = SendEmailTool(base_dir=tool_base, user_root=ur_path)
se = SendEmailTool(base_dir=working_dir_path, user_root=ur_path)
tools[se.name] = se
# 微信主动推送(§8.7 渠道抽象):仅当微信渠道开关在才挂(沿用"有开关才注册")。
# 交互与定时 run 都可用(定时简报可主动推回用户微信,24h 窗口内)。user_id ctor 注入。
# base_dir 同 send_email:用 working_dir_path(宿主 task 目录),wechat_push 在宿主进程
# 读待发文件,需把 agent 给的相对/容器路径翻回宿主(详 _resolve_user_file)。
if wechat_push_available():
wp = WechatPushTool(uid, base_dir=tool_base, user_root=ur_path)
wp = WechatPushTool(uid, base_dir=working_dir_path, user_root=ur_path)
tools[wp.name] = wp
if caps.enable_run_python:

View File

@ -0,0 +1,87 @@
"""诊断微信对话里 wechat_push 发文件失败:dump 绑定状态 + 微信 task 里 wechat_push 工具调用与返回。
ASCII 标签(Windows GBK 安全)用法:.venv/Scripts/python.exe scripts/diag_wechat_push.py [email]
"""
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
env = Path(__file__).resolve().parent.parent / ".env"
for line in env.read_text(encoding="utf-8").splitlines():
if line.strip().startswith("ZCBOT_DB_URL="):
os.environ["ZCBOT_DB_URL"] = line.split("=", 1)[1].strip()
from sqlalchemy import create_engine, text # noqa: E402
import builtins # noqa: E402
_out = open(Path(__file__).resolve().parent / "_wechat_push_dump.txt", "w", encoding="utf-8")
def print(*a, **k): # noqa: A001
builtins.print(*a, **k, file=_out)
engine = create_engine(os.environ["ZCBOT_DB_URL"])
email = sys.argv[1] if len(sys.argv) > 1 else "caoqianming@foxmail.com"
def s(x, n=2000):
t = str(x or "")
return t if len(t) <= n else t[:n] + f"...[+{len(t)-n}]"
with engine.connect() as conn:
row = conn.execute(text("select user_id from users where email=:e"), {"e": email}).fetchone()
if not row:
print("[NO USER]", email); sys.exit(1)
uid = row[0]
print("[USER]", uid)
b = conn.execute(text(
"select user_im_id, base_url, status, context_token_at, "
"(latest_context_token is not null) as has_ctx, chat_task_id "
"from wechat_bot_bindings where user_id=:u"), {"u": uid}).fetchone()
if not b:
print("[NO BINDING]"); sys.exit(1)
print("[BINDING] status=%s user_im_id=%s has_ctx=%s ctx_at=%s base=%s" % (
b.status, b.user_im_id, b.has_ctx, b.context_token_at, b.base_url))
print("[BINDING] chat_task_id=%s" % b.chat_task_id)
if b.context_token_at:
at = b.context_token_at
if at.tzinfo is None:
at = at.replace(tzinfo=timezone.utc)
age = datetime.now(timezone.utc) - at
print("[BINDING] ctx age = %s (fresh if <24h)" % age)
tid = b.chat_task_id
if not tid:
print("[NO CHAT TASK]"); sys.exit(0)
# dump messages, focus on wechat_push tool calls/results
rows = conn.execute(text(
"select idx, payload from messages where task_id=:t order by idx desc limit 60"),
{"t": tid}).fetchall()
print("\n[MESSAGES] last %d (newest first):" % len(rows))
for idx, payload in rows:
if isinstance(payload, str):
try:
payload = json.loads(payload)
except Exception:
pass
if not isinstance(payload, dict):
continue
role = payload.get("role")
# assistant tool_calls
tcs = payload.get("tool_calls") or []
for tc in tcs:
fn = (tc.get("function") or {})
if fn.get("name") == "wechat_push":
print(" #%s [CALL wechat_push] args=%s" % (idx, s(fn.get("arguments"), 800)))
# tool result
if role == "tool":
name = payload.get("name", "")
content = payload.get("content")
if name == "wechat_push" or "微信" in s(content, 200) or "wechat" in s(name):
print(" #%s [TOOL RESULT %s] %s" % (idx, name, s(content, 800)))

View File

@ -165,5 +165,91 @@ class TestMaterialsProjectHostTools(unittest.TestCase):
self.assertEqual(captured["chunk_size"], 2)
class TestHostFileToolPathResolution(unittest.TestCase):
"""send_email / wechat_push 在宿主进程读附件:agent 给的相对/容器 `/workspace` 路径
须翻回宿主 task 目录,越界仍挡回归 docker 模式下附件路径越界发不出文件的 bug"""
def _mk(self):
tmp = tempfile.TemporaryDirectory()
root = Path(tmp.name).resolve()
user_root = root / "users" / "uid"
wd = user_root / "wechat-abc" # 宿主 task 目录(= 容器 /workspace/wechat-abc)
wd.mkdir(parents=True)
(wd / "report.txt").write_text("x", encoding="utf-8")
return tmp, user_root, wd
def test_resolve_user_file_translates_and_bounds(self):
from tools.base import FileOutOfBounds, Tool
class _T(Tool):
def execute(self, **k):
return ""
tmp, user_root, wd = self._mk()
try:
t = _T(base_dir=wd, user_root=user_root)
# 相对路径 → 宿主 task 目录(原 bug:拼到 cwd 判越界)
self.assertEqual(t._resolve_user_file("report.txt"), (wd / "report.txt").resolve())
# 容器绝对路径 /workspace/... → 翻回 user_root 下(原 bug:宿主上解析判越界)
self.assertEqual(
t._resolve_user_file("/workspace/wechat-abc/report.txt"),
(wd / "report.txt").resolve(),
)
# 越界仍挡
for bad in ("../../../etc/passwd", "/workspace/../../etc/passwd"):
with self.assertRaises(FileOutOfBounds):
t._resolve_user_file(bad)
finally:
tmp.cleanup()
def test_send_email_attaches_container_path(self):
from tools.send_email import SendEmailTool
tmp, user_root, wd = self._mk()
try:
tool = SendEmailTool(base_dir=wd, user_root=user_root)
captured = {}
def fake_send(to, subject, body, attachments=None, **k):
captured["attachments"] = list(attachments or [])
with patch("tools.send_email.smtp_configured", return_value=True), patch(
"tools.send_email.send_email_smtp", side_effect=fake_send
):
out = tool.execute(
to=["a@b.com"], subject="s", body="b",
attachments=["/workspace/wechat-abc/report.txt"],
)
self.assertIn("[ok]", out)
self.assertIn("含 1 个附件", out)
self.assertEqual(captured["attachments"], [(wd / "report.txt").resolve()])
finally:
tmp.cleanup()
def test_wechat_push_resolves_relative_file(self):
from tools.wechat_bot import WechatPushTool
from uuid import uuid4
tmp, user_root, wd = self._mk()
try:
tool = WechatPushTool(uuid4(), base_dir=wd, user_root=user_root)
captured = {}
class _Report:
delivered = True
results = []
def fake_send(uid, text, fpath):
captured["fpath"] = fpath
return _Report()
with patch("core.wechat.service.send_to_user", side_effect=fake_send):
out = tool.execute(text="给你文件", file="report.txt")
self.assertIn("[ok]", out)
self.assertEqual(captured["fpath"], str((wd / "report.txt").resolve()))
finally:
tmp.cleanup()
if __name__ == "__main__":
unittest.main()

View File

@ -5,6 +5,16 @@ from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional
# docker 沙箱把 user_root(`<workspace>/users/<uid>`)bind 到容器内 `/workspace`
# (agent_builder _build_system_prompt / executor_docker)。host-side 文件工具
# (send_email / wechat_push)在宿主进程跑,agent 给的可能是容器绝对路径
# `/workspace/<wd>/x`,需翻回宿主 `user_root/<wd>/x`。
_CONTAINER_ROOT = "/workspace"
class FileOutOfBounds(Exception):
"""agent 给的文件路径解析后落在 user_root 之外(防 `../` 读到别人/系统文件)。"""
def compact_tool_output(
text: str,
@ -58,6 +68,32 @@ class Tool(ABC):
p = Path(path)
return p if p.is_absolute() else (self.base_dir / p)
def _resolve_user_file(self, raw: str) -> Path:
"""把 agent 给的路径解析成**宿主**绝对 Path,并强制落在 user_root 内。
host-side 文件工具(send_email / wechat_push)专用处理三种入参:
- 容器绝对路径 `/workspace/<wd>/x` 翻回宿主 `user_root/<wd>/x`(docker bind);
- 相对路径 `x` / `<wd>/x` 拼到 `base_dir`(应为该 task 的宿主工作目录);
- 宿主绝对路径 原样
解析后越界(不在 user_root ) `FileOutOfBounds(raw)`不校验存在性,
调用方按需 `.is_file()`host 模式 agent 不会产出 `/workspace/` 前缀,翻译分支天然不触发
"""
raw = (raw or "").strip()
if self.user_root is not None and (
raw == _CONTAINER_ROOT or raw.startswith(_CONTAINER_ROOT + "/")
):
rest = raw[len(_CONTAINER_ROOT):].lstrip("/")
p = self.user_root / rest
else:
p = self._resolve(raw)
p = p.resolve()
if self.user_root is not None:
try:
p.relative_to(self.user_root.resolve())
except ValueError:
raise FileOutOfBounds(raw)
return p
def _display(self, p: Path) -> str:
"""对外渲染路径:在 user_root 内 → POSIX 相对串;否则原绝对。"""
if self.user_root is not None:

View File

@ -20,7 +20,7 @@ from email.utils import formataddr
from pathlib import Path
from typing import Iterable, Optional
from .base import Tool
from .base import FileOutOfBounds, Tool
_MAX_ATTACH_BYTES = 20 * 1024 * 1024 # 单封附件总上限,防把大产物塞爆 SMTP
_MAX_RECIPIENTS = 10
@ -152,13 +152,11 @@ class SendEmailTool(Tool):
for raw in attachments or []:
if not isinstance(raw, str) or not raw.strip():
continue
p = self._resolve(raw.strip()).resolve()
# 附件强制落 user_root 内,防 ../ 读到别人/系统文件
if self.user_root is not None:
try:
p.relative_to(self.user_root.resolve())
except ValueError:
return f"[Error] 附件路径越界(必须在工作目录内): {raw}"
# host-side 解析:容器 /workspace 路径翻回宿主 + 强制落 user_root 内(防越界)
try:
p = self._resolve_user_file(raw.strip())
except FileOutOfBounds:
return f"[Error] 附件路径越界(必须在工作目录内): {raw}"
if not p.is_file():
return f"[Error] 附件不存在: {raw}"
resolved.append(p)

View File

@ -14,7 +14,7 @@ from typing import Optional
from uuid import UUID
from core.wechat import service
from .base import Tool
from .base import FileOutOfBounds, Tool
def wechat_push_available() -> bool:
@ -61,12 +61,11 @@ class WechatPushTool(Tool):
fpath: Optional[str] = None
if file and file.strip():
p = self._resolve(file.strip()).resolve()
if self.user_root is not None:
try:
p.relative_to(self.user_root.resolve())
except ValueError:
return f"[Error] 文件路径越界(必须在工作目录内): {file}"
# host-side 解析:容器 /workspace 路径翻回宿主 + 强制落 user_root 内(防越界)
try:
p = self._resolve_user_file(file.strip())
except FileOutOfBounds:
return f"[Error] 文件路径越界(必须在工作目录内): {file}"
if not p.is_file():
return f"[Error] 文件不存在: {file}"
fpath = str(p)