"""主 agent loop: ReAct 风格,LLM ↔ Tool 反复直到无 tool_call。 loop 不直接 print —— 进度通过 sink.emit(event) 上抛。Sink 决定怎么呈现 (本地 console / SSE / 日志)。事件类型见 core/sinks.py 头部说明。 LLM 调用走 `chat_stream`(流式),chunk 之间 poll cancel_check 实现快速中断。 content delta 即时 emit `text` 事件让前端打字机渲染;chunks 攒齐后用 `litellm.stream_chunk_builder` 拼回完整 response 给 tool_calls 解析 + usage 记账。 """ from __future__ import annotations import hashlib import json import time from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple from uuid import UUID import litellm from .capabilities import ModelCapabilities from .context import prepare_messages_with_stats from .executor import ExecCtx, Executor from .llm import LLM from .session import Session from .storage import record_chat_usage _CANCELLED_TOOL_PLACEHOLDER = "[cancelled by user]" class _RepeatGuard: """检测「同名同参 + 无产出」的病理性重复调用,断掉死循环。 背景(2026-06-08 DB 实测):高轮数烧 token 的 task 里,单个工具被用**完全相同的 参数**重复调用几十上百次(`document_search` 122 次、空参数 `shell{}` 51 次、反复 `glob` 同一个不存在的路径)。loop 原本对此零防护,照单全收直到撞 max_iterations。 命门是只惩罚「无产出」重复,绝不误伤正常迭代: - 同参但**每次结果不同**(改了脚本后重跑 run_python、修 bug 后重跑构建)→ 有产出, 计数清零,永不拦。 - 同参且**结果是 `[Error]` 或与之前某次一字不差**(空 `{}` 缺参、反复撞同一个错) → 无产出,累计。 累计 >= SOFT 注入软提示(模型当轮就看到);>= HARD 直接拦截不执行,逼它换路。 顺带堵掉 `_malformed_tool_calls` 的洞:大参数畸形退化成合法空 `{}` 时,executor 每次 返回同一句「缺少必填参数」→ 走 dup 分支被这同一机制拦下,无需单独特判空 `{}`。 状态活在单次 task run 内(AgentLoop 实例持有),不跨 task。 """ SOFT = 2 # 无产出重复累计 >= SOFT:在结果尾部注入软提示 HARD = 4 # 无产出重复累计 >= HARD:下一次同参调用直接拦截不执行 def __init__(self) -> None: # key -> {"hashes": set[str], "unproductive": int, "n": int, "blocked": int} self._h: Dict[str, dict] = {} @staticmethod def _key(name: str, args: Any) -> str: try: canon = json.dumps(args, sort_keys=True, ensure_ascii=False) except (TypeError, ValueError): canon = repr(args) return name + "\x00" + canon def _state(self, name: str, args: Any) -> dict: return self._h.setdefault( self._key(name, args), {"hashes": set(), "unproductive": 0, "n": 0, "blocked": 0}, ) def should_block(self, name: str, args: Any) -> bool: """执行前调用:该指纹已累计 >= HARD 次无产出重复 → 拦截(不执行)。""" st = self._h.get(self._key(name, args)) return bool(st and st["unproductive"] >= self.HARD) def register_block(self, name: str, args: Any) -> Tuple[int, int]: """记一次拦截,返回 (已执行次数 n, 累计拦截次数 blocked)。""" st = self._state(name, args) st["blocked"] += 1 return st["n"], st["blocked"] def record(self, name: str, args: Any, result: str) -> Tuple[int, bool]: """执行后调用:登记结果。返回 (该指纹「无产出重复」计数, 本次是否有净产出)。 净产出 = 非 `[Error]` 且非与历史一字不差的重复结果 —— 供全局「无进展」熔断判定: 一步里只要有一次净产出就算在推进。 """ st = self._state(name, args) h = hashlib.sha1(result.encode("utf-8", "replace")).hexdigest() is_err = result.lstrip().startswith("[Error") dup = h in st["hashes"] if st["n"] >= 1: if is_err or dup: st["unproductive"] += 1 else: # 新的非错误结果 = 有产出 → 清零,正常迭代不会被累积成拦截 st["unproductive"] = 0 st["hashes"].add(h) st["n"] += 1 return st["unproductive"], not (is_err or dup) def _extract_delta_content(chunk: Any) -> Optional[str]: """从 stream chunk 提 delta.content(文本片段)。chunk 形态 litellm ModelResponseStream: choices[0].delta.content。usage-only 收尾 chunk(没 choices / delta)返 None。 """ try: choices = getattr(chunk, "choices", None) if not choices: return None delta = getattr(choices[0], "delta", None) if delta is None: return None content = getattr(delta, "content", None) return content if content else None except Exception: return None def _malformed_tool_calls(response: Any) -> List[str]: """检出 arguments 损坏(JSON 解析不了)的 tool_call,返回 [name(len=N), ...]。 背景:deepseek-v4-flash 大参数工具调用偶发畸形 —— 流式 delta 错位把别处的内容 碎片粘到 arguments 开头(如 `].cells[1].merge(...{"path":...}`),拼回来后 JSON 解析直接失败。这种是上游瞬时抖动,不该入库污染上下文,调用方据此丢弃整轮重 roll。 只看「解析失败」;空字符串 / 合法空对象不算畸形(交给 executor 按缺参数处理)。 """ try: msg = response.choices[0].message except Exception: return [] bad: List[str] = [] for tc in (getattr(msg, "tool_calls", None) or []): raw = (getattr(tc.function, "arguments", None) or "").strip() if not raw: continue try: json.loads(raw) except (json.JSONDecodeError, ValueError): bad.append(f"{tc.function.name}(len={len(raw)})") return bad def _usage_to_dict(usage: Any) -> dict: if not usage: return {} if hasattr(usage, "model_dump"): usage = usage.model_dump() elif hasattr(usage, "dict"): usage = usage.dict() if isinstance(usage, dict): return usage return {} def _extract_usage_details(usage: Any) -> dict: """从 provider usage 提取统一 token 明细。 DeepSeek 直接给 prompt_cache_hit_tokens / prompt_cache_miss_tokens; OpenAI 风格把 cached tokens 放在 prompt_tokens_details.cached_tokens。 """ data = _usage_to_dict(usage) prompt_details = data.get("prompt_tokens_details") or {} completion_details = data.get("completion_tokens_details") or {} if not isinstance(prompt_details, dict): prompt_details = {} if not isinstance(completion_details, dict): completion_details = {} cache_hit = ( data.get("prompt_cache_hit_tokens") or prompt_details.get("cached_tokens") or 0 ) cache_miss = data.get("prompt_cache_miss_tokens") or 0 return { "tokens_in": int(data.get("prompt_tokens") or 0), "tokens_out": int(data.get("completion_tokens") or 0), "cache_hit_tokens": int(cache_hit or 0), "cache_miss_tokens": int(cache_miss or 0), "reasoning_tokens": int(completion_details.get("reasoning_tokens") or 0), } def _extract_usage(usage: Any) -> Tuple[int, int]: """从 litellm response.usage 提 (prompt_tokens, completion_tokens)。""" details = _extract_usage_details(usage) return details["tokens_in"], details["tokens_out"] class AgentLoop: def __init__( self, llm: LLM, executor: Executor, session: Session, capabilities: ModelCapabilities, user_id: UUID, working_dir: Path, sink: Optional[Any] = None, max_iterations: Optional[int] = None, cancel_check: Optional[Callable[[], bool]] = None, ) -> None: self.llm = llm self.executor = executor self.session = session self.caps = capabilities self.user_id = user_id # usage_events 写入时按 user 维度聚合 # ExecCtx 字段:user_id / task_id 已在,working_dir 单独传 —— 供 docker backend # (Step 3)拼 `--workdir /workspace/` 与临时文件命名空间使用。 self.working_dir = working_dir self.max_iterations = max_iterations or capabilities.max_iterations self.sink = sink # 协作式 cancel:web 层注入 `lambda: broker.is_cancelled(task_id)`; # CLI 路径不设(None → 永不 cancel)。check 点在 ① 每轮 LLM 前 ② stream chunk 间 # ③ tool_calls 之间。chunk 间 poll 让 cancel 延迟从「整轮 generation 时长」 # (几十秒)降到「单 chunk 间隔」(~100ms)。 self.cancel_check = cancel_check # 病理性重复调用守卫(同名同参 + 无产出),活在本次 run 内,不跨 task。 self._repeat_guard = _RepeatGuard() # 全局「无进展」计数:连续多少步整步无净产出。有净产出清零,见 run loop 熔断。 self._stall = 0 def _emit(self, event: dict) -> None: if self.sink is not None: self.sink.emit(event) def _is_cancelled(self) -> bool: return bool(self.cancel_check and self.cancel_check()) def _fill_cancelled_tool_results(self, remaining: list) -> None: """给未执行的 tool_call 补 cancelled tool result,保 LiteLLM 协议完整。 每个 assistant tool_call 必须有对应的 tool message,否则 resume 时 LLM 报错。 """ for tc in remaining: self.session.append({ "role": "tool", "tool_call_id": tc.id, "content": _CANCELLED_TOOL_PLACEHOLDER, }) def run(self, user_message: str) -> str: self.session.append({"role": "user", "content": user_message}) for _ in range(self.max_iterations): if self._is_cancelled(): self._emit({"type": "cancelled"}) return "[cancelled]" start = time.monotonic() response, cancelled_mid_stream = self._stream_llm() elapsed = time.monotonic() - start if cancelled_mid_stream: # 流中途收到 cancel:已接收的 chunk 丢弃,不入库不记账(部分 assistant # 内容也不持久化,下次 resume 上下文干净)。response 可能是 None。 self._emit({"type": "cancelled"}) return "[cancelled]" msg = response.choices[0].message asst_msg_id = self.session.append(msg) usage_details = _extract_usage_details(getattr(response, "usage", None)) pt, ct = usage_details["tokens_in"], usage_details["tokens_out"] # 记账(0006):一行 usage_event + 回填 messages.tokens_in/out + model_profile。 # 任何失败都吞掉(litellm cost map miss / DB 异常),不阻塞主 loop; # message 仍在 session/DB 里,后续重启不影响。 model_profile = f"{self.caps.family}.{self.caps.variant}" try: record_chat_usage( task_id=self.session.task_id, user_id=self.user_id, message_id=asst_msg_id, model_profile=model_profile, prompt_tokens=pt, completion_tokens=ct, input_cny_per_mtoken=self.caps.input_cny_per_mtoken, output_cny_per_mtoken=self.caps.output_cny_per_mtoken, cache_hit_tokens=usage_details["cache_hit_tokens"], cache_hit_cny_per_mtoken=self.caps.cache_hit_cny_per_mtoken, extra_units={ k: v for k, v in usage_details.items() if k not in ("tokens_in", "tokens_out") and v }, response=response, ) except Exception as e: self._emit({"type": "warn", "msg": f"record_usage failed: {type(e).__name__}: {e}"}) self._emit({ "type": "llm_end", "prompt_tokens": pt, "completion_tokens": ct, "cache_hit_tokens": usage_details["cache_hit_tokens"], "cache_miss_tokens": usage_details["cache_miss_tokens"], "elapsed": elapsed, }) tool_calls = getattr(msg, "tool_calls", None) or [] # content 已通过 stream 流式 emit 过 delta,这里不再 emit 整段 text 事件。 if not tool_calls: self._emit({"type": "done"}) return getattr(msg, "content", None) or "" step_productive = False for i, tc in enumerate(tool_calls): if self._is_cancelled(): self._fill_cancelled_tool_results(tool_calls[i:]) self._emit({"type": "cancelled"}) return "[cancelled]" result, productive = self._execute_tool_call(tc) step_productive = step_productive or productive self.session.append( { "role": "tool", "tool_call_id": tc.id, "name": tc.function.name, "content": result, } ) # 全局「无进展」熔断:整步所有 tool 都无净产出(全是 [Error]/重复/被拦)→ 累计; # 连续 _STALL_LIMIT 步空转就主动停,别烧到 max_iterations。一旦某步有净产出立即清零。 if step_productive: self._stall = 0 else: self._stall += 1 if self._stall >= self._STALL_LIMIT: self._emit({ "type": "warn", "msg": ( f"连续 {self._stall} 步无净产出(全是报错/重复/被拦),已自动停止以免空烧。" "换思路或补充信息后回复「继续」可重试。" ), }) self._emit({"type": "done"}) return "[stopped: no progress]" # 跑满 backstop:不是出错,是单轮自主步数到顶。明确提示可续跑,别静默停。 self._emit({ "type": "warn", "msg": ( f"已达单轮步数上限({self.max_iterations} 步),任务可能尚未完成。" "回复「继续」可接着跑。" ), }) self._emit({"type": "done"}) return "[reached max iterations]" # 工具参数畸形时,丢弃整轮重 roll 的最大次数;第 _MAX_MALFORMED_RETRIES 次(即最后 # 一次)降级走非流式(provider 服务端拼 tool_calls,绕开流式 delta 错位)。实测大参数 # 工具调用偶发连续两次畸形,故留够重试余量。 _MAX_MALFORMED_RETRIES = 3 # 连续多少步「整步无净产出」(全是 [Error]/重复结果/被拦)就判定空转、主动停。 # 比 max_iterations 早得多掐死死循环(第 8 步 vs 第 120 步),同时放正经长任务自由跑。 # 保守取 8:几乎不误伤"连踩几个错再纠正"的正常波动,配 _RepeatGuard 逐指纹 HARD=4 双保险。 _STALL_LIMIT = 8 # 上下文压缩门槛:历史体量未到 reliable_context 的此比例前不压缩 —— 短任务不丢旧工具细节, # 且 prompt 前缀逐轮字节一致、DeepSeek 前缀缓存全程命中。50% 留足上下文安全垫。 _COMPACT_CONTEXT_RATIO = 0.5 # chars↔tokens 粗折算(CJK+代码+json 混合保守按 ~2.5 char/token);压缩是成本/安全优化、 # 非正确性关键,估算粗糙无妨。reliable_context(tokens) × ratio × 此值 = 触发的 char 阈值。 _CHARS_PER_TOKEN = 2.5 def _stream_llm(self) -> Tuple[Optional[Any], bool]: """拉一轮 LLM 并保证返回的 tool_call arguments 可解析。 返回 (response, cancelled_mid_stream): - 正常完结 → (response, False);response shape 与非流式 completion() 等价 (choices[0].message + usage) - 中途 cancel → (None, True);已收 chunk 丢弃,内层 generator 在 finally 关闭底层连接 畸形重试:deepseek-v4-flash 大参数工具调用偶发把内容碎片错位粘进 arguments,拼回 后 JSON 解析失败。这种损坏一旦入库会被每轮重发、诱导模型继续学坏(投毒级联)。 故拼回后先校验 tool_call arguments 能否解析:不能 → 丢弃整轮(不 append/不记账)重 roll;连续失败到最后一次降级非流式兜底。重试消耗的 token 不单独记账(罕见路径)。 """ # 上下文压力门槛按当前模型 reliable_context 折算:体量未到阈值前不压缩(缓存全暖 + 不丢信息)。 compact_threshold = int( self.caps.reliable_context * self._COMPACT_CONTEXT_RATIO * self._CHARS_PER_TOKEN ) llm_messages, context_stats = prepare_messages_with_stats( self.session.messages, compact_threshold_chars=compact_threshold, ) self._emit({ "type": "llm_start", **{f"context_{k}": v for k, v in context_stats.items()}, }) for attempt in range(self._MAX_MALFORMED_RETRIES + 1): use_nonstream = attempt == self._MAX_MALFORMED_RETRIES if use_nonstream: response = self._nonstream_once(llm_messages) else: response, cancelled = self._collect_stream_once(llm_messages) if cancelled: return None, True bad = _malformed_tool_calls(response) if not bad: return response, False self._emit({ "type": "warn", "msg": f"工具调用参数损坏 {bad},丢弃本轮重试 ({attempt + 1}/{self._MAX_MALFORMED_RETRIES})", }) # 非流式兜底仍畸形(理论极罕见):交还给 _execute_tool_call 的 invalid-JSON 分支 # 优雅返错给模型,而非在此死循环。 return response, False def _collect_stream_once(self, llm_messages: List[dict]) -> Tuple[Optional[Any], bool]: """跑一次流式:攒 chunk + content delta 即时 emit,拼回完整 response。 返回 (response, cancelled_mid_stream)。""" chunks: List[Any] = [] stream = self.llm.chat_stream( messages=llm_messages, tools=self.executor.schemas(), reasoning_effort=self.caps.default_reasoning_effort or None, ) cancelled = False try: for chunk in stream: if self._is_cancelled(): cancelled = True break chunks.append(chunk) # delta.content 即时 emit 给前端打字机渲染;tool_call delta 不实时发 # (拼接散在多 chunk 跨 frame 难看,等拼回后整条 tool_call 事件由 # _execute_tool_call 时机发更直观)。 delta_text = _extract_delta_content(chunk) if delta_text: self._emit({"type": "text", "delta": delta_text}) finally: # generator 提前 break 时 GeneratorExit 触发 chat_stream finally → close 底层连接 stream.close() if cancelled: return None, True # 用 litellm 官方 helper 拼回完整 response(包括 tool_calls 拼接 + usage)。 # messages 参数仅用于失败时回填 prompt token 估算,正常路径 stream_options.include_usage # 已让最后一个 chunk 带准确 usage。 response = litellm.stream_chunk_builder(chunks, messages=llm_messages) return response, False def _nonstream_once(self, llm_messages: List[dict]) -> Any: """非流式兜底:provider 服务端一次性拼好 tool_calls,绕开流式 delta 错位。 没有 chunk 级 cancel,content 也拿不到 delta —— 整段 text 一次性补 emit。""" response = self.llm.chat( messages=llm_messages, tools=self.executor.schemas(), reasoning_effort=self.caps.default_reasoning_effort or None, ) try: text = getattr(response.choices[0].message, "content", None) if text: self._emit({"type": "text", "delta": text}) except Exception: pass return response def _execute_tool_call(self, tc: Any) -> Tuple[str, bool]: """执行一次 tool_call,返回 (结果文本, 本次是否有净产出)。 净产出供 run loop 的全局「无进展」熔断判定。""" name = tc.function.name raw_args = tc.function.arguments or "{}" try: args = json.loads(raw_args) except json.JSONDecodeError as e: return f"[Error] invalid JSON arguments for {name}: {e}", False args_preview = json.dumps(args, ensure_ascii=False) if len(args_preview) > 200: args_preview = args_preview[:200] + "..." self._emit({ "type": "tool_call", "name": name, "args": args, "args_preview": args_preview, }) # 病理性重复拦截:同参已累计 HARD 次无产出重复 → 不执行,回硬停消息逼模型换路。 if self._repeat_guard.should_block(name, args): n, blocked = self._repeat_guard.register_block(name, args) result = ( f"[已拦截重复调用] {name} 用完全相同的参数已调用 {n} 次且结果始终未变,本次未执行。" "这通常意味着思路卡死:① 换不同的参数或方法;② 读一下相关文件/报错重新定位;" "③ 若确实推进不了,停下来如实告诉用户卡在哪、缺什么。不要再用相同参数重试。" ) self._emit({"type": "warn", "msg": f"拦截重复调用 {name}(同参第 {n} 次、结果未变)"}) self._emit({ "type": "tool_result", "name": name, "result": result, "preview": result, "truncated": False, }) return result, False ctx = ExecCtx( user_id=self.user_id, task_id=self.session.task_id, working_dir=self.working_dir, cancel_check=self.cancel_check, ) result = self.executor.call_tool(name, args, ctx).content # 控制返回给模型的 tool 结果体量,避免炸 context MAX_LEN = 16_000 truncated = False if len(result) > MAX_LEN: result = result[:MAX_LEN] + f"\n[... truncated, {len(result) - MAX_LEN} chars ...]" truncated = True # 登记结果做重复检测(用截断后、未加提示的原始结果算指纹,保证同输出哈希一致)。 unproductive, productive = self._repeat_guard.record(name, args, result) if unproductive >= _RepeatGuard.SOFT: if unproductive == _RepeatGuard.SOFT: self._emit({"type": "warn", "msg": f"{name} 同参重复且结果未变({unproductive} 次),已提示模型换路"}) result += ( f"\n\n[重复调用警告] 你已用完全相同的参数调用 {name} {unproductive + 1} 次、结果没有变化。" "再原样重调不会有新结果——换参数/换工具/换思路,或停下来向用户说明卡在哪。" ) preview = result if len(result) < 400 else result[:400] + "..." self._emit({ "type": "tool_result", "name": name, "result": result, "preview": preview, "truncated": truncated, }) return result, productive