From 375bb2999ca2ab5e1feceb7be97e672aaab2df15 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Mon, 11 May 2026 14:59:32 +0800 Subject: [PATCH] =?UTF-8?q?core:=20loop=20=E4=BA=8B=E4=BB=B6=E6=B5=81?= =?UTF-8?q?=E5=8C=96=20(sink=20=E6=8E=A5=E5=8F=A3,=20=C2=A77=20A=20?= =?UTF-8?q?=E9=98=B6=E6=AE=B5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit loop 不直接 console.print —— 改成 sink.emit({type, ...}),sink 决定怎么呈现。 新增 ConsoleEventSink 接管 spinner / [in N out N] / assistant 文本 / tool>(args) / 结果预览, CLI 行为零回归。后续接 SSE 时只换 sink 实现,loop 不动。 Co-Authored-By: Claude Opus 4.7 (1M context) --- core/loop.py | 120 ++++++++++++++++++++++++-------------------------- core/sinks.py | 101 ++++++++++++++++++++++++++++++++++++++++++ main.py | 4 +- 3 files changed, 161 insertions(+), 64 deletions(-) create mode 100644 core/sinks.py diff --git a/core/loop.py b/core/loop.py index 392c33a..9c7502a 100644 --- a/core/loop.py +++ b/core/loop.py @@ -1,19 +1,17 @@ -"""主 agent loop: ReAct 风格,LLM ↔ Tool 反复直到无 tool_call。""" +"""主 agent loop: ReAct 风格,LLM ↔ Tool 反复直到无 tool_call。 + +loop 不直接 print —— 进度通过 sink.emit(event) 上抛。Sink 决定怎么呈现 +(本地 console / SSE / 日志)。事件类型见 core/sinks.py 头部说明。 +""" from __future__ import annotations import json -import threading import time -from contextlib import contextmanager from typing import Any, Dict, Optional, Tuple -from rich.console import Console -from rich.markdown import Markdown - from .capabilities import ModelCapabilities from .llm import LLM from .session import Session -from .ui import make_console def _extract_usage(usage: Any) -> Tuple[int, int]: @@ -36,7 +34,7 @@ class AgentLoop: tools: Dict[str, Any], session: Session, capabilities: ModelCapabilities, - console: Optional[Console] = None, + sink: Optional[Any] = None, max_iterations: Optional[int] = None, ) -> None: self.llm = llm @@ -44,68 +42,42 @@ class AgentLoop: self.session = session self.caps = capabilities self.max_iterations = max_iterations or capabilities.max_iterations - self.console = console or make_console() + self.sink = sink - @contextmanager - def _thinking(self): - """spinner 实时刷耗时 + 上下文 token 数。yield 出的 ctx 退出后填 elapsed。""" - start = time.monotonic() - stop = threading.Event() - - def fmt() -> str: - elapsed = time.monotonic() - start - total = self.llm.token_counter.total - tail = f" ctx {total:,} tok" if total else "" - return f"[muted]thinking... {elapsed:.1f}s{tail}[/muted]" - - class Ctx: - elapsed: float = 0.0 - - ctx = Ctx() - status = self.console.status(fmt(), spinner="dots") - - def tick() -> None: - while not stop.wait(0.1): - try: - status.update(fmt()) - except Exception: - return - - with status: - th = threading.Thread(target=tick, daemon=True) - th.start() - try: - yield ctx - finally: - stop.set() - th.join(timeout=0.5) - ctx.elapsed = time.monotonic() - start + def _emit(self, event: dict) -> None: + if self.sink is not None: + self.sink.emit(event) def run(self, user_message: str) -> str: self.session.append({"role": "user", "content": user_message}) for _ in range(self.max_iterations): - with self._thinking() as t: - response = self.llm.chat( - messages=self.session.messages, - tools=[t.schema for t in self.tools.values()], - reasoning_effort=self.caps.default_reasoning_effort or None, - ) + self._emit({"type": "llm_start"}) + start = time.monotonic() + response = self.llm.chat( + messages=self.session.messages, + tools=[t.schema for t in self.tools.values()], + reasoning_effort=self.caps.default_reasoning_effort or None, + ) + elapsed = time.monotonic() - start msg = response.choices[0].message self.session.append(msg) pt, ct = _extract_usage(getattr(response, "usage", None)) - self.console.print( - f"[info][in {pt:,} out {ct:,} t {t.elapsed:.1f}s][/info]" - ) + self._emit({ + "type": "llm_end", + "prompt_tokens": pt, + "completion_tokens": ct, + "elapsed": elapsed, + }) tool_calls = getattr(msg, "tool_calls", None) or [] content = getattr(msg, "content", None) if content: - self.console.print("[assistant]assistant>[/assistant]") - self.console.print(Markdown(content)) + self._emit({"type": "text", "content": content}) if not tool_calls: + self._emit({"type": "done"}) return content or "" for tc in tool_calls: @@ -118,6 +90,7 @@ class AgentLoop: } ) + self._emit({"type": "done"}) return "[reached max iterations]" def _execute_tool_call(self, tc: Any) -> str: @@ -128,31 +101,52 @@ class AgentLoop: except json.JSONDecodeError as e: return f"[Error] invalid JSON arguments for {name}: {e}" - preview = json.dumps(args, ensure_ascii=False) - if len(preview) > 200: - preview = preview[:200] + "..." - self.console.print(f"[tool]tool>[/tool] {name}({preview})") + args_preview = json.dumps(args, ensure_ascii=False) + if len(args_preview) > 200: + args_preview = args_preview[:200] + "..." + self._emit({ + "type": "tool_call", + "name": name, + "args": args, + "args_preview": args_preview, + }) tool = self.tools.get(name) if tool is None: - return f"[Error] unknown tool: {name}" + err = f"[Error] unknown tool: {name}" + self._emit({"type": "tool_result", "name": name, "result": err, + "preview": err, "truncated": False}) + return err try: result = tool.execute(**args) except TypeError as e: - return f"[Error] bad arguments to {name}: {e}" + err = f"[Error] bad arguments to {name}: {e}" + self._emit({"type": "tool_result", "name": name, "result": err, + "preview": err, "truncated": False}) + return err except Exception as e: - return f"[Error executing {name}] {type(e).__name__}: {e}" + err = f"[Error executing {name}] {type(e).__name__}: {e}" + self._emit({"type": "tool_result", "name": name, "result": err, + "preview": err, "truncated": False}) + return err if not isinstance(result, str): result = str(result) # 控制返回给模型的 tool 结果体量,避免炸 context MAX_LEN = 16_000 + truncated = False if len(result) > MAX_LEN: result = result[:MAX_LEN] + f"\n[... truncated, {len(result) - MAX_LEN} chars ...]" + truncated = True - # 给用户预览(截短) preview = result if len(result) < 400 else result[:400] + "..." - self.console.print(f"[muted]{preview}[/muted]") + self._emit({ + "type": "tool_result", + "name": name, + "result": result, + "preview": preview, + "truncated": truncated, + }) return result diff --git a/core/sinks.py b/core/sinks.py new file mode 100644 index 0000000..f708d94 --- /dev/null +++ b/core/sinks.py @@ -0,0 +1,101 @@ +"""EventSink: 把 loop 产生的事件画到目标(本地 console / SSE / 日志)。 + +Loop 不直接 print,改 emit({type, ...})。Sink 决定怎么呈现。 + +事件类型(loop 当前会发的): + llm_start {type} —— 一轮 LLM 调用开始 + llm_end {type, prompt_tokens, completion_tokens, elapsed} + text {type, content} —— assistant 文字段(整段,非流式) + tool_call {type, name, args, args_preview} + tool_result {type, name, result, preview, truncated} + done {type} —— 一次 run 全部结束 + +后续接 SSE 时,sink 实现里把 emit 转 yield 即可,loop 一行不用改。 +""" +from __future__ import annotations + +import threading +import time +from typing import Callable, Optional + +from rich.console import Console +from rich.markdown import Markdown + + +class ConsoleEventSink: + """把事件画到 rich console。spinner 在 llm_start..llm_end 之间显示, + 后台 daemon 线程每 100ms 刷耗时 + 累计 token。""" + + def __init__( + self, + console: Console, + token_counter: Optional[Callable[[], int]] = None, + ) -> None: + self.console = console + # 把 LLM 累计 token 数取出来(spinner 文案要用),可选;无则不显示 ctx + self._tokens = token_counter or (lambda: 0) + self._status = None + self._stop: Optional[threading.Event] = None + self._thread: Optional[threading.Thread] = None + self._start = 0.0 + + def emit(self, event: dict) -> None: + t = event.get("type") + if t == "llm_start": + self._spinner_start() + elif t == "llm_end": + self._spinner_stop() + pt = event.get("prompt_tokens", 0) + ct = event.get("completion_tokens", 0) + el = event.get("elapsed", 0.0) + self.console.print(f"[info][in {pt:,} out {ct:,} t {el:.1f}s][/info]") + elif t == "text": + content = event.get("content") or "" + if content: + self.console.print("[assistant]assistant>[/assistant]") + self.console.print(Markdown(content)) + elif t == "tool_call": + name = event.get("name", "") + preview = event.get("args_preview", "") + self.console.print(f"[tool]tool>[/tool] {name}({preview})") + elif t == "tool_result": + preview = event.get("preview", "") + self.console.print(f"[muted]{preview}[/muted]") + # done: 无需输出 + + def _spinner_start(self) -> None: + self._start = time.monotonic() + self._stop = threading.Event() + + def fmt() -> str: + elapsed = time.monotonic() - self._start + total = self._tokens() + tail = f" ctx {total:,} tok" if total else "" + return f"[muted]thinking... {elapsed:.1f}s{tail}[/muted]" + + self._status = self.console.status(fmt(), spinner="dots") + self._status.__enter__() + + def tick() -> None: + while not self._stop.wait(0.1): + try: + self._status.update(fmt()) + except Exception: + return + + self._thread = threading.Thread(target=tick, daemon=True) + self._thread.start() + + def _spinner_stop(self) -> None: + if self._stop is not None: + self._stop.set() + if self._thread is not None: + self._thread.join(timeout=0.5) + if self._status is not None: + try: + self._status.__exit__(None, None, None) + except Exception: + pass + self._status = None + self._stop = None + self._thread = None diff --git a/main.py b/main.py index 7baca74..d8a0556 100644 --- a/main.py +++ b/main.py @@ -17,6 +17,7 @@ from core.capabilities import ModelCapabilities from core.llm import LLM from core.loop import AgentLoop from core.session import Session +from core.sinks import ConsoleEventSink from core.skills import SkillRegistry from core.task import TaskState from tools.fs import EditTool, GlobTool, GrepTool, ReadTool, WriteTool @@ -173,7 +174,8 @@ def build_agent( rp = RunPythonTool(base_dir=tool_base) tools[rp.name] = rp - agent = AgentLoop(llm, tools, session, caps, console=console) + sink = ConsoleEventSink(console, token_counter=lambda: llm.token_counter.total) if console else None + agent = AgentLoop(llm, tools, session, caps, sink=sink) return agent, session, sid, task_state, task_dir