From a3acb97079740f8190efca4af6baa1e1e39b6fa3 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Wed, 20 May 2026 15:46:54 +0800 Subject: [PATCH] =?UTF-8?q?feat(loop+ui):=20LLM=20=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E5=88=87=20streaming=20=E2=80=94=20cancel=20=E7=A7=92=E9=80=80?= =?UTF-8?q?=20+=20=E5=89=8D=E7=AB=AF=E6=89=93=E5=AD=97=E6=9C=BA=20+=20?= =?UTF-8?q?=E5=8F=91=E9=80=81/=E5=81=9C=E6=AD=A2=E5=90=88=E5=B9=B6?= =?UTF-8?q?=E5=8D=95=E6=8C=89=E9=92=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - core/llm.py: 加 chat_stream() generator(stream=True + include_usage), generator finally 关底层 httpx 连接;_build_kwargs 抽出来 chat/chat_stream 共用 - core/loop.py: 主循环 _stream_llm() 流式迭代,chunk 间 poll cancel 命中 break, litellm.stream_chunk_builder 拼回 response 给 tool_calls 解析 + usage 记账; content delta 即时 emit text 事件激活前端打字机渲染 - web/static/dev.html: chat-send + chat-cancel 合并 chat-action 单按钮, setActionMode(idle/streaming/cancelling) 切态;streaming 期间 Enter 不触发停止 - cancel 延迟从「整轮 generation 时长」(几十秒)降到「单 chunk 间隔」(100ms 级) - 文档:DESIGN §3.1 + API 表 + risks 表翻转 tradeoff;RUN 接口 + 故障兜底同步; web/app.py docstring 对齐;PROGRESS 加条目 + 文件清单行数 Co-Authored-By: Claude Opus 4.7 (1M context) --- DESIGN.md | 11 +++--- PROGRESS.md | 13 ++++--- RUN.md | 4 +- core/llm.py | 74 ++++++++++++++++++++++++++++++++---- core/loop.py | 91 ++++++++++++++++++++++++++++++++++++++------- web/app.py | 7 ++-- web/static/dev.html | 59 ++++++++++++++++++++--------- 7 files changed, 206 insertions(+), 53 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index 8ce1b48..9cce447 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -62,8 +62,9 @@ zcbot/ ### 3.1 主循环(`core/loop.py`) ReAct:LLM → 若有 tool_calls 就执行 → 结果塞回消息 → 再调 LLM。无 tool_call 即返回。 - 工具结果对模型截 16K 字符,用户预览 400 字符 -- 事件通过 `sink.emit` 流式发布(§7 A,SSE 桥) -- `cancel_check: Optional[Callable[[], bool]]` 协作式 cancel,每轮 LLM 前 + tool_calls 之间 poll;命中给未执行 tool_call 补 `[cancelled by user]` 保 LiteLLM 协议 +- 事件通过 `sink.emit` 流式发布(§7 A,SSE 桥);content delta 在 stream chunk 到达即时 emit `text` 事件,前端打字机渲染 +- **LLM 调用走 `LLM.chat_stream`(litellm `stream=True`)**:chunks 攒齐后用 `litellm.stream_chunk_builder` 拼回完整 response 给 tool_calls 解析 + usage 记账;`stream_options.include_usage=True` 让最后一个 chunk 带 usage +- `cancel_check: Optional[Callable[[], bool]]` 协作式 cancel,每轮 LLM 前 + **stream chunk 之间** + tool_calls 之间 poll;chunk 间 poll 让 cancel 延迟从「整轮 generation 时长」(几十秒)降到「单 chunk 间隔」(~100ms);中途 cancel 时已收 chunk 丢弃,assistant 半截内容不入库(resume 上下文干净);命中给未执行 tool_call 补 `[cancelled by user]` 保 LiteLLM 协议 - `max_iterations` 从 capabilities 读 ### 3.2 Model Profile(`core/capabilities.py` + `config/models/*.yaml`) @@ -243,8 +244,8 @@ Tasks GET /v1/tasks/{id}/events SSE 流(见下) — 订阅 task 当前活动事件, 单活 run 形态下无歧义,客户端只需 task_id POST /v1/tasks/{id}/cancel 协作式 cancel(202):标 cancelling + 信号 broker; - BG loop 在工具调用之间 poll 看见即退; - run_status != running → 409;LLM 同步 call 本身不可中断 + BG loop 在 stream chunk 间 + 工具调用之间 poll 看见即退; + run_status != running → 409;cancel 延迟 ~ 单 chunk 间隔(100ms 级) Auth POST /v1/auth/login {user_id, platform_key} → JWT(platform 机器对机器) @@ -417,7 +418,7 @@ create index on usage_events (model_profile, created_at); | DB-then-FS 中断留孤儿目录 | rename 顺序 DB UPDATE → FS rename(FS 失败回滚 DB);delete 后台 GC 周期扫"FS 有但 DB 无引用" | | 同 folder 多 task 并发写同名 | 文件级悲观锁,冲突早失败 | | 同 task 并发 POST messages 撞 `messages.idx` | `POST /messages` 单活 run gate:`SELECT … FOR UPDATE` 锁 task + `run_status in ('running','cancelling')` → 409;启动 lifespan reaper 把孤儿 `running`/`cancelling` 全标 error。未来 multi-worker 换 heartbeat / lease | -| Run 跑太久 / 用户想中断 | `POST /v1/tasks/{id}/cancel` 协作式;LLM 同步 call 本身不可中断 — 最坏等当前一轮跑完(几十秒) | +| Run 跑太久 / 用户想中断 | `POST /v1/tasks/{id}/cancel` 协作式;LLM 走 streaming,chunk 间 poll cancel → 延迟 ~ 单 chunk 间隔(100ms 级)| | Sandbox 出站越权 | egress allowlist 起步只放 LLM + PyPI | | 资源滥用 | BYO key 默认;月度配额;cold task LRU 清 | diff --git a/PROGRESS.md b/PROGRESS.md index 0b736a3..ecc0362 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 2-4 句:做了啥 + 关键判断 + 没动什么;细节查 `git log` / `git diff`。 -最后更新:2026-05-20(豆包 Seedream 5.0 图像生成 tool 接入 + cost_usd → cost_cny 全表统一币种) +最后更新:2026-05-20(LLM 走 streaming + 前端打字机 + 发送/停止单按钮 + cancel 秒退) --- @@ -23,6 +23,8 @@ ### 2026-05-20 +- **LLM 调用切 streaming(cancel 秒退 + 前端打字机)+ 发送/停止合并单按钮**:用户反馈"点停止要等很久"+"发送/停止可以合并"。**问题 1 根因**:`litellm.completion(...)` 是同步阻塞,Python 没标准办法外部线程打断同步 IO;`broker.is_cancelled` 只在 `core/loop.py:run()` 每轮 LLM 前 + tool_calls 之间 poll,所以 cancel 必须等当前整轮 generation 跑完才生效(deepseek v4 + thinking + 长输出几十秒)。**修法**:切 `litellm.completion(stream=True)`,`core/llm.py` 加 `chat_stream()` generator(`stream_options={"include_usage": True}` 让最后 chunk 带 usage;`_build_kwargs` 抽出来给 chat/chat_stream 共用,免重复参数装配);`core/loop.py` 主循环改 `_stream_llm()` 流式迭代,chunk 间 poll cancel,命中 `break` + generator finally `stream.close()` 关底层 httpx 连接;chunks 攒齐用 `litellm.stream_chunk_builder(chunks, messages=...)` 拼回完整 response(自动处理 tool_call name/arguments 跨 chunk 拼接)给 tool_calls 解析 + usage 记账。**cancel 语义对齐**:stream 中途 cancel → 已收 chunk 丢弃不入库不记账(下次 resume 上下文干净);stream 完结后 tool_calls 之间 cancel → 沿用原 `_fill_cancelled_tool_results` 补 cancelled tool message。**前端打字机免费 bonus**:`dev.html:1500-1510` 早就备好接 `text` 事件的 `delta` 字段(rAF 节流 + nearBottom 不抢滚动 + 流中不跑 highlight),但后端原来发的是 `{"type":"text","content":"<整段>"}` 字段名对不上 → 前端永远 match 不到。新逻辑在 `_stream_llm` 里 chunk 到达即 `_emit({"type":"text","delta":...})`,前端自然激活打字机。loop.py 主流程末尾不再 emit 整段 text(content 已通过 delta 流过)。**问题 2 UI**:`web/static/dev.html` 把 `#chat-send`(发送)+ `#chat-cancel`(停止)合并为单 `#chat-action`,新 helper `setActionMode(mode)`(idle="发送" primary 红实心 / streaming="停止" danger 红边 / cancelling="停止中…" disabled);form submit + `chatAction()` 根据 `state.streaming` 分派 sendMessage / cancelCurrentTask;streaming 期间 Enter 不触发停止(textarea 编辑下一条草稿,误触发风险高)。**Smoke 验证**:① 18 chunks 流式 + 文本拼回 ✓ ② tool_call 49 chunks 跨片拼回 `{"a":7,"b":5}` 完整 ✓ ③ 提前 break + close 仅 0.7s(模拟"写 500 字散文中途 cancel")✓。**Tradeoffs**:① streaming 重试只在连接建立阶段(没拿到第一个 chunk 前)生效,中途断流不续 — 实务罕见;② timeout 行为从"整段 timeout"变"chunk 间隔 timeout",新模型接入要测 thinking 不吐 reasoning chunk 的极端情况;③ litellm `stream_chunk_builder` + `stream_options.include_usage` 在 deepseek/doubao/glm/openai 标准协议都正常,新接非主流 provider 时验证。**没动**:probe.py(仍用同步 `chat()`,离线探测不需要 cancel)、CLI 路径(probe 走 chat 不受影响)、broker / SSE 帧格式 / `record_chat_usage` 入参 / DB schema / messages 入库时机(拼回 response 跟非流式等价)。**文档**:`DESIGN.md` §3.1 翻转 tradeoff 表「LLM 同步 call 不可中断」→「LLM 调用走 streaming」+ §7 API 表 cancel 描述改 chunk-level 延迟;`RUN.md` cancel 接口 + 故障兜底表对应行同步;`web/app.py` 两条 docstring 同步。 + - **dev SPA seedream tool 透明性 banner(model/size/cost/elapsed)**:用户问"实际生图用哪个模型 / 价格区别 / 前端要不要给用户选";seedream 现仅一个 variant(5.0),无选择空间 — 但用户**能看到**用了什么模型、花了多少是基本透明度。最小路径:SeedreamTool 返回串首行改成 `[seedream] model=... · size=... · cost=¥... · elapsed=...s` 结构化 banner(用 `·` 分隔 + `key=value` 严格格式,正则 parse);dev SPA 新加 `extractMediaBanner(toolName, resultText)` helper,流式 `tool_result` 与历史回放 `role==="tool"` 两路都在 `
` 的 `` 旁挂一行徽章(`.tool-banner .kv`,model 红字 / cost 暗红 / 其他灰色);model 文本去 `doubao-` 前缀与 `-260128` 日期后缀截短显示 `seedream-5-0`;**折叠态可见,无需展开**。LLM 看到的完整文本不丢(banner 同条第一行就是字符串)。**没动**:tool schema(不加 model 参数 — 单 variant 没意义,等 seedance 二期 pro/fast 真有价差时统一加 task 级下拉 + `tasks.image_model_profile` 列设计)、artifact chip 抽取(figures/*.png 现有逻辑无变化)、DB / 后端。**Tradeoff**:走文本 banner 而非从 .meta.json fetch — 简单 + 即时,代价是 tool 返回串格式成"前端约定"(改格式要同步前端 regex)。 - **豆包 Seedream 5.0 图像生成 tool 接入(seedance/视频留 Phase 2)+ 0007 migration `cost_usd` → `cost_cny` 全表统一币种**:用户要接 doubao-seedream-5-0-260128 + doubao-seedance-2-0-260128 + doubao-seedance-2-0-fast-260128,先做 seedream(同步 API 简单,跑通整条管线);seedance 异步 + token 计费复杂,留二期。**架构判断**:seedream/seedance 不是 chat LLM 范式(litellm 不覆盖,异步 task 形态,价格 per-image/per-second),**不进 chat 顶栏 model 下拉,做成 agent 可调 tool**;`config/media/doubao.yaml` 独立命名空间(`ark_api_key_env=ARK_API_KEY` + `ark_base_url=https://ark.cn-beijing.volces.com/api/v3` + image variants);**不复用 `ModelCapabilities`**(chat 长上下文/thinking schema 不适用)。**新文件**:① `core/ark_client.py`(httpx 封装 base URL + bearer auth + 异常翻译 + `download(url, dest)` 流式下载产物 — 复用给后续 seedance);② `tools/seedream.py::SeedreamTool`(prompt 必填 / size / watermark / search 可选 → POST `/images/generations` → 响应解析 `_extract_url`(三种 shape 兜底:OpenAI `data[].url` / 豆包 `data.images[].url` / 递归扫第一个 http url)→ 立刻下载到 `/figures/-.png` + 同名 `.meta.json`(prompt/model_id/size/cost_cny/elapsed/response_id/ts)→ `record_image_usage` 写 `kind="image"` 行)。**计费**:`record_image_usage` 接 CNY 直落,**`price_cny_per_image` snapshot 进 units jsonb**(`{"n_images":1, "size":"2048x2048", "search":false, "price_cny_per_image":0.22}`)—— 这是**调价防漂移**关键:豆包改价改 YAML 重启即可,历史 usage_events 自带快照不受污染,跨调价对账 `SELECT units->>'price_cny_per_image', cost_cny ... GROUP BY` 能拉出不同价位累计。**币种统一(0007 migration)**:`tasks.cost_usd` + `usage_events.cost_usd` 双 rename → `cost_cny`,现有数据 `×7.2` 一次性折算(开发期数据小且 chat 多用国产模型 litellm cost map 不收录原本就是 0),`record_chat_usage` 内部把 litellm USD `×7.2` 落 CNY,全表统一币种免按 user 总账单分类汇总。**注册策略**:`agent_builder.py::build_agent` 调 `ArkConfig.load()`,**仅当 `ARK_API_KEY` env 设了才挂 tool**(无 key 用户感知零变化,不会看到 schema 里多个永远报错的工具);构造时注入 task_id / user_id / working_dir / ark_cfg(沿用 `user_root=` 注入范式)。**system prompt**(`prompts/system/general_v1.md`):加「媒体生成工具」段提示按需调用、不主动装饰生成、流程图优先 mermaid (skill 已有管线) — seedream 适合写实/概念/艺术风格图。**没动**:`ModelCapabilities`(避免 schema 污染)、dev SPA(图预览 modal 已支持 png,artifact chip 已识别 figures/*.png 自动渲染缩略图)、`tasks.cost_cny` 列读写路径(record_chat_usage / record_image_usage 都只写 usage_events,task 级累计列仍由后续 sync 补)。**Tradeoff**:① CNY 折算用固定汇率 7.2,涨跌 ±5% 误差开发期接受,真精算应按调用时刻汇率但太重;② 涨价瞬间到 YAML 改完的窗口期记账偏低(豆包不会无预警调价,且 units snapshot 让历史数据可还原)。**待办**:① smoke 真调豆包接口走通(等用户配 `ARK_API_KEY`);② Phase 2 接 seedance(异步 task + polling + 进度 SSE 事件,复用 ark_client.download)。 @@ -118,7 +120,8 @@ | 兼容层 | 开发期不写 | DB schema / 字段 / API 改动直接切,见 CLAUDE.md | | `/v1/files/*` 与 DB | files API 作目录树唯一 mutation 入口,DB-FS 一致性服务端内化 | rename / delete 顶层目录 DB-aware(SELECT FOR UPDATE + check_no_subtask + 事务回滚) | | 单活 run | task 同时最多 1 个活 run | gate 在 `post_message` 同事务 `SELECT FOR UPDATE`,挡连点 send / 多 tab | -| LLM 同步 call 不可中断 | cancel 协作式 check 在 LLM 之间 + tool_call 之间 | 最坏等当前一轮跑完(几十秒) | +| LLM 调用走 streaming | `LLM.chat_stream` + `litellm.stream_chunk_builder` 拼回 response;cancel poll 在 stream chunk 间 + tool_call 之间 | cancel 延迟 100ms 级;顺带 content delta 即时 emit `text` 事件给前端打字机渲染 | +| 发送/停止单按钮 | UI 根据 `state.streaming` 切态;cancel 时 setActionMode("cancelling") 临时 disable | streaming 期间 Enter 不触发停止(防误触) | --- @@ -126,8 +129,8 @@ ``` core/capabilities.py 71 -core/llm.py 93 ← litellm 离线 cost map env -core/loop.py 182 ← §7 A sink.emit + cancel_check 协作式 cancel +core/llm.py 151 ← litellm 离线 cost map env + chat_stream(stream=True + include_usage) +core/loop.py 268 ← §7 A sink.emit + _stream_llm(chunk 间 poll cancel + emit delta) core/sinks.py 101 ← §7 A core/ui.py 38 core/paths.py 50 ← task_dir db form 归一(to_db_path / from_db_path) @@ -160,7 +163,7 @@ web/app.py ~1320 ← /v1 JSON API + user_id 隔离 + run lock + web/auth.py ~190 ← D' 过渡:邮箱密码 + platform_key → JWT web/broker.py 121 ← in-process pub/sub + cancel signal(全 task_id 索引) web/sinks.py 21 -web/static/dev.html ~2140 ← D' dev SPA(3 栏 + 文件预览弹框 + 两 tab 登录 + 多选 + 目录选择弹框) +web/static/dev.html ~2480 ← D' dev SPA(3 栏 + 文件预览 + 两 tab 登录 + 选入弹框 + 发送/停止单按钮 + 流式打字机渲染激活) web/static/vendor/ ~1 MB ← jszip / docx-preview / xlsx(office 预览) ───────────────────────────────── Python 合计 ~3400 行(+ dev.html 1700 静态 + vendor 1MB) diff --git a/RUN.md b/RUN.md index b5ce9f4..588c4e2 100644 --- a/RUN.md +++ b/RUN.md @@ -127,7 +127,7 @@ curl --noproxy '*' -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8765/v1/ta | `GET /v1/tasks/{id}/messages` | LiteLLM payload 透传 | 必填 | | `POST /v1/tasks/{id}/messages` | `{content}` 发消息;返 `{events_url}`;**`run_status` 是 running/cancelling → 409**(单活 run;error 起新 run 时清);UI 应 disable send 直到 SSE `done` | 必填 | | `GET /v1/tasks/{id}/events` | SSE 流(`event: ` + `data: `);订阅 task 当前活动 | 必填 | -| `POST /v1/tasks/{id}/cancel` | 协作式 cancel;`run_status != running` → 409;LLM 同步 call 不可中断,最坏等当前一轮跑完 | 必填 | +| `POST /v1/tasks/{id}/cancel` | 协作式 cancel;`run_status != running` → 409;LLM 走 streaming,chunk 间 poll cancel — 延迟 100ms 级,基本秒退 | 必填 | | `POST /v1/tasks/{id}/clear` | 清空当前 task 全部 messages + reset `tasks.tokens_prompt/completion/cost_cny` 三列累计 + `run_status='idle'`;`usage_events`(账单记账)**不动**,只 `message_id` 列变 NULL;run 活跃中(running/cancelling)→ 409(先 cancel);FS 文件保留 | 必填 | | `GET /v1/files?path=` | 列 user_root 下条目 + 面包屑;dotfile 隐藏 | 必填 | | `GET /v1/files/download?path=` | 下单文件 | 必填 | @@ -255,7 +255,7 @@ sudo journalctl -u zcbot -n 50 # 看新进程起没起干 | `POST /v1/tasks/{id}/messages` 返 409 `task already has an active run` | 上一条消息的 BG run 还没跑完;等流式 done 或点 stop / `POST .../cancel`;服务异常下 `run_status` 卡 `running`/`cancelling`,启动 reaper 会清 | | `POST /v1/tasks/{id}/cancel` 返 409 `task not running` | `run_status` 不是 `running`(idle / cancelling / error 都不能 cancel);dev SPA 自动忽略不报错 | | `POST /v1/tasks/{id}/clear` 返 409 `task has an active run` | 当前 run 还没跑完;先点停止 / `POST .../cancel` 等流式 done 再清空 | -| 点 stop 后流式没立刻停 | LLM 同步 call 不可中断,最坏等当前一轮跑完(几十秒);loop 进入下个 check 点(每轮 LLM 前 / 每个 tool_call 前)就退,emit `cancelled` → SSE `done` → UI 收回 stop | +| 点 stop 后流式没立刻停 | streaming 改造后正常路径秒退;若仍卡可能是 ① httpx 连接 close 没立刻关(GC 时机)/ ② 模型 thinking 阶段长时间不吐 chunk,等下一个 chunk 到达才能 poll cancel(罕见) | | `[startup] reaped N stale active run(s)` | 上次 web 进程未正常 finish 留下 N 个孤儿 run,启动 lifespan 自动标 error。info 级,无需处理 | | `seedream` tool 没出现在对话里 | `.env` 没设 `ARK_API_KEY`,build_agent 跳过注册。设了重启 web 即可;无需迁移、无需 DB 改动 | | 豆包调价了 | 改 `config/media/doubao.yaml` 的 `price_cny_per_image` 一行 → 重启 web。**历史 usage_events 不受影响**(units jsonb 里有当时单价 snapshot,聚合查仍按旧价);新写入按新价。涨价瞬间到改 YAML 中间这段记账偏低,开发期接受 | diff --git a/core/llm.py b/core/llm.py index 575bc40..3cdfc74 100644 --- a/core/llm.py +++ b/core/llm.py @@ -1,9 +1,15 @@ -"""LiteLLM 封装: capabilities 决定调用参数,自动重试。""" +"""LiteLLM 封装: capabilities 决定调用参数,自动重试。 + +`chat()`:同步阻塞,一次性返回完整 response。给 probe / 离线探测用。 +`chat_stream()`:流式 generator,yield chunk;调用方累积 + 用 litellm.stream_chunk_builder + 拼回完整 response。loop 走这条以便 chunk 之间 poll cancel(同步 LLM call 不可中断; + 流式下 cancel 延迟 ~ chunk 间隔 100ms 级,而非整轮 generation 时长几十秒)。 +""" from __future__ import annotations import os import time -from typing import Any, List, Optional +from typing import Any, Iterator, List, Optional # 跳过启动时从 GitHub 拉 model_prices 的网络请求,直接用 litellm 打包的本地副本。 # 必须在 `import litellm` 之前设置,否则 get_model_cost_map() 已经跑过了。 @@ -54,14 +60,13 @@ class LLM: f"环境变量 {env_name} 未设置,无法调用 {capabilities.model_id}" ) - def chat( + def _build_kwargs( self, messages: List[dict], - tools: Optional[list] = None, - parallel_tool_calls: Optional[bool] = None, - reasoning_effort: Optional[str] = None, - max_retries: int = 3, - ) -> Any: + tools: Optional[list], + parallel_tool_calls: Optional[bool], + reasoning_effort: Optional[str], + ) -> dict: kwargs: dict = { "model": self.caps.model_id, "messages": messages, @@ -78,7 +83,17 @@ class LLM: kwargs["reasoning_effort"] = reasoning_effort if self.caps.prompt_caching: kwargs["extra_headers"] = {"anthropic-beta": "prompt-caching-2024-07-31"} + return kwargs + def chat( + self, + messages: List[dict], + tools: Optional[list] = None, + parallel_tool_calls: Optional[bool] = None, + reasoning_effort: Optional[str] = None, + max_retries: int = 3, + ) -> Any: + kwargs = self._build_kwargs(messages, tools, parallel_tool_calls, reasoning_effort) last_err: Optional[Exception] = None for attempt in range(max_retries): try: @@ -91,3 +106,46 @@ class LLM: break time.sleep(2 ** attempt) raise last_err # type: ignore[misc] + + def chat_stream( + self, + messages: List[dict], + tools: Optional[list] = None, + parallel_tool_calls: Optional[bool] = None, + reasoning_effort: Optional[str] = None, + max_retries: int = 3, + ) -> Iterator[Any]: + """流式 chat:yield 每个 chunk。调用方累积 + 用 litellm.stream_chunk_builder 拼回完整 response。 + + 重试语义:连接建立阶段错误(还没拿到第一个 chunk)按 max_retries 退避重试; + 开始流之后失败直接抛(半截 partial 没法续)。usage 通过 stream_options.include_usage + 让最后一个 chunk 带 usage。 + """ + kwargs = self._build_kwargs(messages, tools, parallel_tool_calls, reasoning_effort) + kwargs["stream"] = True + kwargs["stream_options"] = {"include_usage": True} + + last_err: Optional[Exception] = None + for attempt in range(max_retries): + try: + stream = litellm.completion(**kwargs) + break + except (RateLimitError, APIConnectionError, ServiceUnavailableError, Timeout, APIError) as e: + last_err = e + if attempt == max_retries - 1: + raise + time.sleep(2 ** attempt) + else: + raise last_err # type: ignore[misc] + + try: + for chunk in stream: + yield chunk + finally: + # 调用方提前 break(cancel) → generator close → 这里关掉底层 httpx 连接 + close = getattr(stream, "close", None) + if callable(close): + try: + close() + except Exception: + pass diff --git a/core/loop.py b/core/loop.py index 7ff183c..4cc4876 100644 --- a/core/loop.py +++ b/core/loop.py @@ -2,15 +2,21 @@ loop 不直接 print —— 进度通过 sink.emit(event) 上抛。Sink 决定怎么呈现 (本地 console / SSE / 日志)。事件类型见 core/sinks.py 头部说明。 + +LLM 调用走 `chat_stream`(流式),chunk 之间 poll cancel_check 实现快速中断。 +content delta 即时 emit `text` 事件让前端打字机渲染;chunks 攒齐后用 +`litellm.stream_chunk_builder` 拼回完整 response 给 tool_calls 解析 + usage 记账。 """ from __future__ import annotations import json import time -from typing import Any, Callable, Dict, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple from uuid import UUID +import litellm + from .capabilities import ModelCapabilities from .llm import LLM from .session import Session @@ -20,6 +26,23 @@ from .storage import record_chat_usage _CANCELLED_TOOL_PLACEHOLDER = "[cancelled by user]" +def _extract_delta_content(chunk: Any) -> Optional[str]: + """从 stream chunk 提 delta.content(文本片段)。chunk 形态 litellm ModelResponseStream: + choices[0].delta.content。usage-only 收尾 chunk(没 choices / delta)返 None。 + """ + try: + choices = getattr(chunk, "choices", None) + if not choices: + return None + delta = getattr(choices[0], "delta", None) + if delta is None: + return None + content = getattr(delta, "content", None) + return content if content else None + except Exception: + return None + + def _extract_usage(usage: Any) -> Tuple[int, int]: """从 litellm response.usage 提 (prompt_tokens, completion_tokens)。""" if not usage: @@ -52,9 +75,10 @@ class AgentLoop: self.user_id = user_id # usage_events 写入时按 user 维度聚合 self.max_iterations = max_iterations or capabilities.max_iterations self.sink = sink - # 协作式 cancel:web 层注入 `lambda: broker.is_cancelled(run_id)`; - # CLI 路径不设(None → 永不 cancel)。LLM 调用本身是 litellm 同步阻塞、不可中断, - # check 点放在每轮 LLM 前、tool_calls 之间;一次 LLM call 最坏卡几十秒。 + # 协作式 cancel:web 层注入 `lambda: broker.is_cancelled(task_id)`; + # CLI 路径不设(None → 永不 cancel)。check 点在 ① 每轮 LLM 前 ② stream chunk 间 + # ③ tool_calls 之间。chunk 间 poll 让 cancel 延迟从「整轮 generation 时长」 + # (几十秒)降到「单 chunk 间隔」(~100ms)。 self.cancel_check = cancel_check def _emit(self, event: dict) -> None: @@ -85,12 +109,15 @@ class AgentLoop: self._emit({"type": "llm_start"}) start = time.monotonic() - response = self.llm.chat( - messages=self.session.messages, - tools=[t.schema for t in self.tools.values()], - reasoning_effort=self.caps.default_reasoning_effort or None, - ) + response, cancelled_mid_stream = self._stream_llm() elapsed = time.monotonic() - start + + if cancelled_mid_stream: + # 流中途收到 cancel:已接收的 chunk 丢弃,不入库不记账(部分 assistant + # 内容也不持久化,下次 resume 上下文干净)。response 可能是 None。 + self._emit({"type": "cancelled"}) + return "[cancelled]" + msg = response.choices[0].message asst_msg_id = self.session.append(msg) @@ -119,13 +146,11 @@ class AgentLoop: }) tool_calls = getattr(msg, "tool_calls", None) or [] - content = getattr(msg, "content", None) - if content: - self._emit({"type": "text", "content": content}) + # content 已通过 stream 流式 emit 过 delta,这里不再 emit 整段 text 事件。 if not tool_calls: self._emit({"type": "done"}) - return content or "" + return getattr(msg, "content", None) or "" for i, tc in enumerate(tool_calls): if self._is_cancelled(): @@ -144,6 +169,46 @@ class AgentLoop: self._emit({"type": "done"}) return "[reached max iterations]" + def _stream_llm(self) -> Tuple[Optional[Any], bool]: + """流式拉一轮 LLM,chunk 间 poll cancel,content delta 即时 emit。 + + 返回 (response, cancelled_mid_stream): + - 正常完结 → (response, False);response 由 litellm.stream_chunk_builder 拼回, + shape 与非流式 completion() 等价(choices[0].message + usage) + - 中途 cancel → (None, True);已收 chunk 丢弃,内层 generator 在 finally 关闭底层连接 + """ + chunks: List[Any] = [] + stream = self.llm.chat_stream( + messages=self.session.messages, + tools=[t.schema for t in self.tools.values()], + reasoning_effort=self.caps.default_reasoning_effort or None, + ) + cancelled = False + try: + for chunk in stream: + if self._is_cancelled(): + cancelled = True + break + chunks.append(chunk) + # delta.content 即时 emit 给前端打字机渲染;tool_call delta 不实时发 + # (拼接散在多 chunk 跨 frame 难看,等拼回后整条 tool_call 事件由 + # _execute_tool_call 时机发更直观)。 + delta_text = _extract_delta_content(chunk) + if delta_text: + self._emit({"type": "text", "delta": delta_text}) + finally: + # generator 提前 break 时 GeneratorExit 触发 chat_stream finally → close 底层连接 + stream.close() + + if cancelled: + return None, True + + # 用 litellm 官方 helper 拼回完整 response(包括 tool_calls 拼接 + usage)。 + # messages 参数仅用于失败时回填 prompt token 估算,正常路径 stream_options.include_usage + # 已让最后一个 chunk 带准确 usage。 + response = litellm.stream_chunk_builder(chunks, messages=self.session.messages) + return response, False + def _execute_tool_call(self, tc: Any) -> str: name = tc.function.name raw_args = tc.function.arguments or "{}" diff --git a/web/app.py b/web/app.py index 488c32f..715ef61 100644 --- a/web/app.py +++ b/web/app.py @@ -251,7 +251,8 @@ def _run_agent_bg(task_id: UUID, user_id: UUID, user_message: str) -> None: sink 通过 broker.emit 桥事件回 asyncio loop;agent.run 是 sync,所以在 to_thread 跑。 user_id 必须从 JWT 那侧透传过来 —— 决定 memory_block 读哪个 per-user 子树。 - cancel_check 桥 broker.is_cancelled,loop 在工具调用之间 poll(LLM 同步调用本身不可中断)。 + cancel_check 桥 broker.is_cancelled,loop 在 stream chunk 间 + 工具调用之间 poll; + cancel 延迟 ~ 单 chunk 间隔(100ms 级)。 `ok / cancelled` 收尾直接回 `idle`(不留持久标记);只有 error 是持久终态。 """ from core.agent_builder import build_agent, sync_task_tokens @@ -895,9 +896,9 @@ def create_app() -> FastAPI: - 单活 run 形态下"取消当前活动"语义无歧义;客户端只需 task_id - 校验 task 归属 user;否则 404 - tasks.run_status 不是 `running` → 409(idle / cancelling / error 都不能 cancel) - - 标 `cancelling`(过渡态),BG 线程 loop 在工具调用之间 poll 看见即退; + - 标 `cancelling`(过渡态),BG 线程 loop 在 stream chunk 间 + 工具调用之间 poll 看见即退; 退出后 finally 写终态(正常→idle,异常→error) - - LLM 同步调用本身不可中断,最坏要等当前 LLM call 跑完(通常几十秒内) + - LLM 走 streaming,cancel 延迟 ~ 单 chunk 间隔(100ms 级) """ try: tid = UUID(task_id) diff --git a/web/static/dev.html b/web/static/dev.html index 5fb233c..e50f756 100644 --- a/web/static/dev.html +++ b/web/static/dev.html @@ -647,9 +647,8 @@
就绪 - - +
@@ -1359,9 +1358,40 @@ function renderMessages(msgs) { } // ───── send + SSE ───── -$("chat-form").addEventListener("submit", (e) => { e.preventDefault(); sendMessage(); }); +// 发送 / 停止 单按钮:idle → 发送(primary 红实心);streaming → 停止(danger 红边); +// cancelling 是过渡态 — 用户点过停止后到 SSE 收到 cancelled/done 之间。 +function setActionMode(mode) { + const btn = $("chat-action"); + btn.classList.remove("primary", "danger"); + if (mode === "idle") { + btn.textContent = "发送"; + btn.classList.add("primary"); + btn.disabled = false; + btn.title = ""; + } else if (mode === "streaming") { + btn.textContent = "停止"; + btn.classList.add("danger"); + btn.disabled = false; + btn.title = "停止当前流式回复"; + } else if (mode === "cancelling") { + btn.textContent = "停止中…"; + btn.classList.add("danger"); + btn.disabled = true; + } +} + +function chatAction() { + if (state.streaming) cancelCurrentTask(); + else sendMessage(); +} + +$("chat-form").addEventListener("submit", (e) => { e.preventDefault(); chatAction(); }); $("chat-input").addEventListener("keydown", (e) => { - if (e.key === "Enter" && !e.shiftKey) { e.preventDefault(); sendMessage(); } + // streaming 期间 Enter 不触发停止 —— 用户可能正在编辑下一条草稿,误触发风险高 + if (e.key === "Enter" && !e.shiftKey) { + e.preventDefault(); + if (!state.streaming) sendMessage(); + } }); // 对话流里 artifact chip 的点击委托 — 复用右栏文件预览 modal(modal 内自带"下载") @@ -1374,9 +1404,10 @@ $("chat-stream").addEventListener("click", (e) => { async function sendMessage() { if (!state.taskId) return; + if (state.streaming) return; const content = $("chat-input").value.trim(); if (!content) return; - $("chat-send").disabled = true; + setActionMode("cancelling"); // 临时锁住,等 events_url 拿到再切 streaming $("chat-hint").textContent = "发送中…"; try { // 立刻渲染 user 消息卡(乐观) @@ -1396,20 +1427,19 @@ async function sendMessage() { const r = await api("POST", `/v1/tasks/${state.taskId}/messages`, { content }); $("chat-input").value = ""; state.streaming = true; - $("chat-cancel").style.display = ""; + setActionMode("streaming"); streamSse(r.events_url, asstCard); } catch (e) { if (e.status === 401) { logout(); return; } appendErrorCard(e.message); - $("chat-send").disabled = false; + setActionMode("idle"); $("chat-hint").textContent = "就绪"; } } async function cancelCurrentTask() { if (!state.taskId || !state.streaming) return; - const btn = $("chat-cancel"); - btn.disabled = true; + setActionMode("cancelling"); $("chat-hint").textContent = "停止中…"; try { await api("POST", `/v1/tasks/${state.taskId}/cancel`); @@ -1418,13 +1448,11 @@ async function cancelCurrentTask() { if (e.status === 401) { logout(); return; } // 409 = 已结束 / 已 cancelling,不算错;其他贴 toast if (e.status !== 409) appendErrorCard("cancel: " + e.message); - btn.disabled = false; - $("chat-hint").textContent = "就绪"; + setActionMode("streaming"); + $("chat-hint").textContent = "接收中…"; } } -$("chat-cancel").addEventListener("click", cancelCurrentTask); - function streamSse(url, asstCard) { // EventSource 不支持自定义 header,token 走 query string(?token=...) // 这里 SSE 走 same-origin,token 经 URL 传给后端 — 但当前后端只读 Authorization 头 @@ -1464,12 +1492,9 @@ async function fetchSse(url, asstCard) { highlightIn(asstCard); } finally { body.classList.remove("streaming"); - $("chat-send").disabled = false; $("chat-hint").textContent = "就绪"; state.streaming = false; - const cb = $("chat-cancel"); - cb.style.display = "none"; - cb.disabled = false; + setActionMode("idle"); } // 刷新 task meta + messages(拿真实持久化的);失败路径已退出,这里不再跑 loadTaskList();