"""Per-user sandbox 容器池(DESIGN §7.5)。 命名:`zcbot-sandbox-`(user_id = UUID 标准串带 dash,与 bind mount 源路径 `/users//` 对齐 ── `docker ps` 看到容器名能直接 grep 出 workspace 目录)。 生命周期: - `ensure(user_id)`:per-user `threading.Lock` 串行化 → `docker inspect` 探测 → 已 running 直接返;exists-but-stopped 先 `rm -f` 重起(保证 iptables 重新 apply); 不存在 `docker run` - `mark_active(user_id)`:exec 完更新 in-memory `_last_active[uid]=now`(docker labels 不可运行时修改 ── Docker 23+ 移除 `docker update --label-add` 支持) - `reap_idle()`:周期任务,扫 `_last_active` dict,>`idle_ttl` 的 `docker rm -f` - `shutdown_all()`:app 启动时清前驱孤儿(`docker ps --filter label=zcbot.product=sandbox`) API 全同步 —— ensure 主要使用方是 AgentLoop / DockerExecutor,跑在 web BG 线程内 天然同步;reaper 跑在 uvicorn 主 loop 里,通过 `run_in_executor` 包一层调本类 sync 方法。 threading.Lock 跨线程有效,asyncio.Lock 会被 ephemeral loop 创建 / 销毁绕过保护。 幂等性: - ensure 在重复调用时跨 daemon round-trip < 100ms(纯 `docker inspect`);per-user lock 防同 user 两并发 `docker run --name` 撞 "Conflict"(虽然 docker 本身会 reject,提前 锁更干净) - reaper 只杀 dict 里有记录的容器 ── 重启后 dict 空 → 不杀历史孤儿(这条由 startup `shutdown_all` 兜底) """ from __future__ import annotations import os import subprocess import threading import time from pathlib import Path from typing import Any, Dict, List, Optional from uuid import UUID from .network import NETWORK_NAME, ensure_network CONTAINER_NAME_PREFIX = "zcbot-sandbox-" LABEL_PRODUCT_KEY = "zcbot.product" LABEL_PRODUCT_VALUE = "sandbox" LABEL_USER_ID_KEY = "zcbot.user_id" DEFAULT_IMAGE = "zcbot-sandbox:latest" DEFAULT_IDLE_TTL_SECONDS = 300 # 容器资源限制默认值(可被 yaml `sandbox.*` / env override,详 SandboxPool ctor) DEFAULT_MEMORY = "2g" DEFAULT_CPUS = "1.0" DEFAULT_PIDS_LIMIT = 256 def container_name(user_id: UUID) -> str: return f"{CONTAINER_NAME_PREFIX}{user_id}" def _now() -> int: return int(time.time()) def _container_exists(name: str) -> bool: """任何 state(running / exited / created)都算存在。""" r = subprocess.run( ["docker", "inspect", "--type=container", name], capture_output=True, text=True, ) return r.returncode == 0 def _container_running(name: str) -> bool: r = subprocess.run( ["docker", "inspect", "--type=container", "--format={{.State.Running}}", name], capture_output=True, text=True, ) return r.returncode == 0 and r.stdout.strip() == "true" class SandboxPool: def __init__( self, user_root_base: Path, repo_root: Optional[Path] = None, image: Optional[str] = None, runtime: Optional[str] = None, idle_ttl: Optional[int] = None, pg_ips: Optional[str] = None, memory: Optional[str] = None, cpus: Optional[str] = None, pids_limit: Optional[int] = None, dns: Optional[List[str]] = None, ) -> None: """ user_root_base: per-user 子树父目录,典型 `/users`。bind mount 源 = `user_root_base / `,目标 `/workspace`。 repo_root: zcbot repo 根(`core/paths.py::ROOT`)。**fs 工具进容器后** (read/write/edit/glob/grep)`/sandbox/skills:ro` mount 让 容器内 read SKILL 内部 references 的 path 能解析(skill 在 host 上是 repo 内代码,容器 user_root 是用户文件,两者 正交)。None → 不挂 skills,只走 user_root 边界。 image: sandbox 镜像 tag(默 env `ZCBOT_SANDBOX_IMAGE`) runtime: `docker run --runtime` 值(runc / runsc / kata 等);空 = 默认 (env `ZCBOT_SANDBOX_RUNTIME`)。§7.5 #5 / §7.9 升级表 ── 切 gVisor / Firecracker 时改这一项即可,应用层零改动。 idle_ttl: 秒;`mark_active` 时间戳 < now - ttl 的容器被 reap_idle 杀 (env `ZCBOT_SANDBOX_IDLE_TTL`,默 300) pg_ips: 逗号分隔的 PG IP 串,塞容器 `ZCBOT_PG_IPS` env,init.sh 加 DROP 规则 (env `ZCBOT_PG_IPS`)。defense-in-depth ── 即便落内网三段。 memory/cpus/pids_limit: 容器资源限制,默 2g/1.0/256;env(`ZCBOT_SANDBOX_MEMORY` 等) override caller 参数 override 默认。改后重启 web 生效,新起的 容器用新值;已 running 不变(idle 5min 回收后下次起按新值)。 """ self.user_root_base = user_root_base self.repo_root = repo_root self.image = image or os.getenv("ZCBOT_SANDBOX_IMAGE", DEFAULT_IMAGE) self.runtime = runtime or os.getenv("ZCBOT_SANDBOX_RUNTIME") or "" self.idle_ttl = idle_ttl if idle_ttl is not None else int( os.getenv("ZCBOT_SANDBOX_IDLE_TTL", str(DEFAULT_IDLE_TTL_SECONDS)) ) self.pg_ips = pg_ips if pg_ips is not None else os.getenv("ZCBOT_PG_IPS", "") # 资源限制:env > caller > 默 self.memory = os.getenv("ZCBOT_SANDBOX_MEMORY") or memory or DEFAULT_MEMORY self.cpus = os.getenv("ZCBOT_SANDBOX_CPUS") or cpus or DEFAULT_CPUS self.pids_limit = int( os.getenv("ZCBOT_SANDBOX_PIDS_LIMIT") or (pids_limit if pids_limit is not None else DEFAULT_PIDS_LIMIT) ) # DNS:env(逗号分隔)> caller > 默空(让 docker 自己探测) env_dns = os.getenv("ZCBOT_SANDBOX_DNS", "").strip() if env_dns: self.dns: List[str] = [x.strip() for x in env_dns.split(",") if x.strip()] else: self.dns = list(dns) if dns else [] self._dict_lock = threading.Lock() # 保护 _locks / _last_active 的字典级 race self._locks: Dict[UUID, threading.Lock] = {} self._last_active: Dict[UUID, int] = {} def _lock_for(self, user_id: UUID) -> threading.Lock: with self._dict_lock: if user_id not in self._locks: self._locks[user_id] = threading.Lock() return self._locks[user_id] def ensure(self, user_id: UUID) -> str: """返回容器名;create-or-reuse 原子。同步阻塞,主调方 AgentLoop 已在 BG 线程。""" with self._lock_for(user_id): name = container_name(user_id) if _container_running(name): self._last_active[user_id] = _now() return name if _container_exists(name): # stopped / crashed ── rm 重起。iptables 规则随容器生命周期重新 apply。 subprocess.run( ["docker", "rm", "-f", name], capture_output=True, check=False, ) self._docker_run(user_id, name) self._last_active[user_id] = _now() return name def _docker_run(self, user_id: UUID, name: str) -> None: """同步阻塞;由 ensure 在 to_thread 里调。""" user_root = self.user_root_base / str(user_id) user_root.mkdir(parents=True, exist_ok=True) cmd: List[str] = [ "docker", "run", "-d", "--name", name, "--label", f"{LABEL_PRODUCT_KEY}={LABEL_PRODUCT_VALUE}", "--label", f"{LABEL_USER_ID_KEY}={user_id}", "--network", NETWORK_NAME, # §7.5 硬限制(任一缺失视为 hardening 未完成) "--read-only", # rootfs read-only "--tmpfs", "/tmp:exec,size=512m,mode=1777", # 可写临时区,exec 允许 (run_python 写脚本) "--cap-drop=ALL", # 默全丢 "--cap-add=NET_ADMIN", # init.sh 配 iptables 需要;exec 进来的 uid 1000 拿不到 "--security-opt=no-new-privileges", f"--pids-limit={self.pids_limit}", f"--memory={self.memory}", f"--cpus={self.cpus}", "-v", f"{user_root}:/workspace", "-e", f"ZCBOT_PG_IPS={self.pg_ips}", "--restart=no", ] # 显式 DNS(绕过 docker daemon 上游探测,腾讯云轻量等场景下 daemon 探测 # host systemd-resolved 上游不稳) for dns_ip in self.dns: cmd += ["--dns", dns_ip] # repo skills 只读 mount ── fs 工具进容器后(read/glob/grep)能 access # SKILL.md 内引用的 references/*.md。host 上 zcbot/skills/ 是项目代码, # 跟用户 working_dir 正交,只读防容器内进程改 skill 实现。 if self.repo_root is not None: skills_path = (self.repo_root / "skills").resolve() if skills_path.is_dir(): cmd += ["-v", f"{skills_path}:/sandbox/skills:ro"] if self.runtime: cmd += ["--runtime", self.runtime] cmd.append(self.image) r = subprocess.run(cmd, capture_output=True, text=True) if r.returncode != 0: raise RuntimeError( f"docker run {name} failed (rc={r.returncode}): {r.stderr.strip()}" ) def mark_active(self, user_id: UUID) -> None: """每次 `docker exec` 完调一次,刷新 idle 计时。""" self._last_active[user_id] = _now() def reap_idle(self) -> List[str]: """杀超过 idle_ttl 没活跃的容器。返回已杀容器名列表(供日志 / 审计)。""" removed: List[str] = [] cutoff = _now() - self.idle_ttl for uid, ts in list(self._last_active.items()): if ts < cutoff: name = container_name(uid) r = subprocess.run( ["docker", "rm", "-f", name], capture_output=True, text=True, ) if r.returncode == 0: removed.append(name) # 无论 rm 成功与否,从 dict 移除 ── 失败则下次启动靠 shutdown_all 兜底 del self._last_active[uid] return removed def shutdown_all(self) -> List[str]: """杀所有 label=zcbot.product=sandbox 的容器。 典型用途:① app 启动时清前驱进程留下的孤儿 ② 测试 / 维护手动调。 """ list_r = subprocess.run( ["docker", "ps", "-aq", "--filter", f"label={LABEL_PRODUCT_KEY}={LABEL_PRODUCT_VALUE}"], capture_output=True, text=True, ) if list_r.returncode != 0 or not list_r.stdout.strip(): return [] ids = list_r.stdout.strip().splitlines() subprocess.run( ["docker", "rm", "-f", *ids], capture_output=True, text=True, ) # 反查容器名给调用方记日志(rm 前先 inspect)── 这里简化只返 id self._last_active.clear() return ids def setup_pool( user_root_base: Path, repo_root: Optional[Path] = None, sandbox_cfg: Optional[Dict[str, object]] = None, ) -> SandboxPool: """app 启动便捷入口:ensure 网络存在 + 返回 pool 实例。 `sandbox_cfg` 是 agent.yaml 的 `sandbox` 段(dict),含 memory/cpus/pids_limit; 没传走 env / 默认值。env 仍可独立 override(SandboxPool ctor 里处理优先级)。 典型用法(lifespan 启动钩子): from core.paths import ROOT cfg = load_config() pool = setup_pool(workspace / "users", repo_root=ROOT, sandbox_cfg=cfg.get("sandbox") or {}) pool.shutdown_all() # 清前驱孤儿 """ ensure_network() cfg = sandbox_cfg or {} dns_cfg = cfg.get("dns") or [] if not isinstance(dns_cfg, list): dns_cfg = [] return SandboxPool( user_root_base=user_root_base, repo_root=repo_root, memory=cfg.get("memory") if isinstance(cfg.get("memory"), str) else None, cpus=str(cfg["cpus"]) if cfg.get("cpus") is not None else None, pids_limit=int(cfg["pids_limit"]) if cfg.get("pids_limit") is not None else None, dns=[str(x) for x in dns_cfg], )