zcbot/web/admin.py

272 lines
10 KiB
Python

"""管理后台端点(admin-only):/v1/admin/*。
`register_admin_routes(app, require_admin)` 在 create_app 内调用,把管理路由挂上去,
整组走 `Depends(require_admin)`(JWT 有效 + users.role=='admin',否则 403)。
第一版只有总览(监控指标):单个 `GET /v1/admin/overview` 一次返回全部 section
(runtime / tasks / users / usage / storage),前端定时轮询这一个端点即可。runtime 读
app.state 内存(轻);其余走 DB 聚合(GROUP BY,无 N+1)。指标只读、不落库。
后续管理动作(建用户 / 改角色 / 配置磁盘配额等)在此模块续挂 /v1/admin/users、
/v1/admin/config 等,前端 admin.html 加 tab。
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID
from fastapi import Depends, FastAPI
from sqlalchemy import BigInteger, Numeric, cast, func, select
from core.storage import session_scope
from core.storage.models import Task, UsageEvent, User, UserDiskUsage
from .broker import broker
try:
import resource # Unix only;Windows dev 无此模块,RSS 监控降级跳过
except ImportError: # pragma: no cover - Windows
resource = None
def _rss_peak_mb():
"""进程峰值 RSS(MB)。Linux 走 ru_maxrss(KB);Windows dev 返 None(降级)。"""
if resource is None:
return None
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024
def _runtime_section(app: FastAPI) -> dict:
"""实时运行态:从 app.state 读内存,无 DB。
active_runs 逼近 max_workers 即线程池排队(新 run 的 SSE 会卡)—— 前端据此变色。
"""
inflight = getattr(app.state, "inflight", None)
active = len(inflight) if inflight is not None else 0
max_workers = getattr(app.state, "run_max_workers", None)
return {
"active_runs": active,
"max_workers": max_workers,
"sse_subs": broker.total_subscribers(),
"rss_peak_mb": _rss_peak_mb(),
}
def _tasks_section(s: Any) -> dict:
"""task 计数:总数 + 按 status + 按 run_status 分布。"""
total = s.execute(select(func.count()).select_from(Task)).scalar_one()
by_status = {
st: n
for st, n in s.execute(
select(Task.status, func.count()).group_by(Task.status)
).all()
}
by_run_status = {
st: n
for st, n in s.execute(
select(Task.run_status, func.count()).group_by(Task.run_status)
).all()
}
return {"total": total, "by_status": by_status, "by_run_status": by_run_status}
def _users_section(s: Any, cutoff_7d: datetime) -> dict:
"""用户:总数 + 近 7d 有用量事件的活跃用户数。"""
total = s.execute(select(func.count()).select_from(User)).scalar_one()
active_7d = s.execute(
select(func.count(func.distinct(UsageEvent.user_id))).where(
UsageEvent.created_at >= cutoff_7d
)
).scalar_one()
return {"total": total, "active_7d": active_7d}
def _usage_section(s: Any, cutoff_7d: datetime) -> dict:
"""token / 成本聚合:全局合计 + 近 7d 按天 + 按模型 + top 用户。
chat token 取自 usage_events.units JSONB(tokens_in/out/cache_hit_tokens);cost_cny
全 kind 合计。与 _usage_aggregates 同源,缓存命中率分母一致(恒 ≤100%)。
"""
chat = UsageEvent.kind == "chat"
tin = cast(UsageEvent.units["tokens_in"].astext, BigInteger)
tout = cast(UsageEvent.units["tokens_out"].astext, BigInteger)
hit = cast(UsageEvent.units["cache_hit_tokens"].astext, BigInteger)
# 全局合计(all-time)
g = s.execute(
select(
func.coalesce(func.sum(UsageEvent.cost_cny), 0),
func.coalesce(func.sum(tin).filter(chat), 0),
func.coalesce(func.sum(tout).filter(chat), 0),
func.coalesce(func.sum(hit).filter(chat), 0),
func.count(),
)
).one()
total = {
"cost_cny": float(g[0] or 0),
"tokens_in": int(g[1] or 0),
"tokens_out": int(g[2] or 0),
"tokens_cache_hit": int(g[3] or 0),
"n_events": int(g[4] or 0),
}
# 近 7d 按天(date 截断;前端画成条/数字均可)
day = func.date(UsageEvent.created_at)
by_day = [
{
"date": str(d),
"cost_cny": float(c or 0),
"tokens_in": int(ti or 0),
"tokens_out": int(to or 0),
}
for d, c, ti, to in s.execute(
select(
day,
func.coalesce(func.sum(UsageEvent.cost_cny), 0),
func.coalesce(func.sum(tin).filter(chat), 0),
func.coalesce(func.sum(tout).filter(chat), 0),
)
.where(UsageEvent.created_at >= cutoff_7d)
.group_by(day)
.order_by(day)
).all()
]
# 按模型(all-time;cost desc)
by_model = [
{
"model_profile": mp,
"cost_cny": float(c or 0),
"tokens_in": int(ti or 0),
"tokens_out": int(to or 0),
"n_events": int(n or 0),
}
for mp, c, ti, to, n in s.execute(
select(
UsageEvent.model_profile,
func.coalesce(func.sum(UsageEvent.cost_cny), 0),
func.coalesce(func.sum(tin).filter(chat), 0),
func.coalesce(func.sum(tout).filter(chat), 0),
func.count(),
)
.group_by(UsageEvent.model_profile)
.order_by(func.coalesce(func.sum(UsageEvent.cost_cny), 0).desc())
).all()
]
return {
"total": total,
"by_day_7d": by_day,
"by_model": by_model,
}
def _user_usage_page(s: Any, page: int, page_size: int) -> dict:
"""分页的各用户 token 用量(cost desc),含零用量用户(LEFT JOIN users)。
`各用户` 取自 users 全表 LEFT JOIN usage_events,故没产生过用量的用户也出现(0)。
cost 全 kind 合计;token/cache_hit 仅 chat(与 _usage_aggregates / total 同源)。
排序 cost desc + user_id 兜底(稳定分页,避免大量 0 值并列时跨页错位)。
返回 {page, page_size, total_users, rows:[...]}。
"""
chat = UsageEvent.kind == "chat"
tin = cast(UsageEvent.units["tokens_in"].astext, BigInteger)
tout = cast(UsageEvent.units["tokens_out"].astext, BigInteger)
hit = cast(UsageEvent.units["cache_hit_tokens"].astext, BigInteger)
cost_sum = func.coalesce(func.sum(UsageEvent.cost_cny), 0)
total_users = s.execute(select(func.count()).select_from(User)).scalar_one()
rows = [
{
"user_id": str(uid),
"email": email or "",
"role": role or "user",
"cost_cny": float(c or 0),
"tokens_in": int(ti or 0),
"tokens_out": int(to or 0),
"tokens_cache_hit": int(h or 0),
"n_events": int(n or 0),
}
for uid, email, role, c, ti, to, h, n in s.execute(
select(
User.user_id,
User.email,
User.role,
cost_sum,
func.coalesce(func.sum(tin).filter(chat), 0),
func.coalesce(func.sum(tout).filter(chat), 0),
func.coalesce(func.sum(hit).filter(chat), 0),
func.count(UsageEvent.event_id),
)
.join(UsageEvent, UsageEvent.user_id == User.user_id, isouter=True)
.group_by(User.user_id, User.email, User.role)
.order_by(cost_sum.desc(), User.user_id)
.limit(page_size)
.offset(page * page_size)
).all()
]
return {"page": page, "page_size": page_size, "total_users": total_users, "rows": rows}
def _storage_section(s: Any) -> dict:
"""各用户磁盘用量(user_disk_usage join email),bytes desc;附 per-user 配额。"""
from core.agent_builder import load_config
from core.storage.disk_quota import parse_bytes
quota = parse_bytes((load_config().get("quotas") or {}).get("disk_bytes_per_user"))
rows = [
{
"user_id": str(uid),
"email": email or "",
"bytes_used": int(b or 0),
"file_count": int(fc or 0),
"scanned_at": scanned.isoformat() if scanned else None,
}
for uid, email, b, fc, scanned in s.execute(
select(
UserDiskUsage.user_id,
User.email,
UserDiskUsage.bytes_used,
UserDiskUsage.file_count,
UserDiskUsage.scanned_at,
)
.join(User, User.user_id == UserDiskUsage.user_id, isouter=True)
.order_by(UserDiskUsage.bytes_used.desc())
).all()
]
return {"quota_bytes": quota, "users": rows}
def register_admin_routes(app: FastAPI, require_admin) -> None:
"""把 /v1/admin/* 管理路由挂到 app 上,整组走 require_admin gate。"""
@app.get("/v1/admin/overview", tags=["admin"])
def admin_overview(user_id: UUID = Depends(require_admin)):
"""管理总览:一次返回全部 section,供 /static/admin.html 轮询。admin-only。"""
now = datetime.now(timezone.utc)
cutoff_7d = now - timedelta(days=7)
with session_scope() as s:
return {
"generated_at": now.isoformat(),
"runtime": _runtime_section(app),
"tasks": _tasks_section(s),
"users": _users_section(s, cutoff_7d),
"usage": _usage_section(s, cutoff_7d),
"storage": _storage_section(s),
}
@app.get("/v1/admin/usage/users", tags=["admin"])
def admin_usage_users(
page: int = 0, page_size: int = 20, user_id: UUID = Depends(require_admin)
):
"""各用户 token 用量(分页,cost desc)。admin-only。
page 0-based;page_size 夹到 [1,100]。含零用量用户(全表 LEFT JOIN)。
前端独立于 overview 轮询管理本表分页;总用量在 overview.usage.total。
"""
page = max(0, page)
page_size = min(100, max(1, page_size))
with session_scope() as s:
return _user_usage_page(s, page, page_size)