core(§7 Phase G G4): chat 发送 + SSE 流式回复

- web/broker.py RunBroker:in-process pub/sub,subscribe/emit/close/
  unsubscribe;同 run_id 多订阅者 fan-out(刷新 / 多 tab / 桌面+移动
  都能同时看流);_done 集合让晚到订阅者立刻收 done(不挂)。
- web/sinks.py WebEventSink:实现 §7 A sink 协议,把 AgentLoop._emit
  桥到 broker.emit(run_id, ev),AgentLoop 完全不知 web 存在。
- 异步策略 = asyncio.to_thread(不改 core):POST /tasks/{tid}/messages
  async handler → INSERT runs 行 + asyncio.create_task(to_thread(
  _run_agent_bg)),_run_agent_bg 工作线程跑 build_agent + agent.run,
  sink 通过 loop.call_soon_threadsafe 跨线程把 event 桥回 asyncio queue。
- GET /tasks/{tid}/runs/{rid}/events:StreamingResponse async gen,
  响应头 text/event-stream + Cache-Control: no-cache + X-Accel-
  Buffering: no(nginx 反代友好);第一帧 retry/connected 让 ES 立
  即建立,30s 无 event 发 : ping 心跳。SSE multi-line data 每行加
  data: 前缀(SSE spec),客户端 ES 自动还原 \n 拼接的 HTML。
- _render_event_fragment 渲染 text/tool_call/tool_result/error
  HTML 片段;run_start/llm_start/llm_end/done 发空 data(只让客户端
  识别 event type)。
- 新模板:_frag_text/_frag_tool_call/_frag_tool_result/_frag_error +
  _send_response(POST 响应:user msg 卡 + msg-assistant streaming
  容器带 sse-connect/sse-swap/sse-close)。
- chat.html 加 send 表单(Enter 发,Shift+Enter 换行,HTMX hx-post /
  hx-target=#chat-stream / hx-swap=beforeend / 提交后 reset);chat
  section 改 id=chat-stream;非 active task 隐藏表单。
- CSS:.streaming .run-indicator 红点脉冲 / .send-form 输入框 /
  .tool-result-inline 追加式样式 / .msg-error 错误卡。
- runs 表写状态:POST 时 status=running,正常完结 ok + tokens_p/c,
  异常 error + error 文本(DB 写失败不放大噪声,已 emit error 给前端)。
- lifespan bind_loop(asyncio.get_running_loop()) 让 broker 拿到
  loop 引用,emit 跨线程才能 call_soon_threadsafe。
- RUN 故障兜底加 3 条:SSE 经 nginx 卡住、浏览器 send 无反应、并发
  POST messages idx 冲突(已知 TODO)。

Smoke 双层全绿:
- broker 单元 8 case (subscribe/emit/get/fan-out/跨 run 隔离/close/
  late subscribe instant done/unsubscribe/未 bind silent drop)
- 端到端 24 case (POST 200 + sse-connect/run_id 抽取 + content-type/
  x-accel-buffering/cache-control 头对 + event 序列 run_start→done
  + text 片段 <strong> + tool_call <details> + tool_result preview
  + empty body 400 + 各种 404 + late done + runs 行 INSERT)

版本 0.3 → 0.4。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-15 09:19:25 +08:00
parent 514d36c481
commit 7356d25652
12 changed files with 437 additions and 25 deletions

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。
最后更新:2026-05-15(Phase G G3)
最后更新:2026-05-15(Phase G G4)
---
@ -15,7 +15,7 @@
| 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 |
| 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 |
| 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill |
| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**Phase G Web UI 进行中(G1 脚手架 ✅;G2 task list ✅;G3 chat 只读 ✅;G4 SSE 流式 待;G5 文件浏览;G6 打磨)**。下一阶 C(Executor) / D(HTTP /v1) 待。 |
| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**Phase G Web UI 进行中(G1 脚手架 ✅;G2 task list ✅;G3 chat 只读 ✅;G4 SSE 流式 ✅;G5 文件浏览 待;G6 打磨)**。下一阶 C(Executor) / D(HTTP /v1) 待。 |
---
@ -37,6 +37,7 @@
- **05-14 / §7 Phase G G1 Web UI 脚手架**:新增 `web/` 包(`app.py` FastAPI 工厂 + `templates/{base,home}.html` + `static/style.css`),`cli.py web --host --port --reload` 子命令(默认 127.0.0.1:8765,本地形态 sentinel user 无 auth,Phase D 才上 OIDC)。模板用 Jinja2 + HTMX/HTMX-SSE 走 CDN(无 node 链路),`base.html` 留 `{% block nav %}` 让 G2+ 扩。**Starlette 新版 `TemplateResponse` 签名**:`(request, name, context)`,旧式塞 context 里会让 jinja 用 dict 当 cache key 报 `unhashable type`,踩过修了。requirements 加 `fastapi>=0.111 uvicorn[standard] jinja2>=3.1 python-multipart`(后者为 G5 文件上传留)。Smoke 四路径全绿(in-process via Starlette `TestClient`):`/healthz` → "ok" / `/` → 1063B(title + static link + version) / `/static/style.css` → 1624B / `/nonexistent` → 404。**Linux portability 顺手**:模板里 path 显示约定用 `Path.as_posix()`(G3+ 模板落地);SSE 响应头 G4 上时带 `X-Accel-Buffering: no`(nginx 反代友好)。
- **05-14 / §7 Phase G G2 task list 页**:`web/app.py::list_tasks(limit, status)` 读 PG `tasks` + `messages` count(updated_at 降序),返回模板友好的 dict 列表;**不复用 `cli.py::_list_task_rows`** —— CLI 拿 tuple, Web 拿 dict,数据形状有别,等真有 schema 变更同步成本时再抽(避免预付抽象)。`/` 路由换成 task 表渲染,filter via `?status=active|completed|abandoned`(无效值静默降级为 all);`/tasks/{task_id}` 占位路由 UUID 校验 + DB 存在性校验,缺一则 404,有效则渲染 `task_placeholder.html`(G3 来填消息流)。**Linux portability 落地**:`_norm_path()` 把存的 backslash 在显示时全替成 forward slash(`Path.as_posix()` 在 Linux 读 Win backslash 串时不归一,所以直接 `replace('\\','/')`);Win Path.resolve() 存 `D:\projects\...`、Linux 存 `/home/user/...`,都能正确显示。template:`home.html` 表格(id/updated/status/mode/model/msgs/tokens/desc-dir),status 用 badge(`status-active/completed/abandoned` 配色),hover 高亮;空态文案。CSS:table 紧凑(.9rem)+ `tabular-nums` 对齐 + accent-soft placeholder note。Smoke 18 路径全绿(in-process):3 task seed(active/completed/abandoned)+ Win\Linux 双路径形态 → / 渲染对、status filter 正/反向、garbage status 静默 all、UUID 占位、notauuid 404、ghost UUID 404、limit 生效、/healthz 不退化。版本 0.1 → 0.2。
- **05-15 / §7 Phase G G3 chat 只读页**:`web/app.py` 加 `_get_md()` 单例 MarkdownIt(`gfm-like` 预设 + linkify + breaks,`html=False` 禁内联 HTML 防 XSS),fenced code 走 pygments `_pygments_highlight()` 回调(`codehilite` cssclass)。`load_chat_messages(tid)` 读 PG idx asc;`build_chat_blocks(messages)` 聚合显示块 —— system / tool 不入 block(tool 内嵌进 assistant 的 tool_call.result),user / assistant text 走 markdown 渲染,assistant.tool_calls 配对 tool result(orphan tool_call → `[no result]`)。`_args_preview` 60 字符截断,`_pretty_json` 解析失败 fallback 原串。`/tasks/{id}` 替换占位为 `chat.html` 渲染,删 `task_placeholder.html`。template:`.msg` 卡片(user 浅蓝 / assistant 白底),`.body` markdown 区(`<pre>` / `<code>` / `<table>` / `<blockquote>` / `<s>` 全 GFM 样式),tool_call 用 `<details>` 默认折叠(无 JS,浏览器原生开闭;`summary` 显示 tool 名 + args 前 60 字预览,展开看 args_pretty + result)。CSS 加 `.codehilite` 浅色 token 配色(keyword / string / comment / function / number / operator 6 类,余下黑色)。Smoke 28 路径全绿:4 display blocks(user/assistant×3,system/tool 跳过)+ markdown 特性(table / fence / autolink / strikethrough / bold)+ tool 配对(call_1 命中、orphan 走 `[no result]`)+ HTML 含 `<details>`/`tool-badge`/`codehilite`/`<s>` + 空 task 文案 + invalid UUID 404 + util 单测(args_preview / pretty_json / render_md 边界)。版本 0.2 → 0.3。requirements 加 `markdown-it-py[linkify]` / `mdit-py-plugins` / `pygments`
- **05-15 / §7 Phase G G4 chat 发送 + SSE 流式**:新增 `web/broker.py::RunBroker`(in-process pub/sub,`subscribe/emit/close/unsubscribe`)+ `web/sinks.py::WebEventSink` 实现 §7 A 的 sink 协议,把 `AgentLoop._emit` 桥到 broker。**异步策略 = `asyncio.to_thread`**(不改 core):POST `/tasks/{tid}/messages` async handler → 校验 task + INSERT `runs` 行 + `asyncio.create_task(asyncio.to_thread(_run_agent_bg, ...))`,`_run_agent_bg` 在工作线程跑 `build_agent(resume=True) + agent.run`,sink 通过 `loop.call_soon_threadsafe(q.put_nowait, ev)` 跨线程桥事件回 asyncio queue。**多访问策略 = fan-out**:每订阅一个独立 `asyncio.Queue`,同 run 多 tab / 刷新 / 桌面+移动都看得到流;`_done` 集合让晚到订阅者立即收 `done`(不挂)。GET `/tasks/{tid}/runs/{rid}/events``StreamingResponse` async gen,响应头带 `text/event-stream / Cache-Control: no-cache / X-Accel-Buffering: no`(nginx 反代友好);第一帧发 `: connected\nretry: 3000\n\n` 让 EventSource 立即建立,30s 无 event 发 `: ping` 注释心跳。**SSE multi-line data**:HTML 片段含换行,每行加 `data: ` 前缀(SSE spec),EventSource API 还原成 `\n` 拼接的 HTML 字符串。**Event → HTML 片段**:`_render_event_fragment` 渲染 `text`/`tool_call`/`tool_result`/`error` 四种,`run_start/llm_start/llm_end/done` 发空 data(只让客户端识别 event type)。新 fragment 模板 `_frag_text.html` / `_frag_tool_call.html` / `_frag_tool_result.html` / `_frag_error.html` + `_send_response.html`(POST 响应:user msg 卡 + `msg-assistant streaming` 容器带 `sse-connect/sse-swap/sse-close`)。`chat.html` 加 send 表单(Enter 发送、Shift+Enter 换行,HTMX `hx-post / hx-target=#chat-stream / hx-swap=beforeend / hx-on::after-request reset`);`chat` section 改 `id="chat-stream"` 让 SSE 追加进同一容器;非 active task 隐藏表单。CSS 加 `.streaming .run-indicator` 红点脉冲 / `.send-form` 表单样式 / `.tool-result-inline` 追加式样式 / `.msg-error` 错误卡。**Run 状态写 PG `runs` 表**:POST 时 status=running,正常完结 status=ok + tokens_p/c,异常 status=error + error 文本;DB 写失败不放大噪声(已 emit error 给前端)。**lifespan** `bind_loop(asyncio.get_running_loop())` 让 broker 拿到 asyncio loop 引用。Smoke 双层全绿:broker 单元 8 case(subscribe/emit/get、fan-out 双订阅、跨 run_id 隔离、close 派 done、late subscribe 立刻收 done、unsubscribe 后失联、WebEventSink 桥、unbinded loop silent drop);端到端 24 case(POST 200 + HTML 含 sse-connect + run_id 抽出 + SSE stream content-type/x-accel-buffering/cache-control 头对、event types 序列 `run_start/llm_start/text/tool_call/tool_result/llm_end/done`、text fragment 含 `<strong>` markdown、tool_call 含 `<details>`、tool_result 含 preview、empty body 400、invalid/ghost UUID 404、late subscribe 立刻 done、PG runs 行 INSERT)。版本 0.3 → 0.4。**TODO**:并发同 task 多 run 互锁(messages idx UniqueConstraint 在并发 POST 下会冲突 — 用户连续点 send 暂时不会触发,但需要在 G6 或 D 阶段加 lock_for_update);event log 持久化(刷新继续看流式)留到未来。
---
@ -81,10 +82,12 @@ db/migrations/env.py 61 ← §7 B Step 1
db/migrations/versions/
0001_initial_schema.py 125 ← §7 B Step 1
web/__init__.py 5 ← Phase G G1
web/app.py 264 ← Phase G G1-G3: 工厂 + list_tasks + chat 渲染 + md/pygments
web/app.py 468 ← Phase G G1-G4: 工厂 + list_tasks + chat 渲染 + POST/SSE
web/broker.py 88 ← Phase G G4: in-process pub/sub
web/sinks.py 20 ← Phase G G4: WebEventSink (§7 A sink 协议)
─────────────────────────────────
Python 合计 ~3370 行
+ web/templates/{base,home,chat}.html ~141 行 + web/static/style.css 131 行(不计 Python 主仓库)
Python 合计 ~3662
+ web/templates/* ~196 行(base/home/chat + 4 个 _frag/_send_response)+ web/static/style.css 169 行(不计 Python 主仓库)
```
加 skills/ppt 脚本 ~600 行 + SKILL.md / references / config / prompts + alembic.ini,总仓库约 3500 行。
@ -93,10 +96,11 @@ Python 合计 ~3370 行
## 下一步候选(性价比排序)
1. **§7 Phase G G4 chat 发送 + SSE**(~1 天)—— `WebEventSink` 把 §7 A 的 `sink.emit` 推 text/event-stream(响应头带 `X-Accel-Buffering: no`),HTMX `sse-swap` 追加 DOM。**核心一步**,需异步跑 `AgentLoop` —— 走 `asyncio.to_thread` 或在 BG task 启 sync runner + 队列。
2. **§7 Phase G G5 文件浏览 + G6 打磨**(~半天 + 半天)—— task_dir 树 / upload / download / 错误 toast / `/new` `/done /abandon` 按钮 / `/export` 链接。
3. **§7 C Executor + sandbox**(~2-3 天)—— Phase G 完后再做,或穿插。
4. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。
5. **Proposal mermaid 预渲染**(~半天)—— ASCII 透传不够用时再上 `mmdc`
1. **§7 Phase G G5 文件浏览 + 上传下载**(~半天)—— `/tasks/{id}/files` 渲染 task_dir 树,upload (multipart)/ download / 删。
2. **§7 Phase G G6 打磨**(~半天)—— `/new` 入口、`/done /abandon` 按钮、`/export` docx 下载、错误 toast、并发 run 互锁。
3. **真实 LLM 跑通 G4**(~10 分钟)—— smoke 走的是 mock,需在浏览器开一个真 task 验证端到端体验(`cli.py web` → 打开 `/tasks/<id>` → send → 看流式)。
4. **§7 C Executor + sandbox**(~2-3 天)—— Phase G 完后再做,或穿插。
5. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。
6. **Proposal mermaid 预渲染**(~半天)—— ASCII 透传不够用时再上 `mmdc`
> §7 B 已完工。Phase G 进行中(G1 ✅ G2 ✅ G3 ✅)。剩余路线:G4-G6 → C(Executor)→ D(HTTP /v1 + OIDC)→ E(CLI 双模式)→ F(deploy / billing)。
> §7 B 已完工。Phase G 进行中(G1 ✅ G2 ✅ G3 ✅ G4 ✅)。剩余路线:G5-G6 → C(Executor)→ D(HTTP /v1 + OIDC)→ E(CLI 双模式)→ F(deploy / billing)。

7
RUN.md
View File

@ -2,7 +2,7 @@
> 怎么把 zcbot 跑起来。env / 常用命令 / 故障兜底。设计看 `DESIGN.md`,进度看 `PROGRESS.md`
最后更新:2026-05-15(Phase G G3)
最后更新:2026-05-15(Phase G G4)
---
@ -103,7 +103,7 @@ REPL 内命令:`/exit /reset /new /resume [last|<id>] /id /status /done /abandon
.venv/Scripts/python.exe cli.py web --reload
```
> G1 ✅ 脚手架 + /healthz;G2 ✅ `/` task 列表 + `?status=` filter;G3 ✅ `/tasks/{uuid}` 消息流渲染(markdown-it-py + pygments syntax,tool_call 走 `<details>` 默认折叠);G4-G6 待。task_dir 显示统一 forward-slash(Win 存 `\` 也归一)。Linux:`.venv/bin/python cli.py web` 一致。
> G1 ✅ 脚手架 + /healthz;G2 ✅ `/` task 列表 + `?status=` filter;G3 ✅ `/tasks/{uuid}` 消息流渲染(markdown-it-py + pygments syntax,tool_call 走 `<details>` 默认折叠);G4 ✅ chat 发送 + SSE 流式回复(POST `/tasks/{tid}/messages` 启 run、GET `/tasks/{tid}/runs/{rid}/events` SSE 流;HTMX `sse-swap` 追加 DOM,无 JS);G5-G6 待。task_dir 显示统一 forward-slash(Win 存 `\` 也归一)。Linux:`.venv/bin/python cli.py web` 一致。SSE 经 nginx 反代记得关 buffering(响应头已带 `X-Accel-Buffering: no` 默认起效)。
---
@ -121,6 +121,9 @@ REPL 内命令:`/exit /reset /new /resume [last|<id>] /id /status /done /abandon
| `NoSubtaskError: task_dir ... 与已有 task ... 前缀嵌套` | §7.4 no-subtask:同 user 不允许 task_dir 嵌套(child 或 parent)。**同项目多对话**请传**完全相同**的 `--task-dir`;否则改路径成 sibling(平级) |
| `cli.py web` 启动后浏览器开不了 | 检查 proxy(`HTTP_PROXY` / `HTTPS_PROXY`):本地形态服务在 127.0.0.1,系统 proxy 拦截会 502。临时 `unset HTTP_PROXY HTTPS_PROXY` 或浏览器配 bypass。`curl` 验通走 `curl --noproxy '*' http://127.0.0.1:8765/healthz`(应返 `ok`) |
| `TypeError: unhashable type: 'dict'` from Jinja templating | Starlette 新版签名:`TemplateResponse(request, name, context)`,旧式 `(name, {"request":..., "...":...})` 在 newer Starlette 会把 context dict 当 cache key 炸 |
| SSE 卡住不流(经 nginx) | 反代要关 buffering — 后端响应头已带 `X-Accel-Buffering: no`,nginx ≥ 1.5.6 默认认。仍卡看 nginx 配 `proxy_buffering off; proxy_read_timeout 3600s;` |
| 浏览器 send 后没反应 | 看 console:HTMX 报 connect failed → 看 `/tasks/{tid}/messages` 响应;200 但流不到 → 看 EventSource 状态(devtools Network → EventStream tab) |
| `UniqueViolation idx already exists` from messages | 同 task 连续两次快速 POST,messages idx 冲突。**已知 TODO**:G6/D 阶段加 task 级 lock_for_update 或 advisory lock |
---

View File

@ -10,19 +10,24 @@
"""
from __future__ import annotations
import asyncio
import json
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, Optional
from uuid import UUID
from uuid import UUID, uuid4
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy import func, select
from sqlalchemy import func, select, update
from core.storage import session_scope
from core.storage.models import Message, Task
from core.storage.models import Message, Run, Task
from .broker import broker
from .sinks import WebEventSink
WEB_ROOT = Path(__file__).resolve().parent
TEMPLATES_DIR = WEB_ROOT / "templates"
@ -195,11 +200,120 @@ def list_tasks(limit: int = 50, status: Optional[str] = None) -> list[dict[str,
return result
# --------------------------- Run 启动 / SSE event 渲染 ---------------------------
def _run_agent_bg(task_id: UUID, run_id: UUID, user_message: str) -> None:
"""工作线程入口。这里**不能** await asyncio —— 在 to_thread 跑。
流程:build_agent(resume=True) WebEventSink agent.run runs 状态
"""
from main import build_agent, sync_task_tokens
# build_agent 会调 ensure_local_sentinel / LLM init / Session.load 等。
# 单次 POST 每次都走一遍 — 不便宜但简单;未来按需缓存 agent。
try:
broker.emit(run_id, {"type": "run_start"})
agent, session, sid, task_state, task_dir = build_agent(
session_id=str(task_id), resume=True,
)
agent.sink = WebEventSink(broker, run_id)
agent.run(user_message)
sync_task_tokens(task_state, agent.llm)
with session_scope() as s:
s.execute(
update(Run)
.where(Run.run_id == run_id)
.values(
status="ok",
finished_at=func.now(),
tokens_p=agent.llm.token_counter.prompt_tokens,
tokens_c=agent.llm.token_counter.completion_tokens,
)
)
except Exception as e:
err = f"{type(e).__name__}: {e}"
broker.emit(run_id, {"type": "error", "msg": err})
try:
with session_scope() as s:
s.execute(
update(Run)
.where(Run.run_id == run_id)
.values(status="error", error=err, finished_at=func.now())
)
except Exception:
pass # 已 emit 给前端,DB 写失败不再放大噪声
finally:
broker.close(run_id)
def _render_event_fragment(templates: Jinja2Templates, ev: dict, request: Request) -> str:
"""把一条 event 渲染成 HTML 片段(供 SSE data 推送)。
片段类型与 chat.html 静态 block 视觉一致,append 模式追加到 #chat-stream 容器尾。
text / tool_call / tool_result / error 各有专用块;run_start / llm_start / llm_end /
done 不出 HTML(用空串当 keep-alive,客户端依然能识别 event type 控制状态)
"""
t = ev.get("type")
if t == "text":
content = ev.get("content") or ""
if not content:
return ""
# assistant text 片段:跟 chat.html 静态 assistant body 同形态
return templates.get_template("_frag_text.html").render(
request=request, html=_render_md(content)
)
if t == "tool_call":
return templates.get_template("_frag_tool_call.html").render(
request=request,
name=ev.get("name", "?"),
args_preview=_args_preview(ev.get("args_preview", "")),
args_pretty=_pretty_json(json.dumps(ev.get("args", {}), ensure_ascii=False))
if ev.get("args") is not None else _pretty_json(ev.get("args_preview", "")),
)
if t == "tool_result":
return templates.get_template("_frag_tool_result.html").render(
request=request,
name=ev.get("name", "?"),
preview=ev.get("preview", ""),
truncated=ev.get("truncated", False),
)
if t == "error":
return templates.get_template("_frag_error.html").render(
request=request, msg=ev.get("msg", "")
)
# llm_start / llm_end / run_start / done发空 data,htmx-ext-sse 也会触发 event,
# 客户端只读 type 控制状态(spinner / close);data 内容不需要 swap。
return ""
def _sse_format(event_type: str, payload: str) -> bytes:
"""格式化一帧 SSE。data 多行要每行 `data: ` 前缀(SSE spec)。
EventSource API 会自动把 multi-line data \n 拼接还原 htmx-ext-sse 直接拿来当 HTML swap
"""
parts = [f"event: {event_type}"]
if payload:
for line in payload.splitlines() or [""]:
parts.append(f"data: {line}")
else:
parts.append("data: ") # 空 data 也要有,EventSource 才认这帧
parts.append("") # 终结空行
parts.append("")
return ("\n".join(parts)).encode("utf-8")
# --------------------------- App 工厂 ---------------------------
def create_app() -> FastAPI:
"""FastAPI 工厂。uvicorn --reload 模式需要工厂签名(factory=True)。"""
app = FastAPI(title="zcbot web", version="0.3")
@asynccontextmanager
async def lifespan(app: FastAPI):
# 把当前 asyncio loop 绑给 broker — emit() 从工作线程会 call_soon_threadsafe 桥回
broker.bind_loop(asyncio.get_running_loop())
yield
app = FastAPI(title="zcbot web", version="0.4", lifespan=lifespan)
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
@ -257,6 +371,96 @@ def create_app() -> FastAPI:
},
)
@app.post("/tasks/{task_id}/messages", response_class=HTMLResponse)
async def post_message(request: Request, task_id: str, content: str = Form(...)):
"""G4:用户提交消息 → 启 BG run → 返回 user msg 卡 + assistant 占位 + SSE 容器。
客户端 HTMX hx-post 这条,响应 swap #chat-stream beforeend;响应 HTML 内含
sse-connect=/tasks/{id}/runs/{rid}/events,htmx-ext-sse 自动开 EventSource
"""
try:
tid = UUID(task_id)
except ValueError:
raise HTTPException(404, f"invalid task id: {task_id!r}")
content = (content or "").strip()
if not content:
raise HTTPException(400, "empty message")
# 校验 task 存在
with session_scope() as s:
row = s.execute(
select(Task.task_id).where(Task.task_id == tid)
).first()
if row is None:
raise HTTPException(404, f"task not found: {tid}")
run_id = uuid4()
with session_scope() as s:
s.add(Run(run_id=run_id, task_id=tid, status="running", started_at=func.now()))
# 启 BG agent — to_thread 跑 sync agent.run,sink 通过 broker 把 event 桥回 asyncio
asyncio.create_task(asyncio.to_thread(_run_agent_bg, tid, run_id, content))
return templates.TemplateResponse(
request, "_send_response.html",
{
"task_id": str(tid),
"run_id": str(run_id),
"user_html": _render_md(content),
},
)
@app.get("/tasks/{task_id}/runs/{run_id}/events")
async def stream_events(request: Request, task_id: str, run_id: str):
"""G4:SSE 流。订阅 broker[run_id] → 渲染 HTML 片段 → 推。
客户端断开(close tab / navigate) asyncio 在下次 yield CancelledError
finally 清理 run 多订阅者(刷新页面 / tab)各自独立 queue
"""
try:
tid = UUID(task_id)
rid = UUID(run_id)
except ValueError:
raise HTTPException(404, "invalid id")
# task 存在性校验(防探测 / 错链)
with session_scope() as s:
ok = s.execute(
select(Task.task_id).where(Task.task_id == tid)
).first()
if ok is None:
raise HTTPException(404, f"task not found: {tid}")
async def gen():
q = broker.subscribe(rid)
try:
# 第一帧 retry 注释 + 心跳:让 EventSource 立即建立(不被 buffer 卡住)
yield b": connected\nretry: 3000\n\n"
while True:
try:
ev = await asyncio.wait_for(q.get(), timeout=30.0)
except asyncio.TimeoutError:
yield b": ping\n\n"
continue
ev_type = ev.get("type", "msg")
frag = _render_event_fragment(templates, ev, request)
yield _sse_format(ev_type, frag)
if ev_type in ("done", "error"):
break
except asyncio.CancelledError:
# 客户端断开 — 静默退,不向上抛
pass
finally:
broker.unsubscribe(rid, q)
return StreamingResponse(
gen(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # nginx 反代:别 buffer 这条流
},
)
@app.get("/healthz", response_class=HTMLResponse)
def healthz():
return HTMLResponse("ok")

88
web/broker.py Normal file
View File

@ -0,0 +1,88 @@
"""RunBroker:in-process pub/sub,把 agent run 产生的 event fan-out 给所有 SSE 订阅者。
设计:
- emit() 从工作线程调(agent.run to_thread ), loop.call_soon_threadsafe
桥到 asyncio queue;SSE generator await queue.get() 拉出来推流
- 同一 run_id 多个订阅者(刷新页面 / tab / 桌面+移动) 每个订阅 1 个独立 queue
- run 结束 broker.close(run_id) 给所有订阅者派一条 done;新订阅者( done 后到的)
立即收到 done 并断流(不漏不挂)
- 进程内单实例 / 多进程不共享 个人 SaaS worker 够用;真要扩多 worker 再上 Redis
- 不持久化 event messages 已落 PG,刷新页面走 G3 静态视图能看历史;真要"刷新继续看
实时流"未来加 event log 表 + backfill。
线程模型:
- broker.bind_loop(loop) FastAPI startup 调一次,记录 asyncio loop 引用
- emit() 调用方可能在任意线程;put_nowait thread-unsafe(asyncio.Queue 设计前提
是单 loop),所以走 call_soon_threadsafe 跨回 loop 线程再 put
- subscribe / unsubscribe / close 也都用 call_soon_threadsafe ,避免 race
(实测 SSE generator finally unsubscribe,这个就在 loop 线程,直接调也行)
"""
from __future__ import annotations
import asyncio
from collections import defaultdict
from typing import Any, Optional
from uuid import UUID
class RunBroker:
def __init__(self) -> None:
self._subs: dict[UUID, set[asyncio.Queue]] = defaultdict(set)
# 已经发完 done 的 run — 后来订阅者直接收到 done,避免无限等
self._done: set[UUID] = set()
self._loop: Optional[asyncio.AbstractEventLoop] = None
def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""FastAPI startup 调一次。"""
self._loop = loop
def subscribe(self, run_id: UUID) -> asyncio.Queue:
"""订阅 run 的 event 流。已 done 的 run 立刻在 queue 放一条 done。
调用方:SSE handler( asyncio loop 线程内)
"""
q: asyncio.Queue = asyncio.Queue()
if run_id in self._done:
q.put_nowait({"type": "done"})
else:
self._subs[run_id].add(q)
return q
def unsubscribe(self, run_id: UUID, q: asyncio.Queue) -> None:
"""SSE generator finally 清理。"""
self._subs.get(run_id, set()).discard(q)
if run_id in self._subs and not self._subs[run_id]:
del self._subs[run_id]
def emit(self, run_id: UUID, event: dict[str, Any]) -> None:
"""从工作线程调:把 event 推给所有订阅者。
如果没人订阅(run 在跑但没浏览器连上),event 丢弃 这是设计选择
(event 不持久化,messages PG)
"""
loop = self._loop
if loop is None:
return # 还没 bind,丢弃(测试 / 启动竞态)
for q in list(self._subs.get(run_id, [])):
loop.call_soon_threadsafe(q.put_nowait, event)
def close(self, run_id: UUID) -> None:
"""run 结束:派 done 给所有订阅者,标记 run_id 为已完成。
从工作线程调(agent.run 完成 / 抛异常 finally 清理)
"""
self.emit(run_id, {"type": "done"})
self._done.add(run_id)
# subs 不在这里立即删 — SSE generator 会先收到 done、yield 它、走到
# finally unsubscribe;此处 emit 后立即删会让那次 emit 之后的清理无的放矢。
def n_subscribers(self, run_id: UUID) -> int:
"""供测试 / 监控用。"""
return len(self._subs.get(run_id, set()))
def is_done(self, run_id: UUID) -> bool:
return run_id in self._done
# 进程内单例 — FastAPI lifespan 里 bind_loop;agent / sink / SSE handler 共享。
broker = RunBroker()

20
web/sinks.py Normal file
View File

@ -0,0 +1,20 @@
"""WebEventSink:实现 §7 A 的 sink 协议,把 AgentLoop.emit 桥到 RunBroker。
每次 run 一个 sink 实例,绑死 run_id`emit({type, ...})` 直接转 broker.emit(run_id, event)
sink 实例由 web 层在启 run 时创建,传进 AgentLoop;loop 完全不知 web 存在(§5 Less Scaffolding)
"""
from __future__ import annotations
from typing import Any
from uuid import UUID
from .broker import RunBroker
class WebEventSink:
def __init__(self, broker: RunBroker, run_id: UUID) -> None:
self._broker = broker
self._run_id = run_id
def emit(self, event: dict[str, Any]) -> None:
self._broker.emit(self._run_id, event)

View File

@ -121,6 +121,44 @@ table.task-list tr:hover { background: #fdf6f6; }
.tool-label { font-size: .7rem; color: var(--muted); text-transform: uppercase; letter-spacing: .05em; font-weight: 600; margin-bottom: .15rem; }
.tool-pre { background: #fafafa; padding: .5rem .65rem; border: 1px solid var(--border); border-radius: 3px; max-height: 400px; overflow-y: auto; white-space: pre-wrap; word-break: break-word; font-family: var(--mono); font-size: .8rem; line-height: 1.4; margin: 0; color: #333; }
/* 流式状态指示 + send form (G4) */
.streaming .run-indicator {
display: inline-block; width: 6px; height: 6px; border-radius: 50%;
background: var(--accent); margin-left: .35rem; vertical-align: middle;
animation: pulse 1.2s ease-in-out infinite;
}
@keyframes pulse {
0%, 100% { opacity: .35; transform: scale(.9); }
50% { opacity: 1; transform: scale(1.15); }
}
.send-form { display: flex; gap: .5rem; margin-top: 1rem; align-items: flex-end; }
.send-form textarea {
flex: 1; font: inherit; padding: .5rem .65rem; border: 1px solid var(--border);
border-radius: 4px; resize: vertical; min-height: 2.4rem; max-height: 14rem;
background: var(--surface); color: var(--fg); font-size: .95rem; line-height: 1.4;
}
.send-form textarea:focus { outline: 2px solid var(--accent-soft); outline-offset: 1px; border-color: var(--accent); }
.send-form button {
padding: .55rem 1.1rem; border: 1px solid var(--accent); border-radius: 4px;
background: var(--accent); color: white; cursor: pointer; font: inherit; font-weight: 600;
}
.send-form button:hover { filter: brightness(1.05); }
.send-form button:disabled { background: var(--muted); border-color: var(--muted); cursor: wait; }
/* tool_result append-only 片段(G4 流式来:跟在上一个 tool_call 后) */
.tool-result-inline { margin: .5rem 0 .25rem 1rem; padding-left: .65rem; border-left: 2px solid var(--border); }
.tool-result-tag { font-family: var(--mono); font-size: .75rem; color: var(--muted); font-weight: 600; }
.tool-pending { color: var(--muted); font-style: italic; }
/* error 片段 */
.msg-error {
display: flex; gap: .5rem; align-items: baseline;
margin-top: .5rem; padding: .5rem .75rem;
background: #fceaea; border-left: 3px solid var(--accent); border-radius: 3px;
font-size: .85rem; color: #5c0a0a;
}
.err-tag { font-weight: 600; text-transform: uppercase; font-size: .7rem; letter-spacing: .05em; }
/* pygments codehilite (轻量配色,选少数高频 token,余下走默认黑色) */
.codehilite .k, .codehilite .kn, .codehilite .kr { color: #c00; } /* keyword */
.codehilite .s, .codehilite .s1, .codehilite .s2, .codehilite .sb, .codehilite .sd { color: #1a3d6b; } /* string */

View File

@ -0,0 +1,4 @@
<div class="msg-error">
<span class="err-tag">error</span>
<span>{{ msg }}</span>
</div>

View File

@ -0,0 +1 @@
<div class="body">{{ html | safe }}</div>

View File

@ -0,0 +1,13 @@
<details class="tool">
<summary>
<span class="tool-badge">tool</span>
<span class="tool-name">{{ name }}</span>
<span class="tool-args-preview">{{ args_preview }}</span>
</summary>
<div class="tool-body">
<div class="tool-section">
<div class="tool-label">args</div>
<pre class="tool-pre">{{ args_pretty }}</pre>
</div>
</div>
</details>

View File

@ -0,0 +1,4 @@
<div class="tool-result-inline">
<span class="tool-result-tag">↳ {{ name }}</span>
<pre class="tool-pre">{{ preview }}{% if truncated %}<span class="muted"> (truncated)</span>{% endif %}</pre>
</div>

View File

@ -0,0 +1,22 @@
{# POST /tasks/{id}/messages 响应 — append 进 #chat-stream beforeend。
含 user msg 卡 + assistant 容器(SSE 监听器在它身上)。
htmx-ext-sse:sse-connect 开 EventSource;sse-swap 列的 event 把 data
作为 HTML swap 到自己(hx-swap=beforeend 决定追加而非替换)。
#}
<article class="msg msg-user">
<div class="role">user</div>
<div class="body">{{ user_html | safe }}</div>
</article>
<article class="msg msg-assistant streaming"
hx-ext="sse"
sse-connect="/tasks/{{ task_id }}/runs/{{ run_id }}/events"
sse-swap="text,tool_call,tool_result,error"
sse-close="done,error"
hx-swap="beforeend">
<div class="role">
assistant
<span class="run-indicator" title="run {{ run_id[:8] }}"></span>
</div>
{# SSE event=text/tool_call/tool_result/error 的 data → swap 到这个 article 内尾部 #}
</article>

View File

@ -15,7 +15,7 @@
{% if description %}<p class="lead">{{ description }}</p>{% endif %}
{% if task_dir %}<p class="muted mono small">{{ task_dir }}</p>{% endif %}
<section class="chat">
<section class="chat" id="chat-stream">
{% for b in blocks %}
{% if b.type == "user" %}
<article class="msg msg-user">
@ -47,12 +47,23 @@
{% endfor %}
</article>
{% endif %}
{% else %}
<p class="empty muted">
该 task 还没消息(只读视图)。G4 上线后从浏览器发送 / 流式回复。
</p>
{% endfor %}
</section>
<p class="muted small mt-1">G3 只读 · 发送 + SSE = G4 · 文件浏览 = G5</p>
{% if status == "active" %}
<form class="send-form"
hx-post="/tasks/{{ task_id }}/messages"
hx-target="#chat-stream"
hx-swap="beforeend"
hx-on::after-request="if(event.detail.successful) this.reset()">
<textarea name="content" placeholder="发条消息… (Enter 发送,Shift+Enter 换行)"
required rows="2"
onkeydown="if(event.key==='Enter'&&!event.shiftKey){event.preventDefault();this.form.requestSubmit();}"></textarea>
<button type="submit">send</button>
</form>
{% else %}
<p class="muted small mt-1">task 已 {{ status }},不接收新消息。CLI <code>/done</code> 改 status 来恢复。</p>
{% endif %}
<p class="muted small mt-1">G4 流式 ✓ · 文件浏览 = G5 · 打磨 = G6</p>
{% endblock %}