"""管理后台端点(admin-only):/v1/admin/*。 `register_admin_routes(app, require_admin)` 在 create_app 内调用,把管理路由挂上去, 整组走 `Depends(require_admin)`(JWT 有效 + users.role=='admin',否则 403)。 第一版只有总览(监控指标):单个 `GET /v1/admin/overview` 一次返回全部 section (runtime / tasks / users / usage / storage),前端定时轮询这一个端点即可。runtime 读 app.state 内存(轻);其余走 DB 聚合(GROUP BY,无 N+1)。指标只读、不落库。 后续管理动作(建用户 / 改角色 / 配置磁盘配额等)在此模块续挂 /v1/admin/users、 /v1/admin/config 等,前端 admin.html 加 tab。 """ from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Any from uuid import UUID from fastapi import Depends, FastAPI from sqlalchemy import BigInteger, and_, cast, func, select from core.storage import session_scope from core.storage.models import Task, UsageEvent, User, UserDiskUsage from .broker import broker try: import resource # Unix only;Windows dev 无此模块,RSS 监控降级跳过 except ImportError: # pragma: no cover - Windows resource = None def _rss_peak_mb(): """进程峰值 RSS(MB)。Linux 走 ru_maxrss(KB);Windows dev 返 None(降级)。""" if resource is None: return None return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 def _range_cutoff(now: datetime, range_key: str): """时间筛选 → cutoff datetime(或 None=全部)。range_key: all / 7d / 30d。""" if range_key == "7d": return now - timedelta(days=7) if range_key == "30d": return now - timedelta(days=30) return None # all / 未知 → 不筛 def _runtime_section(app: FastAPI) -> dict: """实时运行态:从 app.state 读内存,无 DB。 active_runs 逼近 max_workers 即线程池排队(新 run 的 SSE 会卡)—— 前端据此变色。 """ inflight = getattr(app.state, "inflight", None) active = len(inflight) if inflight is not None else 0 max_workers = getattr(app.state, "run_max_workers", None) return { "active_runs": active, "max_workers": max_workers, "sse_subs": broker.total_subscribers(), "rss_peak_mb": _rss_peak_mb(), } def _tasks_section(s: Any) -> dict: """task 计数:总数 + 按 status + 按 run_status 分布。""" total = s.execute(select(func.count()).select_from(Task)).scalar_one() by_status = { st: n for st, n in s.execute( select(Task.status, func.count()).group_by(Task.status) ).all() } by_run_status = { st: n for st, n in s.execute( select(Task.run_status, func.count()).group_by(Task.run_status) ).all() } return {"total": total, "by_status": by_status, "by_run_status": by_run_status} def _users_section(s: Any, cutoff_7d: datetime) -> dict: """用户:总数 + 近 7d 有用量事件的活跃用户数。""" total = s.execute(select(func.count()).select_from(User)).scalar_one() active_7d = s.execute( select(func.count(func.distinct(UsageEvent.user_id))).where( UsageEvent.created_at >= cutoff_7d ) ).scalar_one() return {"total": total, "active_7d": active_7d} def _usage_section(s: Any, cutoff_7d: datetime) -> dict: """token / 成本聚合(放进 overview,固定形态):全局合计(all-time)+ 近 7d 按天趋势。 按模型 / 各用户用量已拆成独立带筛选排序的端点(_models_usage / _user_usage_page), 不在此 bundle。chat token 取自 usage_events.units JSONB;cost_cny 全 kind 合计。 """ chat = UsageEvent.kind == "chat" tin = cast(UsageEvent.units["tokens_in"].astext, BigInteger) tout = cast(UsageEvent.units["tokens_out"].astext, BigInteger) hit = cast(UsageEvent.units["cache_hit_tokens"].astext, BigInteger) # 全局合计(all-time) g = s.execute( select( func.coalesce(func.sum(UsageEvent.cost_cny), 0), func.coalesce(func.sum(tin).filter(chat), 0), func.coalesce(func.sum(tout).filter(chat), 0), func.coalesce(func.sum(hit).filter(chat), 0), func.count(), ) ).one() total = { "cost_cny": float(g[0] or 0), "tokens_in": int(g[1] or 0), "tokens_out": int(g[2] or 0), "tokens_cache_hit": int(g[3] or 0), "n_events": int(g[4] or 0), } # 近 7d 按天(date 截断;前端画成条/数字均可);按日期倒序 —— 最新一天在最上面 day = func.date(UsageEvent.created_at) by_day = [ { "date": str(d), "cost_cny": float(c or 0), "tokens_in": int(ti or 0), "tokens_out": int(to or 0), } for d, c, ti, to in s.execute( select( day, func.coalesce(func.sum(UsageEvent.cost_cny), 0), func.coalesce(func.sum(tin).filter(chat), 0), func.coalesce(func.sum(tout).filter(chat), 0), ) .where(UsageEvent.created_at >= cutoff_7d) .group_by(day) .order_by(day.desc()) ).all() ] return {"total": total, "by_day_7d": by_day} def _models_usage(s: Any, cutoff, sort: str) -> list: """按模型用量(支持时间筛选 + 排序)。sort: cost(按成本)/ tokens(按用量=输入+输出)。 cutoff=None 即全部;cost 全 kind 合计,token 仅 chat。模型集合从 usage_events 现取 (无"全模型"基线),故时间条件直接进 WHERE。 """ chat = UsageEvent.kind == "chat" tin = cast(UsageEvent.units["tokens_in"].astext, BigInteger) tout = cast(UsageEvent.units["tokens_out"].astext, BigInteger) cost_sum = func.coalesce(func.sum(UsageEvent.cost_cny), 0) tin_sum = func.coalesce(func.sum(tin).filter(chat), 0) tout_sum = func.coalesce(func.sum(tout).filter(chat), 0) order = (tin_sum + tout_sum).desc() if sort == "tokens" else cost_sum.desc() q = select( UsageEvent.model_profile, cost_sum, tin_sum, tout_sum, func.count(), ) if cutoff is not None: q = q.where(UsageEvent.created_at >= cutoff) q = q.group_by(UsageEvent.model_profile).order_by(order, UsageEvent.model_profile) return [ { "model_profile": mp, "cost_cny": float(c or 0), "tokens_in": int(ti or 0), "tokens_out": int(to or 0), "n_events": int(n or 0), } for mp, c, ti, to, n in s.execute(q).all() ] def _user_usage_page(s: Any, page: int, page_size: int, cutoff, sort: str) -> dict: """分页的各用户 token 用量(时间筛选 + 排序),含零用量用户(LEFT JOIN users)。 `各用户` 取自 users 全表 LEFT JOIN usage_events,故没产生过用量的用户也出现(0); 时间筛选放 JOIN ON(非 WHERE),否则带 cutoff 时会把零用量用户挤掉。 sort: cost(按成本)/ tokens(按用量=输入+输出);+ user_id 兜底稳定分页。 cost 全 kind 合计;token/cache_hit 仅 chat。返回 {page, page_size, total_users, rows}。 """ chat = UsageEvent.kind == "chat" tin = cast(UsageEvent.units["tokens_in"].astext, BigInteger) tout = cast(UsageEvent.units["tokens_out"].astext, BigInteger) hit = cast(UsageEvent.units["cache_hit_tokens"].astext, BigInteger) cost_sum = func.coalesce(func.sum(UsageEvent.cost_cny), 0) tin_sum = func.coalesce(func.sum(tin).filter(chat), 0) tout_sum = func.coalesce(func.sum(tout).filter(chat), 0) order = (tin_sum + tout_sum).desc() if sort == "tokens" else cost_sum.desc() join_cond = UsageEvent.user_id == User.user_id if cutoff is not None: join_cond = and_(join_cond, UsageEvent.created_at >= cutoff) total_users = s.execute(select(func.count()).select_from(User)).scalar_one() rows = [ { "user_id": str(uid), "email": email or "", "name": name or "", "user_name": uname or "", "role": role or "user", "cost_cny": float(c or 0), "tokens_in": int(ti or 0), "tokens_out": int(to or 0), "tokens_cache_hit": int(h or 0), "n_events": int(n or 0), } for uid, email, name, uname, role, c, ti, to, h, n in s.execute( select( User.user_id, User.email, User.name, User.user_name, User.role, cost_sum, tin_sum, tout_sum, func.coalesce(func.sum(hit).filter(chat), 0), func.count(UsageEvent.event_id), ) .join(UsageEvent, join_cond, isouter=True) .group_by(User.user_id, User.email, User.name, User.user_name, User.role) .order_by(order, User.user_id) .limit(page_size) .offset(page * page_size) ).all() ] return {"page": page, "page_size": page_size, "total_users": total_users, "rows": rows} def _storage_page(s: Any, page: int, page_size: int) -> dict: """分页的各用户磁盘用量(bytes desc + user_id 兜底);附 per-user 配额。 数据源 user_disk_usage(后台扫描快照,只含扫过的用户);total 为该表行数。 """ from core.agent_builder import load_config from core.storage.disk_quota import parse_bytes quota = parse_bytes((load_config().get("quotas") or {}).get("disk_bytes_per_user")) total = s.execute(select(func.count()).select_from(UserDiskUsage)).scalar_one() rows = [ { "user_id": str(uid), "email": email or "", "name": name or "", "user_name": uname or "", "bytes_used": int(b or 0), "file_count": int(fc or 0), "scanned_at": scanned.isoformat() if scanned else None, } for uid, email, name, uname, b, fc, scanned in s.execute( select( UserDiskUsage.user_id, User.email, User.name, User.user_name, UserDiskUsage.bytes_used, UserDiskUsage.file_count, UserDiskUsage.scanned_at, ) .join(User, User.user_id == UserDiskUsage.user_id, isouter=True) .order_by(UserDiskUsage.bytes_used.desc(), UserDiskUsage.user_id) .limit(page_size) .offset(page * page_size) ).all() ] return { "page": page, "page_size": page_size, "total": total, "quota_bytes": quota, "rows": rows, } def register_admin_routes(app: FastAPI, require_admin) -> None: """把 /v1/admin/* 管理路由挂到 app 上,整组走 require_admin gate。""" @app.get("/v1/admin/overview", tags=["admin"]) def admin_overview(user_id: UUID = Depends(require_admin)): """管理总览(固定形态,供轮询):runtime/tasks/users/usage 总用量+近7d趋势。admin-only。 按模型 / 各用户用量 / 存储 是带筛选/分页的独立端点,不在此 bundle。 """ now = datetime.now(timezone.utc) cutoff_7d = now - timedelta(days=7) with session_scope() as s: return { "generated_at": now.isoformat(), "runtime": _runtime_section(app), "tasks": _tasks_section(s), "users": _users_section(s, cutoff_7d), "usage": _usage_section(s, cutoff_7d), } @app.get("/v1/admin/usage/models", tags=["admin"]) def admin_usage_models( range: str = "all", sort: str = "cost", user_id: UUID = Depends(require_admin) ): """按模型用量。range: all/7d/30d;sort: cost/tokens。admin-only。""" now = datetime.now(timezone.utc) with session_scope() as s: return { "range": range, "sort": sort, "rows": _models_usage(s, _range_cutoff(now, range), sort), } @app.get("/v1/admin/usage/users", tags=["admin"]) def admin_usage_users( page: int = 0, page_size: int = 20, range: str = "all", sort: str = "cost", user_id: UUID = Depends(require_admin), ): """各用户 token 用量(分页 + 时间筛选 + 排序)。admin-only。 page 0-based;page_size 夹到 [1,100];range all/7d/30d;sort cost/tokens。 含零用量用户(全表 LEFT JOIN);总用量在 overview.usage.total。 """ page = max(0, page) page_size = min(100, max(1, page_size)) now = datetime.now(timezone.utc) with session_scope() as s: d = _user_usage_page(s, page, page_size, _range_cutoff(now, range), sort) d["range"] = range d["sort"] = sort return d @app.get("/v1/admin/storage/users", tags=["admin"]) def admin_storage_users( page: int = 0, page_size: int = 20, user_id: UUID = Depends(require_admin) ): """各用户磁盘用量(分页,bytes desc)。admin-only。page 0-based;page_size [1,100]。""" page = max(0, page) page_size = min(100, max(1, page_size)) with session_scope() as s: return _storage_page(s, page, page_size)