"""EventSink: 把 loop 产生的事件画到目标(本地 console / SSE / 日志)。 Loop 不直接 print,改 emit({type, ...})。Sink 决定怎么呈现。 事件类型(loop 当前会发的): llm_start {type} —— 一轮 LLM 调用开始 llm_end {type, prompt_tokens, completion_tokens, elapsed} text {type, content} —— assistant 文字段(整段,非流式) tool_call {type, name, args, args_preview} tool_result {type, name, result, preview, truncated} done {type} —— 一次 run 全部结束 后续接 SSE 时,sink 实现里把 emit 转 yield 即可,loop 一行不用改。 """ from __future__ import annotations import threading import time from typing import Callable, Optional from rich.console import Console from rich.markdown import Markdown class ConsoleEventSink: """把事件画到 rich console。spinner 在 llm_start..llm_end 之间显示, 后台 daemon 线程每 100ms 刷耗时 + 累计 token。""" def __init__( self, console: Console, token_counter: Optional[Callable[[], int]] = None, ) -> None: self.console = console # 把 LLM 累计 token 数取出来(spinner 文案要用),可选;无则不显示 ctx self._tokens = token_counter or (lambda: 0) self._status = None self._stop: Optional[threading.Event] = None self._thread: Optional[threading.Thread] = None self._start = 0.0 def emit(self, event: dict) -> None: t = event.get("type") if t == "llm_start": self._spinner_start() elif t == "llm_end": self._spinner_stop() pt = event.get("prompt_tokens", 0) ct = event.get("completion_tokens", 0) el = event.get("elapsed", 0.0) self.console.print(f"[info][in {pt:,} out {ct:,} t {el:.1f}s][/info]") elif t == "text": content = event.get("content") or "" if content: self.console.print("[assistant]assistant>[/assistant]") self.console.print(Markdown(content)) elif t == "tool_call": name = event.get("name", "") preview = event.get("args_preview", "") self.console.print(f"[tool]tool>[/tool] {name}({preview})") elif t == "tool_result": preview = event.get("preview", "") self.console.print(f"[muted]{preview}[/muted]") # done: 无需输出 def _spinner_start(self) -> None: self._start = time.monotonic() self._stop = threading.Event() def fmt() -> str: elapsed = time.monotonic() - self._start total = self._tokens() tail = f" ctx {total:,} tok" if total else "" return f"[muted]thinking... {elapsed:.1f}s{tail}[/muted]" self._status = self.console.status(fmt(), spinner="dots") self._status.__enter__() def tick() -> None: while not self._stop.wait(0.1): try: self._status.update(fmt()) except Exception: return self._thread = threading.Thread(target=tick, daemon=True) self._thread.start() def _spinner_stop(self) -> None: if self._stop is not None: self._stop.set() if self._thread is not None: self._thread.join(timeout=0.5) if self._status is not None: try: self._status.__exit__(None, None, None) except Exception: pass self._status = None self._stop = None self._thread = None