fix(usage): 顶栏 token 累计修 — sync_task_tokens 改走 messages SUM,删 LLM.TokenCounter

5/20 切流式后 LLM.token_counter.add() 只在同步 chat() 里调,流式 chat_stream() 路径从不更新它,
sync_task_tokens 从内存计数器读累计 → tasks.tokens_prompt/completion 一直 0 → 顶栏 0 tok。
DB 验证:5/20 之前 task 数据正常(4568/934),之后 0/0;但 messages.tokens_in/out 一直对
(record_chat_usage 写),source-of-truth 完好,只是冗余汇总列没同步。

改 sync_task_tokens(task_state) 走 SELECT SUM(tokens_in/out) FROM messages WHERE task_id=?,
删 TokenCounter 类 + ConsoleEventSink 的 token_counter 回调 + spinner ctx 尾巴。
一次性 backfill 4 个被影响 task 的累计列。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-21 13:39:57 +08:00
parent d2fd89f3a4
commit b480147fb2
5 changed files with 30 additions and 51 deletions

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 2-4 句:做了啥 + 关键判断 + 没动什么;细节查 `git log` / `git diff` > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 2-4 句:做了啥 + 关键判断 + 没动什么;细节查 `git log` / `git diff`
最后更新:2026-05-21(同 wd 多 task 并发软警告 banner + task header `📁 wd` 仅在 name≠wdName 时挂 + `/v1/tasks``run_status` 筛选) 最后更新:2026-05-21(顶栏 token 累计修 — 5/20 切流式后 `LLM.token_counter` 不再被更新,task 级 tokens_prompt/completion 一直 0;改 `sync_task_tokens` 走 messages SUM,删 TokenCounter 这个冗余内存计数器)
--- ---
@ -23,9 +23,11 @@
### 2026-05-21 ### 2026-05-21
- **顶栏 token 累计修(`sync_task_tokens` 改走 messages SUM,删 `LLM.TokenCounter`)**:用户报"token 计数一直 0"。复盘:5/20 把 loop 切流式后,`LLM.token_counter.add()` 只在同步 `chat()` 路径里调,新走的 `chat_stream()` 路径从来不更新它;`agent_builder.sync_task_tokens(task_state, llm)` 每轮 run 后从 `llm.token_counter.{prompt,completion}_tokens` 读累计 UPDATE 进 `tasks.tokens_prompt/completion` — 内存计数器永远 0 → tasks 行 0/0 → `_task_dict` 顶栏数字 0。DB 验证:5/20 08:55 之前最后一个 task 4568/934,之后所有 task 0/0;但每条 assistant message 的 `tokens_in/out` 都是对的(3223/1014 这种,`record_chat_usage` 在 loop 里写),所以 source-of-truth 在 messages 表完好,只是 task 级冗余汇总列没同步。**修法**:删 `LLM.TokenCounter` 整个类 + `.token_counter` 属性 + `chat()` 里那行 `.add()` 调用;`sync_task_tokens` 改签名为 `(task_state)`(不再要 llm),内部 `SELECT coalesce(sum(tokens_in),0), coalesce(sum(tokens_out),0) FROM messages WHERE task_id=?` 现算后 UPDATE。`ConsoleEventSink` 同步删 `token_counter` 回调参数 + spinner fmt 的 `ctx N tok` 尾巴(CLI 旁路,改动小)。`web/app.py:273` 调用点改 `sync_task_tokens(task_state)`。**对比方案**:① 一行补丁在 `core/loop.py` 拼回 response 后补 `self.llm.token_counter.add(response.usage)` — 最小,但留着 TokenCounter 这个"内存计数器 vs DB 真相"双写源头不解决根因;② 当前方案改 4 文件去掉冗余,符合"开发期以最优实现为准不留兼容层"。**性能**:`(task_id)` FK + `uq_messages_task_idx` 复合索引,单 task 行数顶天几百,SUM 两 int 亚毫秒,在刚跑完几秒 LLM 的 round-trip 噪声里。**Backfill**:`SELECT task_id, SUM(...) GROUP BY task_id` 一次性把现有 0/0 行修对,4 个 task 累计补正(52943/6593、26191/8687、10138/427、6399/1069)。**没动**:DB schema(`tasks.tokens_prompt/completion` 列保留作汇总展示,只是数据源改 messages 现算)、`record_chat_usage`(per-message 写入逻辑就是真相源)、loop / streaming 流程、DESIGN(§348 描述 `sync_task_tokens 维护` 仍准确,只是实现细节变,不属于架构/schema/API 语义变化)。
- **同 wd 并发软警告 banner + task header `📁 wd` 仅在 name≠wdName 时显示 + `/v1/tasks` 加 `run_status` 筛选**:用户问"task + working_dir 设计如何 / 同 wd 多 task 并发咋处理",评估了 γ(同 wd 单活 gate)/ short_id 全产物隔离 / clone task 三个方案均判定过度工程 —— dogfood 经验同 wd 基本不并发,定了 Claude Code 同款"信任 + 软警告 + 承认 limitation"。**后端**:`GET /v1/tasks` 加 `run_status` query 参数(逗号分隔,allowlist `idle/running/cancelling/error`,非法静默忽略),拼 `Task.run_status.in_(set)` 条件;复用现有 `(user_id, working_dir)` 索引,同 wd 活跃 task 常态 0/1 行近零开销。**前端**:① `refreshConcurrentWarnings()``selectTask` + SSE 收尾两点 fire-and-forget 拉 `working_dir=<末段名>&run_status=running,cancelling&page_size=10`,过滤 task_id != current 后存 `state.concurrentWarnings`;② `renderConcurrentWarning()``#chat-meta` 后 / `#chat-stream` 前插 `#wd-concurrent-warn` 黄底 banner(⚠ + 项目名 + 邻居 task name + run_status + "等 N 个"),非阻塞不挡发送;③ `renderChatMeta``📁 wdName` 改为"仅 wdName !== taskName 时显示"(留空 fallback 多数场景 name == wd,显示是噪音;不同时显示提示项目归属)。**对比方案**:γ 硬挡破坏对话切换流畅性,short_id 全产物破坏 §7.1 扁平共享语义 + SKILL.md 改造成本,clone task 工程量对零频场景过重;软警告 ~80 行(后端 10 + 前端 70)实现意图,真高频再升级。**没动**:不轮询(接受"邻居 task 在我浏览时收尾,banner 短暂 stale 几秒"边界 — 同 wd 并发本就近 0)、警告无"不再提示"开关、不点击跳目标 task、宪法文件 short_id 命名约定保留(§7.9 2026-05-20 不动)、不加 clone / gate / 物理隔离。**文档**:DESIGN §7.8 风险表"文件级悲观锁"(本就未实现)行替换为"软警告 + known limitation";§7.9 新增 2026-05-21 取舍条说明 γ/short_id/clone 三方案为何都不选 + 引 Claude Code 同款设计。 - **同 wd 并发软警告 banner + task header `📁 wd` 仅在 name≠wdName 时显示 + `/v1/tasks` 加 `run_status` 筛选**:用户问"task + working_dir 设计如何 / 同 wd 多 task 并发咋处理",评估了 γ(同 wd 单活 gate)/ short_id 全产物隔离 / clone task 三个方案均判定过度工程 —— dogfood 经验同 wd 基本不并发,定了 Claude Code 同款"信任 + 软警告 + 承认 limitation"。**后端**:`GET /v1/tasks` 加 `run_status` query 参数(逗号分隔,allowlist `idle/running/cancelling/error`,非法静默忽略),拼 `Task.run_status.in_(set)` 条件;复用现有 `(user_id, working_dir)` 索引,同 wd 活跃 task 常态 0/1 行近零开销。**前端**:① `refreshConcurrentWarnings()``selectTask` + SSE 收尾两点 fire-and-forget 拉 `working_dir=<末段名>&run_status=running,cancelling&page_size=10`,过滤 task_id != current 后存 `state.concurrentWarnings`;② `renderConcurrentWarning()``#chat-meta` 后 / `#chat-stream` 前插 `#wd-concurrent-warn` 黄底 banner(⚠ + 项目名 + 邻居 task name + run_status + "等 N 个"),非阻塞不挡发送;③ `renderChatMeta``📁 wdName` 改为"仅 wdName !== taskName 时显示"(留空 fallback 多数场景 name == wd,显示是噪音;不同时显示提示项目归属)。**对比方案**:γ 硬挡破坏对话切换流畅性,short_id 全产物破坏 §7.1 扁平共享语义 + SKILL.md 改造成本,clone task 工程量对零频场景过重;软警告 ~80 行(后端 10 + 前端 70)实现意图,真高频再升级。**没动**:不轮询(接受"邻居 task 在我浏览时收尾,banner 短暂 stale 几秒"边界 — 同 wd 并发本就近 0)、警告无"不再提示"开关、不点击跳目标 task、宪法文件 short_id 命名约定保留(§7.9 2026-05-20 不动)、不加 clone / gate / 物理隔离。**文档**:DESIGN §7.8 风险表"文件级悲观锁"(本就未实现)行替换为"软警告 + known limitation";§7.9 新增 2026-05-21 取舍条说明 γ/short_id/clone 三方案为何都不选 + 引 Claude Code 同款设计。
- **paper_server → zcbot research skill(查文献 / get abstract / 拉 PDF)**:用户要 zcbot 能查内部部署的 paper_server(`http://paper.xxhhcty.xyz:8080/`,OpenAlex 元数据 + Sci-Hub PDF 抓取)。**范式判断**:不做 tool(频次低 + zcbot 没 ToolSearch 基建,3 个函数 schema 永驻 chat context 不划算)、不做 MCP(部署/分发成本)、不裸 `run_python` 调 httpx(每次重复写 base_url / 字段名,且易漂移)、不做 helper-lib(LLM 不知道该 import 啥) → **做成 skill**(同 proposal/ppt 范式,SKILL.md + paper.py helper 同目录,LLM `load_skill("research")` 后用 `run_python` 调 helper)。**新增**:① `skills/research/SKILL.md`:何时用 / 何时不用 / 三函数签名 + 示例 / 工作流(search → 筛选 → get_paper 看 abstract → 必要时 fetch_pdf → read PDF)/ 错误处理 / 反模式。② `skills/research/paper.py`(~110 行):`search(keyword, year, doi, has_pdf, limit)` → paper_server `/api/resm/paper/` list 端点,精简 9 字段返(避 abstract 在 list 时 dump 给 LLM 太大);`get_paper(id_or_doi)` → retrieve 端点,**依赖 paper_server 侧 PaperDetailSerializer 加 abstract 字段**(由用户改 serializer + redeploy);`fetch_pdf(id_or_doi, working_dir)` → `/resm/paper/<id>/pdf/` 流式下载到 `<working_dir>/papers/<safe_doi>.pdf`,已存在跳过,`has_fulltext_pdf=False` 抛 RuntimeError;`_resolve_to_id` DOI → id(`10.` 前缀启发式);base_url 默认 `http://paper.xxhhcty.xyz:8080``PAPER_SERVER_URL` env 覆盖。③ **`tools/run_python.py` 注入 PYTHONPATH=base_dir**(关键 enabler):子进程 cwd 是 zcbot 仓库根,但默认 PYTHONPATH 不含项目根 → 不能 `from skills.research.paper import ...`;`env["PYTHONIOENCODING"]` 那行后加 `env["PYTHONPATH"] = str(self.base_dir) + os.pathsep + env.get("PYTHONPATH", "")`,LLM 能直接 import 不必折腾 sys.path。**没动**:tool 系统 / `agent_builder.py` / config / `ModelCapabilities` / ToolSearch 基建(独立决策,触发条件:tool 数 >20 或 schema 总 token >3k)/ paper_server filterset / search_fields / urls / models / paper_pdf_view / DESIGN(skill 是已有抽象)/ RUN(`PAPER_SERVER_URL` 是可选 env,有默认值)。**Tradeoffs**:① skill 内 helper 范式让 paper_server API 漂移时改一处(`paper.py`)而不是 prompt + tool schema;② DOI 启发式 `_is_doi` 容易误判像 `arxiv/2401.xxxxx` 这种非标准串(prefix 不是 `10.`),paper_server 内部用真 DOI(`10.xxx/...`)所以本库内场景稳;③ `search(limit>50)` 自动夹紧到 50 防 LLM 误用一次性拉全表。**遗留**:paper_server 侧 `PaperDetailSerializer` 加 abstract 由用户负责(handoff §A 描述);redeploy 后跑 `scripts/smoke_paper_skill.py`(三步:search list shape / get_paper abstract / fetch_pdf 落盘 + 复用)。 - **paper_server → zcbot research skill(查文献 / get abstract / 拉 PDF)**:用户要 zcbot 能查内部部署的 paper_server(`http://paper.xxhhcty.xyz:8080/`,OpenAlex 元数据 + Sci-Hub PDF 抓取)。**范式判断**:不做 tool(频次低 + zcbot 没 ToolSearch 基建,3 个函数 schema 永驻 chat context 不划算)、不做 MCP(部署/分发成本)、不裸 `run_python` 调 httpx(每次重复写 base_url / 字段名,且易漂移)、不做 helper-lib(LLM 不知道该 import 啥) → **做成 skill**(同 proposal/ppt 范式,SKILL.md + paper.py helper 同目录,LLM `load_skill("research")` 后用 `run_python` 调 helper)。**zcbot 新增**:① `skills/research/SKILL.md`:何时用 / 何时不用 / 三函数签名 + 示例 / 工作流(search 看 abstract → 必要时 fetch_pdf → read PDF)/ 错误处理 / 反模式 / **"keyword 优先英文"专段**(库主语料英文,用户中文输入要转专业英文术语,带中英对照例子表)。② `skills/research/paper.py`(~140 行):`search(keyword, year, year_gte, year_lte, doi, first_author, publication_name, has_pdf, is_oa, limit)` → paper_server `/api/resm/paper/` list 端点,精简 12 字段返(**含 abstract**);`has_pdf` 走精确的 `has_fulltext_pdf` filter(不是 `has_fulltext` 那个含 xml 的);`get_paper(id_or_doi)` → retrieve 端点(list 已带 abstract,正常工作流不需要,仅用户给单 id/DOI 想拿全字段时用);`fetch_pdf(id_or_doi, working_dir)` → `/resm/paper/<id>/pdf/` 流式下载到 `<working_dir>/papers/<safe_doi>.pdf`,已存在跳过,`has_fulltext_pdf=False` 抛 RuntimeError;`_resolve_to_id` DOI → id(`10.` 前缀启发式);base_url 默认 `http://paper.xxhhcty.xyz:8080``PAPER_SERVER_URL` env 覆盖。③ `tools/run_python.py` 注入 PYTHONPATH=base_dir(关键 enabler):子进程 cwd 是 zcbot 仓库根但默认 PYTHONPATH 不含项目根 → 不能 `from skills.research.paper import ...`;`env["PYTHONIOENCODING"]` 那行后加 `env["PYTHONPATH"] = str(self.base_dir) + os.pathsep + env.get("PYTHONPATH", "")`,LLM 能直接 import 不必折腾 sys.path。**paper_server 改动**:① `apps/resm/serializers.py`:`PaperListSerializer` 加 `abstract = SerializerMethodField()` 从 O2O `paper.abstract.abstract` 取(无行 / has_abstract=False → 空串);**list 和 retrieve 共用一个 serializer**,不分 list/detail(原 handoff §A 想分两个,落地时砍掉降复杂度)。② `apps/resm/views.py`:`PaperViewSet` 加 `CustomRetrieveModelMixin`(原只挂 list,retrieve 端点根本不存在 → `/api/resm/paper/<id>/` 404 是个 bug);queryset 改 `Paper.objects.select_related("abstract").all()` 解决 N+1(list 20 条原本 21 次 query,现 1 次 LEFT JOIN);`filterset_fields` → `filterset_class = PaperFilterSet`。③ 新增 `apps/resm/filters.py`(~20 行):`PaperFilterSet` 声明 `publication_year_gte/lte` NumberFilter(LLM 做"近 5 年文献"用)+ exact 字段 `publication_year/type/fetch_status/has_abstract/has_fulltext/has_fulltext_pdf/is_oa/publication_name/first_author/openalex_id/doi`。④ `search_fields` **不动**(仍是 title/first_author/first_author_institution)— 评估过加 `abstract__abstract` 提升中→英 keyword 召回,被用户判定相关性下降 + 性能担忧 > 召回收益,保持现状,**靠 SKILL.md 引导 LLM 转英文 keyword 而不是扩 search_fields**。**Tradeoffs**:① skill 内 helper 范式让 paper_server API 漂移时改一处(`paper.py`)而不是 prompt + tool schema;② DOI 启发式 `_is_doi` 容易误判像 `arxiv/2401.xxxxx` 这种非标准串(prefix 不是 `10.`),paper_server 内部用真 DOI(`10.xxx/...`)所以本库内场景稳;③ `search(limit>50)` 自动夹紧到 50 防 LLM 误用一次性拉全表;④ list 加 abstract 后 payload 从 ~1KB/条 涨到 ~3KB/条,默认 limit=10 也就 30KB,内网毫无感知 + 省去 LLM 逐条 get_paper 的 roundtrip(主要收益)。**没动**:tool 系统 / `agent_builder.py` / config / `ModelCapabilities` / ToolSearch 基建 / paper_pdf_view / urls / Paper model / DESIGN(skill 是已有抽象)/ RUN(`PAPER_SERVER_URL` 是可选 env,有默认值)。**遗留**:paper_server 三个文件已落地,由用户 redeploy;redeploy 后跑 `scripts/smoke_paper_skill.py`(三步:search list shape + abstract 字段 / get_paper retrieve 端点 / fetch_pdf 落盘 + 复用)。
- **dev SPA chip 维度二次校准:工具 I/O 走产物白名单 + 助手正文无条件挂 chip 绕开 seenRels**:截图反馈"助手回复里 echo 的产物图路径(`rust介绍/figures/...png`)没挂 chip"。复盘上一条改动 + `febe04a`:① 上一条把工具 I/O 的 chip gate 也解了 —— 实际意图是"glob/grep 列出的引用不该挂(否则把命中的老 figures/foo.png 当新产物展示)"故 gate 该留;② `febe04a``seenRels` 全局去重把"防同图被 inline 两次"做过头了,把助手正文 echo 的同路径 chip 也吃掉。**最终模型(三条规则)**:① 工具 I/O(args/result):chip 抽取只对产物工具(seedream/seedance);② 产物工具的产物图/视频:inline 大图;③ 助手正文 echo 的路径:**永远**挂 chip(绕开 seenRels)+ 强制 `allowInlineMedia=false`(只小按钮,绝不重复 inline 大图 —— 因为产物工具上面已经 inline 过了)。**改动**:`renderMessages` 3 处(tool 卡 / assistant 正文 / assistant tool_calls args)+ SSE 2 处(tool_call / tool_result)按上面规则改写;`pickFresh`(seenRels 读写)只在产物工具的两处保留(防同图 inline 二次),assistant 正文改成 `renderArtifactBarHtml(extractArtifactRels(...), false)` —— 不读不写 seenRels,直接 chip。SSE 处 `upgradeMediaArtifacts` 同步 gate 到 `if (isProducer)` 下,非产物工具不发 blob fetch。**为什么 chip 重复出现无害**:chip 是 monospace 小字 + 5px 圆角小按钮,占 1 行;同路径在 tool 结果 + assistant 正文都出现,体感是"工具产出了它 + 助手又提到它",是合理叙事节点,跟"两张同样的大 PNG 占整屏"完全不同视觉量级。**对比方案**:① 助手正文也走 seenRels 但区分 chip/inline 类型(seen=path 同时也存 cat),只去重 inline、放过 chip — 复杂度涨,逻辑分支多;② 后端 tool_result 元信息显式标 `produced_files`(前端不再启发式抽路径)— 干净但 SSE/历史回放/seedream 全要改,成本最大,不上。当前方案 4 行实现意图。**没动**:`extractArtifactRels` regex / `_categorize` / 媒体 blob 缓存 / chip 点击委托 / 后端 / DESIGN(纯前端 UX 反复)/ RUN。**遗留**:用户提"绝对路径有些没挂 chip",等具体例子再排(可能是 wd_name 与历史路径段不齐 / 跨 task 路径)。 - **dev SPA chip 维度二次校准:工具 I/O 走产物白名单 + 助手正文无条件挂 chip 绕开 seenRels**:截图反馈"助手回复里 echo 的产物图路径(`rust介绍/figures/...png`)没挂 chip"。复盘上一条改动 + `febe04a`:① 上一条把工具 I/O 的 chip gate 也解了 —— 实际意图是"glob/grep 列出的引用不该挂(否则把命中的老 figures/foo.png 当新产物展示)"故 gate 该留;② `febe04a``seenRels` 全局去重把"防同图被 inline 两次"做过头了,把助手正文 echo 的同路径 chip 也吃掉。**最终模型(三条规则)**:① 工具 I/O(args/result):chip 抽取只对产物工具(seedream/seedance);② 产物工具的产物图/视频:inline 大图;③ 助手正文 echo 的路径:**永远**挂 chip(绕开 seenRels)+ 强制 `allowInlineMedia=false`(只小按钮,绝不重复 inline 大图 —— 因为产物工具上面已经 inline 过了)。**改动**:`renderMessages` 3 处(tool 卡 / assistant 正文 / assistant tool_calls args)+ SSE 2 处(tool_call / tool_result)按上面规则改写;`pickFresh`(seenRels 读写)只在产物工具的两处保留(防同图 inline 二次),assistant 正文改成 `renderArtifactBarHtml(extractArtifactRels(...), false)` —— 不读不写 seenRels,直接 chip。SSE 处 `upgradeMediaArtifacts` 同步 gate 到 `if (isProducer)` 下,非产物工具不发 blob fetch。**为什么 chip 重复出现无害**:chip 是 monospace 小字 + 5px 圆角小按钮,占 1 行;同路径在 tool 结果 + assistant 正文都出现,体感是"工具产出了它 + 助手又提到它",是合理叙事节点,跟"两张同样的大 PNG 占整屏"完全不同视觉量级。**对比方案**:① 助手正文也走 seenRels 但区分 chip/inline 类型(seen=path 同时也存 cat),只去重 inline、放过 chip — 复杂度涨,逻辑分支多;② 后端 tool_result 元信息显式标 `produced_files`(前端不再启发式抽路径)— 干净但 SSE/历史回放/seedream 全要改,成本最大,不上。当前方案 4 行实现意图。**没动**:`extractArtifactRels` regex / `_categorize` / 媒体 blob 缓存 / chip 点击委托 / 后端 / DESIGN(纯前端 UX 反复)/ RUN。**遗留**:用户提"绝对路径有些没挂 chip",等具体例子再排(可能是 wd_name 与历史路径段不齐 / 跨 task 路径)。

View File

@ -377,24 +377,32 @@ def build_agent(
) )
tools[seedream_tool.name] = seedream_tool tools[seedream_tool.name] = seedream_tool
sink = ConsoleEventSink(console, token_counter=lambda: llm.token_counter.total) if console else None sink = ConsoleEventSink(console) if console else None
agent = AgentLoop(llm, tools, session, caps, user_id=uid, sink=sink) agent = AgentLoop(llm, tools, session, caps, user_id=uid, sink=sink)
return agent, session, sid, task_state, working_dir_path return agent, session, sid, task_state, working_dir_path
def sync_task_tokens(task_state: TaskState, llm: LLM) -> None: def sync_task_tokens(task_state: TaskState) -> None:
"""每轮 agent.run 后调,把 LLM 累计 tokens UPDATE 到 PG tasks 表。 """每轮 agent.run 后调,把累计 tokens UPDATE 到 PG tasks 表。
update_task 而非 task_state.save() 只更 tokens 两列,避免无谓全字段 UPSERT `messages.tokens_in/out` SUM 现算 `record_chat_usage` 写每条 assistant
ORM-level update 自动刷 updated_at message 时已落库,这里聚合写入 tasks 概览列query (task_id) 索引,行数
顶天几百,亚毫秒级,在刚跑完几秒 LLM 后的 round-trip 噪声里
""" """
from uuid import UUID from uuid import UUID
from sqlalchemy import func, select
from core.storage import update_task from core.storage import update_task
tc = llm.token_counter from core.storage.engine import session_scope
task_state.tokens_prompt = tc.prompt_tokens from core.storage.models import Message
task_state.tokens_completion = tc.completion_tokens tid = UUID(task_state.task_id)
update_task( with session_scope() as s:
UUID(task_state.task_id), row = s.execute(
tokens_prompt=tc.prompt_tokens, select(
tokens_completion=tc.completion_tokens, func.coalesce(func.sum(Message.tokens_in), 0),
) func.coalesce(func.sum(Message.tokens_out), 0),
).where(Message.task_id == tid)
).one()
tp, tc = int(row[0]), int(row[1])
task_state.tokens_prompt = tp
task_state.tokens_completion = tc
update_task(tid, tokens_prompt=tp, tokens_completion=tc)

View File

@ -27,34 +27,12 @@ from litellm.exceptions import (
from .capabilities import ModelCapabilities from .capabilities import ModelCapabilities
class TokenCounter:
def __init__(self) -> None:
self.prompt_tokens = 0
self.completion_tokens = 0
def add(self, usage: Any) -> None:
if not usage:
return
if hasattr(usage, "model_dump"):
usage = usage.model_dump()
elif hasattr(usage, "dict"):
usage = usage.dict()
if isinstance(usage, dict):
self.prompt_tokens += int(usage.get("prompt_tokens") or 0)
self.completion_tokens += int(usage.get("completion_tokens") or 0)
@property
def total(self) -> int:
return self.prompt_tokens + self.completion_tokens
class LLM: class LLM:
def __init__(self, capabilities: ModelCapabilities) -> None: def __init__(self, capabilities: ModelCapabilities) -> None:
self.caps = capabilities self.caps = capabilities
env_name = capabilities.api_key_env or "DEEPSEEK_API_KEY" env_name = capabilities.api_key_env or "DEEPSEEK_API_KEY"
self.api_key = os.environ.get(env_name) self.api_key = os.environ.get(env_name)
self.api_base = capabilities.api_base or None self.api_base = capabilities.api_base or None
self.token_counter = TokenCounter()
if not self.api_key: if not self.api_key:
raise RuntimeError( raise RuntimeError(
f"环境变量 {env_name} 未设置,无法调用 {capabilities.model_id}" f"环境变量 {env_name} 未设置,无法调用 {capabilities.model_id}"
@ -98,7 +76,6 @@ class LLM:
for attempt in range(max_retries): for attempt in range(max_retries):
try: try:
response = litellm.completion(**kwargs) response = litellm.completion(**kwargs)
self.token_counter.add(getattr(response, "usage", None))
return response return response
except (RateLimitError, APIConnectionError, ServiceUnavailableError, Timeout, APIError) as e: except (RateLimitError, APIConnectionError, ServiceUnavailableError, Timeout, APIError) as e:
last_err = e last_err = e

View File

@ -16,7 +16,7 @@ from __future__ import annotations
import threading import threading
import time import time
from typing import Callable, Optional from typing import Optional
from rich.console import Console from rich.console import Console
from rich.markdown import Markdown from rich.markdown import Markdown
@ -24,16 +24,10 @@ from rich.markdown import Markdown
class ConsoleEventSink: class ConsoleEventSink:
"""把事件画到 rich console。spinner 在 llm_start..llm_end 之间显示, """把事件画到 rich console。spinner 在 llm_start..llm_end 之间显示,
后台 daemon 线程每 100ms 刷耗时 + 累计 token""" 后台 daemon 线程每 100ms 刷耗时"""
def __init__( def __init__(self, console: Console) -> None:
self,
console: Console,
token_counter: Optional[Callable[[], int]] = None,
) -> None:
self.console = console self.console = console
# 把 LLM 累计 token 数取出来(spinner 文案要用),可选;无则不显示 ctx
self._tokens = token_counter or (lambda: 0)
self._status = None self._status = None
self._stop: Optional[threading.Event] = None self._stop: Optional[threading.Event] = None
self._thread: Optional[threading.Thread] = None self._thread: Optional[threading.Thread] = None
@ -69,9 +63,7 @@ class ConsoleEventSink:
def fmt() -> str: def fmt() -> str:
elapsed = time.monotonic() - self._start elapsed = time.monotonic() - self._start
total = self._tokens() return f"[muted]thinking... {elapsed:.1f}s[/muted]"
tail = f" ctx {total:,} tok" if total else ""
return f"[muted]thinking... {elapsed:.1f}s{tail}[/muted]"
self._status = self.console.status(fmt(), spinner="dots") self._status = self.console.status(fmt(), spinner="dots")
self._status.__enter__() self._status.__enter__()

View File

@ -270,7 +270,7 @@ def _run_agent_bg(
agent.sink = WebEventSink(broker, task_id) agent.sink = WebEventSink(broker, task_id)
agent.cancel_check = lambda tid=task_id: broker.is_cancelled(tid) agent.cancel_check = lambda tid=task_id: broker.is_cancelled(tid)
agent.run(user_message) agent.run(user_message)
sync_task_tokens(task_state, agent.llm) sync_task_tokens(task_state)
# cancel 命中或正常完成 → run_status 回 idle(error 才持久) # cancel 命中或正常完成 → run_status 回 idle(error 才持久)
with session_scope() as s: with session_scope() as s:
s.execute( s.execute(