zcbot/web/admin.py

429 lines
17 KiB
Python

"""管理后台端点(admin-only):/v1/admin/*。
`register_admin_routes(app, require_admin)` 在 create_app 内调用,把管理路由挂上去,
整组走 `Depends(require_admin)`(JWT 有效 + users.role=='admin',否则 403)。
第一版只有总览(监控指标):单个 `GET /v1/admin/overview` 一次返回全部 section
(runtime / tasks / users / usage / storage),前端定时轮询这一个端点即可。runtime 读
app.state 内存(轻);其余走 DB 聚合(GROUP BY,无 N+1)。指标只读、不落库。
后续管理动作(建用户 / 改角色 / 配置磁盘配额等)在此模块续挂 /v1/admin/users、
/v1/admin/config 等,前端 admin.html 加 tab。
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID
from fastapi import Depends, FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import BigInteger, and_, cast, func, select, update
from core.storage import session_scope
from core.storage.models import Task, UsageEvent, User, UserDiskUsage
from .broker import broker
try:
import resource # Unix only;Windows dev 无此模块,RSS 监控降级跳过
except ImportError: # pragma: no cover - Windows
resource = None
def _rss_peak_mb():
"""进程峰值 RSS(MB)。Linux 走 ru_maxrss(KB);Windows dev 返 None(降级)。"""
if resource is None:
return None
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024
def _range_cutoff(now: datetime, range_key: str):
"""时间筛选 → cutoff datetime(或 None=全部)。range_key: all / 7d / 30d。"""
if range_key == "7d":
return now - timedelta(days=7)
if range_key == "30d":
return now - timedelta(days=30)
return None # all / 未知 → 不筛
def _runtime_section(app: FastAPI) -> dict:
"""实时运行态:从 app.state 读内存,无 DB。
active_runs 逼近 max_workers 即线程池排队(新 run 的 SSE 会卡)—— 前端据此变色。
"""
inflight = getattr(app.state, "inflight", None)
active = len(inflight) if inflight is not None else 0
max_workers = getattr(app.state, "run_max_workers", None)
return {
"active_runs": active,
"max_workers": max_workers,
"sse_subs": broker.total_subscribers(),
"rss_peak_mb": _rss_peak_mb(),
}
def _tasks_section(s: Any) -> dict:
"""task 计数:总数 + 按 status + 按 run_status 分布。"""
total = s.execute(select(func.count()).select_from(Task)).scalar_one()
by_status = {
st: n
for st, n in s.execute(
select(Task.status, func.count()).group_by(Task.status)
).all()
}
by_run_status = {
st: n
for st, n in s.execute(
select(Task.run_status, func.count()).group_by(Task.run_status)
).all()
}
return {"total": total, "by_status": by_status, "by_run_status": by_run_status}
def _users_section(s: Any, cutoff_7d: datetime) -> dict:
"""用户:总数 + 近 7d 有用量事件的活跃用户数。"""
total = s.execute(select(func.count()).select_from(User)).scalar_one()
active_7d = s.execute(
select(func.count(func.distinct(UsageEvent.user_id))).where(
UsageEvent.created_at >= cutoff_7d
)
).scalar_one()
return {"total": total, "active_7d": active_7d}
def _usage_section(s: Any, cutoff_7d: datetime) -> dict:
"""token / 成本聚合(放进 overview,固定形态):全局合计(all-time)+ 近 7d 按天趋势。
按模型 / 各用户用量已拆成独立带筛选排序的端点(_models_usage / _user_usage_page),
不在此 bundle。chat token 取自 usage_events.units JSONB;cost_cny 全 kind 合计。
"""
chat = UsageEvent.kind == "chat"
tin = cast(UsageEvent.units["tokens_in"].astext, BigInteger)
tout = cast(UsageEvent.units["tokens_out"].astext, BigInteger)
hit = cast(UsageEvent.units["cache_hit_tokens"].astext, BigInteger)
# 全局合计(all-time)
g = s.execute(
select(
func.coalesce(func.sum(UsageEvent.cost_cny), 0),
func.coalesce(func.sum(tin).filter(chat), 0),
func.coalesce(func.sum(tout).filter(chat), 0),
func.coalesce(func.sum(hit).filter(chat), 0),
func.count(),
)
).one()
total = {
"cost_cny": float(g[0] or 0),
"tokens_in": int(g[1] or 0),
"tokens_out": int(g[2] or 0),
"tokens_cache_hit": int(g[3] or 0),
"n_events": int(g[4] or 0),
}
# 近 7d 按天(date 截断;前端画成条/数字均可);按日期倒序 —— 最新一天在最上面
day = func.date(UsageEvent.created_at)
by_day = [
{
"date": str(d),
"cost_cny": float(c or 0),
"tokens_in": int(ti or 0),
"tokens_out": int(to or 0),
}
for d, c, ti, to in s.execute(
select(
day,
func.coalesce(func.sum(UsageEvent.cost_cny), 0),
func.coalesce(func.sum(tin).filter(chat), 0),
func.coalesce(func.sum(tout).filter(chat), 0),
)
.where(UsageEvent.created_at >= cutoff_7d)
.group_by(day)
.order_by(day.desc())
).all()
]
return {"total": total, "by_day_7d": by_day}
def _models_usage(s: Any, cutoff, sort: str) -> list:
"""按模型用量(支持时间筛选 + 排序)。sort: cost(按成本)/ tokens(按用量=输入+输出)。
cutoff=None 即全部;cost 全 kind 合计,token 仅 chat。模型集合从 usage_events 现取
(无"全模型"基线),故时间条件直接进 WHERE。
"""
chat = UsageEvent.kind == "chat"
tin = cast(UsageEvent.units["tokens_in"].astext, BigInteger)
tout = cast(UsageEvent.units["tokens_out"].astext, BigInteger)
cost_sum = func.coalesce(func.sum(UsageEvent.cost_cny), 0)
tin_sum = func.coalesce(func.sum(tin).filter(chat), 0)
tout_sum = func.coalesce(func.sum(tout).filter(chat), 0)
order = (tin_sum + tout_sum).desc() if sort == "tokens" else cost_sum.desc()
q = select(
UsageEvent.model_profile, cost_sum, tin_sum, tout_sum, func.count(),
)
if cutoff is not None:
q = q.where(UsageEvent.created_at >= cutoff)
q = q.group_by(UsageEvent.model_profile).order_by(order, UsageEvent.model_profile)
return [
{
"model_profile": mp,
"cost_cny": float(c or 0),
"tokens_in": int(ti or 0),
"tokens_out": int(to or 0),
"n_events": int(n or 0),
}
for mp, c, ti, to, n in s.execute(q).all()
]
def _user_usage_page(s: Any, page: int, page_size: int, cutoff, sort: str) -> dict:
"""分页的各用户 token 用量(时间筛选 + 排序),含零用量用户(LEFT JOIN users)。
`各用户` 取自 users 全表 LEFT JOIN usage_events,故没产生过用量的用户也出现(0);
时间筛选放 JOIN ON(非 WHERE),否则带 cutoff 时会把零用量用户挤掉。
sort: cost(按成本)/ tokens(按用量=输入+输出);+ user_id 兜底稳定分页。
cost 全 kind 合计;token/cache_hit 仅 chat。返回 {page, page_size, total_users, rows}。
"""
chat = UsageEvent.kind == "chat"
tin = cast(UsageEvent.units["tokens_in"].astext, BigInteger)
tout = cast(UsageEvent.units["tokens_out"].astext, BigInteger)
hit = cast(UsageEvent.units["cache_hit_tokens"].astext, BigInteger)
cost_sum = func.coalesce(func.sum(UsageEvent.cost_cny), 0)
tin_sum = func.coalesce(func.sum(tin).filter(chat), 0)
tout_sum = func.coalesce(func.sum(tout).filter(chat), 0)
order = (tin_sum + tout_sum).desc() if sort == "tokens" else cost_sum.desc()
join_cond = UsageEvent.user_id == User.user_id
if cutoff is not None:
join_cond = and_(join_cond, UsageEvent.created_at >= cutoff)
# 最近使用时间:取全量(不随 range 筛选变),否则 7d/30d 会把更早的真实 last-used 藏掉。
last_used_sq = (
select(func.max(UsageEvent.created_at))
.where(UsageEvent.user_id == User.user_id)
.correlate(User)
.scalar_subquery()
)
total_users = s.execute(select(func.count()).select_from(User)).scalar_one()
rows = [
{
"user_id": str(uid),
"email": email or "",
"name": name or "",
"user_name": uname or "",
"role": role or "user",
"plan": plan or "", # 模型档位(空 → default 档),admin UI 内联下拉用
"cost_cny": float(c or 0),
"tokens_in": int(ti or 0),
"tokens_out": int(to or 0),
"tokens_cache_hit": int(h or 0),
"n_events": int(n or 0),
"last_used_at": last_used.isoformat() if last_used else None,
}
for uid, email, name, uname, role, plan, c, ti, to, h, n, last_used in s.execute(
select(
User.user_id, User.email, User.name, User.user_name, User.role, User.plan,
cost_sum, tin_sum, tout_sum,
func.coalesce(func.sum(hit).filter(chat), 0),
func.count(UsageEvent.event_id),
last_used_sq.label("last_used_at"),
)
.join(UsageEvent, join_cond, isouter=True)
.group_by(User.user_id, User.email, User.name, User.user_name, User.role, User.plan)
.order_by(order, User.user_id)
.limit(page_size)
.offset(page * page_size)
).all()
]
return {"page": page, "page_size": page_size, "total_users": total_users, "rows": rows}
def _storage_page(s: Any, page: int, page_size: int) -> dict:
"""分页的各用户磁盘用量(bytes desc + user_id 兜底);附 per-user 配额。
数据源 user_disk_usage(后台扫描快照,只含扫过的用户);total 为该表行数。
"""
from core.agent_builder import load_config
from core.storage.disk_quota import parse_bytes
quota = parse_bytes((load_config().get("quotas") or {}).get("disk_bytes_per_user"))
total = s.execute(select(func.count()).select_from(UserDiskUsage)).scalar_one()
rows = [
{
"user_id": str(uid),
"email": email or "",
"name": name or "",
"user_name": uname or "",
"bytes_used": int(b or 0),
"file_count": int(fc or 0),
"scanned_at": scanned.isoformat() if scanned else None,
}
for uid, email, name, uname, b, fc, scanned in s.execute(
select(
UserDiskUsage.user_id,
User.email,
User.name,
User.user_name,
UserDiskUsage.bytes_used,
UserDiskUsage.file_count,
UserDiskUsage.scanned_at,
)
.join(User, User.user_id == UserDiskUsage.user_id, isouter=True)
.order_by(UserDiskUsage.bytes_used.desc(), UserDiskUsage.user_id)
.limit(page_size)
.offset(page * page_size)
).all()
]
return {
"page": page, "page_size": page_size, "total": total,
"quota_bytes": quota, "rows": rows,
}
def _model_catalog() -> list[dict]:
"""全部可门控模型清单 [{id, display_name, kind}]:文本(config/models/*.yaml)+
图/视频(config/media/doubao.yaml)。给档位编辑 UI 画图例(id → 显示名)。
"""
from core.capabilities import ModelCapabilities
from core.paths import ROOT
import yaml as _yaml
out: list[dict] = []
models_dir = ROOT / "config" / "models"
if models_dir.is_dir():
for path in sorted(models_dir.glob("*.yaml")):
try:
data = _yaml.safe_load(path.read_text(encoding="utf-8")) or {}
except Exception:
continue
family = data.get("family") or path.stem
for variant in (data.get("variants") or {}).keys():
profile = f"{family}.{variant}"
try:
caps = ModelCapabilities.load(profile, models_dir)
except (ValueError, FileNotFoundError):
continue
out.append({"id": profile, "display_name": caps.display_name or profile, "kind": "text"})
media = ROOT / "config" / "media" / "doubao.yaml"
if media.exists():
try:
mdata = _yaml.safe_load(media.read_text(encoding="utf-8")) or {}
except Exception:
mdata = {}
for kind in ("image", "video"):
for key, cfg in (mdata.get(kind) or {}).items():
if isinstance(cfg, dict):
out.append({"id": key, "display_name": cfg.get("display_name") or key, "kind": kind})
return out
class SetPlanRequest(BaseModel):
plan: str = "" # 档位名(config/agent.yaml model_tiers 的 key);空串 = 清空 → 落 default 档
def register_admin_routes(app: FastAPI, require_admin) -> None:
"""把 /v1/admin/* 管理路由挂到 app 上,整组走 require_admin gate。"""
@app.get("/v1/admin/overview", tags=["admin"])
def admin_overview(user_id: UUID = Depends(require_admin)):
"""管理总览(固定形态,供轮询):runtime/tasks/users/usage 总用量+近7d趋势。admin-only。
按模型 / 各用户用量 / 存储 是带筛选/分页的独立端点,不在此 bundle。
"""
now = datetime.now(timezone.utc)
cutoff_7d = now - timedelta(days=7)
with session_scope() as s:
return {
"generated_at": now.isoformat(),
"runtime": _runtime_section(app),
"tasks": _tasks_section(s),
"users": _users_section(s, cutoff_7d),
"usage": _usage_section(s, cutoff_7d),
}
@app.get("/v1/admin/usage/models", tags=["admin"])
def admin_usage_models(
range: str = "all", sort: str = "cost", user_id: UUID = Depends(require_admin)
):
"""按模型用量。range: all/7d/30d;sort: cost/tokens。admin-only。"""
now = datetime.now(timezone.utc)
with session_scope() as s:
return {
"range": range, "sort": sort,
"rows": _models_usage(s, _range_cutoff(now, range), sort),
}
@app.get("/v1/admin/usage/users", tags=["admin"])
def admin_usage_users(
page: int = 0, page_size: int = 20, range: str = "all", sort: str = "cost",
user_id: UUID = Depends(require_admin),
):
"""各用户 token 用量(分页 + 时间筛选 + 排序)。admin-only。
page 0-based;page_size 夹到 [1,100];range all/7d/30d;sort cost/tokens。
含零用量用户(全表 LEFT JOIN);总用量在 overview.usage.total。
"""
page = max(0, page)
page_size = min(100, max(1, page_size))
now = datetime.now(timezone.utc)
with session_scope() as s:
d = _user_usage_page(s, page, page_size, _range_cutoff(now, range), sort)
d["range"] = range
d["sort"] = sort
return d
@app.get("/v1/admin/storage/users", tags=["admin"])
def admin_storage_users(
page: int = 0, page_size: int = 20, user_id: UUID = Depends(require_admin)
):
"""各用户磁盘用量(分页,bytes desc)。admin-only。page 0-based;page_size [1,100]。"""
page = max(0, page)
page_size = min(100, max(1, page_size))
with session_scope() as s:
return _storage_page(s, page, page_size)
@app.get("/v1/admin/tiers", tags=["admin"])
def admin_tiers(user_id: UUID = Depends(require_admin)):
"""模型档位定义 + 全模型目录。admin-only。
UI:用户行的「档位」下拉用 tier 名;图例把每档 member id 映射成显示名。
default_tier 标出 plan 为空 / 未知时落的档。role=admin 始终全开(不在 tiers 里体现)。
"""
from core.model_access import DEFAULT_TIER
from core.agent_builder import load_config
tiers = load_config().get("model_tiers") or {}
return {
"tiers": tiers, # {name: [model_id, ...]}
"default_tier": DEFAULT_TIER,
"catalog": _model_catalog(), # [{id, display_name, kind}]
}
@app.patch("/v1/admin/users/{uid}/plan", tags=["admin"])
def admin_set_user_plan(
uid: str, body: SetPlanRequest, user_id: UUID = Depends(require_admin)
):
"""设置某用户的模型档位(写 users.plan)。admin-only。
plan 必须是 config/agent.yaml model_tiers 里存在的档位名;空串 = 清空(落 default 档)。
非法档位 → 400;用户不存在 → 404。
"""
from core.agent_builder import load_config
try:
target = UUID(uid)
except ValueError:
raise HTTPException(400, f"invalid user id: {uid!r}")
plan = (body.plan or "").strip()
tiers = load_config().get("model_tiers") or {}
if plan and plan not in tiers:
raise HTTPException(400, f"unknown tier {plan!r}; available: {sorted(tiers)}")
with session_scope() as s:
result = s.execute(
update(User).where(User.user_id == target).values(plan=plan or None)
)
if result.rowcount == 0:
raise HTTPException(404, f"user not found: {uid}")
return {"user_id": str(target), "plan": plan}