diff --git a/PROGRESS.md b/PROGRESS.md index 0c1dfdb..be704f3 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。 -最后更新:2026-05-15(Phase G G3) +最后更新:2026-05-15(Phase G G4) --- @@ -15,7 +15,7 @@ | 5 | Eval Suite | ⏸ 不做 | dogfooding 替代,probe 覆盖健康检查 | | 6 | 长任务工程化 | 🟡 | task + 恢复 ✅;双层记忆 ✅;context 压缩未做 | | 7 | 打磨 | ❌ | Docker 沙盒 / 更多 skill | -| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**Phase G Web UI 进行中(G1 脚手架 ✅;G2 task list ✅;G3 chat 只读 ✅;G4 SSE 流式 待;G5 文件浏览;G6 打磨)**。下一阶 C(Executor) / D(HTTP /v1) 待。 | +| §7 SaaS | DESIGN §7 路线 | 🟡 | A 事件流化 ✅;B 完工;**Phase G Web UI 进行中(G1 脚手架 ✅;G2 task list ✅;G3 chat 只读 ✅;G4 SSE 流式 ✅;G5 文件浏览 待;G6 打磨)**。下一阶 C(Executor) / D(HTTP /v1) 待。 | --- @@ -37,6 +37,7 @@ - **05-14 / §7 Phase G G1 Web UI 脚手架**:新增 `web/` 包(`app.py` FastAPI 工厂 + `templates/{base,home}.html` + `static/style.css`),`cli.py web --host --port --reload` 子命令(默认 127.0.0.1:8765,本地形态 sentinel user 无 auth,Phase D 才上 OIDC)。模板用 Jinja2 + HTMX/HTMX-SSE 走 CDN(无 node 链路),`base.html` 留 `{% block nav %}` 让 G2+ 扩。**Starlette 新版 `TemplateResponse` 签名**:`(request, name, context)`,旧式塞 context 里会让 jinja 用 dict 当 cache key 报 `unhashable type`,踩过修了。requirements 加 `fastapi>=0.111 uvicorn[standard] jinja2>=3.1 python-multipart`(后者为 G5 文件上传留)。Smoke 四路径全绿(in-process via Starlette `TestClient`):`/healthz` → "ok" / `/` → 1063B(title + static link + version) / `/static/style.css` → 1624B / `/nonexistent` → 404。**Linux portability 顺手**:模板里 path 显示约定用 `Path.as_posix()`(G3+ 模板落地);SSE 响应头 G4 上时带 `X-Accel-Buffering: no`(nginx 反代友好)。 - **05-14 / §7 Phase G G2 task list 页**:`web/app.py::list_tasks(limit, status)` 读 PG `tasks` + `messages` count(updated_at 降序),返回模板友好的 dict 列表;**不复用 `cli.py::_list_task_rows`** —— CLI 拿 tuple, Web 拿 dict,数据形状有别,等真有 schema 变更同步成本时再抽(避免预付抽象)。`/` 路由换成 task 表渲染,filter via `?status=active|completed|abandoned`(无效值静默降级为 all);`/tasks/{task_id}` 占位路由 UUID 校验 + DB 存在性校验,缺一则 404,有效则渲染 `task_placeholder.html`(G3 来填消息流)。**Linux portability 落地**:`_norm_path()` 把存的 backslash 在显示时全替成 forward slash(`Path.as_posix()` 在 Linux 读 Win backslash 串时不归一,所以直接 `replace('\\','/')`);Win Path.resolve() 存 `D:\projects\...`、Linux 存 `/home/user/...`,都能正确显示。template:`home.html` 表格(id/updated/status/mode/model/msgs/tokens/desc-dir),status 用 badge(`status-active/completed/abandoned` 配色),hover 高亮;空态文案。CSS:table 紧凑(.9rem)+ `tabular-nums` 对齐 + accent-soft placeholder note。Smoke 18 路径全绿(in-process):3 task seed(active/completed/abandoned)+ Win\Linux 双路径形态 → / 渲染对、status filter 正/反向、garbage status 静默 all、UUID 占位、notauuid 404、ghost UUID 404、limit 生效、/healthz 不退化。版本 0.1 → 0.2。 - **05-15 / §7 Phase G G3 chat 只读页**:`web/app.py` 加 `_get_md()` 单例 MarkdownIt(`gfm-like` 预设 + linkify + breaks,`html=False` 禁内联 HTML 防 XSS),fenced code 走 pygments `_pygments_highlight()` 回调(`codehilite` cssclass)。`load_chat_messages(tid)` 读 PG idx asc;`build_chat_blocks(messages)` 聚合显示块 —— system / tool 不入 block(tool 内嵌进 assistant 的 tool_call.result),user / assistant text 走 markdown 渲染,assistant.tool_calls 配对 tool result(orphan tool_call → `[no result]`)。`_args_preview` 60 字符截断,`_pretty_json` 解析失败 fallback 原串。`/tasks/{id}` 替换占位为 `chat.html` 渲染,删 `task_placeholder.html`。template:`.msg` 卡片(user 浅蓝 / assistant 白底),`.body` markdown 区(`
` / `` / `` / `` / `` 全 GFM 样式),tool_call 用 `` 默认折叠(无 JS,浏览器原生开闭;`summary` 显示 tool 名 + args 前 60 字预览,展开看 args_pretty + result)。CSS 加 `.codehilite` 浅色 token 配色(keyword / string / comment / function / number / operator 6 类,余下黑色)。Smoke 28 路径全绿:4 display blocks(user/assistant×3,system/tool 跳过)+ markdown 特性(table / fence / autolink / strikethrough / bold)+ tool 配对(call_1 命中、orphan 走 `[no result]`)+ HTML 含 ``/`tool-badge`/`codehilite`/`` + 空 task 文案 + invalid UUID 404 + util 单测(args_preview / pretty_json / render_md 边界)。版本 0.2 → 0.3。requirements 加 `markdown-it-py[linkify]` / `mdit-py-plugins` / `pygments`。
+- **05-15 / §7 Phase G G4 chat 发送 + SSE 流式**:新增 `web/broker.py::RunBroker`(in-process pub/sub,`subscribe/emit/close/unsubscribe`)+ `web/sinks.py::WebEventSink` 实现 §7 A 的 sink 协议,把 `AgentLoop._emit` 桥到 broker。**异步策略 = `asyncio.to_thread`**(不改 core):POST `/tasks/{tid}/messages` async handler → 校验 task + INSERT `runs` 行 + `asyncio.create_task(asyncio.to_thread(_run_agent_bg, ...))`,`_run_agent_bg` 在工作线程跑 `build_agent(resume=True) + agent.run`,sink 通过 `loop.call_soon_threadsafe(q.put_nowait, ev)` 跨线程桥事件回 asyncio queue。**多访问策略 = fan-out**:每订阅一个独立 `asyncio.Queue`,同 run 多 tab / 刷新 / 桌面+移动都看得到流;`_done` 集合让晚到订阅者立即收 `done`(不挂)。GET `/tasks/{tid}/runs/{rid}/events` 返 `StreamingResponse` async gen,响应头带 `text/event-stream / Cache-Control: no-cache / X-Accel-Buffering: no`(nginx 反代友好);第一帧发 `: connected\nretry: 3000\n\n` 让 EventSource 立即建立,30s 无 event 发 `: ping` 注释心跳。**SSE multi-line data**:HTML 片段含换行,每行加 `data: ` 前缀(SSE spec),EventSource API 还原成 `\n` 拼接的 HTML 字符串。**Event → HTML 片段**:`_render_event_fragment` 渲染 `text`/`tool_call`/`tool_result`/`error` 四种,`run_start/llm_start/llm_end/done` 发空 data(只让客户端识别 event type)。新 fragment 模板 `_frag_text.html` / `_frag_tool_call.html` / `_frag_tool_result.html` / `_frag_error.html` + `_send_response.html`(POST 响应:user msg 卡 + `msg-assistant streaming` 容器带 `sse-connect/sse-swap/sse-close`)。`chat.html` 加 send 表单(Enter 发送、Shift+Enter 换行,HTMX `hx-post / hx-target=#chat-stream / hx-swap=beforeend / hx-on::after-request reset`);`chat` section 改 `id="chat-stream"` 让 SSE 追加进同一容器;非 active task 隐藏表单。CSS 加 `.streaming .run-indicator` 红点脉冲 / `.send-form` 表单样式 / `.tool-result-inline` 追加式样式 / `.msg-error` 错误卡。**Run 状态写 PG `runs` 表**:POST 时 status=running,正常完结 status=ok + tokens_p/c,异常 status=error + error 文本;DB 写失败不放大噪声(已 emit error 给前端)。**lifespan** `bind_loop(asyncio.get_running_loop())` 让 broker 拿到 asyncio loop 引用。Smoke 双层全绿:broker 单元 8 case(subscribe/emit/get、fan-out 双订阅、跨 run_id 隔离、close 派 done、late subscribe 立刻收 done、unsubscribe 后失联、WebEventSink 桥、unbinded loop silent drop);端到端 24 case(POST 200 + HTML 含 sse-connect + run_id 抽出 + SSE stream content-type/x-accel-buffering/cache-control 头对、event types 序列 `run_start/llm_start/text/tool_call/tool_result/llm_end/done`、text fragment 含 `` markdown、tool_call 含 ``、tool_result 含 preview、empty body 400、invalid/ghost UUID 404、late subscribe 立刻 done、PG runs 行 INSERT)。版本 0.3 → 0.4。**TODO**:并发同 task 多 run 互锁(messages idx UniqueConstraint 在并发 POST 下会冲突 — 用户连续点 send 暂时不会触发,但需要在 G6 或 D 阶段加 lock_for_update);event log 持久化(刷新继续看流式)留到未来。
---
@@ -81,10 +82,12 @@ db/migrations/env.py 61 ← §7 B Step 1
db/migrations/versions/
0001_initial_schema.py 125 ← §7 B Step 1
web/__init__.py 5 ← Phase G G1
-web/app.py 264 ← Phase G G1-G3: 工厂 + list_tasks + chat 渲染 + md/pygments
+web/app.py 468 ← Phase G G1-G4: 工厂 + list_tasks + chat 渲染 + POST/SSE
+web/broker.py 88 ← Phase G G4: in-process pub/sub
+web/sinks.py 20 ← Phase G G4: WebEventSink (§7 A sink 协议)
─────────────────────────────────
-Python 合计 ~3370 行
-+ web/templates/{base,home,chat}.html ~141 行 + web/static/style.css 131 行(不计 Python 主仓库)
+Python 合计 ~3662 行
++ web/templates/* ~196 行(base/home/chat + 4 个 _frag/_send_response)+ web/static/style.css 169 行(不计 Python 主仓库)
```
加 skills/ppt 脚本 ~600 行 + SKILL.md / references / config / prompts + alembic.ini,总仓库约 3500 行。
@@ -93,10 +96,11 @@ Python 合计 ~3370 行
## 下一步候选(性价比排序)
-1. **§7 Phase G G4 chat 发送 + SSE**(~1 天)—— `WebEventSink` 把 §7 A 的 `sink.emit` 推 text/event-stream(响应头带 `X-Accel-Buffering: no`),HTMX `sse-swap` 追加 DOM。**核心一步**,需异步跑 `AgentLoop` —— 走 `asyncio.to_thread` 或在 BG task 启 sync runner + 队列。
-2. **§7 Phase G G5 文件浏览 + G6 打磨**(~半天 + 半天)—— task_dir 树 / upload / download / 错误 toast / `/new` `/done /abandon` 按钮 / `/export` 链接。
-3. **§7 C Executor + sandbox**(~2-3 天)—— Phase G 完后再做,或穿插。
-4. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。
-5. **Proposal mermaid 预渲染**(~半天)—— ASCII 透传不够用时再上 `mmdc`。
+1. **§7 Phase G G5 文件浏览 + 上传下载**(~半天)—— `/tasks/{id}/files` 渲染 task_dir 树,upload (multipart)/ download / 删。
+2. **§7 Phase G G6 打磨**(~半天)—— `/new` 入口、`/done /abandon` 按钮、`/export` docx 下载、错误 toast、并发 run 互锁。
+3. **真实 LLM 跑通 G4**(~10 分钟)—— smoke 走的是 mock,需在浏览器开一个真 task 验证端到端体验(`cli.py web` → 打开 `/tasks/` → send → 看流式)。
+4. **§7 C Executor + sandbox**(~2-3 天)—— Phase G 完后再做,或穿插。
+5. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。
+6. **Proposal mermaid 预渲染**(~半天)—— ASCII 透传不够用时再上 `mmdc`。
-> §7 B 已完工。Phase G 进行中(G1 ✅ G2 ✅ G3 ✅)。剩余路线:G4-G6 → C(Executor)→ D(HTTP /v1 + OIDC)→ E(CLI 双模式)→ F(deploy / billing)。
+> §7 B 已完工。Phase G 进行中(G1 ✅ G2 ✅ G3 ✅ G4 ✅)。剩余路线:G5-G6 → C(Executor)→ D(HTTP /v1 + OIDC)→ E(CLI 双模式)→ F(deploy / billing)。
diff --git a/RUN.md b/RUN.md
index 74bbf0f..866de27 100644
--- a/RUN.md
+++ b/RUN.md
@@ -2,7 +2,7 @@
> 怎么把 zcbot 跑起来。env / 常用命令 / 故障兜底。设计看 `DESIGN.md`,进度看 `PROGRESS.md`。
-最后更新:2026-05-15(Phase G G3)
+最后更新:2026-05-15(Phase G G4)
---
@@ -103,7 +103,7 @@ REPL 内命令:`/exit /reset /new /resume [last|] /id /status /done /abandon
.venv/Scripts/python.exe cli.py web --reload
```
-> G1 ✅ 脚手架 + /healthz;G2 ✅ `/` task 列表 + `?status=` filter;G3 ✅ `/tasks/{uuid}` 消息流渲染(markdown-it-py + pygments syntax,tool_call 走 `` 默认折叠);G4-G6 待。task_dir 显示统一 forward-slash(Win 存 `\` 也归一)。Linux:`.venv/bin/python cli.py web` 一致。
+> G1 ✅ 脚手架 + /healthz;G2 ✅ `/` task 列表 + `?status=` filter;G3 ✅ `/tasks/{uuid}` 消息流渲染(markdown-it-py + pygments syntax,tool_call 走 `` 默认折叠);G4 ✅ chat 发送 + SSE 流式回复(POST `/tasks/{tid}/messages` 启 run、GET `/tasks/{tid}/runs/{rid}/events` SSE 流;HTMX `sse-swap` 追加 DOM,无 JS);G5-G6 待。task_dir 显示统一 forward-slash(Win 存 `\` 也归一)。Linux:`.venv/bin/python cli.py web` 一致。SSE 经 nginx 反代记得关 buffering(响应头已带 `X-Accel-Buffering: no` 默认起效)。
---
@@ -121,6 +121,9 @@ REPL 内命令:`/exit /reset /new /resume [last|] /id /status /done /abandon
| `NoSubtaskError: task_dir ... 与已有 task ... 前缀嵌套` | §7.4 no-subtask:同 user 不允许 task_dir 嵌套(child 或 parent)。**同项目多对话**请传**完全相同**的 `--task-dir`;否则改路径成 sibling(平级) |
| `cli.py web` 启动后浏览器开不了 | 检查 proxy(`HTTP_PROXY` / `HTTPS_PROXY`):本地形态服务在 127.0.0.1,系统 proxy 拦截会 502。临时 `unset HTTP_PROXY HTTPS_PROXY` 或浏览器配 bypass。`curl` 验通走 `curl --noproxy '*' http://127.0.0.1:8765/healthz`(应返 `ok`) |
| `TypeError: unhashable type: 'dict'` from Jinja templating | Starlette 新版签名:`TemplateResponse(request, name, context)`,旧式 `(name, {"request":..., "...":...})` 在 newer Starlette 会把 context dict 当 cache key 炸 |
+| SSE 卡住不流(经 nginx) | 反代要关 buffering — 后端响应头已带 `X-Accel-Buffering: no`,nginx ≥ 1.5.6 默认认。仍卡看 nginx 配 `proxy_buffering off; proxy_read_timeout 3600s;` |
+| 浏览器 send 后没反应 | 看 console:HTMX 报 connect failed → 看 `/tasks/{tid}/messages` 响应;200 但流不到 → 看 EventSource 状态(devtools Network → EventStream tab) |
+| `UniqueViolation idx already exists` from messages | 同 task 连续两次快速 POST,messages idx 冲突。**已知 TODO**:G6/D 阶段加 task 级 lock_for_update 或 advisory lock |
---
diff --git a/web/app.py b/web/app.py
index a5a9669..e37b5d1 100644
--- a/web/app.py
+++ b/web/app.py
@@ -10,19 +10,24 @@
"""
from __future__ import annotations
+import asyncio
import json
+from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, Optional
-from uuid import UUID
+from uuid import UUID, uuid4
-from fastapi import FastAPI, Request
-from fastapi.responses import HTMLResponse
+from fastapi import FastAPI, Form, HTTPException, Request
+from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
-from sqlalchemy import func, select
+from sqlalchemy import func, select, update
from core.storage import session_scope
-from core.storage.models import Message, Task
+from core.storage.models import Message, Run, Task
+
+from .broker import broker
+from .sinks import WebEventSink
WEB_ROOT = Path(__file__).resolve().parent
TEMPLATES_DIR = WEB_ROOT / "templates"
@@ -195,11 +200,120 @@ def list_tasks(limit: int = 50, status: Optional[str] = None) -> list[dict[str,
return result
+# --------------------------- Run 启动 / SSE event 渲染 ---------------------------
+
+def _run_agent_bg(task_id: UUID, run_id: UUID, user_message: str) -> None:
+ """工作线程入口。这里**不能** await asyncio —— 在 to_thread 跑。
+
+ 流程:build_agent(resume=True) → 装 WebEventSink → agent.run → 写 runs 状态。
+ """
+ from main import build_agent, sync_task_tokens
+
+ # build_agent 会调 ensure_local_sentinel / LLM init / Session.load 等。
+ # 单次 POST 每次都走一遍 — 不便宜但简单;未来按需缓存 agent。
+ try:
+ broker.emit(run_id, {"type": "run_start"})
+ agent, session, sid, task_state, task_dir = build_agent(
+ session_id=str(task_id), resume=True,
+ )
+ agent.sink = WebEventSink(broker, run_id)
+ agent.run(user_message)
+ sync_task_tokens(task_state, agent.llm)
+ with session_scope() as s:
+ s.execute(
+ update(Run)
+ .where(Run.run_id == run_id)
+ .values(
+ status="ok",
+ finished_at=func.now(),
+ tokens_p=agent.llm.token_counter.prompt_tokens,
+ tokens_c=agent.llm.token_counter.completion_tokens,
+ )
+ )
+ except Exception as e:
+ err = f"{type(e).__name__}: {e}"
+ broker.emit(run_id, {"type": "error", "msg": err})
+ try:
+ with session_scope() as s:
+ s.execute(
+ update(Run)
+ .where(Run.run_id == run_id)
+ .values(status="error", error=err, finished_at=func.now())
+ )
+ except Exception:
+ pass # 已 emit 给前端,DB 写失败不再放大噪声
+ finally:
+ broker.close(run_id)
+
+
+def _render_event_fragment(templates: Jinja2Templates, ev: dict, request: Request) -> str:
+ """把一条 event 渲染成 HTML 片段(供 SSE data 推送)。
+
+ 片段类型与 chat.html 静态 block 视觉一致,append 模式追加到 #chat-stream 容器尾。
+ text / tool_call / tool_result / error 各有专用块;run_start / llm_start / llm_end /
+ done 不出 HTML(用空串当 keep-alive,客户端依然能识别 event type 控制状态)。
+ """
+ t = ev.get("type")
+ if t == "text":
+ content = ev.get("content") or ""
+ if not content:
+ return ""
+ # assistant text 片段:跟 chat.html 静态 assistant body 同形态
+ return templates.get_template("_frag_text.html").render(
+ request=request, html=_render_md(content)
+ )
+ if t == "tool_call":
+ return templates.get_template("_frag_tool_call.html").render(
+ request=request,
+ name=ev.get("name", "?"),
+ args_preview=_args_preview(ev.get("args_preview", "")),
+ args_pretty=_pretty_json(json.dumps(ev.get("args", {}), ensure_ascii=False))
+ if ev.get("args") is not None else _pretty_json(ev.get("args_preview", "")),
+ )
+ if t == "tool_result":
+ return templates.get_template("_frag_tool_result.html").render(
+ request=request,
+ name=ev.get("name", "?"),
+ preview=ev.get("preview", ""),
+ truncated=ev.get("truncated", False),
+ )
+ if t == "error":
+ return templates.get_template("_frag_error.html").render(
+ request=request, msg=ev.get("msg", "")
+ )
+ # llm_start / llm_end / run_start / done:发空 data,htmx-ext-sse 也会触发 event,
+ # 客户端只读 type 控制状态(spinner / close);data 内容不需要 swap。
+ return ""
+
+
+def _sse_format(event_type: str, payload: str) -> bytes:
+ """格式化一帧 SSE。data 多行要每行 `data: ` 前缀(SSE spec)。
+
+ EventSource API 会自动把 multi-line data 用 \n 拼接还原 — htmx-ext-sse 直接拿来当 HTML swap。
+ """
+ parts = [f"event: {event_type}"]
+ if payload:
+ for line in payload.splitlines() or [""]:
+ parts.append(f"data: {line}")
+ else:
+ parts.append("data: ") # 空 data 也要有,EventSource 才认这帧
+ parts.append("") # 终结空行
+ parts.append("")
+ return ("\n".join(parts)).encode("utf-8")
+
+
# --------------------------- App 工厂 ---------------------------
def create_app() -> FastAPI:
"""FastAPI 工厂。uvicorn --reload 模式需要工厂签名(factory=True)。"""
- app = FastAPI(title="zcbot web", version="0.3")
+
+ @asynccontextmanager
+ async def lifespan(app: FastAPI):
+ # 把当前 asyncio loop 绑给 broker — emit() 从工作线程会 call_soon_threadsafe 桥回
+ broker.bind_loop(asyncio.get_running_loop())
+ yield
+
+ app = FastAPI(title="zcbot web", version="0.4", lifespan=lifespan)
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
@@ -257,6 +371,96 @@ def create_app() -> FastAPI:
},
)
+ @app.post("/tasks/{task_id}/messages", response_class=HTMLResponse)
+ async def post_message(request: Request, task_id: str, content: str = Form(...)):
+ """G4:用户提交消息 → 启 BG run → 返回 user msg 卡 + assistant 占位 + SSE 容器。
+
+ 客户端 HTMX hx-post 这条,响应 swap 到 #chat-stream beforeend;响应 HTML 内含
+ sse-connect=/tasks/{id}/runs/{rid}/events,htmx-ext-sse 自动开 EventSource。
+ """
+ try:
+ tid = UUID(task_id)
+ except ValueError:
+ raise HTTPException(404, f"invalid task id: {task_id!r}")
+ content = (content or "").strip()
+ if not content:
+ raise HTTPException(400, "empty message")
+ # 校验 task 存在
+ with session_scope() as s:
+ row = s.execute(
+ select(Task.task_id).where(Task.task_id == tid)
+ ).first()
+ if row is None:
+ raise HTTPException(404, f"task not found: {tid}")
+
+ run_id = uuid4()
+ with session_scope() as s:
+ s.add(Run(run_id=run_id, task_id=tid, status="running", started_at=func.now()))
+
+ # 启 BG agent — to_thread 跑 sync agent.run,sink 通过 broker 把 event 桥回 asyncio
+ asyncio.create_task(asyncio.to_thread(_run_agent_bg, tid, run_id, content))
+
+ return templates.TemplateResponse(
+ request, "_send_response.html",
+ {
+ "task_id": str(tid),
+ "run_id": str(run_id),
+ "user_html": _render_md(content),
+ },
+ )
+
+ @app.get("/tasks/{task_id}/runs/{run_id}/events")
+ async def stream_events(request: Request, task_id: str, run_id: str):
+ """G4:SSE 流。订阅 broker[run_id] → 渲染 HTML 片段 → 推。
+
+ 客户端断开(close tab / navigate)→ asyncio 在下次 yield 抛 CancelledError →
+ finally 清理。同 run 多订阅者(刷新页面 / 多 tab)各自独立 queue。
+ """
+ try:
+ tid = UUID(task_id)
+ rid = UUID(run_id)
+ except ValueError:
+ raise HTTPException(404, "invalid id")
+ # task 存在性校验(防探测 / 错链)
+ with session_scope() as s:
+ ok = s.execute(
+ select(Task.task_id).where(Task.task_id == tid)
+ ).first()
+ if ok is None:
+ raise HTTPException(404, f"task not found: {tid}")
+
+ async def gen():
+ q = broker.subscribe(rid)
+ try:
+ # 第一帧 retry 注释 + 心跳:让 EventSource 立即建立(不被 buffer 卡住)
+ yield b": connected\nretry: 3000\n\n"
+ while True:
+ try:
+ ev = await asyncio.wait_for(q.get(), timeout=30.0)
+ except asyncio.TimeoutError:
+ yield b": ping\n\n"
+ continue
+ ev_type = ev.get("type", "msg")
+ frag = _render_event_fragment(templates, ev, request)
+ yield _sse_format(ev_type, frag)
+ if ev_type in ("done", "error"):
+ break
+ except asyncio.CancelledError:
+ # 客户端断开 — 静默退,不向上抛
+ pass
+ finally:
+ broker.unsubscribe(rid, q)
+
+ return StreamingResponse(
+ gen(),
+ media_type="text/event-stream",
+ headers={
+ "Cache-Control": "no-cache",
+ "Connection": "keep-alive",
+ "X-Accel-Buffering": "no", # nginx 反代:别 buffer 这条流
+ },
+ )
+
@app.get("/healthz", response_class=HTMLResponse)
def healthz():
return HTMLResponse("ok")
diff --git a/web/broker.py b/web/broker.py
new file mode 100644
index 0000000..4015e52
--- /dev/null
+++ b/web/broker.py
@@ -0,0 +1,88 @@
+"""RunBroker:in-process pub/sub,把 agent run 产生的 event fan-out 给所有 SSE 订阅者。
+
+设计:
+- emit() 从工作线程调(agent.run 在 to_thread 跑),用 loop.call_soon_threadsafe
+ 桥到 asyncio queue;SSE generator await queue.get() 拉出来推流。
+- 同一 run_id 多个订阅者(刷新页面 / 多 tab / 桌面+移动)— 每个订阅 1 个独立 queue。
+- run 结束 → broker.close(run_id) 给所有订阅者派一条 done;新订阅者(在 done 后到的)
+ 立即收到 done 并断流(不漏不挂)。
+- 进程内单实例 / 多进程不共享 — 个人 SaaS 单 worker 够用;真要扩多 worker 再上 Redis。
+- 不持久化 event — messages 已落 PG,刷新页面走 G3 静态视图能看历史;真要"刷新继续看
+ 实时流"未来加 event log 表 + backfill。
+
+线程模型:
+- broker.bind_loop(loop) 在 FastAPI startup 调一次,记录 asyncio loop 引用。
+- emit() 调用方可能在任意线程;put_nowait 是 thread-unsafe(asyncio.Queue 设计前提
+ 是单 loop),所以走 call_soon_threadsafe 跨回 loop 线程再 put。
+- subscribe / unsubscribe / close 也都用 call_soon_threadsafe 包,避免 race
+ (实测 SSE generator 在 finally 里 unsubscribe,这个就在 loop 线程,直接调也行)。
+"""
+from __future__ import annotations
+
+import asyncio
+from collections import defaultdict
+from typing import Any, Optional
+from uuid import UUID
+
+
+class RunBroker:
+ def __init__(self) -> None:
+ self._subs: dict[UUID, set[asyncio.Queue]] = defaultdict(set)
+ # 已经发完 done 的 run — 后来订阅者直接收到 done,避免无限等
+ self._done: set[UUID] = set()
+ self._loop: Optional[asyncio.AbstractEventLoop] = None
+
+ def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None:
+ """FastAPI startup 调一次。"""
+ self._loop = loop
+
+ def subscribe(self, run_id: UUID) -> asyncio.Queue:
+ """订阅 run 的 event 流。已 done 的 run 立刻在 queue 放一条 done。
+
+ 调用方:SSE handler(在 asyncio loop 线程内)。
+ """
+ q: asyncio.Queue = asyncio.Queue()
+ if run_id in self._done:
+ q.put_nowait({"type": "done"})
+ else:
+ self._subs[run_id].add(q)
+ return q
+
+ def unsubscribe(self, run_id: UUID, q: asyncio.Queue) -> None:
+ """SSE generator finally 清理。"""
+ self._subs.get(run_id, set()).discard(q)
+ if run_id in self._subs and not self._subs[run_id]:
+ del self._subs[run_id]
+
+ def emit(self, run_id: UUID, event: dict[str, Any]) -> None:
+ """从工作线程调:把 event 推给所有订阅者。
+
+ 如果没人订阅(run 在跑但没浏览器连上),event 丢弃 — 这是设计选择
+ (event 不持久化,messages 走 PG)。
+ """
+ loop = self._loop
+ if loop is None:
+ return # 还没 bind,丢弃(测试 / 启动竞态)
+ for q in list(self._subs.get(run_id, [])):
+ loop.call_soon_threadsafe(q.put_nowait, event)
+
+ def close(self, run_id: UUID) -> None:
+ """run 结束:派 done 给所有订阅者,标记 run_id 为已完成。
+
+ 从工作线程调(agent.run 完成 / 抛异常 finally 清理)。
+ """
+ self.emit(run_id, {"type": "done"})
+ self._done.add(run_id)
+ # subs 不在这里立即删 — SSE generator 会先收到 done、yield 它、走到
+ # finally unsubscribe;此处 emit 后立即删会让那次 emit 之后的清理无的放矢。
+
+ def n_subscribers(self, run_id: UUID) -> int:
+ """供测试 / 监控用。"""
+ return len(self._subs.get(run_id, set()))
+
+ def is_done(self, run_id: UUID) -> bool:
+ return run_id in self._done
+
+
+# 进程内单例 — FastAPI lifespan 里 bind_loop;agent / sink / SSE handler 共享。
+broker = RunBroker()
diff --git a/web/sinks.py b/web/sinks.py
new file mode 100644
index 0000000..970c1e9
--- /dev/null
+++ b/web/sinks.py
@@ -0,0 +1,20 @@
+"""WebEventSink:实现 §7 A 的 sink 协议,把 AgentLoop.emit 桥到 RunBroker。
+
+每次 run 一个 sink 实例,绑死 run_id。`emit({type, ...})` 直接转 broker.emit(run_id, event)。
+sink 实例由 web 层在启 run 时创建,传进 AgentLoop;loop 完全不知 web 存在(§5 Less Scaffolding)。
+"""
+from __future__ import annotations
+
+from typing import Any
+from uuid import UUID
+
+from .broker import RunBroker
+
+
+class WebEventSink:
+ def __init__(self, broker: RunBroker, run_id: UUID) -> None:
+ self._broker = broker
+ self._run_id = run_id
+
+ def emit(self, event: dict[str, Any]) -> None:
+ self._broker.emit(self._run_id, event)
diff --git a/web/static/style.css b/web/static/style.css
index ec4c2f8..ea3f040 100644
--- a/web/static/style.css
+++ b/web/static/style.css
@@ -121,6 +121,44 @@ table.task-list tr:hover { background: #fdf6f6; }
.tool-label { font-size: .7rem; color: var(--muted); text-transform: uppercase; letter-spacing: .05em; font-weight: 600; margin-bottom: .15rem; }
.tool-pre { background: #fafafa; padding: .5rem .65rem; border: 1px solid var(--border); border-radius: 3px; max-height: 400px; overflow-y: auto; white-space: pre-wrap; word-break: break-word; font-family: var(--mono); font-size: .8rem; line-height: 1.4; margin: 0; color: #333; }
+/* 流式状态指示 + send form (G4) */
+.streaming .run-indicator {
+ display: inline-block; width: 6px; height: 6px; border-radius: 50%;
+ background: var(--accent); margin-left: .35rem; vertical-align: middle;
+ animation: pulse 1.2s ease-in-out infinite;
+}
+@keyframes pulse {
+ 0%, 100% { opacity: .35; transform: scale(.9); }
+ 50% { opacity: 1; transform: scale(1.15); }
+}
+.send-form { display: flex; gap: .5rem; margin-top: 1rem; align-items: flex-end; }
+.send-form textarea {
+ flex: 1; font: inherit; padding: .5rem .65rem; border: 1px solid var(--border);
+ border-radius: 4px; resize: vertical; min-height: 2.4rem; max-height: 14rem;
+ background: var(--surface); color: var(--fg); font-size: .95rem; line-height: 1.4;
+}
+.send-form textarea:focus { outline: 2px solid var(--accent-soft); outline-offset: 1px; border-color: var(--accent); }
+.send-form button {
+ padding: .55rem 1.1rem; border: 1px solid var(--accent); border-radius: 4px;
+ background: var(--accent); color: white; cursor: pointer; font: inherit; font-weight: 600;
+}
+.send-form button:hover { filter: brightness(1.05); }
+.send-form button:disabled { background: var(--muted); border-color: var(--muted); cursor: wait; }
+
+/* tool_result append-only 片段(G4 流式来:跟在上一个 tool_call 后) */
+.tool-result-inline { margin: .5rem 0 .25rem 1rem; padding-left: .65rem; border-left: 2px solid var(--border); }
+.tool-result-tag { font-family: var(--mono); font-size: .75rem; color: var(--muted); font-weight: 600; }
+.tool-pending { color: var(--muted); font-style: italic; }
+
+/* error 片段 */
+.msg-error {
+ display: flex; gap: .5rem; align-items: baseline;
+ margin-top: .5rem; padding: .5rem .75rem;
+ background: #fceaea; border-left: 3px solid var(--accent); border-radius: 3px;
+ font-size: .85rem; color: #5c0a0a;
+}
+.err-tag { font-weight: 600; text-transform: uppercase; font-size: .7rem; letter-spacing: .05em; }
+
/* pygments codehilite (轻量配色,选少数高频 token,余下走默认黑色) */
.codehilite .k, .codehilite .kn, .codehilite .kr { color: #c00; } /* keyword */
.codehilite .s, .codehilite .s1, .codehilite .s2, .codehilite .sb, .codehilite .sd { color: #1a3d6b; } /* string */
diff --git a/web/templates/_frag_error.html b/web/templates/_frag_error.html
new file mode 100644
index 0000000..4f34e30
--- /dev/null
+++ b/web/templates/_frag_error.html
@@ -0,0 +1,4 @@
+
+ error
+ {{ msg }}
+
diff --git a/web/templates/_frag_text.html b/web/templates/_frag_text.html
new file mode 100644
index 0000000..e8aa55e
--- /dev/null
+++ b/web/templates/_frag_text.html
@@ -0,0 +1 @@
+{{ html | safe }}
diff --git a/web/templates/_frag_tool_call.html b/web/templates/_frag_tool_call.html
new file mode 100644
index 0000000..224d7b5
--- /dev/null
+++ b/web/templates/_frag_tool_call.html
@@ -0,0 +1,13 @@
+
+
+ tool
+ {{ name }}
+ {{ args_preview }}
+
+
+
+ args
+ {{ args_pretty }}
+
+
+
diff --git a/web/templates/_frag_tool_result.html b/web/templates/_frag_tool_result.html
new file mode 100644
index 0000000..9a71ab1
--- /dev/null
+++ b/web/templates/_frag_tool_result.html
@@ -0,0 +1,4 @@
+
+ ↳ {{ name }}
+ {{ preview }}{% if truncated %} (truncated){% endif %}
+
diff --git a/web/templates/_send_response.html b/web/templates/_send_response.html
new file mode 100644
index 0000000..f7de5f2
--- /dev/null
+++ b/web/templates/_send_response.html
@@ -0,0 +1,22 @@
+{# POST /tasks/{id}/messages 响应 — append 进 #chat-stream beforeend。
+ 含 user msg 卡 + assistant 容器(SSE 监听器在它身上)。
+ htmx-ext-sse:sse-connect 开 EventSource;sse-swap 列的 event 把 data
+ 作为 HTML swap 到自己(hx-swap=beforeend 决定追加而非替换)。
+#}
+
+ user
+ {{ user_html | safe }}
+
+
+
+
+ assistant
+
+
+ {# SSE event=text/tool_call/tool_result/error 的 data → swap 到这个 article 内尾部 #}
+
diff --git a/web/templates/chat.html b/web/templates/chat.html
index ea50319..ea50b48 100644
--- a/web/templates/chat.html
+++ b/web/templates/chat.html
@@ -15,7 +15,7 @@
{% if description %}{{ description }}
{% endif %}
{% if task_dir %}{{ task_dir }}
{% endif %}
-
+
{% for b in blocks %}
{% if b.type == "user" %}
@@ -47,12 +47,23 @@
{% endfor %}
{% endif %}
- {% else %}
-
- 该 task 还没消息(只读视图)。G4 上线后从浏览器发送 / 流式回复。
-
{% endfor %}
-G3 只读 · 发送 + SSE = G4 · 文件浏览 = G5
+{% if status == "active" %}
+
+{% else %}
+task 已 {{ status }},不接收新消息。CLI /done 改 status 来恢复。
+{% endif %}
+
+G4 流式 ✓ · 文件浏览 = G5 · 打磨 = G6
{% endblock %}