313 lines
14 KiB
Python
313 lines
14 KiB
Python
"""Per-user sandbox 容器池(DESIGN §7.5)。
|
|
|
|
命名:`zcbot-sandbox-<user_id>`(user_id = UUID 标准串带 dash,与 bind mount
|
|
源路径 `<workspace>/users/<user_id>/` 对齐 ── `docker ps` 看到容器名能直接 grep
|
|
出 workspace 目录)。
|
|
|
|
生命周期:
|
|
- `ensure(user_id)`:per-user `threading.Lock` 串行化 → `docker inspect` 探测 →
|
|
已 running 直接返;exists-but-stopped 先 `rm -f` 重起(保证 iptables 重新 apply);
|
|
不存在 `docker run`
|
|
- `mark_active(user_id)`:exec 完更新 in-memory `_last_active[uid]=now`(docker labels
|
|
不可运行时修改 ── Docker 23+ 移除 `docker update --label-add` 支持)
|
|
- `reap_idle()`:周期任务,扫 `_last_active` dict,>`idle_ttl` 的 `docker rm -f`
|
|
- `shutdown_all()`:app 启动时清前驱孤儿(`docker ps --filter label=zcbot.product=sandbox`)
|
|
|
|
API 全同步 —— ensure 主要使用方是 AgentLoop / DockerExecutor,跑在 web BG 线程内
|
|
天然同步;reaper 跑在 uvicorn 主 loop 里,通过 `run_in_executor` 包一层调本类 sync 方法。
|
|
threading.Lock 跨线程有效,asyncio.Lock 会被 ephemeral loop 创建 / 销毁绕过保护。
|
|
|
|
幂等性:
|
|
- ensure 在重复调用时跨 daemon round-trip < 100ms(纯 `docker inspect`);per-user lock
|
|
防同 user 两并发 `docker run --name` 撞 "Conflict"(虽然 docker 本身会 reject,提前
|
|
锁更干净)
|
|
- reaper 只杀 dict 里有记录的容器 ── 重启后 dict 空 → 不杀历史孤儿(这条由 startup
|
|
`shutdown_all` 兜底)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
from uuid import UUID
|
|
|
|
from .network import NETWORK_NAME, ensure_network
|
|
|
|
|
|
CONTAINER_NAME_PREFIX = "zcbot-sandbox-"
|
|
LABEL_PRODUCT_KEY = "zcbot.product"
|
|
LABEL_PRODUCT_VALUE = "sandbox"
|
|
LABEL_USER_ID_KEY = "zcbot.user_id"
|
|
|
|
DEFAULT_IMAGE = "zcbot-sandbox:latest"
|
|
DEFAULT_IDLE_TTL_SECONDS = 300
|
|
|
|
# 容器资源限制默认值(可被 yaml `sandbox.*` / env override,详 SandboxPool ctor)
|
|
DEFAULT_MEMORY = "2g"
|
|
DEFAULT_CPUS = "1.0"
|
|
DEFAULT_PIDS_LIMIT = 256
|
|
|
|
|
|
def container_name(user_id: UUID) -> str:
|
|
return f"{CONTAINER_NAME_PREFIX}{user_id}"
|
|
|
|
|
|
def _now() -> int:
|
|
return int(time.time())
|
|
|
|
|
|
def _container_exists(name: str) -> bool:
|
|
"""任何 state(running / exited / created)都算存在。"""
|
|
r = subprocess.run(
|
|
["docker", "inspect", "--type=container", name],
|
|
capture_output=True, text=True,
|
|
)
|
|
return r.returncode == 0
|
|
|
|
|
|
def _container_running(name: str) -> bool:
|
|
r = subprocess.run(
|
|
["docker", "inspect", "--type=container",
|
|
"--format={{.State.Running}}", name],
|
|
capture_output=True, text=True,
|
|
)
|
|
return r.returncode == 0 and r.stdout.strip() == "true"
|
|
|
|
|
|
class SandboxPool:
|
|
def __init__(
|
|
self,
|
|
user_root_base: Path,
|
|
repo_root: Optional[Path] = None,
|
|
image: Optional[str] = None,
|
|
runtime: Optional[str] = None,
|
|
idle_ttl: Optional[int] = None,
|
|
pg_ips: Optional[str] = None,
|
|
memory: Optional[str] = None,
|
|
cpus: Optional[str] = None,
|
|
pids_limit: Optional[int] = None,
|
|
dns: Optional[List[str]] = None,
|
|
) -> None:
|
|
"""
|
|
user_root_base: per-user 子树父目录,典型 `<workspace>/users`。bind mount 源
|
|
= `user_root_base / <user_id>`,目标 `/workspace`。
|
|
repo_root: zcbot repo 根(`core/paths.py::ROOT`)。**fs 工具进容器后**
|
|
(read/write/edit/glob/grep)`/sandbox/skills:ro` mount 让
|
|
容器内 read SKILL 内部 references 的 path 能解析(skill
|
|
在 host 上是 repo 内代码,容器 user_root 是用户文件,两者
|
|
正交)。None → 不挂 skills,只走 user_root 边界。
|
|
image: sandbox 镜像 tag(默 env `ZCBOT_SANDBOX_IMAGE`)
|
|
runtime: `docker run --runtime` 值(runc / runsc / kata 等);空 = 默认
|
|
(env `ZCBOT_SANDBOX_RUNTIME`)。§7.5 #5 / §7.9 升级表 ── 切
|
|
gVisor / Firecracker 时改这一项即可,应用层零改动。
|
|
idle_ttl: 秒;`mark_active` 时间戳 < now - ttl 的容器被 reap_idle 杀
|
|
(env `ZCBOT_SANDBOX_IDLE_TTL`,默 300)
|
|
pg_ips: 逗号分隔的 PG IP 串,塞容器 `ZCBOT_PG_IPS` env,init.sh 加 DROP 规则
|
|
(env `ZCBOT_PG_IPS`)。defense-in-depth ── 即便落内网三段。
|
|
memory/cpus/pids_limit:
|
|
容器资源限制,默 2g/1.0/256;env(`ZCBOT_SANDBOX_MEMORY` 等)
|
|
override caller 参数 override 默认。改后重启 web 生效,新起的
|
|
容器用新值;已 running 不变(idle 5min 回收后下次起按新值)。
|
|
"""
|
|
self.user_root_base = user_root_base
|
|
self.repo_root = repo_root
|
|
self.image = image or os.getenv("ZCBOT_SANDBOX_IMAGE", DEFAULT_IMAGE)
|
|
self.runtime = runtime or os.getenv("ZCBOT_SANDBOX_RUNTIME") or ""
|
|
self.idle_ttl = idle_ttl if idle_ttl is not None else int(
|
|
os.getenv("ZCBOT_SANDBOX_IDLE_TTL", str(DEFAULT_IDLE_TTL_SECONDS))
|
|
)
|
|
self.pg_ips = pg_ips if pg_ips is not None else os.getenv("ZCBOT_PG_IPS", "")
|
|
# 资源限制:env > caller > 默
|
|
self.memory = os.getenv("ZCBOT_SANDBOX_MEMORY") or memory or DEFAULT_MEMORY
|
|
self.cpus = os.getenv("ZCBOT_SANDBOX_CPUS") or cpus or DEFAULT_CPUS
|
|
self.pids_limit = int(
|
|
os.getenv("ZCBOT_SANDBOX_PIDS_LIMIT")
|
|
or (pids_limit if pids_limit is not None else DEFAULT_PIDS_LIMIT)
|
|
)
|
|
# DNS:env(逗号分隔)> caller > 默空(让 docker 自己探测)
|
|
env_dns = os.getenv("ZCBOT_SANDBOX_DNS", "").strip()
|
|
if env_dns:
|
|
self.dns: List[str] = [x.strip() for x in env_dns.split(",") if x.strip()]
|
|
else:
|
|
self.dns = list(dns) if dns else []
|
|
self._dict_lock = threading.Lock() # 保护 _locks / _last_active 的字典级 race
|
|
self._locks: Dict[UUID, threading.Lock] = {}
|
|
self._last_active: Dict[UUID, int] = {}
|
|
|
|
def _lock_for(self, user_id: UUID) -> threading.Lock:
|
|
with self._dict_lock:
|
|
if user_id not in self._locks:
|
|
self._locks[user_id] = threading.Lock()
|
|
return self._locks[user_id]
|
|
|
|
def ensure(self, user_id: UUID) -> str:
|
|
"""返回容器名;create-or-reuse 原子。同步阻塞,主调方 AgentLoop 已在 BG 线程。"""
|
|
with self._lock_for(user_id):
|
|
name = container_name(user_id)
|
|
if _container_running(name):
|
|
self._last_active[user_id] = _now()
|
|
return name
|
|
if _container_exists(name):
|
|
# stopped / crashed ── rm 重起。iptables 规则随容器生命周期重新 apply。
|
|
subprocess.run(
|
|
["docker", "rm", "-f", name],
|
|
capture_output=True, check=False,
|
|
)
|
|
self._docker_run(user_id, name)
|
|
self._last_active[user_id] = _now()
|
|
return name
|
|
|
|
def _ensure_resolv_conf_file(self) -> Optional[Path]:
|
|
"""准备 host 侧 resolv.conf 用于 bind mount 覆盖容器内 ro mount。
|
|
|
|
docker daemon 默 mount /etc/resolv.conf 是 ro,且 init.sh 在 `--read-only`
|
|
rootfs 下写不进 ── 改用 host 侧准备一份 + `-v <host>:/etc/resolv.conf:ro`
|
|
覆盖,绕开 embedded DNS。无 dns 配置返 None,不 mount(走 docker 默 embedded DNS)。
|
|
"""
|
|
if not self.dns:
|
|
return None
|
|
sandbox_dir = self.user_root_base.parent / ".sandbox"
|
|
try:
|
|
sandbox_dir.mkdir(parents=True, exist_ok=True)
|
|
except OSError:
|
|
return None
|
|
resolv = sandbox_dir / "resolv.conf"
|
|
content = "".join(f"nameserver {ip}\n" for ip in self.dns if ip)
|
|
try:
|
|
resolv.write_text(content, encoding="utf-8")
|
|
except OSError:
|
|
return None
|
|
return resolv
|
|
|
|
def _docker_run(self, user_id: UUID, name: str) -> None:
|
|
"""同步阻塞;由 ensure 在 to_thread 里调。"""
|
|
user_root = self.user_root_base / str(user_id)
|
|
user_root.mkdir(parents=True, exist_ok=True)
|
|
resolv_file = self._ensure_resolv_conf_file()
|
|
|
|
cmd: List[str] = [
|
|
"docker", "run", "-d",
|
|
"--name", name,
|
|
"--label", f"{LABEL_PRODUCT_KEY}={LABEL_PRODUCT_VALUE}",
|
|
"--label", f"{LABEL_USER_ID_KEY}={user_id}",
|
|
"--network", NETWORK_NAME,
|
|
# §7.5 硬限制(任一缺失视为 hardening 未完成)
|
|
"--read-only", # rootfs read-only
|
|
"--tmpfs", "/tmp:exec,size=512m,mode=1777", # 可写临时区,exec 允许 (run_python 写脚本)
|
|
"--cap-drop=ALL", # 默全丢
|
|
"--cap-add=NET_ADMIN", # init.sh 配 iptables 需要;exec 进来的 uid 1000 拿不到
|
|
"--security-opt=no-new-privileges",
|
|
f"--pids-limit={self.pids_limit}",
|
|
f"--memory={self.memory}",
|
|
f"--cpus={self.cpus}",
|
|
"-v", f"{user_root}:/workspace",
|
|
"-e", f"ZCBOT_PG_IPS={self.pg_ips}",
|
|
"--restart=no",
|
|
]
|
|
# 显式 DNS 两层(主路径 + fallback):
|
|
# 1. **主**:host 侧 resolv.conf bind mount 覆盖(下面 -v),user 指定 mount
|
|
# 优先级 > daemon 默注入,绕开 embedded DNS(127.0.0.11)
|
|
# 2. **fallback**:ZCBOT_DNS env 传给 init.sh 启动时 cat > /etc/resolv.conf,
|
|
# 容器内 /etc/resolv.conf 是 ro 时写失败也 warn 继续(init.sh 已 robust)。
|
|
# 保留以防 host 侧 file mount 因某种原因失败的备份路径
|
|
# 用 docker `--dns` flag 不行 ── user-defined bridge network 上 `--dns` 只改
|
|
# docker daemon 给 embedded DNS 的上游,不动容器 resolv.conf;daemon 上游
|
|
# 探测在腾讯云轻量等场景下挂掉,embedded DNS 自身 forward 仍跪
|
|
if resolv_file is not None:
|
|
cmd += ["-v", f"{resolv_file}:/etc/resolv.conf:ro"]
|
|
if self.dns:
|
|
cmd += ["-e", f"ZCBOT_DNS={','.join(self.dns)}"]
|
|
# repo skills 只读 mount ── fs 工具进容器后(read/glob/grep)能 access
|
|
# SKILL.md 内引用的 references/*.md。host 上 zcbot/skills/ 是项目代码,
|
|
# 跟用户 working_dir 正交,只读防容器内进程改 skill 实现。
|
|
if self.repo_root is not None:
|
|
skills_path = (self.repo_root / "skills").resolve()
|
|
if skills_path.is_dir():
|
|
cmd += ["-v", f"{skills_path}:/sandbox/skills:ro"]
|
|
if self.runtime:
|
|
cmd += ["--runtime", self.runtime]
|
|
cmd.append(self.image)
|
|
|
|
r = subprocess.run(cmd, capture_output=True, text=True)
|
|
if r.returncode != 0:
|
|
raise RuntimeError(
|
|
f"docker run {name} failed (rc={r.returncode}): {r.stderr.strip()}"
|
|
)
|
|
|
|
def mark_active(self, user_id: UUID) -> None:
|
|
"""每次 `docker exec` 完调一次,刷新 idle 计时。"""
|
|
self._last_active[user_id] = _now()
|
|
|
|
def reap_idle(self) -> List[str]:
|
|
"""杀超过 idle_ttl 没活跃的容器。返回已杀容器名列表(供日志 / 审计)。"""
|
|
removed: List[str] = []
|
|
cutoff = _now() - self.idle_ttl
|
|
for uid, ts in list(self._last_active.items()):
|
|
if ts < cutoff:
|
|
name = container_name(uid)
|
|
r = subprocess.run(
|
|
["docker", "rm", "-f", name],
|
|
capture_output=True, text=True,
|
|
)
|
|
if r.returncode == 0:
|
|
removed.append(name)
|
|
# 无论 rm 成功与否,从 dict 移除 ── 失败则下次启动靠 shutdown_all 兜底
|
|
del self._last_active[uid]
|
|
return removed
|
|
|
|
def shutdown_all(self) -> List[str]:
|
|
"""杀所有 label=zcbot.product=sandbox 的容器。
|
|
|
|
典型用途:① app 启动时清前驱进程留下的孤儿 ② 测试 / 维护手动调。
|
|
"""
|
|
list_r = subprocess.run(
|
|
["docker", "ps", "-aq", "--filter",
|
|
f"label={LABEL_PRODUCT_KEY}={LABEL_PRODUCT_VALUE}"],
|
|
capture_output=True, text=True,
|
|
)
|
|
if list_r.returncode != 0 or not list_r.stdout.strip():
|
|
return []
|
|
ids = list_r.stdout.strip().splitlines()
|
|
subprocess.run(
|
|
["docker", "rm", "-f", *ids],
|
|
capture_output=True, text=True,
|
|
)
|
|
# 反查容器名给调用方记日志(rm 前先 inspect)── 这里简化只返 id
|
|
self._last_active.clear()
|
|
return ids
|
|
|
|
|
|
def setup_pool(
|
|
user_root_base: Path,
|
|
repo_root: Optional[Path] = None,
|
|
sandbox_cfg: Optional[Dict[str, object]] = None,
|
|
) -> SandboxPool:
|
|
"""app 启动便捷入口:ensure 网络存在 + 返回 pool 实例。
|
|
|
|
`sandbox_cfg` 是 agent.yaml 的 `sandbox` 段(dict),含 memory/cpus/pids_limit;
|
|
没传走 env / 默认值。env 仍可独立 override(SandboxPool ctor 里处理优先级)。
|
|
|
|
典型用法(lifespan 启动钩子):
|
|
from core.paths import ROOT
|
|
cfg = load_config()
|
|
pool = setup_pool(workspace / "users", repo_root=ROOT,
|
|
sandbox_cfg=cfg.get("sandbox") or {})
|
|
pool.shutdown_all() # 清前驱孤儿
|
|
"""
|
|
ensure_network()
|
|
cfg = sandbox_cfg or {}
|
|
dns_cfg = cfg.get("dns") or []
|
|
if not isinstance(dns_cfg, list):
|
|
dns_cfg = []
|
|
return SandboxPool(
|
|
user_root_base=user_root_base,
|
|
repo_root=repo_root,
|
|
memory=cfg.get("memory") if isinstance(cfg.get("memory"), str) else None,
|
|
cpus=str(cfg["cpus"]) if cfg.get("cpus") is not None else None,
|
|
pids_limit=int(cfg["pids_limit"]) if cfg.get("pids_limit") is not None else None,
|
|
dns=[str(x) for x in dns_cfg],
|
|
)
|