"""RunBroker:in-process pub/sub,把 agent run 产生的 event fan-out 给所有 SSE 订阅者。 设计: - emit() 从工作线程调(agent.run 在 to_thread 跑),用 loop.call_soon_threadsafe 桥到 asyncio queue;SSE generator await queue.get() 拉出来推流。 - 同一 run_id 多个订阅者(刷新页面 / 多 tab / 桌面+移动)— 每个订阅 1 个独立 queue。 - run 结束 → broker.close(run_id) 给所有订阅者派一条 done;新订阅者(在 done 后到的) 立即收到 done 并断流(不漏不挂)。 - 进程内单实例 / 多进程不共享 — 个人 SaaS 单 worker 够用;真要扩多 worker 再上 Redis。 - 不持久化 event — messages 已落 PG,刷新页面走 G3 静态视图能看历史;真要"刷新继续看 实时流"未来加 event log 表 + backfill。 线程模型: - broker.bind_loop(loop) 在 FastAPI startup 调一次,记录 asyncio loop 引用。 - emit() 调用方可能在任意线程;put_nowait 是 thread-unsafe(asyncio.Queue 设计前提 是单 loop),所以走 call_soon_threadsafe 跨回 loop 线程再 put。 - subscribe / unsubscribe / close 也都用 call_soon_threadsafe 包,避免 race (实测 SSE generator 在 finally 里 unsubscribe,这个就在 loop 线程,直接调也行)。 """ from __future__ import annotations import asyncio import threading from collections import defaultdict from typing import Any, Optional from uuid import UUID class RunBroker: def __init__(self) -> None: self._subs: dict[UUID, set[asyncio.Queue]] = defaultdict(set) # 已经发完 done 的 run — 后来订阅者直接收到 done,避免无限等 self._done: set[UUID] = set() # cancel signal per-run。AgentLoop 在 BG 线程里 poll is_cancelled() 决定是否退; # request_cancel 可在 BG 还没 register 时调用(setdefault),BG 启动后第一次 # check 即看到。run 完成在 finally 里 clear_cancel 回收。 self._cancel_flags: dict[UUID, threading.Event] = {} self._loop: Optional[asyncio.AbstractEventLoop] = None def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None: """FastAPI startup 调一次。""" self._loop = loop def subscribe(self, run_id: UUID) -> asyncio.Queue: """订阅 run 的 event 流。已 done 的 run 立刻在 queue 放一条 done。 调用方:SSE handler(在 asyncio loop 线程内)。 """ q: asyncio.Queue = asyncio.Queue() if run_id in self._done: q.put_nowait({"type": "done"}) else: self._subs[run_id].add(q) return q def unsubscribe(self, run_id: UUID, q: asyncio.Queue) -> None: """SSE generator finally 清理。""" self._subs.get(run_id, set()).discard(q) if run_id in self._subs and not self._subs[run_id]: del self._subs[run_id] def emit(self, run_id: UUID, event: dict[str, Any]) -> None: """从工作线程调:把 event 推给所有订阅者。 如果没人订阅(run 在跑但没浏览器连上),event 丢弃 — 这是设计选择 (event 不持久化,messages 走 PG)。 """ loop = self._loop if loop is None: return # 还没 bind,丢弃(测试 / 启动竞态) for q in list(self._subs.get(run_id, [])): loop.call_soon_threadsafe(q.put_nowait, event) def close(self, run_id: UUID) -> None: """run 结束:派 done 给所有订阅者,标记 run_id 为已完成。 从工作线程调(agent.run 完成 / 抛异常 finally 清理)。 """ self.emit(run_id, {"type": "done"}) self._done.add(run_id) # subs 不在这里立即删 — SSE generator 会先收到 done、yield 它、走到 # finally unsubscribe;此处 emit 后立即删会让那次 emit 之后的清理无的放矢。 def n_subscribers(self, run_id: UUID) -> int: """供测试 / 监控用。""" return len(self._subs.get(run_id, set())) def is_done(self, run_id: UUID) -> bool: return run_id in self._done # ─────────────── cancel signaling ─────────────── def request_cancel(self, run_id: UUID) -> None: """主线程(HTTP handler)发的 cancel 信号 — BG 线程 poll is_cancelled() 看见即退。 setdefault:即便 BG 还没注册 flag 也能 set,BG 启动后第一次 check 立刻看见。 """ self._cancel_flags.setdefault(run_id, threading.Event()).set() def is_cancelled(self, run_id: UUID) -> bool: ev = self._cancel_flags.get(run_id) return bool(ev and ev.is_set()) def clear_cancel(self, run_id: UUID) -> None: """run 真正退出(BG finally)清掉 flag,避免 dict 无限增长。""" self._cancel_flags.pop(run_id, None) # 进程内单例 — FastAPI lifespan 里 bind_loop;agent / sink / SSE handler 共享。 broker = RunBroker()