zcbot/web/broker.py

89 lines
3.8 KiB
Python

"""RunBroker:in-process pub/sub,把 agent run 产生的 event fan-out 给所有 SSE 订阅者。
设计:
- emit() 从工作线程调(agent.run 在 to_thread 跑),用 loop.call_soon_threadsafe
桥到 asyncio queue;SSE generator await queue.get() 拉出来推流。
- 同一 run_id 多个订阅者(刷新页面 / 多 tab / 桌面+移动)— 每个订阅 1 个独立 queue。
- run 结束 → broker.close(run_id) 给所有订阅者派一条 done;新订阅者(在 done 后到的)
立即收到 done 并断流(不漏不挂)。
- 进程内单实例 / 多进程不共享 — 个人 SaaS 单 worker 够用;真要扩多 worker 再上 Redis。
- 不持久化 event — messages 已落 PG,刷新页面走 G3 静态视图能看历史;真要"刷新继续看
实时流"未来加 event log 表 + backfill。
线程模型:
- broker.bind_loop(loop) 在 FastAPI startup 调一次,记录 asyncio loop 引用。
- emit() 调用方可能在任意线程;put_nowait 是 thread-unsafe(asyncio.Queue 设计前提
是单 loop),所以走 call_soon_threadsafe 跨回 loop 线程再 put。
- subscribe / unsubscribe / close 也都用 call_soon_threadsafe 包,避免 race
(实测 SSE generator 在 finally 里 unsubscribe,这个就在 loop 线程,直接调也行)。
"""
from __future__ import annotations
import asyncio
from collections import defaultdict
from typing import Any, Optional
from uuid import UUID
class RunBroker:
def __init__(self) -> None:
self._subs: dict[UUID, set[asyncio.Queue]] = defaultdict(set)
# 已经发完 done 的 run — 后来订阅者直接收到 done,避免无限等
self._done: set[UUID] = set()
self._loop: Optional[asyncio.AbstractEventLoop] = None
def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""FastAPI startup 调一次。"""
self._loop = loop
def subscribe(self, run_id: UUID) -> asyncio.Queue:
"""订阅 run 的 event 流。已 done 的 run 立刻在 queue 放一条 done。
调用方:SSE handler(在 asyncio loop 线程内)。
"""
q: asyncio.Queue = asyncio.Queue()
if run_id in self._done:
q.put_nowait({"type": "done"})
else:
self._subs[run_id].add(q)
return q
def unsubscribe(self, run_id: UUID, q: asyncio.Queue) -> None:
"""SSE generator finally 清理。"""
self._subs.get(run_id, set()).discard(q)
if run_id in self._subs and not self._subs[run_id]:
del self._subs[run_id]
def emit(self, run_id: UUID, event: dict[str, Any]) -> None:
"""从工作线程调:把 event 推给所有订阅者。
如果没人订阅(run 在跑但没浏览器连上),event 丢弃 — 这是设计选择
(event 不持久化,messages 走 PG)。
"""
loop = self._loop
if loop is None:
return # 还没 bind,丢弃(测试 / 启动竞态)
for q in list(self._subs.get(run_id, [])):
loop.call_soon_threadsafe(q.put_nowait, event)
def close(self, run_id: UUID) -> None:
"""run 结束:派 done 给所有订阅者,标记 run_id 为已完成。
从工作线程调(agent.run 完成 / 抛异常 finally 清理)。
"""
self.emit(run_id, {"type": "done"})
self._done.add(run_id)
# subs 不在这里立即删 — SSE generator 会先收到 done、yield 它、走到
# finally unsubscribe;此处 emit 后立即删会让那次 emit 之后的清理无的放矢。
def n_subscribers(self, run_id: UUID) -> int:
"""供测试 / 监控用。"""
return len(self._subs.get(run_id, set()))
def is_done(self, run_id: UUID) -> bool:
return run_id in self._done
# 进程内单例 — FastAPI lifespan 里 bind_loop;agent / sink / SSE handler 共享。
broker = RunBroker()