"""FastAPI app: 纯 /v1 JSON API(2026-05-15 切换 — 详见 DESIGN §7.9)。 设计要点: - 所有路由 `/v1/*` 前缀,响应 JSON;模板 / HTMX / 服务端 markdown 渲染全删 - SSE 事件 payload 是 JSON dict 而非 HTML 片段(`event: ` + `data: `) - Auth: PLATFORM_KEY → JWT 兑换(§7 D' 过渡形态,见 web/auth.py);OIDC 替换时只动 /v1/auth/login 内部 - 所有 /v1/tasks* 路由 Depends(require_user),按 user_id 隔离数据 - 豁免:/healthz、/docs、/openapi.json、/、/v1/auth/login、/static/* - CORS allow_origins=["*"] 本地宽松;真发布按 platform 域名收紧 - `GET /` 302 → /static/dev.html(本地 dev SPA) """ from __future__ import annotations import asyncio import json import mimetypes import os import tempfile from concurrent.futures import ThreadPoolExecutor from contextlib import asynccontextmanager from datetime import datetime as _dt from pathlib import Path from typing import Any, Optional from uuid import UUID, uuid4 try: import resource # Unix only;Windows dev 无此模块,RSS 监控自动降级跳过 except ImportError: # pragma: no cover - Windows resource = None from fastapi import Depends, FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from pydantic import BaseModel from sqlalchemy import BigInteger, cast, func, select, update from starlette.background import BackgroundTask from core import __version__ from core.paths import to_db_path from core.storage import ( NoSubtaskError, check_no_subtask, session_scope, ) from core.storage.models import Message, Task, UsageEvent from core.storage.utils import ensure_local_task_row from .auth import ( AuthConfig, UserCreateError, change_password, create_user, ensure_user_row, make_require_user, mint_token, resolve_user_by_email, ) from .broker import broker from .sinks import WebEventSink from .static_files import NoCacheStaticFiles STATUS_FILTERS = ("active", "completed", "abandoned") STATUS_WRITABLE = ("completed", "abandoned") # web 不让从 web 端切回 active(走 CLI) ORDER_FIELDS = ("created_at", "updated_at", "name", "status") ORDER_DEFAULT = "-created_at" # pptx→PDF 预览:按解析后的 pptx 绝对路径加锁,防同一文件并发重复转换(DESIGN §8.3)。 _pptx_preview_locks: dict[str, asyncio.Lock] = {} def _pptx_lock_for(abs_path: str) -> asyncio.Lock: lock = _pptx_preview_locks.get(abs_path) if lock is None: lock = _pptx_preview_locks[abs_path] = asyncio.Lock() return lock # ─────────────────────────── helpers ─────────────────────────── def _norm_path(p: str) -> str: """跨 OS 显示归一:backslash → forward slash。""" return (p or "").replace("\\", "/") def _iso(dt: Optional[Any]) -> Optional[str]: return dt.isoformat() if dt else None def _parse_ordering(s: Optional[str]) -> list: """DRF 风格 `ordering` 解析:逗号分隔多字段,`-` 前缀代表 desc。 allowlist 见 `ORDER_FIELDS`;非法字段静默丢弃。全部非法或空串 → `ORDER_DEFAULT`(`-created_at`)。 返回 sqlalchemy `order_by` 列表(可直接 `*expand`)。 """ spec = (s or "").strip() or ORDER_DEFAULT cols = [] for part in spec.split(","): p = part.strip() if not p: continue asc = True if p.startswith("-"): asc = False p = p[1:] if p in ORDER_FIELDS: col = getattr(Task, p) cols.append(col.asc() if asc else col.desc()) if not cols: # 用户传了全无效字段 → fallback 默认 cols = [Task.created_at.desc()] return cols def _usage_aggregates(s: Any, tids: list) -> dict: """按 task_id 批量聚合 usage_events:真实成本 + chat token + 缓存命中。 单查询 GROUP BY(复用列表接口 msg_counts 同款批量范式,无 N+1)。on-the-fly 现算, 不落 tasks 列 —— 对所有历史 task 即时准确,免回填。 - cost_cny:全 kind(chat+image+video)合计 = task 真实花费 - tokens_in/out + cache_hit:仅 chat。**三者同源 usage_events**,故缓存命中率 `cache_hit / tokens_in` 恒 ≤ 100%;不能拿 `tasks.tokens_prompt` 当分母 —— 那列会被「清空对话」重置而 usage_events 不重置,跨源相除会算出 >100% 的怪值。 返回 {task_id: {"cost_cny": float, "tokens_in": int, "tokens_out": int, "tokens_cache_hit": int}}。 """ if not tids: return {} chat = UsageEvent.kind == "chat" tin_col = cast(UsageEvent.units["tokens_in"].astext, BigInteger) tout_col = cast(UsageEvent.units["tokens_out"].astext, BigInteger) hit_col = cast(UsageEvent.units["cache_hit_tokens"].astext, BigInteger) rows = s.execute( select( UsageEvent.task_id, func.coalesce(func.sum(UsageEvent.cost_cny), 0), func.coalesce(func.sum(tin_col).filter(chat), 0), func.coalesce(func.sum(tout_col).filter(chat), 0), func.coalesce(func.sum(hit_col).filter(chat), 0), ) .where(UsageEvent.task_id.in_(tids)) .group_by(UsageEvent.task_id) ).all() return { tid: { "cost_cny": float(cost or 0), "tokens_in": int(tin or 0), "tokens_out": int(tout or 0), "tokens_cache_hit": int(hit or 0), } for tid, cost, tin, tout, hit in rows } def _task_dict( row: Any, *, n_messages: Optional[int] = None, usage: Optional[dict] = None, ) -> dict: """Task ORM row → API JSON dict。 `usage`(可选)= `_usage_aggregates` 算出的本 task 概要,带真实成本与缓存命中; 缺省回退到 tasks.cost_cny 列(多为 0)与 0 命中,前端据此显 ¥ / 缓存命中率。 """ u = usage or {} # token 总量优先取 usage_events 聚合(用量 source-of-truth,且与 cache_hit 同源 → # 命中率分母一致、恒 ≤100%);无 usage 时回退 tasks 概览列。tasks.tokens_prompt 会被 # 「清空对话」重置,不能与 usage_events 的 cache_hit 跨源相除。 tokens_prompt = int(u["tokens_in"]) if "tokens_in" in u else (row.tokens_prompt or 0) tokens_completion = int(u["tokens_out"]) if "tokens_out" in u else (row.tokens_completion or 0) d = { "task_id": str(row.task_id), "name": row.name or "", "description": row.description or "", "working_dir": _norm_path(row.working_dir or ""), "status": row.status, "skill": row.skill or "", "model": row.model or "", "model_profile": row.model_profile or "", "tokens_prompt": tokens_prompt, "tokens_completion": tokens_completion, "tokens": tokens_prompt + tokens_completion, # 缓存命中 token(chat 前缀缓存)+ 真实成本(已按缓存折价,见 usage.py)。 # on-the-fly 聚合;未传 usage 时回退列/0。 "tokens_cache_hit": int(u.get("tokens_cache_hit", 0)), "cost_cny": float(u["cost_cny"]) if "cost_cny" in u else float(row.cost_cny or 0), # 当前 run 状态(0004 schema 简化:原 runs 表合并入 task) "run_status": row.run_status or "idle", "run_error": row.run_error or None, "created_at": _iso(getattr(row, "created_at", None)), "updated_at": _iso(getattr(row, "updated_at", None)), } if n_messages is not None: d["n_messages"] = n_messages return d # ─────────────────────── files helpers ─────────────────────── def _load_user_root(user_id: UUID) -> Path: """user_root = `/users//`,所有 files API 的边界。 若目录尚未存在自动 mkdir(空 user 首次访问也能拿到根)。 """ from core.agent_builder import resolve_workspace, user_root ws = resolve_workspace(None) return user_root(ws, user_id) def _safe_join(root: Path, rel: str) -> Path: """归一用户路径到 absolute,并校验仍在 root 内。防 `../` / 绝对 path / symlink 越界。""" rel = (rel or "").strip() if not rel: return root.resolve() if rel[0] in ("/", "\\"): raise HTTPException(400, f"absolute-style path not allowed: {rel!r}") if Path(rel).is_absolute(): raise HTTPException(400, f"absolute path not allowed: {rel!r}") target = (root / rel).resolve() try: target.relative_to(root.resolve()) except ValueError: raise HTTPException(400, f"path escapes user_root: {rel!r}") return target def _rel_to(root: Path, target: Path) -> str: try: rel = target.resolve().relative_to(root.resolve()).as_posix() except ValueError: return "" return "" if rel == "." else rel def _enumerate_files(root: Path, current: Path) -> tuple[list[dict], list[dict], bool]: """枚举 current 下条目 + 拼面包屑。size raw bytes,mtime ISO 串(前端 humanize)。 Dotfile 一律隐藏(`.memory/` 等系统区不暴露给 UI,同 `/v1/folders` 约定)。 """ entries: list[dict] = [] exists = current.exists() if exists and current.is_dir(): try: raw = sorted(current.iterdir(), key=lambda p: (p.is_file(), p.name.lower())) except OSError: raw = [] for p in raw: if p.name.startswith("."): continue try: st = p.stat() except OSError: continue entries.append({ "name": p.name, "is_dir": p.is_dir(), "size": st.st_size if p.is_file() else None, "mtime": _dt.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"), "rel": _rel_to(root, p), }) cur_rel = _rel_to(root, current) crumbs = [{"label": "/", "rel": ""}] # cur_rel == "." 表示当前就在 root(target.relative_to(root) 返 Path(".")), # 不该再追加一个无意义的 "." crumb if cur_rel and cur_rel != ".": acc = "" for part in cur_rel.split("/"): acc = f"{acc}/{part}" if acc else part crumbs.append({"label": part, "rel": acc}) return entries, crumbs, exists def _validate_transfer( root: Path, paths: list[str], dest_dir: str, ) -> tuple[list[Path], Path]: """预检批量 transfer:解析所有源 + 目标,任意一项不合法即整批 abort(无 FS 副作用)。 返回 (sources, dest_dir_path)。不区分 copy / move(顶层 working_dir 闸由路由各自加)。 校验项: - paths 非空;每个源在 user_root 内 + 存在;不能是 user_root 本身 - dest_dir 存在 + 是目录(可以是 user_root) - 源不能与 dest_dir 相同(自移动) - dest_dir 不能在源的子树内(不能把 a/ 搬进 a/b/) - 源不能已是 dest_dir 直接子项(原地移动,no-op) - 同批次源 leaf 名不能重复(俩 a.txt 会撞 dest/a.txt) - dest_dir/ 不能已存在(整批 409,不静默覆盖) """ if not paths: raise HTTPException(400, "paths is empty") dest = _safe_join(root, dest_dir) if not dest.exists(): raise HTTPException(404, f"dest_dir not found: {dest_dir!r}") if not dest.is_dir(): raise HTTPException(400, f"dest_dir is not a directory: {dest_dir!r}") dest_r = dest.resolve() sources: list[Path] = [] seen_names: set[str] = set() for p in paths: src = _safe_join(root, p) if not src.exists(): raise HTTPException(404, f"source not found: {p!r}") src_r = src.resolve() if src_r == root.resolve(): raise HTTPException(400, "cannot transfer user_root") if src_r == dest_r: raise HTTPException(400, f"source equals dest_dir: {p!r}") # dest 在 src 子树内 → 自嵌套 try: dest_r.relative_to(src_r) raise HTTPException( 400, f"cannot transfer {p!r} into its own subtree" ) except ValueError: pass # 已是 dest 直接子项 → no-op if src.parent.resolve() == dest_r: raise HTTPException( 400, f"{p!r} already directly under dest_dir" ) name = src.name if name in seen_names: raise HTTPException(400, f"duplicate source leaf name in batch: {name!r}") seen_names.add(name) target = dest / name if target.exists(): raise HTTPException( 409, f"target already exists: {_rel_to(root, target)!r}" ) sources.append(src) return sources, dest # ─────────────────── BG run + SSE 帧格式 ─────────────────── def _run_agent_bg( task_id: UUID, user_id: UUID, user_message: str, image_variant: str = "", video_variant: str = "", ) -> None: """工作线程:`build_agent(resume=True)` → 装 WebEventSink + cancel_check → `agent.run` → 写 tasks.run_status。 sink 通过 broker.emit 桥事件回 asyncio loop;agent.run 是 sync,所以在 to_thread 跑。 user_id 必须从 JWT 那侧透传过来 —— 决定 memory_block 读哪个 per-user 子树。 cancel_check 桥 broker.is_cancelled,loop 在 stream chunk 间 + 工具调用之间 poll; cancel 延迟 ~ 单 chunk 间隔(100ms 级);seedance 轮询间也读这个 cancel_check 用于 用户停止按钮(必须在 build_agent 阶段就传进去,因为 SeedanceTool ctor 持有它, 不能像以前那样 build_agent 返回后再赋 agent.cancel_check)。 `ok / cancelled` 收尾直接回 `idle`(不留持久标记);只有 error 是持久终态。 image_variant / video_variant:本 run 用哪个 image/video variant 装 tool(空 → yaml 第一个)。 随消息 POST 传进来,不入 DB —— UI 下拉的选择就跟在这一条消息上生效。 """ from core.agent_builder import build_agent, sync_task_tokens cancel_check = lambda tid=task_id: broker.is_cancelled(tid) try: broker.emit(task_id, {"type": "run_start"}) agent, session, sid, task_state, task_dir = build_agent( session_id=str(task_id), resume=True, user_id=user_id, image_variant=image_variant, video_variant=video_variant, cancel_check=cancel_check, ) agent.sink = WebEventSink(broker, task_id) agent.run(user_message) sync_task_tokens(task_state) # cancel 命中或正常完成 → run_status 回 idle(error 才持久) with session_scope() as s: s.execute( update(Task).where(Task.task_id == task_id).values( run_status="idle", run_error=None, ) ) except Exception as e: err = f"{type(e).__name__}: {e}" broker.emit(task_id, {"type": "error", "msg": err}) try: with session_scope() as s: s.execute( update(Task).where(Task.task_id == task_id).values( run_status="error", run_error=err, ) ) except Exception: pass # 已 emit error 给前端,DB 写失败不放大噪声 finally: broker.clear_cancel(task_id) broker.close(task_id) def _sse_event(event_type: str, payload: dict) -> bytes: """格式化 SSE 一帧:`event: ` + `data: `。""" body = json.dumps(payload, ensure_ascii=False, separators=(",", ":")) return f"event: {event_type}\ndata: {body}\n\n".encode("utf-8") def _resolve_model_profile(profile: str) -> tuple[str, str]: """校验 model_profile 并返回 (profile, model_id)。 传空 → cfg["default_model"]。profile 走 ModelCapabilities.load: 格式或文件错误一律 400。返 (profile_str, caps.model_id) —— 调 ensure_local_task_row 时 model_profile / model 两列一起填,保持现有 schema 双列约定。 """ from core.agent_builder import load_config from core.capabilities import ModelCapabilities from core.paths import ROOT cfg = load_config() name = (profile or "").strip() or cfg["default_model"] try: caps = ModelCapabilities.load(name, ROOT / cfg["models_dir"]) except (FileNotFoundError, ValueError) as e: raise HTTPException(400, f"invalid model_profile {name!r}: {e}") return name, caps.model_id def _list_image_variants() -> list[tuple[str, dict]]: """扫 config/media/doubao.yaml image 段 → [(variant_key, variant_cfg), ...]。 yaml 不存在或 image 段空 / 仅注释 → 返 []。不要求 ARK_API_KEY 已设 —— 仅纯 元数据列举,UI 拉这个画下拉。真正调用 seedream 时 agent_builder 那边再过 `ArkConfig.load()`(没 key → tool 不注册)。 """ from core.paths import ROOT import yaml as _yaml p = ROOT / "config" / "media" / "doubao.yaml" if not p.exists(): return [] try: data = _yaml.safe_load(p.read_text(encoding="utf-8")) or {} except Exception: return [] image_cfg = data.get("image") or {} return [(k, v) for k, v in image_cfg.items() if isinstance(v, dict)] def _resolve_image_model(variant: str) -> str: """校验 image_model variant key。 传空 → 返空(agent_builder fallback 到第一个 variant);传非空 → 必须存在 于 config/media/doubao.yaml image 段,否则 400。 """ name = (variant or "").strip() if not name: return "" variants = {k for k, _ in _list_image_variants()} if name not in variants: raise HTTPException(400, f"invalid image_model {name!r}; available: {sorted(variants)}") return name def _list_video_variants() -> list[tuple[str, dict]]: """扫 config/media/doubao.yaml video 段 → [(variant_key, variant_cfg), ...]。 与 _list_image_variants 同范式;空 video 段(未上线 / 注释掉)→ 返 [],UI 隐藏下拉。 """ from core.paths import ROOT import yaml as _yaml p = ROOT / "config" / "media" / "doubao.yaml" if not p.exists(): return [] try: data = _yaml.safe_load(p.read_text(encoding="utf-8")) or {} except Exception: return [] video_cfg = data.get("video") or {} return [(k, v) for k, v in video_cfg.items() if isinstance(v, dict)] def _resolve_video_model(variant: str) -> str: """校验 video_model variant key(同 _resolve_image_model 范式)。""" name = (variant or "").strip() if not name: return "" variants = {k for k, _ in _list_video_variants()} if name not in variants: raise HTTPException(400, f"invalid video_model {name!r}; available: {sorted(variants)}") return name # ────────────────────── Pydantic 请求体 ────────────────────── class TaskCreateRequest(BaseModel): name: str # 任务显示名(必填,DB 列 NOT NULL) working_dir: str = "" # 工作目录名(可选,留空 → 用 name 作目录名) description: str = "" skill: str = "" model_profile: str = "" # `family.variant`,留空 → cfg["default_model"];必须存在于 config/models/ class TaskPatchRequest(BaseModel): status: Optional[str] = None description: Optional[str] = None name: Optional[str] = None skill: Optional[str] = None model_profile: Optional[str] = None # 切模型(c 模式 task 层 / A 粒度 — 下条 send 生效) class MessageRequest(BaseModel): content: str # 该条消息触发的生图 / 生视频模型 variant key(config/media/doubao.yaml image/video 段)。 # 空 → 对应 tool 走 yaml 第一个 variant;非空 → 本次 run 装配指定 variant。 # 仅作用于本 run,不入 DB,UI 下拉的选择跟在消息 POST body 上。 image_model: str = "" video_model: str = "" class OptimizePromptRequest(BaseModel): text: str # 选择性传当前 UI 选中的 variant key,润色 meta-prompt 会把对应模型特性塞进去 # (让 LLM 知道下游 tool 偏好,润色出更贴合 seedream / seedance 等的 prompt)。 image_model: str = "" video_model: str = "" class FileDeleteRequest(BaseModel): path: str recursive: bool = False class FileRenameRequest(BaseModel): path: str # 被重命名的目录 / 文件,相对 user_root new_name: str # 新的 leaf 名(不是路径),不含 / \ .. class FileTransferRequest(BaseModel): paths: list[str] # 多源,均相对 user_root dest_dir: str = "" # 目标目录,相对 user_root,空 → user_root class LoginRequest(BaseModel): user_id: str platform_key: str class PasswordLoginRequest(BaseModel): email: str password: str class AdminCreateUserRequest(BaseModel): email: str password: str admin_token: str class ChangePasswordRequest(BaseModel): old_password: str new_password: str # ────────────────────── App 工厂 ────────────────────── # web/static 目录路径 — /static 静态挂载用,dev.html 也放这 _STATIC_DIR = Path(__file__).parent / "static" def create_app() -> FastAPI: # fail-fast:env 缺失直接抛,不裸跑无密 auth_cfg = AuthConfig.from_env() require_user = make_require_user(auth_cfg) @asynccontextmanager async def lifespan(app: FastAPI): loop = asyncio.get_running_loop() broker.bind_loop(loop) # ── 接管默认线程池 executor(§8.4)────────────────────────────── # run 走 asyncio.to_thread(用 loop 默认 executor);默认是匿名的,读不到大小、 # 不可调。显式建一个同尺寸(复刻 Python 默认 min(32, cpu+4))接管,好处:① 监控 # 能读 max_workers 判断有没有排队 ② 并发不够时改 ZCBOT_RUN_MAX_WORKERS 调大不改码。 # 注:run 与 disk scan / pptx 转换 / reaper 共享此池(同原默认行为);真要隔离 # 长任务再另开 run 专用池,那是后话。 run_max_workers = int( os.getenv("ZCBOT_RUN_MAX_WORKERS") or min(32, (os.cpu_count() or 1) + 4) ) run_executor = ThreadPoolExecutor( max_workers=run_max_workers, thread_name_prefix="run" ) loop.set_default_executor(run_executor) app.state.run_executor = run_executor app.state.run_max_workers = run_max_workers print(f"[startup] run executor: max_workers={run_max_workers} " f"(override via ZCBOT_RUN_MAX_WORKERS)") from core.agent_builder import load_config, resolve_workspace _cfg = load_config() # 优雅 drain 状态(SIGTERM / systemctl restart 兜底,见下方 finally): # draining 置位后 POST /messages 返 503;inflight 登记在跑的 BG run task, # 关停时 await 它们收尾。inflight 同时给 create_task 持强引用,防被 GC 中途回收。 app.state.draining = asyncio.Event() app.state.inflight = {} # dict[asyncio.Task, UUID(task_id)] _shutdown_cfg = _cfg.get("shutdown") or {} drain_timeout = int(_shutdown_cfg.get("drain_timeout_seconds") or 90) cancel_grace = int(_shutdown_cfg.get("cancel_grace_seconds") or 15) # Stale-run reaper:上次进程 crash 留下的 "running" / "cancelling" 已无 BG 线程 # 继续,启动时标 error,让对应 task 重新可发消息(否则 gate 永挂)。 # TODO 真生产 multi-worker:换 heartbeat / lease,只 reap 自家 worker 的孤儿。 with session_scope() as s: result = s.execute( update(Task) .where(Task.run_status.in_(("running", "cancelling"))) .values( run_status="error", run_error="server restarted before run finished", ) ) if result.rowcount: print(f"[startup] reaped {result.rowcount} stale active run(s)") # 磁盘配额后台扫描(§7.5 #4 应用层 gate)── 不依赖 docker backend,host # backend 也跑(/v1/files/upload 也走配额 gate)。yaml `quotas.disk_scan_interval_seconds` # 默 900s = 15min;limit_bytes ≤ 0 视为不限,scan 仍跑(用量统计有用),check 短路放行。 from core.agent_builder import resolve_workspace from core.storage.disk_quota import parse_bytes, scan_all_users workspace = resolve_workspace(None, _cfg) disk_user_root = workspace / "users" quotas_cfg = _cfg.get("quotas") or {} disk_scan_interval = int(quotas_cfg.get("disk_scan_interval_seconds") or 900) async def _disk_scanner() -> None: loop = asyncio.get_running_loop() # 启动时跑一次,后续按 interval。首次扫完 check 才能命中。 try: n = await loop.run_in_executor(None, scan_all_users, disk_user_root) if n: print(f"[disk_scanner] initial scan: {n} user(s)") except Exception as e: print(f"[disk_scanner] initial scan error: {type(e).__name__}: {e}") while True: try: await asyncio.sleep(disk_scan_interval) n = await loop.run_in_executor(None, scan_all_users, disk_user_root) if n: print(f"[disk_scanner] scanned {n} user(s)") except asyncio.CancelledError: raise except Exception as e: print(f"[disk_scanner] error: {type(e).__name__}: {e}") disk_scanner_task = asyncio.create_task(_disk_scanner(), name="disk-scanner") # ── 并发/线程池监控(§8.4):周期采样,只在有负载/刷新峰值时打,空闲不刷屏 ── # active_runs 来自 inflight(已提交未完成的 run,含排队中);逼近 max_workers 即 # 线程池排队,新 run 的 SSE 会卡着不吐 token。查看:journalctl -u zcbot | grep '\[stats\]' def _rss_peak_mb() -> Optional[float]: if resource is None: return None # Windows dev:降级,不打 rss # Linux ru_maxrss 单位 KB,是峰值/high-water(单调不降 —— 看内存涨势够用) return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 async def _stats_logger() -> None: peak = 0 while True: try: await asyncio.sleep(60) active = len(app.state.inflight) if active > peak: peak = active warn = " [WARN >= max_workers,已在排队]" if active >= run_max_workers else "" print(f"[stats] new peak active_runs={active} " f"max_workers={run_max_workers}{warn}") if active > 0: rss = _rss_peak_mb() rss_s = f" rss_peak={rss:.0f}MB" if rss is not None else "" print(f"[stats] active_runs={active} " f"max_workers={run_max_workers} " f"sse_subs={broker.total_subscribers()}{rss_s}") except asyncio.CancelledError: raise except Exception as e: print(f"[stats] error: {type(e).__name__}: {e}") stats_logger_task = asyncio.create_task(_stats_logger(), name="stats-logger") # Sandbox pool(§7.5):仅当 ZCBOT_SANDBOX_BACKEND=docker 时启用。 # 启动钩子:① init_pool(创建 docker network + pool 实例)② shutdown_all 清 # 前驱孤儿(上次进程留下的 zcbot-sandbox-* 容器,内存 _last_active 为空, # 全清重启)③ 后台 reaper task,每 60s 跑 reap_idle。 sandbox_backend = os.getenv("ZCBOT_SANDBOX_BACKEND", "host").lower() sandbox_reaper_task = None if sandbox_backend == "docker": from core.paths import ROOT from core.sandbox import init_pool from core.sandbox.check import detect_fs_quota workspace = resolve_workspace(None, _cfg) user_root_base = workspace / "users" # §7.5 #4 fs quota 探测:不阻塞启动(应用层周期扫描已有),仅打 WARN # 提醒外部用户开放前必须升级到 xfs prjquota / ext4 project / zfs。 try: level, msg = detect_fs_quota(user_root_base.resolve()) print(f"[startup] {'[ok]' if level == 'ok' else '[warn]'} {msg}") except Exception as e: print(f"[startup] [warn] fs quota detect failed: {type(e).__name__}: {e}") try: # repo_root=ROOT 让 SandboxPool 把 /skills 只读 mount 进容器 # (fs 工具进容器后 read SKILL references 需要) # sandbox_cfg=yaml `sandbox` 段(memory/cpus/pids_limit 可调) pool = init_pool( user_root_base, repo_root=ROOT, sandbox_cfg=_cfg.get("sandbox") or {}, ) removed = pool.shutdown_all() if removed: print(f"[startup] swept {len(removed)} stale sandbox container(s)") async def _reaper() -> None: loop = asyncio.get_running_loop() while True: try: await asyncio.sleep(60) removed = await loop.run_in_executor(None, pool.reap_idle) if removed: print(f"[reaper] reaped {len(removed)} idle sandbox container(s)") except asyncio.CancelledError: raise except Exception as e: print(f"[reaper] error: {type(e).__name__}: {e}") sandbox_reaper_task = asyncio.create_task(_reaper(), name="sandbox-reaper") app.state.sandbox_pool = pool except Exception as e: # ensure_network / docker CLI 不可用 → fail-fast。Stage C 协议:任一 # hardening 缺失视为部署未完成,不退化到 host(否则误以为有沙盒实则在裸跑)。 raise RuntimeError( f"ZCBOT_SANDBOX_BACKEND=docker but sandbox init failed: {e}" ) try: yield finally: # ── 优雅 drain:先拒新 run,等在跑的 run 收尾,超时转协作式 cancel ── # 单实例形态下消除"restart 误杀 in-flight run 标 error"。新 POST /messages # 期间返 503(客户端退避重试覆盖)。drain_timeout 内自然跑完 → idle 零 error; # 超时的 broker.request_cancel → 下个 chunk 间隙退(标 idle);cancel_grace 后仍 # 没退的留给 systemd SIGKILL,下次启动 reaper 标 error(最坏退化 = 改前行为)。 # ★ systemd TimeoutStopSec 必须 > drain_timeout + cancel_grace + 余量(见 RUN.md)。 app.state.draining.set() inflight = app.state.inflight if inflight: print(f"[shutdown] draining {len(inflight)} in-flight run(s), " f"timeout={drain_timeout}s") _, pending = await asyncio.wait( list(inflight.keys()), timeout=drain_timeout ) if pending: print(f"[shutdown] {len(pending)} run(s) over drain timeout; " f"signalling cooperative cancel") for t in pending: cid = inflight.get(t) if cid is not None: broker.request_cancel(cid) _, still = await asyncio.wait(pending, timeout=cancel_grace) if still: print(f"[shutdown] {len(still)} run(s) still active after " f"cancel grace; SIGKILL takes over, next start reaps them") disk_scanner_task.cancel() try: await disk_scanner_task except (asyncio.CancelledError, Exception): pass stats_logger_task.cancel() try: await stats_logger_task except (asyncio.CancelledError, Exception): pass if sandbox_reaper_task is not None: sandbox_reaper_task.cancel() try: await sandbox_reaper_task except (asyncio.CancelledError, Exception): pass if sandbox_backend == "docker": pool = getattr(app.state, "sandbox_pool", None) if pool is not None: try: pool.shutdown_all() except Exception as e: print(f"[shutdown] sandbox shutdown_all error: {type(e).__name__}: {e}") # drain 已 await inflight 收尾、run 线程退完;非阻塞关池(进程在退出,保守清理) run_executor.shutdown(wait=False) app = FastAPI( title="zcbot api", version=__version__, description=( "zcbot 后端 — /v1 JSON API + SSE。Auth: PLATFORM_KEY → JWT(§7 D' 过渡)。" "本地 dev SPA: /static/dev.html。" ), lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], # 本地宽松,部署 platform 时按域名收紧 allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) if _STATIC_DIR.is_dir(): # Windows 上 mimetypes 偶尔把 .js 判成 text/plain,会令