zcbot/core/storage/models.py

174 lines
8.0 KiB
Python

"""SQLAlchemy 2.x ORM models,对应 DESIGN.md §7.4 schema。
4 张表:users / tasks / messages / usage_events。
- users 行在 web 入口按需 INSERT(`/v1/auth/login_password` 实际创行 / `/v1/auth/login`
platform_key 入口 ensure_user_row);email UNIQUE(0005)给 login lookup 用,
password_hash 是 bcrypt(`bcrypt.hashpw`),只在邮箱密码登录时有值
- messages.payload 用 jsonb,GIN 索引在 migration 里建;messages.model_profile
(0006)只在 assistant 行有值,标注产生该条 message 的模型
- run 状态承载在 tasks.run_status / run_error 两列(0004 合并 runs 表);
原 runs / 旧 usage_events 表 0004 删 — 详 DESIGN §7.4 取舍 / PROGRESS 05-18
- usage_events(0006 v2 形态):per-event 多态用量行,kind=chat/image/video/...,
units 列 JSONB(chat 装 tokens_in/out,image 装 count/size 等)。用户级聚合
走 (user_id, created_at) 复合索引,JOIN-free。详 DESIGN §7.4 / PROGRESS 05-19
"""
from __future__ import annotations
from datetime import datetime
from decimal import Decimal
from typing import Any, Optional
from uuid import UUID, uuid4
from sqlalchemy import (
BigInteger,
DateTime,
ForeignKey,
Integer,
Numeric,
Text,
UniqueConstraint,
func,
)
from sqlalchemy.dialects.postgresql import JSONB, UUID as PG_UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
user_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4)
email: Mapped[Optional[str]] = mapped_column(Text, nullable=True, unique=True)
oidc_subject: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
password_hash: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
plan: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# 0009:访问角色。'user'(默认)/ 'admin';仅 admin 可访问 /v1/admin/* 管理端点。
# 提管理员:main.py user role --email X --role admin。
role: Mapped[str] = mapped_column(Text, nullable=False, server_default="user")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
class Task(Base):
__tablename__ = "tasks"
task_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4)
user_id: Mapped[UUID] = mapped_column(
PG_UUID(as_uuid=True), ForeignKey("users.user_id"), nullable=False
)
name: Mapped[str] = mapped_column(Text, nullable=False)
working_dir: Mapped[str] = mapped_column(Text, nullable=False)
skill: Mapped[str] = mapped_column(Text, nullable=False, default="")
description: Mapped[str] = mapped_column(Text, nullable=False, default="")
status: Mapped[str] = mapped_column(Text, nullable=False, default="active")
model: Mapped[str] = mapped_column(Text, nullable=False, default="")
model_profile: Mapped[str] = mapped_column(Text, nullable=False, default="")
reasoning_effort: Mapped[str] = mapped_column(Text, nullable=False, default="")
tokens_prompt: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
tokens_completion: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
cost_cny: Mapped[Decimal] = mapped_column(Numeric(12, 6), nullable=False, default=0)
# 当前 in-flight 状态(原 runs 表合并入,DESIGN §7.4 简化 / 0004 migration):
# idle / running / cancelling / error;ok / cancelled 收尾直接回 idle,
# 只有 error 是持久终态(下次起新 run 时由 post_message 清掉)
run_status: Mapped[str] = mapped_column(Text, nullable=False, default="idle")
run_error: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False
)
# 软删除标记(0010):置时间即"逻辑删除",从列表隐藏但 DB 行 / messages / usage_events /
# 工作目录文件全部保留(留作语料 + 可恢复)。NULL = 未删。物理删只在管理员清理时走。
deleted_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True
)
class Message(Base):
__tablename__ = "messages"
__table_args__ = (UniqueConstraint("task_id", "idx", name="uq_messages_task_idx"),)
message_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4)
task_id: Mapped[UUID] = mapped_column(
PG_UUID(as_uuid=True),
ForeignKey("tasks.task_id", ondelete="CASCADE"),
nullable=False,
)
idx: Mapped[int] = mapped_column(Integer, nullable=False)
payload: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
tokens_in: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
tokens_out: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
# 0006:产生该 message 的模型(只在 assistant 行有值;user/tool/system 为 NULL)。
# 跟 usage_events.model_profile 写入一致,JOIN-free 时按 message 直查也能拿到。
model_profile: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
class UsageEvent(Base):
"""per-event 用量记账(0006 v2 形态)。
一行 = 一次产生成本的调用。chat 类型由 loop 在 assistant message 入库后写入;
未来的媒体工具(image/video/audio)在 tool execute 完后由 loop 顺手记账。
units 列 polymorphic JSONB —— chat: {"tokens_in": N, "tokens_out": M};
image: {"count": K, "size": "1024x1024"};按 kind 约定。
按 user 聚合的统计 query 走 (user_id, created_at) 索引,不 JOIN tasks 表。
"""
__tablename__ = "usage_events"
event_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4)
user_id: Mapped[UUID] = mapped_column(
PG_UUID(as_uuid=True), ForeignKey("users.user_id"), nullable=False
)
task_id: Mapped[UUID] = mapped_column(
PG_UUID(as_uuid=True),
ForeignKey("tasks.task_id", ondelete="CASCADE"),
nullable=False,
)
message_id: Mapped[Optional[UUID]] = mapped_column(
PG_UUID(as_uuid=True),
ForeignKey("messages.message_id", ondelete="SET NULL"),
nullable=True,
)
kind: Mapped[str] = mapped_column(Text, nullable=False) # chat / image / video / audio / ...
model_profile: Mapped[str] = mapped_column(Text, nullable=False) # deepseek_v4.pro / dall-e-3 / ...
units: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
cost_cny: Mapped[Decimal] = mapped_column(Numeric(12, 6), nullable=False, default=0)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
class UserDiskUsage(Base):
"""per-user 工作目录字节使用快照(0008,§7.5 #4 软配额表)。
每个 user_id 单行 upsert,lifespan 后台 task 周期(默 15min)扫描 user_root 落库;
write 前 gate(DockerExecutor / /v1/files/upload)查这表对比 yaml `quotas.disk_bytes_per_user`,
超额返 [Error] 硬阻。
扫描间隙写入会突破上限一点(race-tolerant,跟 image/video 配额一致接受);外部用户
开放前 OS 层 xfs prjquota 兜底真上限。详 DESIGN §7.5 #4 / PROGRESS。
"""
__tablename__ = "user_disk_usage"
user_id: Mapped[UUID] = mapped_column(
PG_UUID(as_uuid=True),
ForeignKey("users.user_id", ondelete="CASCADE"),
primary_key=True,
)
bytes_used: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
file_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
scanned_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)