zcbot/core/sinks.py

102 lines
3.6 KiB
Python

"""EventSink: 把 loop 产生的事件画到目标(本地 console / SSE / 日志)。
Loop 不直接 print,改 emit({type, ...})。Sink 决定怎么呈现。
事件类型(loop 当前会发的):
llm_start {type} —— 一轮 LLM 调用开始
llm_end {type, prompt_tokens, completion_tokens, elapsed}
text {type, content} —— assistant 文字段(整段,非流式)
tool_call {type, name, args, args_preview}
tool_result {type, name, result, preview, truncated}
done {type} —— 一次 run 全部结束
后续接 SSE 时,sink 实现里把 emit 转 yield 即可,loop 一行不用改。
"""
from __future__ import annotations
import threading
import time
from typing import Callable, Optional
from rich.console import Console
from rich.markdown import Markdown
class ConsoleEventSink:
"""把事件画到 rich console。spinner 在 llm_start..llm_end 之间显示,
后台 daemon 线程每 100ms 刷耗时 + 累计 token。"""
def __init__(
self,
console: Console,
token_counter: Optional[Callable[[], int]] = None,
) -> None:
self.console = console
# 把 LLM 累计 token 数取出来(spinner 文案要用),可选;无则不显示 ctx
self._tokens = token_counter or (lambda: 0)
self._status = None
self._stop: Optional[threading.Event] = None
self._thread: Optional[threading.Thread] = None
self._start = 0.0
def emit(self, event: dict) -> None:
t = event.get("type")
if t == "llm_start":
self._spinner_start()
elif t == "llm_end":
self._spinner_stop()
pt = event.get("prompt_tokens", 0)
ct = event.get("completion_tokens", 0)
el = event.get("elapsed", 0.0)
self.console.print(f"[info][in {pt:,} out {ct:,} t {el:.1f}s][/info]")
elif t == "text":
content = event.get("content") or ""
if content:
self.console.print("[assistant]assistant>[/assistant]")
self.console.print(Markdown(content))
elif t == "tool_call":
name = event.get("name", "")
preview = event.get("args_preview", "")
self.console.print(f"[tool]tool>[/tool] {name}({preview})")
elif t == "tool_result":
preview = event.get("preview", "")
self.console.print(f"[muted]{preview}[/muted]")
# done: 无需输出
def _spinner_start(self) -> None:
self._start = time.monotonic()
self._stop = threading.Event()
def fmt() -> str:
elapsed = time.monotonic() - self._start
total = self._tokens()
tail = f" ctx {total:,} tok" if total else ""
return f"[muted]thinking... {elapsed:.1f}s{tail}[/muted]"
self._status = self.console.status(fmt(), spinner="dots")
self._status.__enter__()
def tick() -> None:
while not self._stop.wait(0.1):
try:
self._status.update(fmt())
except Exception:
return
self._thread = threading.Thread(target=tick, daemon=True)
self._thread.start()
def _spinner_stop(self) -> None:
if self._stop is not None:
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=0.5)
if self._status is not None:
try:
self._status.__exit__(None, None, None)
except Exception:
pass
self._status = None
self._stop = None
self._thread = None