feat(ops): 并发/线程池轻量监控 + 接管默认 executor

已上生产后线程池排队此前无观测手段:
- lifespan 显式建 ThreadPoolExecutor(尺寸复刻 Python 默认 min(32,cpu+4),
  env ZCBOT_RUN_MAX_WORKERS 可调) + set_default_executor 接管 —— 行为不变
  (匿名池换显式同尺寸池),但 max_workers 可读、成调并发的旋钮
- _stats_logger 每 60s 采样:active_runs(含排队)逼近 max_workers 即排队,
  刷新峰值/有负载打 [stats],空闲静默不刷屏
- broker.total_subscribers() 全局 SSE 订阅数;RSS 用 stdlib resource
  (Unix 峰值;Windows dev 降级),零新依赖

不做监控界面:运维健康是少数标量日志够,业务分析走 SQL。DESIGN 8.4 记
取舍 + 界面阶梯;无感换版(gunicorn/Redis/蓝绿)成本不抵当前收益,搁置。

查看: journalctl -u zcbot | grep [stats]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-06-11 11:09:44 +08:00
parent f614046438
commit 15d69b3372
5 changed files with 95 additions and 1 deletions

View File

@ -553,6 +553,23 @@ create index on usage_events (model_profile, created_at);
**安全边界**:对上传任意 pptx 跑 LibreOffice(历史有宏/EPS CVE)→ `--convert-to` 默认不执行宏 + 宏安全 high + 禁网 + 仅处理鉴权用户自己 user_root 内文件。
**保真边界**:deck 用微软雅黑,Linux 上替换成 Noto Sans CJK 度量略差(可接受)。**Stage 2(未做)**:常驻 soffice listener 消冷启、deck 生成后 eager 预转、缩略图导航。
### 8.4 运维监控 / 无感更新(2026-06-11,监控 ✅ 已落地 / 无感换版 status=design)
**背景**:已上生产、真实用户在用。换版可用性从"nice to have"变真账;且当前并发到多少、线程池有没有排队**没有观测手段**。
**心智**
- **优雅 drain(已实现,2026-06-10)** —— SIGTERM 后拒新 run(503)、等在跑的 run 收尾再换版,不再标 `error`。这是**单实例能做到的上限**。剩余代价:几十秒 503 窗(dev SPA 退避重试已吸收)+ 换版时 SSE 重连丢正在吐的 delta。
- **真正先撞的瓶颈是线程池,不是别处**:run 走 `asyncio.to_thread`(`web/app.py:1382`)用默认 `ThreadPoolExecutor`(`min(32, cpu+4)`),每个活跃对话整个 run 期占 1 线程。4 核 ≈ 8 并发活跃对话就排队,第 9 个 SSE 卡着不吐 token。解这个只需调大 executor / 加信号量背压,**不引外部依赖**。
**落地排序(便宜→贵,到触发线才进下一级)**
1. **轻量监控(✅ 已落地 2026-06-11,详 PROGRESS)**:核心数据现成 —— `len(app.state.inflight)`=当前活跃 run 数(含排队)、`broker._subs`=SSE 订阅者、`resource.getrusage`=RSS(Unix,Windows 跳过)。**周期日志优先**(lifespan 起 task 每 60s 打 `[stats] active_runs=N max=M rss=X`),因为要的是历史峰值不是此刻快照;`/v1/stats` 端点(复用 `ZCBOT_ADMIN_TOKEN` 鉴权)为辅。前提:启动时显式建 executor + `set_default_executor` 接管,才能读 `max_workers` 且日后可调大。
2. **按数据决策**:`active_runs` 峰值不逼近线程池 → 并发非瓶颈,扩容彻底搁置;逼近 → 先调大 executor(改个数字),再观察。
3. **503 窗优化(零依赖)**:`--reload`(RUN.md §A)把窗从几十秒缩到 <1s
**不做监控界面(现在)**:运维健康(线程池/内存/SSE/容器)是少数标量,日志 + 偶尔 curl 够诊断,可视化是过度工程;业务分析(token/任务/成本)已落 DB(`usage_events`/`tasks` 三列),SQL 查即可。界面阶梯:日志 → `/v1/stats` JSON → (要趋势图)Prometheus+Grafana(不自写前端)→ (要给非技术人看报表)只读 dashboard。现在停在第一级。
**搁置(成本不抵当前收益)**:gunicorn 无感换版 / broker 外置 Redis / nginx 蓝绿双实例 —— 留到"单机线程池调到头仍不够"或"换版断流成真实投诉"再议(无感换版需先把 broker 外置共享,分析见 RUN.md §B)。
---
## 附录:DeepSeek V4 关键事实(2026-04-24)

View File

@ -23,6 +23,7 @@
### 2026-06-11
- **并发/线程池轻量监控 + 接管默认 executor(§8.4 落地第 1 步)**:已上生产后线程池排队此前无观测手段。lifespan 显式建 `ThreadPoolExecutor`(尺寸复刻 Python 默认 `min(32, cpu+4)`,env `ZCBOT_RUN_MAX_WORKERS` 可调大)+ `set_default_executor` 接管——run 走 `asyncio.to_thread` 即用它,这样既能读 `max_workers` 判断排队、也成了日后调并发的旋钮(**行为不变**,只从匿名默认池换成显式同尺寸池;run 与 disk scan/pptx/reaper 仍共享此池,同原默认)。加 `_stats_logger` 后台 task 每 60s 采样:`active_runs`(=`len(inflight)`,含排队中)逼近 `max_workers` 即排队、新 run 的 SSE 会卡着不吐 token;**刷新峰值**时打 `[stats] new peak active_runs=N max_workers=M`(≥max_workers 带 `[WARN 已在排队]`),**有负载**时打 `[stats] active_runs=.. max_workers=.. sse_subs=.. rss_peak=..MB`,**空闲静默不刷屏**。RSS 用 stdlib `resource`(Unix 峰值/high-water;Windows dev 降级跳过),零新依赖;新 `broker.total_subscribers()` 给全局 SSE 订阅数。查看:`journalctl -u zcbot | grep '\[stats\]'`。**不做监控界面**(运维健康是少数标量、日志够诊断;业务分析数据已落 DB 走 SQL)——界面阶梯见 DESIGN §8.4。
- **dev SPA「技能」查看 modal(左侧 rail 底部入口)**:因 `.skills` 在文件面板隐藏,加左侧 rail 底部「我的资源」分组(`#rail-resources`,留位给后续「记忆」)+「技能」按钮 → 弹 modal 分「平台 skill / 我的 skill」两组列表,点任一项展开**完整 SKILL.md**(`GET /v1/skills/{name}` + 现有 markdown 渲染),「我的」每项带删除(二次确认 → `DELETE /v1/skills/{name}`,只删 user 源 + 防穿越);覆盖标 `已覆盖平台同名`,`load_errors` 提示未加载的。创建/改/fork 仍走对话。新 `web/static/js/skills.js`(零构建 ES module,main.js import + Esc 栈接入);`/v1/skills` 已带 source/overrides/load_errors。**纯查看 + 删除,不在 UI 做创建/编辑**(编辑天然对话式)。
- **用户私有 skill(每用户 `.skills/`,可从零写或 fork 内置再改)**:`SkillRegistry` 从单目录改**多来源**(`SkillSource` 列表:内置 `ROOT/skills` + 用户 `user_root/.skills`),后扫同名覆盖先扫 → **user wins**;覆盖关系记进 `user_overrides`,discovery 显式标 `[你的·已覆盖内置]`(不静默)。`Skill` 加 `source` 字段;`from_dir` 区分"无 SKILL.md(静默跳过)"与"有但格式错(抛 `SkillLoadError`)",`_scan` 捕获用户来源的错收进 `load_errors`、注入 system prompt 提示用户修(一个坏 skill 不再崩整次扫描)。容器路径改写从 LoadSkillTool 下沉到 registry(`container_dir` 按 `source``/sandbox/skills``/workspace/.skills`),LoadSkillTool 去掉 `container_skills_dir` 参数。**关键判断**:写 skill 用 host-side typed tool(`save_skill`/`fork_skill`,`tools/skill_authoring.py`)而非 fs/shell —— 因 fs 的 base_dir 锚 cwd(host)/ 容器 wd(docker),都够不到 `user_root/.skills`,跨 backend 不可靠;host-side 工具知道 user_root 一个落点两模式通吃(与 seedream/DocumentDownload 一致范式)。`save_skill` 写时校验 frontmatter(名合法 / YAML 合法 / 有 description / name 一致),`fork_skill` copytree 整目录(带脚本)+ 自动把 frontmatter name 对齐新名(否则 fork ppt 仍叫 ppt 会反覆盖内置)。`.skills` 是 dotfile(文件面板隐藏,与 `.memory` 一致;`validate_task_name` 已禁 `.` 起头 working_dir,天然不撞)。`/v1/skills` 带上用户 skill + `source`/`overrides_builtin`/`load_errors`。新增 `skill-creator` 引导 skill。+`test_user_skills.py`(20 例)+ 改写 `test_load_skill.py`。性能:多扫一目录,没 `.skills` 的用户一次 `exists()` 跳过;有 skill 仅每 run +1-3ms,不在热路径。

5
RUN.md
View File

@ -40,6 +40,10 @@
# ZCBOT_JWT_TTL_SECONDS=604800
# 可选:设了之后登录页右下角"+ 管理员添加用户"入口才工作(未设 → 接口返 503)
# ZCBOT_ADMIN_TOKEN=<≥32 字符随机串,管理员发用户共享口令>
# 可选:web run 线程池大小(asyncio.to_thread 默认池),默 min(32, cpu+4)。每个活跃
# 对话整个 run 期占 1 线程,active_runs 逼近此值即排队(看 journalctl 的 [stats] 行);
# 并发不够再调大(先确认是真并发高、而非单条 run 慢)。
# ZCBOT_RUN_MAX_WORKERS=16
```
> litellm 在 import 时副作用加载 .env;入口走 `main.py`,`.env` 自动生效。直跑 `python -c "from core.storage import ..."` 不经 litellm 链路时记得自己 `import litellm` 触发,或手动 `export ZCBOT_DB_URL=...`
- **依赖**:`pip install -r requirements.txt`(已在 `.venv` 里;含 `bcrypt`)。
@ -232,6 +236,7 @@ sudo systemctl daemon-reload
sudo systemctl enable --now zcbot
sudo systemctl status zcbot | head
sudo journalctl -u zcbot -f # 实时日志
sudo journalctl -u zcbot | grep '\[stats\]' # 并发/线程池采样:active_runs 逼近 max_workers 即排队 → 调 ZCBOT_RUN_MAX_WORKERS
sudo systemctl restart zcbot # 重启:先 drain 在跑的 run 再换新版,新发消息期间 503(客户端自动重试)
sudo systemctl stop zcbot
```

View File

@ -16,12 +16,18 @@ import json
import mimetypes
import os
import tempfile
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from datetime import datetime as _dt
from pathlib import Path
from typing import Any, Optional
from uuid import UUID, uuid4
try:
import resource # Unix only;Windows dev 无此模块,RSS 监控自动降级跳过
except ImportError: # pragma: no cover - Windows
resource = None
from fastapi import Depends, FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
@ -555,7 +561,27 @@ def create_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
broker.bind_loop(asyncio.get_running_loop())
loop = asyncio.get_running_loop()
broker.bind_loop(loop)
# ── 接管默认线程池 executor(§8.4)──────────────────────────────
# run 走 asyncio.to_thread(用 loop 默认 executor);默认是匿名的,读不到大小、
# 不可调。显式建一个同尺寸(复刻 Python 默认 min(32, cpu+4))接管,好处:① 监控
# 能读 max_workers 判断有没有排队 ② 并发不够时改 ZCBOT_RUN_MAX_WORKERS 调大不改码。
# 注:run 与 disk scan / pptx 转换 / reaper 共享此池(同原默认行为);真要隔离
# 长任务再另开 run 专用池,那是后话。
run_max_workers = int(
os.getenv("ZCBOT_RUN_MAX_WORKERS") or min(32, (os.cpu_count() or 1) + 4)
)
run_executor = ThreadPoolExecutor(
max_workers=run_max_workers, thread_name_prefix="run"
)
loop.set_default_executor(run_executor)
app.state.run_executor = run_executor
app.state.run_max_workers = run_max_workers
print(f"[startup] run executor: max_workers={run_max_workers} "
f"(override via ZCBOT_RUN_MAX_WORKERS)")
from core.agent_builder import load_config, resolve_workspace
_cfg = load_config()
@ -615,6 +641,39 @@ def create_app() -> FastAPI:
disk_scanner_task = asyncio.create_task(_disk_scanner(), name="disk-scanner")
# ── 并发/线程池监控(§8.4):周期采样,只在有负载/刷新峰值时打,空闲不刷屏 ──
# active_runs 来自 inflight(已提交未完成的 run,含排队中);逼近 max_workers 即
# 线程池排队,新 run 的 SSE 会卡着不吐 token。查看:journalctl -u zcbot | grep '\[stats\]'
def _rss_peak_mb() -> Optional[float]:
if resource is None:
return None # Windows dev:降级,不打 rss
# Linux ru_maxrss 单位 KB,是峰值/high-water(单调不降 —— 看内存涨势够用)
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024
async def _stats_logger() -> None:
peak = 0
while True:
try:
await asyncio.sleep(60)
active = len(app.state.inflight)
if active > peak:
peak = active
warn = " [WARN >= max_workers,已在排队]" if active >= run_max_workers else ""
print(f"[stats] new peak active_runs={active} "
f"max_workers={run_max_workers}{warn}")
if active > 0:
rss = _rss_peak_mb()
rss_s = f" rss_peak={rss:.0f}MB" if rss is not None else ""
print(f"[stats] active_runs={active} "
f"max_workers={run_max_workers} "
f"sse_subs={broker.total_subscribers()}{rss_s}")
except asyncio.CancelledError:
raise
except Exception as e:
print(f"[stats] error: {type(e).__name__}: {e}")
stats_logger_task = asyncio.create_task(_stats_logger(), name="stats-logger")
# Sandbox pool(§7.5):仅当 ZCBOT_SANDBOX_BACKEND=docker 时启用。
# 启动钩子:① init_pool(创建 docker network + pool 实例)② shutdown_all 清
# 前驱孤儿(上次进程留下的 zcbot-sandbox-* 容器,内存 _last_active 为空,
@ -701,6 +760,11 @@ def create_app() -> FastAPI:
await disk_scanner_task
except (asyncio.CancelledError, Exception):
pass
stats_logger_task.cancel()
try:
await stats_logger_task
except (asyncio.CancelledError, Exception):
pass
if sandbox_reaper_task is not None:
sandbox_reaper_task.cancel()
try:
@ -715,6 +779,9 @@ def create_app() -> FastAPI:
except Exception as e:
print(f"[shutdown] sandbox shutdown_all error: {type(e).__name__}: {e}")
# drain 已 await inflight 收尾、run 线程退完;非阻塞关池(进程在退出,保守清理)
run_executor.shutdown(wait=False)
app = FastAPI(
title="zcbot api",
version="0.8",

View File

@ -97,6 +97,10 @@ class RunBroker:
"""供测试 / 监控用。"""
return len(self._subs.get(task_id, set()))
def total_subscribers(self) -> int:
"""全局 SSE 订阅者总数(所有 task 累加),供 §8.4 并发监控用。"""
return sum(len(s) for s in self._subs.values())
def is_done(self, task_id: UUID) -> bool:
return task_id in self._done