From 15d69b33724bb59c6da93f9ac3669ad6643abb78 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 11 Jun 2026 11:09:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(ops):=20=E5=B9=B6=E5=8F=91/=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=E8=BD=BB=E9=87=8F=E7=9B=91=E6=8E=A7=20+=20?= =?UTF-8?q?=E6=8E=A5=E7=AE=A1=E9=BB=98=E8=AE=A4=20executor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 已上生产后线程池排队此前无观测手段: - lifespan 显式建 ThreadPoolExecutor(尺寸复刻 Python 默认 min(32,cpu+4), env ZCBOT_RUN_MAX_WORKERS 可调) + set_default_executor 接管 —— 行为不变 (匿名池换显式同尺寸池),但 max_workers 可读、成调并发的旋钮 - _stats_logger 每 60s 采样:active_runs(含排队)逼近 max_workers 即排队, 刷新峰值/有负载打 [stats],空闲静默不刷屏 - broker.total_subscribers() 全局 SSE 订阅数;RSS 用 stdlib resource (Unix 峰值;Windows dev 降级),零新依赖 不做监控界面:运维健康是少数标量日志够,业务分析走 SQL。DESIGN 8.4 记 取舍 + 界面阶梯;无感换版(gunicorn/Redis/蓝绿)成本不抵当前收益,搁置。 查看: journalctl -u zcbot | grep [stats] Co-Authored-By: Claude Opus 4.8 (1M context) --- DESIGN.md | 17 +++++++++++++ PROGRESS.md | 1 + RUN.md | 5 ++++ web/app.py | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++- web/broker.py | 4 +++ 5 files changed, 95 insertions(+), 1 deletion(-) diff --git a/DESIGN.md b/DESIGN.md index 67a8802..fc3f384 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -553,6 +553,23 @@ create index on usage_events (model_profile, created_at); **安全边界**:对上传任意 pptx 跑 LibreOffice(历史有宏/EPS CVE)→ `--convert-to` 默认不执行宏 + 宏安全 high + 禁网 + 仅处理鉴权用户自己 user_root 内文件。 **保真边界**:deck 用微软雅黑,Linux 上替换成 Noto Sans CJK 度量略差(可接受)。**Stage 2(未做)**:常驻 soffice listener 消冷启、deck 生成后 eager 预转、缩略图导航。 +### 8.4 运维监控 / 无感更新(2026-06-11,监控 ✅ 已落地 / 无感换版 status=design) + +**背景**:已上生产、真实用户在用。换版可用性从"nice to have"变真账;且当前并发到多少、线程池有没有排队**没有观测手段**。 + +**心智** +- **优雅 drain(已实现,2026-06-10)** —— SIGTERM 后拒新 run(503)、等在跑的 run 收尾再换版,不再标 `error`。这是**单实例能做到的上限**。剩余代价:几十秒 503 窗(dev SPA 退避重试已吸收)+ 换版时 SSE 重连丢正在吐的 delta。 +- **真正先撞的瓶颈是线程池,不是别处**:run 走 `asyncio.to_thread`(`web/app.py:1382`)用默认 `ThreadPoolExecutor`(`min(32, cpu+4)`),每个活跃对话整个 run 期占 1 线程。4 核 ≈ 8 并发活跃对话就排队,第 9 个 SSE 卡着不吐 token。解这个只需调大 executor / 加信号量背压,**不引外部依赖**。 + +**落地排序(便宜→贵,到触发线才进下一级)** +1. **轻量监控(✅ 已落地 2026-06-11,详 PROGRESS)**:核心数据现成 —— `len(app.state.inflight)`=当前活跃 run 数(含排队)、`broker._subs`=SSE 订阅者、`resource.getrusage`=RSS(Unix,Windows 跳过)。**周期日志优先**(lifespan 起 task 每 60s 打 `[stats] active_runs=N max=M rss=X`),因为要的是历史峰值不是此刻快照;`/v1/stats` 端点(复用 `ZCBOT_ADMIN_TOKEN` 鉴权)为辅。前提:启动时显式建 executor + `set_default_executor` 接管,才能读 `max_workers` 且日后可调大。 +2. **按数据决策**:`active_runs` 峰值不逼近线程池 → 并发非瓶颈,扩容彻底搁置;逼近 → 先调大 executor(改个数字),再观察。 +3. **503 窗优化(零依赖)**:`--reload`(RUN.md §A)把窗从几十秒缩到 <1s。 + +**不做监控界面(现在)**:运维健康(线程池/内存/SSE/容器)是少数标量,日志 + 偶尔 curl 够诊断,可视化是过度工程;业务分析(token/任务/成本)已落 DB(`usage_events`/`tasks` 三列),SQL 查即可。界面阶梯:日志 → `/v1/stats` JSON → (要趋势图)Prometheus+Grafana(不自写前端)→ (要给非技术人看报表)只读 dashboard。现在停在第一级。 + +**搁置(成本不抵当前收益)**:gunicorn 无感换版 / broker 外置 Redis / nginx 蓝绿双实例 —— 留到"单机线程池调到头仍不够"或"换版断流成真实投诉"再议(无感换版需先把 broker 外置共享,分析见 RUN.md §B)。 + --- ## 附录:DeepSeek V4 关键事实(2026-04-24) diff --git a/PROGRESS.md b/PROGRESS.md index 0490900..cbbdaf2 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -23,6 +23,7 @@ ### 2026-06-11 +- **并发/线程池轻量监控 + 接管默认 executor(§8.4 落地第 1 步)**:已上生产后线程池排队此前无观测手段。lifespan 显式建 `ThreadPoolExecutor`(尺寸复刻 Python 默认 `min(32, cpu+4)`,env `ZCBOT_RUN_MAX_WORKERS` 可调大)+ `set_default_executor` 接管——run 走 `asyncio.to_thread` 即用它,这样既能读 `max_workers` 判断排队、也成了日后调并发的旋钮(**行为不变**,只从匿名默认池换成显式同尺寸池;run 与 disk scan/pptx/reaper 仍共享此池,同原默认)。加 `_stats_logger` 后台 task 每 60s 采样:`active_runs`(=`len(inflight)`,含排队中)逼近 `max_workers` 即排队、新 run 的 SSE 会卡着不吐 token;**刷新峰值**时打 `[stats] new peak active_runs=N max_workers=M`(≥max_workers 带 `[WARN 已在排队]`),**有负载**时打 `[stats] active_runs=.. max_workers=.. sse_subs=.. rss_peak=..MB`,**空闲静默不刷屏**。RSS 用 stdlib `resource`(Unix 峰值/high-water;Windows dev 降级跳过),零新依赖;新 `broker.total_subscribers()` 给全局 SSE 订阅数。查看:`journalctl -u zcbot | grep '\[stats\]'`。**不做监控界面**(运维健康是少数标量、日志够诊断;业务分析数据已落 DB 走 SQL)——界面阶梯见 DESIGN §8.4。 - **dev SPA「技能」查看 modal(左侧 rail 底部入口)**:因 `.skills` 在文件面板隐藏,加左侧 rail 底部「我的资源」分组(`#rail-resources`,留位给后续「记忆」)+「技能」按钮 → 弹 modal 分「平台 skill / 我的 skill」两组列表,点任一项展开**完整 SKILL.md**(`GET /v1/skills/{name}` + 现有 markdown 渲染),「我的」每项带删除(二次确认 → `DELETE /v1/skills/{name}`,只删 user 源 + 防穿越);覆盖标 `已覆盖平台同名`,`load_errors` 提示未加载的。创建/改/fork 仍走对话。新 `web/static/js/skills.js`(零构建 ES module,main.js import + Esc 栈接入);`/v1/skills` 已带 source/overrides/load_errors。**纯查看 + 删除,不在 UI 做创建/编辑**(编辑天然对话式)。 - **用户私有 skill(每用户 `.skills/`,可从零写或 fork 内置再改)**:`SkillRegistry` 从单目录改**多来源**(`SkillSource` 列表:内置 `ROOT/skills` + 用户 `user_root/.skills`),后扫同名覆盖先扫 → **user wins**;覆盖关系记进 `user_overrides`,discovery 显式标 `[你的·已覆盖内置]`(不静默)。`Skill` 加 `source` 字段;`from_dir` 区分"无 SKILL.md(静默跳过)"与"有但格式错(抛 `SkillLoadError`)",`_scan` 捕获用户来源的错收进 `load_errors`、注入 system prompt 提示用户修(一个坏 skill 不再崩整次扫描)。容器路径改写从 LoadSkillTool 下沉到 registry(`container_dir` 按 `source` 给 `/sandbox/skills` 或 `/workspace/.skills`),LoadSkillTool 去掉 `container_skills_dir` 参数。**关键判断**:写 skill 用 host-side typed tool(`save_skill`/`fork_skill`,`tools/skill_authoring.py`)而非 fs/shell —— 因 fs 的 base_dir 锚 cwd(host)/ 容器 wd(docker),都够不到 `user_root/.skills`,跨 backend 不可靠;host-side 工具知道 user_root 一个落点两模式通吃(与 seedream/DocumentDownload 一致范式)。`save_skill` 写时校验 frontmatter(名合法 / YAML 合法 / 有 description / name 一致),`fork_skill` copytree 整目录(带脚本)+ 自动把 frontmatter name 对齐新名(否则 fork ppt 仍叫 ppt 会反覆盖内置)。`.skills` 是 dotfile(文件面板隐藏,与 `.memory` 一致;`validate_task_name` 已禁 `.` 起头 working_dir,天然不撞)。`/v1/skills` 带上用户 skill + `source`/`overrides_builtin`/`load_errors`。新增 `skill-creator` 引导 skill。+`test_user_skills.py`(20 例)+ 改写 `test_load_skill.py`。性能:多扫一目录,没 `.skills` 的用户一次 `exists()` 跳过;有 skill 仅每 run +1-3ms,不在热路径。 diff --git a/RUN.md b/RUN.md index 5fd693e..025f5f5 100644 --- a/RUN.md +++ b/RUN.md @@ -40,6 +40,10 @@ # ZCBOT_JWT_TTL_SECONDS=604800 # 可选:设了之后登录页右下角"+ 管理员添加用户"入口才工作(未设 → 接口返 503) # ZCBOT_ADMIN_TOKEN=<≥32 字符随机串,管理员发用户共享口令> + # 可选:web run 线程池大小(asyncio.to_thread 默认池),默 min(32, cpu+4)。每个活跃 + # 对话整个 run 期占 1 线程,active_runs 逼近此值即排队(看 journalctl 的 [stats] 行); + # 并发不够再调大(先确认是真并发高、而非单条 run 慢)。 + # ZCBOT_RUN_MAX_WORKERS=16 ``` > litellm 在 import 时副作用加载 .env;入口走 `main.py`,`.env` 自动生效。直跑 `python -c "from core.storage import ..."` 不经 litellm 链路时记得自己 `import litellm` 触发,或手动 `export ZCBOT_DB_URL=...`。 - **依赖**:`pip install -r requirements.txt`(已在 `.venv` 里;含 `bcrypt`)。 @@ -232,6 +236,7 @@ sudo systemctl daemon-reload sudo systemctl enable --now zcbot sudo systemctl status zcbot | head sudo journalctl -u zcbot -f # 实时日志 +sudo journalctl -u zcbot | grep '\[stats\]' # 并发/线程池采样:active_runs 逼近 max_workers 即排队 → 调 ZCBOT_RUN_MAX_WORKERS sudo systemctl restart zcbot # 重启:先 drain 在跑的 run 再换新版,新发消息期间 503(客户端自动重试) sudo systemctl stop zcbot ``` diff --git a/web/app.py b/web/app.py index 6b67984..4c5a391 100644 --- a/web/app.py +++ b/web/app.py @@ -16,12 +16,18 @@ import json import mimetypes import os import tempfile +from concurrent.futures import ThreadPoolExecutor from contextlib import asynccontextmanager from datetime import datetime as _dt from pathlib import Path from typing import Any, Optional from uuid import UUID, uuid4 +try: + import resource # Unix only;Windows dev 无此模块,RSS 监控自动降级跳过 +except ImportError: # pragma: no cover - Windows + resource = None + from fastapi import Depends, FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse @@ -555,7 +561,27 @@ def create_app() -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): - broker.bind_loop(asyncio.get_running_loop()) + loop = asyncio.get_running_loop() + broker.bind_loop(loop) + + # ── 接管默认线程池 executor(§8.4)────────────────────────────── + # run 走 asyncio.to_thread(用 loop 默认 executor);默认是匿名的,读不到大小、 + # 不可调。显式建一个同尺寸(复刻 Python 默认 min(32, cpu+4))接管,好处:① 监控 + # 能读 max_workers 判断有没有排队 ② 并发不够时改 ZCBOT_RUN_MAX_WORKERS 调大不改码。 + # 注:run 与 disk scan / pptx 转换 / reaper 共享此池(同原默认行为);真要隔离 + # 长任务再另开 run 专用池,那是后话。 + run_max_workers = int( + os.getenv("ZCBOT_RUN_MAX_WORKERS") or min(32, (os.cpu_count() or 1) + 4) + ) + run_executor = ThreadPoolExecutor( + max_workers=run_max_workers, thread_name_prefix="run" + ) + loop.set_default_executor(run_executor) + app.state.run_executor = run_executor + app.state.run_max_workers = run_max_workers + print(f"[startup] run executor: max_workers={run_max_workers} " + f"(override via ZCBOT_RUN_MAX_WORKERS)") + from core.agent_builder import load_config, resolve_workspace _cfg = load_config() @@ -615,6 +641,39 @@ def create_app() -> FastAPI: disk_scanner_task = asyncio.create_task(_disk_scanner(), name="disk-scanner") + # ── 并发/线程池监控(§8.4):周期采样,只在有负载/刷新峰值时打,空闲不刷屏 ── + # active_runs 来自 inflight(已提交未完成的 run,含排队中);逼近 max_workers 即 + # 线程池排队,新 run 的 SSE 会卡着不吐 token。查看:journalctl -u zcbot | grep '\[stats\]' + def _rss_peak_mb() -> Optional[float]: + if resource is None: + return None # Windows dev:降级,不打 rss + # Linux ru_maxrss 单位 KB,是峰值/high-water(单调不降 —— 看内存涨势够用) + return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 + + async def _stats_logger() -> None: + peak = 0 + while True: + try: + await asyncio.sleep(60) + active = len(app.state.inflight) + if active > peak: + peak = active + warn = " [WARN >= max_workers,已在排队]" if active >= run_max_workers else "" + print(f"[stats] new peak active_runs={active} " + f"max_workers={run_max_workers}{warn}") + if active > 0: + rss = _rss_peak_mb() + rss_s = f" rss_peak={rss:.0f}MB" if rss is not None else "" + print(f"[stats] active_runs={active} " + f"max_workers={run_max_workers} " + f"sse_subs={broker.total_subscribers()}{rss_s}") + except asyncio.CancelledError: + raise + except Exception as e: + print(f"[stats] error: {type(e).__name__}: {e}") + + stats_logger_task = asyncio.create_task(_stats_logger(), name="stats-logger") + # Sandbox pool(§7.5):仅当 ZCBOT_SANDBOX_BACKEND=docker 时启用。 # 启动钩子:① init_pool(创建 docker network + pool 实例)② shutdown_all 清 # 前驱孤儿(上次进程留下的 zcbot-sandbox-* 容器,内存 _last_active 为空, @@ -701,6 +760,11 @@ def create_app() -> FastAPI: await disk_scanner_task except (asyncio.CancelledError, Exception): pass + stats_logger_task.cancel() + try: + await stats_logger_task + except (asyncio.CancelledError, Exception): + pass if sandbox_reaper_task is not None: sandbox_reaper_task.cancel() try: @@ -715,6 +779,9 @@ def create_app() -> FastAPI: except Exception as e: print(f"[shutdown] sandbox shutdown_all error: {type(e).__name__}: {e}") + # drain 已 await inflight 收尾、run 线程退完;非阻塞关池(进程在退出,保守清理) + run_executor.shutdown(wait=False) + app = FastAPI( title="zcbot api", version="0.8", diff --git a/web/broker.py b/web/broker.py index 32fd692..03db605 100644 --- a/web/broker.py +++ b/web/broker.py @@ -97,6 +97,10 @@ class RunBroker: """供测试 / 监控用。""" return len(self._subs.get(task_id, set())) + def total_subscribers(self) -> int: + """全局 SSE 订阅者总数(所有 task 累加),供 §8.4 并发监控用。""" + return sum(len(s) for s in self._subs.values()) + def is_done(self, task_id: UUID) -> bool: return task_id in self._done