Compare commits

..

No commits in common. "af2ad3cef1d50c9ca9196bd6d02549d1e887e085" and "1c30a9e54ef244d6ef9f102f1daa154819de267b" have entirely different histories.

12 changed files with 18 additions and 411 deletions

View File

@ -587,7 +587,7 @@ zcbot-sandbox image 已 ~1.5G(python deps + chromium + nodejs + mermaid-cli),后
**Stage 3:上下文预算与自动压缩(中风险,需测试)**:
1. 新增 `core/context.py` 负责构造 LLM messages,输入为 `Session.messages` + budget,输出为裁剪后的 messages。
2. 第一步只做**旧 tool / tool_call 参数压缩**:保留 system、最近约 12 条原文,对较旧且过长的 `role=tool` 内容做头尾摘要;旧 `load_skill` 结果压成"已加载 skill: name/dir"标记;旧 assistant `tool_calls[].function.arguments` 超过约 800 chars 时压成合法 JSON 标记(保留 path/script_path/name/original_chars),避免 `write(content=...)` 源码参数反复进 prompt。`role` / `tool_call_id` / `name` 不变,保证 OpenAI/LiteLLM tool_call 协议完整。这个阶段不调用额外 LLM,不生成全局摘要。
2. 第一步只做**旧 tool 消息压缩**:保留 system、最近若干条原文,对较旧且过长的 `role=tool` 内容做头尾摘要;旧 `load_skill` 结果压成"已加载 skill: name/dir"标记;`role` / `tool_call_id` / `name` 不变,保证 OpenAI/LiteLLM tool_call 协议完整。这个阶段不调用额外 LLM,不生成全局摘要。
3. 第二步再做 task summary:保留 system、最近 6-10 轮原文、未闭合 tool_call 协议相关消息、最新用户消息;旧消息压成一条 summary。
4. summary 必须区分:用户确认的硬约束、当前计划、已生成文件路径、关键事实、待办/风险、可丢弃日志。旧 tool 原文不直接塞回,只保留路径和摘要。
5. 阈值建议先按字符粗估触发(如 200k chars),后续接 tokenizer 精确预算;触发后目标压到 reliable_context 的 25%-40%,避免刚压完又涨满。
@ -596,7 +596,6 @@ zcbot-sandbox image 已 ~1.5G(python deps + chromium + nodejs + mermaid-cli),后
- 高用量 task 的单次 `tokens_in` 从 50 万级降到 5-10 万级以内,常规任务低于 3 万。
- `usage_events.units` 能区分 input/output/cache hit/cache miss,`cost_cny` 不再全 0。
- `llm_start` SSE 事件能看到 `context_original_chars` / `context_sent_chars` / `context_saved_chars` / `context_compacted_tool_messages` / `context_compacted_skill_messages`,dev SPA 底部 hint 同步展示,用于判断压缩是否真实生效。
- task 列表里的 `N 条 / N tok` 是 DB 持久化累计消息数和历史调用总 token,用于账单 / 审计;它不会因"发送前上下文压缩"下降。真实本轮发送体量看 `llm_start context_*``llm_end prompt_tokens/cache_*`
- 文献采集 / 论文写作 / PPT 三类长任务仍能复查原文路径,不会因摘要丢失用户确认过的规格。
- 增加 focused tests 覆盖 usage detail 提取、成本兜底、工具结果裁剪、上下文压缩协议完整性。

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`
最后更新:2026-06-05(记账给前缀缓存命中折价 + 前端体现缓存命中/真实成本)
最后更新:2026-06-04(sandbox 容器 env:shell 也注入 PYTHONPATH + HOME=/tmp)
---
@ -21,10 +21,6 @@
## 已完成关键能力
### 2026-06-05
- **记账给 DeepSeek 前缀缓存命中折价(修虚高 ~2-3x)+ 前端体现缓存命中/真实成本**:排查"rust 优势→PPT"那 task(flash,34 轮)发现 `tokens_in` 累计 69.9 万里 **88.6% 是缓存命中**,但 `usage.py::_fallback_chat_cost_cny` 把命中段也按 `input` 全价(1.0)算 → 记 ¥0.84,真实(命中按 0.1x)只 ~¥0.28,**越大的 task 虚高越多**(文献采集 53% 命中:¥33→~¥16)。修:① `ModelCapabilities``cache_hit_cny_per_mtoken`(deepseek flash 0.1 / pro 0.2;0=不区分按全价兜底,绝不少记);② 成本公式拆三段「命中×缓存价 + (input命中)×input价 + output×output价」,`loop.py` 把 `cache_hit_tokens` + 缓存单价透传进 `record_chat_usage`;③ 前端不加 DB 列——`web/app.py` 加 `_usage_aggregates`(单查询 GROUP BY `usage_events`,复用列表 `msg_counts` 同款批量范式,无 N+1)on-the-fly 算每 task 真实成本 + 缓存命中 token,`_task_dict` 带出,列表行显 `¥`、顶栏 `formatTaskUsage` 显「总 tok · 缓存命中 N% · ¥真实花费」。**折价只对新 chat 事件生效**,历史 events 保留原记账(不回填,不删数据)。**注**:真正压低 token 体量的杠杆是减少轮数(高成本 task 全是 100+ 轮的逐步 write/run_python 循环),非本次范围。
### 2026-06-04
- **ppt skill 版式 helper 收进可 import 的模块 + 修中文字体没真生效**:逐页生成是「每页一个 run_python」,以前 ~150 行 helper(配色常量/`add_textbox`/`apply_brand` 等)要在每页里默写一遍 —— 烧 token 且长 deck 里坐标会漂(第 7 页 `apply_brand` 跟第 2 页对不上)。抽出 `skills/ppt/scripts/pptx_helpers.py`,每页 `import pptx_helpers as P` 调用;新增 `new_presentation`/`load`(按文件实际尺寸回填画布常量,逐页进程间同步)/`add_slide`/`set_palette`(默认商务红,`spec_path=` 自动取 spec 前 3 个 hex 作主/辅/强调)入口。**字体修复**:python-pptx `font.name` 只写 `<a:latin>`,中文字形走 `<a:ea>` 槽位没设 → 「指定微软雅黑却没真生效」的根因;`set_text` 改为同时写 latin=Arial + ea/cs=微软雅黑,中英混排各命中正确字体。改 `layouts.md`(helper 块换成 import 起手 + API 速查,9 个示例全改 `P.` 调用)、`icons.md` A5 示例、`SKILL.md` 资源/阶段二。冒烟测试过:`ea` 确写入、`set_palette` 覆盖生效、quality_check 正常解析。
@ -189,7 +185,7 @@
- **05-06 → 05-08**:Phase 6 部分(task + state.json + tokens 累计);TUI rich Markdown + spinner 实时耗时;`/resume [last|<id>]` + 懒创建 + `_cleanup_if_empty`
- **05-09 → 05-10**:DESIGN §7 初版(05-12 重写);`cli.py export` + `core/export_docx.py`
- **05-11**:`atomic_write_text` + `core/memory.py`(core.md 入 prompt,extended/* 索引);loop 事件流化 `sink.emit` 铺 SSE 路。
- **06-04 token 优化启动**:`DESIGN.md §8.2` 写入上下文治理方案;chat usage 记录 cache hit/miss / reasoning tokens,LiteLLM cost=0 时按模型档案 CNY/Mtok 兜底;`run_python` 新增 `script_path` 模式(长代码先 write .py 再按路径执行,减少 run_python code 入历史);`run_python` / `shell` 长输出只做上下文裁剪,不写 `.tool_logs`;`document_search` 默认召回量保持 6×1200 chars;`core/context.py` 先压缩旧 tool 消息、旧 `load_skill` 结果、旧 assistant tool_call arguments(`write(content=...)` 源码参数),不改持久化历史;`llm_start` SSE 输出 `context_*` 压缩统计,dev SPA 底部 hint 展示上下文压缩与 cache hit/miss。`rust介绍` 实测:task 列表 `70条 / 711k tok` 是历史累计,最近单轮 22k input 且 cache hit 高;新增 arguments 压缩后 sent_chars 估算 `49,166 → 34,415`
- **06-04 token 优化启动**:`DESIGN.md §8.2` 写入上下文治理方案;chat usage 记录 cache hit/miss / reasoning tokens,LiteLLM cost=0 时按模型档案 CNY/Mtok 兜底;`run_python` 新增 `script_path` 模式(长代码先 write .py 再按路径执行,减少 tool_call arguments 进历史);`run_python` / `shell` 长输出只做上下文裁剪,不写 `.tool_logs`;`document_search` 默认召回量保持 6×1200 chars;`core/context.py` 先压缩旧 tool 消息和旧 `load_skill` 结果,不改持久化历史;`llm_start` SSE 输出 `context_*` 压缩统计,dev SPA 底部 hint 展示上下文压缩与 cache hit/miss。
---

View File

@ -24,7 +24,6 @@ variants:
extended_thinking: false
input_cny_per_mtoken: 1.0
output_cny_per_mtoken: 12.0
cache_hit_cny_per_mtoken: 0.1 # DeepSeek 前缀缓存命中价(input 的 ~0.1x)
pro:
display_name: DeepSeek V4 Pro
@ -47,4 +46,3 @@ variants:
extended_thinking: false
input_cny_per_mtoken: 2.0
output_cny_per_mtoken: 20.0
cache_hit_cny_per_mtoken: 0.2 # DeepSeek 前缀缓存命中价(input 的 ~0.1x)

View File

@ -44,9 +44,6 @@ class ModelCapabilities:
# 计费兜底(CNY / million tokens)。provider / LiteLLM cost map 缺失时使用。
input_cny_per_mtoken: float = 0.0
output_cny_per_mtoken: float = 0.0
# 前缀缓存命中价(DeepSeek 等自动缓存 prompt 前缀,命中部分按此价,通常 ~0.1x input)。
# 0 = 不区分,缓存命中按 input 全价记(安全兜底,不会少记)。
cache_hit_cny_per_mtoken: float = 0.0
# API 接入
api_base: str = ""

View File

@ -45,63 +45,11 @@ def _message_chars(msg: dict[str, Any]) -> int:
return len(str(msg))
def _compact_tool_call_arguments(raw: Any, max_chars: int) -> tuple[Any, bool]:
if not isinstance(raw, str) or len(raw) <= max_chars:
return raw, False
marker: dict[str, Any] = {
"_compacted": True,
"original_chars": len(raw),
"note": "old assistant tool_call arguments omitted from context",
}
try:
parsed = json.loads(raw)
except Exception:
parsed = None
if isinstance(parsed, dict):
for key in ("path", "script_path", "file_path", "name"):
value = parsed.get(key)
if isinstance(value, str) and value:
marker[key] = value
content = parsed.get("content")
if isinstance(content, str):
marker["content_chars"] = len(content)
return json.dumps(marker, ensure_ascii=False), True
def _compact_assistant_tool_calls(
msg: dict[str, Any],
*,
max_arg_chars: int,
) -> tuple[int, int]:
tool_calls = msg.get("tool_calls")
if not isinstance(tool_calls, list):
return 0, 0
compacted = 0
saved = 0
for tc in tool_calls:
if not isinstance(tc, dict):
continue
fn = tc.get("function")
if not isinstance(fn, dict):
continue
before = fn.get("arguments")
after, did_compact = _compact_tool_call_arguments(
before,
max_chars=max(0, max_arg_chars),
)
if did_compact:
fn["arguments"] = after
compacted += 1
saved += len(before) - len(after)
return compacted, max(0, saved)
def prepare_messages_for_llm(
messages: List[dict[str, Any]],
*,
keep_recent: int = 12,
keep_recent: int = 20,
old_tool_chars: int = 2_000,
old_tool_arg_chars: int = 800,
) -> List[dict[str, Any]]:
"""返回发给 LLM 的 messages 副本。
@ -113,7 +61,6 @@ def prepare_messages_for_llm(
messages,
keep_recent=keep_recent,
old_tool_chars=old_tool_chars,
old_tool_arg_chars=old_tool_arg_chars,
)
return prepared
@ -121,9 +68,8 @@ def prepare_messages_for_llm(
def prepare_messages_with_stats(
messages: List[dict[str, Any]],
*,
keep_recent: int = 12,
keep_recent: int = 20,
old_tool_chars: int = 2_000,
old_tool_arg_chars: int = 800,
) -> tuple[List[dict[str, Any]], dict[str, int]]:
"""返回发给 LLM 的 messages 副本和压缩统计。"""
if keep_recent < 0:
@ -133,16 +79,9 @@ def prepare_messages_with_stats(
prepared: List[dict[str, Any]] = []
compacted_tool_messages = 0
compacted_skill_messages = 0
compacted_tool_call_arguments = 0
for idx, msg in enumerate(messages):
new_msg = deepcopy(msg)
is_recent = idx >= recent_start
if not is_recent and new_msg.get("role") == "assistant":
n_args, _ = _compact_assistant_tool_calls(
new_msg,
max_arg_chars=old_tool_arg_chars,
)
compacted_tool_call_arguments += n_args
if (
not is_recent
and new_msg.get("role") == "tool"
@ -166,6 +105,5 @@ def prepare_messages_with_stats(
"saved_chars": max(0, original_chars - sent_chars),
"compacted_tool_messages": compacted_tool_messages,
"compacted_skill_messages": compacted_skill_messages,
"compacted_tool_call_arguments": compacted_tool_call_arguments,
}
return prepared, stats

View File

@ -177,8 +177,6 @@ class AgentLoop:
completion_tokens=ct,
input_cny_per_mtoken=self.caps.input_cny_per_mtoken,
output_cny_per_mtoken=self.caps.output_cny_per_mtoken,
cache_hit_tokens=usage_details["cache_hit_tokens"],
cache_hit_cny_per_mtoken=self.caps.cache_hit_cny_per_mtoken,
extra_units={
k: v for k, v in usage_details.items()
if k not in ("tokens_in", "tokens_out") and v

View File

@ -47,31 +47,15 @@ def _fallback_chat_cost_cny(
completion_tokens: int,
input_cny_per_mtoken: float,
output_cny_per_mtoken: float,
cache_hit_tokens: int = 0,
cache_hit_cny_per_mtoken: float = 0.0,
) -> Decimal:
"""按本地模型档案价格兜底计算 chat 成本(CNY)。
`prompt_tokens` = 全部输入(含缓存命中部分)DeepSeek 等对前缀缓存命中按更低
单价计费,这里把输入拆成命中 / 未命中两段:命中段按 `cache_hit_cny_per_mtoken`,
其余按 `input_cny_per_mtoken``cache_hit_cny_per_mtoken<=0` 时不区分,全按 input
(老行为,绝不少记)
"""
"""按本地模型档案价格兜底计算 chat 成本(CNY)。"""
input_price = Decimal(str(input_cny_per_mtoken or 0))
output_price = Decimal(str(output_cny_per_mtoken or 0))
hit_price = Decimal(str(cache_hit_cny_per_mtoken or 0))
if hit_price <= 0:
hit_price = input_price # 未配缓存价 → 命中段按 input 全价(安全兜底)
pt = int(prompt_tokens)
hit = max(0, min(int(cache_hit_tokens), pt)) # clamp:命中数不超过总输入
miss = pt - hit
ct = int(completion_tokens)
pt = Decimal(str(int(prompt_tokens)))
ct = Decimal(str(int(completion_tokens)))
cost = (
Decimal(str(miss)) * input_price / Decimal("1000000")
+ Decimal(str(hit)) * hit_price / Decimal("1000000")
+ Decimal(str(ct)) * output_price / Decimal("1000000")
pt * input_price / Decimal("1000000")
+ ct * output_price / Decimal("1000000")
)
return cost.quantize(Decimal("0.000001"))
@ -86,8 +70,6 @@ def record_chat_usage(
completion_tokens: int,
input_cny_per_mtoken: float = 0.0,
output_cny_per_mtoken: float = 0.0,
cache_hit_tokens: int = 0,
cache_hit_cny_per_mtoken: float = 0.0,
extra_units: Optional[Mapping[str, Any]] = None,
response: Any = None,
) -> Decimal:
@ -106,8 +88,6 @@ def record_chat_usage(
completion_tokens=completion_tokens,
input_cny_per_mtoken=input_cny_per_mtoken,
output_cny_per_mtoken=output_cny_per_mtoken,
cache_hit_tokens=cache_hit_tokens,
cache_hit_cny_per_mtoken=cache_hit_cny_per_mtoken,
)
units = {
"tokens_in": int(prompt_tokens),

View File

@ -1,140 +0,0 @@
"""Backfill 历史 chat usage_events 的 cost_cny —— 给前缀缓存命中折价 + 修 ¥0 旧账。
背景:`usage.py::_fallback_chat_cost_cny` 早期(a)对未知模型 litellm 0 又无兜底
大量 chat 事件 cost_cny 记成 ¥0;(b)后来加了兜底但把缓存命中段也按 input 全价算
命中率高的 task 虚高 2-3x本脚本按每条事件 units 里已存的 token + 模型档案价
**重算 cost_cny**,只改成本列,**不动任何 token / units**
价格来源:`ModelCapabilities.load(model_profile)`(config 当前价, cache_hit 折价)
config 里没有该 profile 或无 input/output ( glm 未配价) 跳过,保留原值
(不臆造价格)缓存命中数取 units.cache_hit_tokens( 0,即按全价,绝不少记)
跑法: .venv/Scripts/python.exe scripts/backfill_chat_cost_cache_discount.py
默认 dry-run 只打印汇总, --apply 真写
幂等:重算是确定性的;再跑一遍 0 改动前端任务成本是现算 SUM(usage_events.cost_cny),
改完即时反映,无需动 tasks
"""
from __future__ import annotations
import argparse
import os
import sys
from collections import defaultdict
from decimal import Decimal
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
env_file = ROOT / ".env"
if env_file.exists():
for line in env_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
os.environ.setdefault(k.strip(), v.strip())
from sqlalchemy import select
from core.agent_builder import ROOT as AB_ROOT, load_config
from core.capabilities import ModelCapabilities
from core.storage import session_scope
from core.storage.models import UsageEvent
from core.storage.usage import _fallback_chat_cost_cny
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--apply", action="store_true", help="真写;默认 dry-run 只打印")
args = ap.parse_args()
cfg = load_config()
models_dir = AB_ROOT / cfg["models_dir"]
# ModelCapabilities.load 按 profile 缓存(避免每行重读 yaml);None = 无法定价
caps_cache: dict[str, ModelCapabilities | None] = {}
def get_caps(profile: str) -> ModelCapabilities | None:
if profile not in caps_cache:
try:
caps_cache[profile] = ModelCapabilities.load(profile, models_dir)
except Exception:
caps_cache[profile] = None
return caps_cache[profile]
# per-profile 统计:事件数 / 改动数 / 跳过数 / 旧总额 / 新总额
stat_n: dict[str, int] = defaultdict(int)
stat_changed: dict[str, int] = defaultdict(int)
stat_skipped: dict[str, int] = defaultdict(int)
old_sum: dict[str, Decimal] = defaultdict(lambda: Decimal("0"))
new_sum: dict[str, Decimal] = defaultdict(lambda: Decimal("0"))
with session_scope() as s:
rows = s.execute(
select(UsageEvent).where(UsageEvent.kind == "chat")
).scalars().all()
for e in rows:
profile = e.model_profile or "?"
stat_n[profile] += 1
u = e.units or {}
caps = get_caps(profile)
if caps and (caps.input_cny_per_mtoken or caps.output_cny_per_mtoken):
inp = caps.input_cny_per_mtoken
outp = caps.output_cny_per_mtoken
chp = caps.cache_hit_cny_per_mtoken
else:
# config 无价 → 退 units 价格快照(老事件多半也没有);仍无 → 跳过
inp = float(u.get("input_cny_per_mtoken") or 0)
outp = float(u.get("output_cny_per_mtoken") or 0)
chp = float(u.get("cache_hit_cny_per_mtoken") or 0)
if not (inp or outp):
stat_skipped[profile] += 1
old_sum[profile] += Decimal(str(e.cost_cny))
new_sum[profile] += Decimal(str(e.cost_cny)) # 无价不变
continue
new_cost = _fallback_chat_cost_cny(
prompt_tokens=int(u.get("tokens_in") or 0),
completion_tokens=int(u.get("tokens_out") or 0),
input_cny_per_mtoken=inp,
output_cny_per_mtoken=outp,
cache_hit_tokens=int(u.get("cache_hit_tokens") or 0),
cache_hit_cny_per_mtoken=chp,
)
old_cost = Decimal(str(e.cost_cny))
old_sum[profile] += old_cost
new_sum[profile] += new_cost
if new_cost != old_cost:
e.cost_cny = new_cost
stat_changed[profile] += 1
if args.apply:
s.commit()
else:
s.rollback()
print()
print(f"{'model_profile':<22}{'events':>8}{'changed':>9}{'skipped':>9}"
f"{'old_¥':>12}{'new_¥':>12}")
tot_old = Decimal("0")
tot_new = Decimal("0")
for profile in sorted(stat_n):
o, n = old_sum[profile], new_sum[profile]
tot_old += o
tot_new += n
print(f"{profile:<22}{stat_n[profile]:>8}{stat_changed[profile]:>9}"
f"{stat_skipped[profile]:>9}{float(o):>12.4f}{float(n):>12.4f}")
print(f"{'TOTAL':<22}{sum(stat_n.values()):>8}"
f"{sum(stat_changed.values()):>9}{sum(stat_skipped.values()):>9}"
f"{float(tot_old):>12.4f}{float(tot_new):>12.4f}")
print()
print(f"[mode] {'APPLIED (committed)' if args.apply else 'DRY-RUN (no commit, rerun with --apply)'}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,5 +1,4 @@
import unittest
import json
from core.context import prepare_messages_for_llm, prepare_messages_with_stats
@ -96,60 +95,6 @@ class ContextCompactionTests(unittest.TestCase):
self.assertGreater(stats["saved_chars"], 0)
self.assertEqual(len(prepared), len(messages))
def test_defaults_compact_medium_sized_old_write_arguments(self) -> None:
args = json.dumps({"path": "slides/p01.py", "content": "A" * 1000})
messages = [
{"role": "system", "content": "rules"},
{
"role": "assistant",
"tool_calls": [{
"id": "tc1",
"type": "function",
"function": {"name": "write", "arguments": args},
}],
},
] + [{"role": "user", "content": f"recent {i}"} for i in range(12)]
prepared, stats = prepare_messages_with_stats(messages)
compacted_args = json.loads(prepared[1]["tool_calls"][0]["function"]["arguments"])
self.assertTrue(compacted_args["_compacted"])
self.assertEqual(compacted_args["path"], "slides/p01.py")
self.assertEqual(stats["compacted_tool_call_arguments"], 1)
def test_compacts_old_assistant_tool_call_arguments(self) -> None:
args = json.dumps({"path": "slides/p01.py", "content": "A" * 5000})
messages = [
{"role": "system", "content": "rules"},
{
"role": "assistant",
"content": "writing slide",
"tool_calls": [{
"id": "tc1",
"type": "function",
"function": {"name": "write", "arguments": args},
}],
},
{"role": "tool", "tool_call_id": "tc1", "name": "write", "content": "[wrote file]"},
{"role": "user", "content": "next"},
]
prepared, stats = prepare_messages_with_stats(
messages,
keep_recent=1,
old_tool_arg_chars=200,
)
tc = prepared[1]["tool_calls"][0]
compacted_args = json.loads(tc["function"]["arguments"])
self.assertEqual(tc["id"], "tc1")
self.assertEqual(tc["type"], "function")
self.assertEqual(tc["function"]["name"], "write")
self.assertTrue(compacted_args["_compacted"])
self.assertEqual(compacted_args["path"], "slides/p01.py")
self.assertNotIn("A" * 100, tc["function"]["arguments"])
self.assertEqual(stats["compacted_tool_call_arguments"], 1)
if __name__ == "__main__":
unittest.main()

View File

@ -33,31 +33,6 @@ class UsageAccountingTests(unittest.TestCase):
self.assertEqual(cost, Decimal("6.000000"))
def test_fallback_chat_cost_discounts_cache_hits(self) -> None:
# 100 万输入里 80 万命中缓存(0.1 价),20 万未命中(1.0 价),50 万输出(10 价)
cost = _fallback_chat_cost_cny(
prompt_tokens=1_000_000,
completion_tokens=500_000,
input_cny_per_mtoken=1.0,
output_cny_per_mtoken=10.0,
cache_hit_tokens=800_000,
cache_hit_cny_per_mtoken=0.1,
)
# 0.2(miss) + 0.08(hit) + 5.0(out) = 5.28
self.assertEqual(cost, Decimal("5.280000"))
def test_fallback_chat_cost_no_cache_price_charges_full(self) -> None:
# 未配缓存价(0)→ 命中段不打折,按 input 全价(老行为,绝不少记)
cost = _fallback_chat_cost_cny(
prompt_tokens=1_000_000,
completion_tokens=0,
input_cny_per_mtoken=1.0,
output_cny_per_mtoken=10.0,
cache_hit_tokens=900_000,
cache_hit_cny_per_mtoken=0.0,
)
self.assertEqual(cost, Decimal("1.000000"))
if __name__ == "__main__":
unittest.main()

View File

@ -26,7 +26,7 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from sqlalchemy import BigInteger, cast, func, select, update
from sqlalchemy import func, select, update
from starlette.background import BackgroundTask
from core.paths import to_db_path
@ -35,7 +35,7 @@ from core.storage import (
check_no_subtask,
session_scope,
)
from core.storage.models import Message, Task, UsageEvent
from core.storage.models import Message, Task
from core.storage.utils import ensure_local_task_row
from .auth import (
@ -93,47 +93,8 @@ def _parse_ordering(s: Optional[str]) -> list:
return cols
def _usage_aggregates(s: Any, tids: list) -> dict:
"""按 task_id 批量聚合 usage_events:真实成本 + 缓存命中 token。
单查询 GROUP BY(复用列表接口 msg_counts 同款批量范式, N+1)on-the-fly 现算,
不落 tasks 对所有历史 task 即时准确,免回填
- cost_cny: kind(chat+image+video)合计 = task 真实花费
- cache_hit: chat,units.cache_hit_tokens 之和(DeepSeek 等前缀缓存命中部分)
返回 {task_id: {"cost_cny": float, "tokens_cache_hit": int}}
"""
if not tids:
return {}
cache_hit_col = cast(UsageEvent.units["cache_hit_tokens"].astext, BigInteger)
rows = s.execute(
select(
UsageEvent.task_id,
func.coalesce(func.sum(UsageEvent.cost_cny), 0),
func.coalesce(
func.sum(cache_hit_col).filter(UsageEvent.kind == "chat"), 0
),
)
.where(UsageEvent.task_id.in_(tids))
.group_by(UsageEvent.task_id)
).all()
return {
tid: {"cost_cny": float(cost or 0), "tokens_cache_hit": int(hit or 0)}
for tid, cost, hit in rows
}
def _task_dict(
row: Any,
*,
n_messages: Optional[int] = None,
usage: Optional[dict] = None,
) -> dict:
"""Task ORM row → API JSON dict。
`usage`(可选)= `_usage_aggregates` 算出的本 task 概要,带真实成本与缓存命中;
缺省回退到 tasks.cost_cny (多为 0) 0 命中,前端据此显 ¥ / 缓存命中率
"""
u = usage or {}
def _task_dict(row: Any, *, n_messages: Optional[int] = None) -> dict:
"""Task ORM row → API JSON dict。"""
d = {
"task_id": str(row.task_id),
"name": row.name or "",
@ -146,10 +107,6 @@ def _task_dict(
"tokens_prompt": row.tokens_prompt or 0,
"tokens_completion": row.tokens_completion or 0,
"tokens": (row.tokens_prompt or 0) + (row.tokens_completion or 0),
# 缓存命中 token(chat 前缀缓存)+ 真实成本(已按缓存折价,见 usage.py)。
# on-the-fly 聚合;未传 usage 时回退列/0。
"tokens_cache_hit": int(u.get("tokens_cache_hit", 0)),
"cost_cny": float(u["cost_cny"]) if "cost_cny" in u else float(row.cost_cny or 0),
# 当前 run 状态(0004 schema 简化:原 runs 表合并入 task)
"run_status": row.run_status or "idle",
"run_error": row.run_error or None,
@ -957,18 +914,13 @@ def create_app() -> FastAPI:
).all())
if tids else {}
)
usage = _usage_aggregates(s, tids)
return {
"page": page,
"page_size": page_size,
"count": int(cnt),
"results": [
_task_dict(
r,
n_messages=msg_counts.get(r.task_id, 0),
usage=usage.get(r.task_id),
)
_task_dict(r, n_messages=msg_counts.get(r.task_id, 0))
for r in rows
],
}
@ -989,8 +941,7 @@ def create_app() -> FastAPI:
n = s.execute(
select(func.count()).select_from(Message).where(Message.task_id == tid)
).scalar_one()
usage = _usage_aggregates(s, [tid])
return _task_dict(row, n_messages=n, usage=usage.get(tid))
return _task_dict(row, n_messages=n)
@app.get("/v1/folders", tags=["folders"])
def list_folders(user_id: UUID = Depends(require_user)):
@ -1139,8 +1090,7 @@ def create_app() -> FastAPI:
n = s.execute(
select(func.count()).select_from(Message).where(Message.task_id == tid)
).scalar_one()
usage = _usage_aggregates(s, [tid])
return _task_dict(row, n_messages=n, usage=usage.get(tid))
return _task_dict(row, n_messages=n)
# ───────────── Messages ─────────────

View File

@ -1218,33 +1218,6 @@ function fmtTokens(n) {
return (n / 1000000).toFixed(1) + "M";
}
// 紧凑成本显示(¥,已按缓存折价的真实花费):0 不显;<0.01 三位小数;否则两位
function fmtCost(n) {
n = n || 0;
if (n <= 0) return "";
if (n < 0.01) return "¥" + n.toFixed(3);
return "¥" + n.toFixed(2);
}
// 任务级累计用量(顶栏):总 token · 缓存命中率 · 真实花费。
// 缓存命中率 = cache_hit / 总输入(tokens_prompt);命中越高说明前缀复用越好、越省钱。
function formatTaskUsage(t) {
const tok = t.tokens || 0;
if (!tok) return "";
const hit = t.tokens_cache_hit || 0;
const pin = t.tokens_prompt || 0;
const bits = [`${fmtTokens(tok)} tok`];
if (pin > 0 && hit > 0) {
bits.push(`缓存命中 ${Math.round(hit / pin * 100)}%`);
}
const cost = fmtCost(t.cost_cny);
if (cost) bits.push(cost);
const title = `累计:输入 ${pin.toLocaleString()} / 输出 ${(t.tokens_completion || 0).toLocaleString()} tok`
+ (hit > 0 ? `\n前缀缓存命中 ${hit.toLocaleString()} tok命中部分按低价计费` : "")
+ (t.cost_cny > 0 ? `\n真实花费 ¥${(t.cost_cny).toFixed(4)}(已按缓存命中折价)` : "");
return `<span class="muted" title="${escapeHtml(title)}" style="white-space:nowrap;">${bits.join(" · ")}</span>`;
}
function formatContextStats(d) {
d = d || {};
const orig = d.context_original_chars || 0;
@ -1756,8 +1729,7 @@ function renderTaskList(tasks, append = false) {
<span class="badge ${t.status}">${statusLabel}</span>
${t.skill ? `<span class="muted" title="${escapeHtml(t.skill)}">${escapeHtml(t.skill)}</span>` : ""}
<span class="num right-group">${t.n_messages || 0} 条</span>
<span class="num" title="${(t.tokens || 0).toLocaleString()} tokens${t.tokens_cache_hit ? `(其中缓存命中 ${(t.tokens_cache_hit).toLocaleString()}` : ""}">${fmtTokens(t.tokens)} tok</span>
${t.cost_cny > 0 ? `<span class="num" title="真实花费(已按缓存命中折价)">${fmtCost(t.cost_cny)}</span>` : ""}
<span class="num" title="${(t.tokens || 0).toLocaleString()} tokens">${fmtTokens(t.tokens)} tok</span>
<span class="muted time-ago" title="${escapeHtml(fmtTime(t.updated_at))}">${escapeHtml(fmtTimeAgo(t.updated_at))}</span>
</div>
</div>
@ -1917,7 +1889,6 @@ function renderChatMeta() {
${wdBadge}
${t.skill ? `<span class="muted">${escapeHtml(t.skill)}</span>` : ""}
<span class="tid">${t.task_id.slice(0, 8)}</span>
${formatTaskUsage(t)}
${t.description ? `<span class="muted desc">${escapeHtml(t.description)}</span>` : ""}
<span class="spacer"></span>
${renderModelDropdown(t)}