zcbot/core/sandbox/pool.py

212 lines
8.5 KiB
Python

"""Per-user sandbox 容器池(DESIGN §7.5)。
命名:`zcbot-sandbox-<user_id>`(user_id = UUID 标准串带 dash,与 bind mount
源路径 `<workspace>/users/<user_id>/` 对齐 ── `docker ps` 看到容器名能直接 grep
出 workspace 目录)。
生命周期:
- `ensure(user_id)`:per-user `asyncio.Lock` 串行化 → `docker inspect` 探测 → 已 running
直接返;exists-but-stopped 先 `rm -f` 重起(保证 iptables 重新 apply);不存在 `docker run`
- `mark_active(user_id)`:exec 完更新 in-memory `_last_active[uid]=now`(docker labels
不可运行时修改 ── Docker 23+ 移除 `docker update --label-add` 支持)
- `reap_idle()`:周期任务,扫 `_last_active` dict,>`idle_ttl` 的 `docker rm -f`
- `shutdown_all()`:app 启动时清前驱孤儿(`docker ps --filter label=zcbot.product=sandbox`)
幂等性:
- ensure 在重复调用时跨 daemon round-trip < 100ms(纯 `docker inspect`);per-user lock
防同 user 两并发 `docker run --name` 撞 "Conflict"(虽然 docker 本身会 reject,提前
锁更干净)
- reaper 只杀 dict 里有记录的容器 ── 重启后 dict 空 → 不杀历史孤儿(这条由 startup
`shutdown_all` 兜底)
Step 2 范围:仅 pool / lifecycle。Tools(shell / run_python)在 Step 3 接入。
"""
from __future__ import annotations
import asyncio
import os
import subprocess
import time
from pathlib import Path
from typing import Dict, List, Optional
from uuid import UUID
from .network import NETWORK_NAME, ensure_network
CONTAINER_NAME_PREFIX = "zcbot-sandbox-"
LABEL_PRODUCT_KEY = "zcbot.product"
LABEL_PRODUCT_VALUE = "sandbox"
LABEL_USER_ID_KEY = "zcbot.user_id"
DEFAULT_IMAGE = "zcbot-sandbox:latest"
DEFAULT_IDLE_TTL_SECONDS = 300
def container_name(user_id: UUID) -> str:
return f"{CONTAINER_NAME_PREFIX}{user_id}"
def _now() -> int:
return int(time.time())
def _container_exists(name: str) -> bool:
"""任何 state(running / exited / created)都算存在。"""
r = subprocess.run(
["docker", "inspect", "--type=container", name],
capture_output=True, text=True,
)
return r.returncode == 0
def _container_running(name: str) -> bool:
r = subprocess.run(
["docker", "inspect", "--type=container",
"--format={{.State.Running}}", name],
capture_output=True, text=True,
)
return r.returncode == 0 and r.stdout.strip() == "true"
class SandboxPool:
def __init__(
self,
user_root_base: Path,
image: Optional[str] = None,
runtime: Optional[str] = None,
idle_ttl: Optional[int] = None,
pg_ips: Optional[str] = None,
) -> None:
"""
user_root_base: per-user 子树父目录,典型 `<workspace>/users`。bind mount 源
= `user_root_base / <user_id>`,目标 `/workspace`。
image: sandbox 镜像 tag(默 env `ZCBOT_SANDBOX_IMAGE`)
runtime: `docker run --runtime` 值(runc / runsc / kata 等);空 = 默认
(env `ZCBOT_SANDBOX_RUNTIME`)。§7.5 #5 / §7.9 升级表 ── 切
gVisor / Firecracker 时改这一项即可,应用层零改动。
idle_ttl: 秒;`mark_active` 时间戳 < now - ttl 的容器被 reap_idle 杀
(env `ZCBOT_SANDBOX_IDLE_TTL`,默 300)
pg_ips: 逗号分隔的 PG IP 串,塞容器 `ZCBOT_PG_IPS` env,init.sh 加 DROP 规则
(env `ZCBOT_PG_IPS`)。defense-in-depth ── 即便落内网三段。
"""
self.user_root_base = user_root_base
self.image = image or os.getenv("ZCBOT_SANDBOX_IMAGE", DEFAULT_IMAGE)
self.runtime = runtime or os.getenv("ZCBOT_SANDBOX_RUNTIME") or ""
self.idle_ttl = idle_ttl if idle_ttl is not None else int(
os.getenv("ZCBOT_SANDBOX_IDLE_TTL", str(DEFAULT_IDLE_TTL_SECONDS))
)
self.pg_ips = pg_ips if pg_ips is not None else os.getenv("ZCBOT_PG_IPS", "")
self._locks: Dict[UUID, asyncio.Lock] = {}
self._last_active: Dict[UUID, int] = {}
def _lock_for(self, user_id: UUID) -> asyncio.Lock:
if user_id not in self._locks:
self._locks[user_id] = asyncio.Lock()
return self._locks[user_id]
async def ensure(self, user_id: UUID) -> str:
"""返回容器名;create-or-reuse 原子。"""
async with self._lock_for(user_id):
name = container_name(user_id)
if _container_running(name):
self._last_active[user_id] = _now()
return name
if _container_exists(name):
# stopped / crashed ── rm 重起。iptables 规则随容器生命周期重新 apply。
subprocess.run(
["docker", "rm", "-f", name],
capture_output=True, check=False,
)
await asyncio.to_thread(self._docker_run, user_id, name)
self._last_active[user_id] = _now()
return name
def _docker_run(self, user_id: UUID, name: str) -> None:
"""同步阻塞;由 ensure 在 to_thread 里调。"""
user_root = self.user_root_base / str(user_id)
user_root.mkdir(parents=True, exist_ok=True)
cmd: List[str] = [
"docker", "run", "-d",
"--name", name,
"--label", f"{LABEL_PRODUCT_KEY}={LABEL_PRODUCT_VALUE}",
"--label", f"{LABEL_USER_ID_KEY}={user_id}",
"--network", NETWORK_NAME,
# §7.5 硬限制(任一缺失视为 hardening 未完成)
"--read-only", # rootfs read-only
"--tmpfs", "/tmp:exec,size=512m,mode=1777", # 可写临时区,exec 允许 (run_python 写脚本)
"--cap-drop=ALL", # 默全丢
"--cap-add=NET_ADMIN", # init.sh 配 iptables 需要;exec 进来的 uid 1000 拿不到
"--security-opt=no-new-privileges",
"--pids-limit=256",
"--memory=2g",
"--cpus=1.0",
"-v", f"{user_root}:/workspace",
"-e", f"ZCBOT_PG_IPS={self.pg_ips}",
"--restart=no",
]
if self.runtime:
cmd += ["--runtime", self.runtime]
cmd.append(self.image)
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
raise RuntimeError(
f"docker run {name} failed (rc={r.returncode}): {r.stderr.strip()}"
)
def mark_active(self, user_id: UUID) -> None:
"""每次 `docker exec` 完调一次,刷新 idle 计时。"""
self._last_active[user_id] = _now()
def reap_idle(self) -> List[str]:
"""杀超过 idle_ttl 没活跃的容器。返回已杀容器名列表(供日志 / 审计)。"""
removed: List[str] = []
cutoff = _now() - self.idle_ttl
for uid, ts in list(self._last_active.items()):
if ts < cutoff:
name = container_name(uid)
r = subprocess.run(
["docker", "rm", "-f", name],
capture_output=True, text=True,
)
if r.returncode == 0:
removed.append(name)
# 无论 rm 成功与否,从 dict 移除 ── 失败则下次启动靠 shutdown_all 兜底
del self._last_active[uid]
return removed
def shutdown_all(self) -> List[str]:
"""杀所有 label=zcbot.product=sandbox 的容器。
典型用途:① app 启动时清前驱进程留下的孤儿 ② 测试 / 维护手动调。
"""
list_r = subprocess.run(
["docker", "ps", "-aq", "--filter",
f"label={LABEL_PRODUCT_KEY}={LABEL_PRODUCT_VALUE}"],
capture_output=True, text=True,
)
if list_r.returncode != 0 or not list_r.stdout.strip():
return []
ids = list_r.stdout.strip().splitlines()
subprocess.run(
["docker", "rm", "-f", *ids],
capture_output=True, text=True,
)
# 反查容器名给调用方记日志(rm 前先 inspect)── 这里简化只返 id
self._last_active.clear()
return ids
def setup_pool(user_root_base: Path) -> SandboxPool:
"""app 启动便捷入口:ensure 网络存在 + 返回 pool 实例。
典型用法(lifespan 启动钩子):
pool = setup_pool(workspace / "users")
pool.shutdown_all() # 清前驱孤儿
# 后台 reaper task 周期跑 pool.reap_idle()
"""
ensure_network()
return SandboxPool(user_root_base=user_root_base)