zcbot/web/broker.py

122 lines
5.5 KiB
Python

"""RunBroker:in-process pub/sub,把 agent run 产生的 event fan-out 给所有 SSE 订阅者。
DESIGN §7 简化(0004 一并)—— 单活 run 形态下 run_id 是冗余的(同 task 同时
最多 1 个活 run);broker 内部全用 task_id 索引,客户端只需要 task_id 即可
订阅 / cancel,不再需要先拿 run_id。
设计:
- emit() 从工作线程调(agent.run 在 to_thread 跑),用 loop.call_soon_threadsafe
桥到 asyncio queue;SSE generator await queue.get() 拉出来推流。
- 同一 task 多个订阅者(刷新页面 / 多 tab / 桌面+移动)— 每个订阅 1 个独立 queue。
- run 结束 → broker.close(task_id) 给所有订阅者派一条 done;新订阅者(在 done
后到的)立即收到 done 并断流(不漏不挂)。
- 同 task 起新 run → broker.start(task_id) 清掉 _done 标记;否则上一轮 done
会让新订阅者立刻断流看不到流式。
- 进程内单实例 / 多进程不共享 — 个人 SaaS 单 worker 够用;真要扩多 worker 再上 Redis。
- 不持久化 event — messages 已落 PG,刷新页面走 GET /v1/tasks/{id}/messages 看历史;
真要"刷新继续看实时流"未来加 event log 表 + backfill。
线程模型:
- broker.bind_loop(loop) 在 FastAPI startup 调一次,记录 asyncio loop 引用。
- emit() 调用方可能在任意线程;put_nowait 是 thread-unsafe(asyncio.Queue 设计前提
是单 loop),所以走 call_soon_threadsafe 跨回 loop 线程再 put。
- subscribe / unsubscribe / close / start 也都用 call_soon_threadsafe 包,避免 race
(实测 SSE generator 在 finally 里 unsubscribe,这个就在 loop 线程,直接调也行)。
"""
from __future__ import annotations
import asyncio
import threading
from collections import defaultdict
from typing import Any, Optional
from uuid import UUID
class RunBroker:
def __init__(self) -> None:
self._subs: dict[UUID, set[asyncio.Queue]] = defaultdict(set)
# 已经发完 done 的 task — 后来订阅者直接收到 done,避免无限等
self._done: set[UUID] = set()
# cancel signal per-task。AgentLoop 在 BG 线程里 poll is_cancelled() 决定是否退;
# request_cancel 可在 BG 还没 register 时调用(setdefault),BG 启动后第一次
# check 即看到。run 完成在 finally 里 clear_cancel 回收。
self._cancel_flags: dict[UUID, threading.Event] = {}
self._loop: Optional[asyncio.AbstractEventLoop] = None
def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""FastAPI startup 调一次。"""
self._loop = loop
def start(self, task_id: UUID) -> None:
"""同 task 起新 run 时调:清 _done 标记,让新订阅者能看到流式。
cancel flag 在 finally 里 clear_cancel 清,这里不动(避免擦掉刚刚 request_cancel 的请求)。
"""
self._done.discard(task_id)
def subscribe(self, task_id: UUID) -> asyncio.Queue:
"""订阅 task 当前 run 的 event 流。已 done 的 task 立刻在 queue 放一条 done。
调用方:SSE handler(在 asyncio loop 线程内)。
"""
q: asyncio.Queue = asyncio.Queue()
if task_id in self._done:
q.put_nowait({"type": "done"})
else:
self._subs[task_id].add(q)
return q
def unsubscribe(self, task_id: UUID, q: asyncio.Queue) -> None:
"""SSE generator finally 清理。"""
self._subs.get(task_id, set()).discard(q)
if task_id in self._subs and not self._subs[task_id]:
del self._subs[task_id]
def emit(self, task_id: UUID, event: dict[str, Any]) -> None:
"""从工作线程调:把 event 推给所有订阅者。
如果没人订阅(run 在跑但没浏览器连上),event 丢弃 — 这是设计选择
(event 不持久化,messages 走 PG)。
"""
loop = self._loop
if loop is None:
return # 还没 bind,丢弃(测试 / 启动竞态)
for q in list(self._subs.get(task_id, [])):
loop.call_soon_threadsafe(q.put_nowait, event)
def close(self, task_id: UUID) -> None:
"""run 结束:派 done 给所有订阅者,标记 task 为已完成。
从工作线程调(agent.run 完成 / 抛异常 finally 清理)。
"""
self.emit(task_id, {"type": "done"})
self._done.add(task_id)
# subs 不在这里立即删 — SSE generator 会先收到 done、yield 它、走到
# finally unsubscribe;此处 emit 后立即删会让那次 emit 之后的清理无的放矢。
def n_subscribers(self, task_id: UUID) -> int:
"""供测试 / 监控用。"""
return len(self._subs.get(task_id, set()))
def is_done(self, task_id: UUID) -> bool:
return task_id in self._done
# ─────────────── cancel signaling ───────────────
def request_cancel(self, task_id: UUID) -> None:
"""主线程(HTTP handler)发的 cancel 信号 — BG 线程 poll is_cancelled() 看见即退。
setdefault:即便 BG 还没注册 flag 也能 set,BG 启动后第一次 check 立刻看见。
"""
self._cancel_flags.setdefault(task_id, threading.Event()).set()
def is_cancelled(self, task_id: UUID) -> bool:
ev = self._cancel_flags.get(task_id)
return bool(ev and ev.is_set())
def clear_cancel(self, task_id: UUID) -> None:
"""run 真正退出(BG finally)清掉 flag,避免 dict 无限增长。"""
self._cancel_flags.pop(task_id, None)
# 进程内单例 — FastAPI lifespan 里 bind_loop;agent / sink / SSE handler 共享。
broker = RunBroker()