94 lines
3.3 KiB
Python
94 lines
3.3 KiB
Python
"""EventSink: 把 loop 产生的事件画到目标(本地 console / SSE / 日志)。
|
|
|
|
Loop 不直接 print,改 emit({type, ...})。Sink 决定怎么呈现。
|
|
|
|
事件类型(loop 当前会发的):
|
|
llm_start {type} —— 一轮 LLM 调用开始
|
|
llm_end {type, prompt_tokens, completion_tokens, elapsed}
|
|
text {type, content} —— assistant 文字段(整段,非流式)
|
|
tool_call {type, name, args, args_preview}
|
|
tool_result {type, name, result, preview, truncated}
|
|
done {type} —— 一次 run 全部结束
|
|
|
|
后续接 SSE 时,sink 实现里把 emit 转 yield 即可,loop 一行不用改。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
from rich.console import Console
|
|
from rich.markdown import Markdown
|
|
|
|
|
|
class ConsoleEventSink:
|
|
"""把事件画到 rich console。spinner 在 llm_start..llm_end 之间显示,
|
|
后台 daemon 线程每 100ms 刷耗时。"""
|
|
|
|
def __init__(self, console: Console) -> None:
|
|
self.console = console
|
|
self._status = None
|
|
self._stop: Optional[threading.Event] = None
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._start = 0.0
|
|
|
|
def emit(self, event: dict) -> None:
|
|
t = event.get("type")
|
|
if t == "llm_start":
|
|
self._spinner_start()
|
|
elif t == "llm_end":
|
|
self._spinner_stop()
|
|
pt = event.get("prompt_tokens", 0)
|
|
ct = event.get("completion_tokens", 0)
|
|
el = event.get("elapsed", 0.0)
|
|
self.console.print(f"[info][in {pt:,} out {ct:,} t {el:.1f}s][/info]")
|
|
elif t == "text":
|
|
content = event.get("content") or ""
|
|
if content:
|
|
self.console.print("[assistant]assistant>[/assistant]")
|
|
self.console.print(Markdown(content))
|
|
elif t == "tool_call":
|
|
name = event.get("name", "")
|
|
preview = event.get("args_preview", "")
|
|
self.console.print(f"[tool]tool>[/tool] {name}({preview})")
|
|
elif t == "tool_result":
|
|
preview = event.get("preview", "")
|
|
self.console.print(f"[muted]{preview}[/muted]")
|
|
# done: 无需输出
|
|
|
|
def _spinner_start(self) -> None:
|
|
self._start = time.monotonic()
|
|
self._stop = threading.Event()
|
|
|
|
def fmt() -> str:
|
|
elapsed = time.monotonic() - self._start
|
|
return f"[muted]thinking... {elapsed:.1f}s[/muted]"
|
|
|
|
self._status = self.console.status(fmt(), spinner="dots")
|
|
self._status.__enter__()
|
|
|
|
def tick() -> None:
|
|
while not self._stop.wait(0.1):
|
|
try:
|
|
self._status.update(fmt())
|
|
except Exception:
|
|
return
|
|
|
|
self._thread = threading.Thread(target=tick, daemon=True)
|
|
self._thread.start()
|
|
|
|
def _spinner_stop(self) -> None:
|
|
if self._stop is not None:
|
|
self._stop.set()
|
|
if self._thread is not None:
|
|
self._thread.join(timeout=0.5)
|
|
if self._status is not None:
|
|
try:
|
|
self._status.__exit__(None, None, None)
|
|
except Exception:
|
|
pass
|
|
self._status = None
|
|
self._stop = None
|
|
self._thread = None
|