"""用量记账(0006 + 0007):一次产生成本的调用 = 一行 usage_events + 双写 messages 列。 chat 类型的入口由 loop.py 在 assistant message 入库后调用;媒体工具(image/video/audio) 在 tool execute 完后由 tool 直接调用对应入口(record_image_usage 等)。 币种(0007):全表统一 CNY(`cost_cny` 列)。chat 路径走 litellm 的 USD cost_map → 内部 ×USD_TO_CNY 折算落库;媒体路径价格本身就是 CNY,直接落。units jsonb 里 snapshot 当时 的关键价格参数(chat 没有,media 存 price_cny_per_image 等),便于跨调价对账。 """ from __future__ import annotations from decimal import Decimal from typing import Any, Mapping, Optional from uuid import UUID from sqlalchemy import update from .engine import session_scope from .models import Message, UsageEvent # litellm 的 cost map 给的是 USD,落库前折成 CNY。汇率近似(每年看一次,实质偏差不大); # 真要精算的话应该按调用时刻的汇率,但开发期/个人用接受。 USD_TO_CNY = Decimal("7.2") def _safe_chat_cost_usd(response: Any) -> Decimal: """litellm.completion_cost(response) 包一层:任何异常都吞掉返 0。 未知 model / cost map 没收录 / response 结构变都不影响主流程 —— usage_events 仍写入,只是 cost=0,后续人工补算 OK。返 USD,由 caller 折算。 """ try: from litellm import completion_cost # type: ignore[import-not-found] cost = completion_cost(completion_response=response) if cost is None: return Decimal("0") return Decimal(str(cost)) except Exception: return Decimal("0") def record_chat_usage( *, task_id: UUID, user_id: UUID, message_id: Optional[UUID], model_profile: str, prompt_tokens: int, completion_tokens: int, response: Any = None, ) -> Decimal: """记一次 chat 调用:写 usage_events 行 + 回填 messages.model_profile/tokens_in/out。 `message_id` 来自 `Session.append` 的返回值;若为 None(系统消息 / 旧路径未拿到) 则 usage_events 仍写但 message_id=NULL,messages 列不回填。 `model_profile` 形如 `"deepseek_v4.pro"`(family.variant)。 返回算出的 cost_cny(已落库),调用方可用作 SSE 显示。 """ cost_usd = _safe_chat_cost_usd(response) cost_cny = (cost_usd * USD_TO_CNY).quantize(Decimal("0.000001")) units = { "tokens_in": int(prompt_tokens), "tokens_out": int(completion_tokens), # snapshot 折算系数,便于历史对账(汇率/价格涨跌后仍能还原当时折算逻辑) "usd_to_cny": float(USD_TO_CNY), } with session_scope() as s: s.add(UsageEvent( user_id=user_id, task_id=task_id, message_id=message_id, kind="chat", model_profile=model_profile, units=units, cost_cny=cost_cny, )) if message_id is not None: s.execute( update(Message) .where(Message.message_id == message_id) .values( tokens_in=int(prompt_tokens), tokens_out=int(completion_tokens), model_profile=model_profile, ) ) return cost_cny def record_image_usage( *, task_id: UUID, user_id: UUID, model_profile: str, n_images: int, size: str, price_cny_per_image: float, search: bool = False, extra_units: Optional[Mapping[str, Any]] = None, ) -> Decimal: """记一次图像生成:写 usage_events(kind=image)。 单价(CNY/张)由 caller 从配置文件读出后传入,**同步 snapshot 进 units jsonb** —— 将来豆包调价改 YAML 即可,历史记录不动且仍能完整还原当时单价。 `model_profile` 形如 `"doubao.seedream_5"`(family.variant 风格,跟 chat 对齐)。 `extra_units` 给将来扩展(如 quality / style 等额外加价维度)预留。 返回 cost_cny(已落库,可作 SSE / tool 返回串显示用)。 """ price = Decimal(str(price_cny_per_image)) cost_cny = (price * n_images).quantize(Decimal("0.000001")) units: dict[str, Any] = { "n_images": int(n_images), "size": size, "search": bool(search), "price_cny_per_image": float(price_cny_per_image), } if extra_units: units.update(extra_units) with session_scope() as s: s.add(UsageEvent( user_id=user_id, task_id=task_id, message_id=None, # image tool 在 tool execute 时调用,message 还未落库 kind="image", model_profile=model_profile, units=units, cost_cny=cost_cny, )) return cost_cny