zcbot/core/storage/usage.py

133 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""用量记账(0006 + 0007):一次产生成本的调用 = 一行 usage_events + 双写 messages 列。
chat 类型的入口由 loop.py 在 assistant message 入库后调用;媒体工具(image/video/audio)
在 tool execute 完后由 tool 直接调用对应入口(record_image_usage 等)。
币种(0007):全表统一 CNY(`cost_cny` 列)。chat 路径走 litellm 的 USD cost_map → 内部
×USD_TO_CNY 折算落库;媒体路径价格本身就是 CNY,直接落。units jsonb 里 snapshot 当时
的关键价格参数(chat 没有,media 存 price_cny_per_image 等),便于跨调价对账。
"""
from __future__ import annotations
from decimal import Decimal
from typing import Any, Mapping, Optional
from uuid import UUID
from sqlalchemy import update
from .engine import session_scope
from .models import Message, UsageEvent
# litellm 的 cost map 给的是 USD,落库前折成 CNY。汇率近似(每年看一次,实质偏差不大);
# 真要精算的话应该按调用时刻的汇率,但开发期/个人用接受。
USD_TO_CNY = Decimal("7.2")
def _safe_chat_cost_usd(response: Any) -> Decimal:
"""litellm.completion_cost(response) 包一层:任何异常都吞掉返 0。
未知 model / cost map 没收录 / response 结构变都不影响主流程 —— usage_events
仍写入,只是 cost=0,后续人工补算 OK。返 USD,由 caller 折算。
"""
try:
from litellm import completion_cost # type: ignore[import-not-found]
cost = completion_cost(completion_response=response)
if cost is None:
return Decimal("0")
return Decimal(str(cost))
except Exception:
return Decimal("0")
def record_chat_usage(
*,
task_id: UUID,
user_id: UUID,
message_id: Optional[UUID],
model_profile: str,
prompt_tokens: int,
completion_tokens: int,
response: Any = None,
) -> Decimal:
"""记一次 chat 调用:写 usage_events 行 + 回填 messages.model_profile/tokens_in/out。
`message_id` 来自 `Session.append` 的返回值;若为 None(系统消息 / 旧路径未拿到)
则 usage_events 仍写但 message_id=NULL,messages 列不回填。
`model_profile` 形如 `"deepseek_v4.pro"`(family.variant)。
返回算出的 cost_cny(已落库),调用方可用作 SSE 显示。
"""
cost_usd = _safe_chat_cost_usd(response)
cost_cny = (cost_usd * USD_TO_CNY).quantize(Decimal("0.000001"))
units = {
"tokens_in": int(prompt_tokens),
"tokens_out": int(completion_tokens),
# snapshot 折算系数,便于历史对账(汇率/价格涨跌后仍能还原当时折算逻辑)
"usd_to_cny": float(USD_TO_CNY),
}
with session_scope() as s:
s.add(UsageEvent(
user_id=user_id,
task_id=task_id,
message_id=message_id,
kind="chat",
model_profile=model_profile,
units=units,
cost_cny=cost_cny,
))
if message_id is not None:
s.execute(
update(Message)
.where(Message.message_id == message_id)
.values(
tokens_in=int(prompt_tokens),
tokens_out=int(completion_tokens),
model_profile=model_profile,
)
)
return cost_cny
def record_image_usage(
*,
task_id: UUID,
user_id: UUID,
model_profile: str,
n_images: int,
size: str,
price_cny_per_image: float,
search: bool = False,
extra_units: Optional[Mapping[str, Any]] = None,
) -> Decimal:
"""记一次图像生成:写 usage_events(kind=image)。
单价(CNY/张)由 caller 从配置文件读出后传入,**同步 snapshot 进 units jsonb** ——
将来豆包调价改 YAML 即可,历史记录不动且仍能完整还原当时单价。
`model_profile` 形如 `"doubao.seedream_5"`(family.variant 风格,跟 chat 对齐)。
`extra_units` 给将来扩展(如 quality / style 等额外加价维度)预留。
返回 cost_cny(已落库,可作 SSE / tool 返回串显示用)。
"""
price = Decimal(str(price_cny_per_image))
cost_cny = (price * n_images).quantize(Decimal("0.000001"))
units: dict[str, Any] = {
"n_images": int(n_images),
"size": size,
"search": bool(search),
"price_cny_per_image": float(price_cny_per_image),
}
if extra_units:
units.update(extra_units)
with session_scope() as s:
s.add(UsageEvent(
user_id=user_id,
task_id=task_id,
message_id=None, # image tool 在 tool execute 时调用,message 还未落库
kind="image",
model_profile=model_profile,
units=units,
cost_cny=cost_cny,
))
return cost_cny