diff --git a/DESIGN.md b/DESIGN.md index bf5249d..3398a38 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -65,7 +65,7 @@ ReAct:LLM → 若有 tool_calls 就执行 → 结果塞回消息 → 再调 LLM - 事件通过 `sink.emit` 流式发布(§7 A,SSE 桥);content delta 在 stream chunk 到达即时 emit `text` 事件,前端打字机渲染 - **LLM 调用走 `LLM.chat_stream`(litellm `stream=True`)**:chunks 攒齐后用 `litellm.stream_chunk_builder` 拼回完整 response 给 tool_calls 解析 + usage 记账;`stream_options.include_usage=True` 让最后一个 chunk 带 usage - `cancel_check: Optional[Callable[[], bool]]` 协作式 cancel,每轮 LLM 前 + **stream chunk 之间** + tool_calls 之间 poll;chunk 间 poll 让 cancel 延迟从「整轮 generation 时长」(几十秒)降到「单 chunk 间隔」(~100ms);中途 cancel 时已收 chunk 丢弃,assistant 半截内容不入库(resume 上下文干净);命中给未执行 tool_call 补 `[cancelled by user]` 保 LiteLLM 协议 -- `max_iterations` 从 capabilities 读 +- **停机判据 = 解耦「跑了几步」与「是否在推进」**(2026-06-10):用户感知的"轮"是来回对话次数,一个 run 内模型自主连调 N 次 tool **概念上仍是 1 轮**,该放它跑完;真正要掐的是"空转"。故 `max_iterations`(从 capabilities/yaml 读,flash 120 / pro 150)降级为**纯安全 backstop**,不再当"轮预算"砍正经长任务;主防护是两道**进展信号**:① `_RepeatGuard` 逐指纹"同名同参+无产出(`[Error]`/结果一字不差重复)"累计,SOFT=2 注提示、HARD=4 拦截;② run 级全局 `_stall`——整步所有 tool 都无净产出则 +1、任一净产出清零,连续 `_STALL_LIMIT=8` 步主动停(`[stopped: no progress]`),比撞 backstop 早得多掐死循环。撞 backstop / 空转停都 emit 明确"回复『继续』可续跑"提示,不静默停。**取舍**:step-count 是"不收敛"的粗糙代理,正经任务 80 步和死循环 5 步被一刀切同等对待是错的;进展信号才对症。新增成本=run loop 一个计数器,死循环兜底反而更早(8 步 vs 120 步)。 ### 3.2 Model Profile(`core/capabilities.py` + `config/models/*.yaml`) 每模型一份 yaml,agent 行为按档案动态调整。新模型 5 分钟接入,不改代码。 diff --git a/PROGRESS.md b/PROGRESS.md index 303ce0f..dd9b79f 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -23,6 +23,7 @@ ### 2026-06-10 +- **单轮停机判据从"步数"解耦为"是否在推进":`max_iterations` 升为纯 backstop + 新增全局「无进展」熔断 + 撞顶明确提示**:DB 诊断 task `b27466a0`(智能体介绍 PPT)所谓"中途断了"——查实=该 run 跑满 `max_iterations`(flash 旧值 50)后 `return "[reached max iterations]"` 干净停下、留一条悬空 tool 结果,用户离开 25min 回来打"继续"才续完(`run_status=idle/run_error=None`,非崩溃);"步骤太长"=少数轮 DeepSeek API 延迟 126-185s,工具本身全 <13s;顺带实测该 task DeepSeek 前缀缓存命中 92-94%,**上下文压缩对缓存几乎无害**(压缩函数确定性→旧消息压缩态稳定,只滑动边界这一处断,每轮 miss 几十~几百 token)。**洞察**:`max_iterations` 把"用户感知的轮(来回对话)"和"一轮内自主工作步数"混在一个旋钮上——自主 tool 链概念上是 1 轮,该松;真正要掐的是"空转"。落地:① yaml `max_iterations` flash 50→120 / pro 100→150,dataclass 默认 50→120,定位为安全兜底非"轮"预算;② `_RepeatGuard.record` 多返一个 `productive`(净产出=非 `[Error]` 且非一字不差重复);③ `_execute_tool_call` 三个返回点都带 `productive`(invalid-JSON/被拦=False);④ run loop 累计 `self._stall`——整步所有 tool 都无净产出则 +1、任一净产出清零,连续 `_STALL_LIMIT=8` 步空转主动停(`[stopped: no progress]`),比撞 120 早得多掐死循环,配 `_RepeatGuard` 逐指纹 HARD=4 双保险;⑤ 撞 backstop / 熔断都 emit 明确"回复『继续』可接着跑"提示,不再静默停。`tests/test_loop_repeat_guard.py` 更新 record 解包调用 + 加 `productive` 信号用例(17 例过,全量 103 过)。 - **`systemctl restart` 优雅 drain in-flight run(单实例止血,不再误标 error)**:此前 restart 硬杀 BG run 线程,下次启动 reaper 把所有 `running/cancelling` 标 `error: server restarted before run finished` —— 用户一多就不能随便重启。落地纯进程内、**零 DB 改动**:① lifespan 加 `app.state.draining`(asyncio.Event)+ `app.state.inflight`(`{asyncio.Task: task_id}`,顺手修 `create_task` 不留引用可能被 GC 的旧坑);② POST `/messages` 起 run 时登记+done 回调自摘除,draining 置位时返 503+`Retry-After`;③ lifespan `finally` 先置 draining 拒新 run,`asyncio.wait(inflight, drain_timeout)` 等收尾,超时的 `broker.request_cancel` 转协作式 cancel(下个 chunk 间隙退、标 idle 不报 error),再过 `cancel_grace` 仍没退的留给 SIGKILL(最坏退化=改前)。④ `main.py` uvicorn 加 `timeout_graceful_shutdown=5`(否则长连 SSE 无限挡在 drain 前);⑤ `config/agent.yaml` 加 `shutdown` 段(drain_timeout 30s / cancel_grace 15s,超时转 cancel = 用户按停止可重发,故偏短);⑥ dev SPA `chat.js` 发送包退避重试(503 背压 + 交接拒连 TypeError 都重试 ~26s,显"服务更新中",耗尽贴友好提示)。**部署强耦合**:unit `TimeoutStopSec` 从 10 提到 90(必须 > drain+grace+sandbox 清扫余量,否则 SIGKILL 砍掉 drain),已写进 RUN.md unit + 故障兜底。B 蓝绿(零 503 窗口)留作触发信号后再做,前置是 instance-aware reaper(§7.8)。 ### 2026-06-09 diff --git a/RUN.md b/RUN.md index 9f8dc9c..3cc9768 100644 --- a/RUN.md +++ b/RUN.md @@ -720,6 +720,8 @@ sudo xfs_quota -x -c "limit -p bhard=10g zcbot_" /opt | dev.html 显示 "load failed" 立刻回登录页 | token 过期或 JWT_SECRET 服务端变了。已自动跳登录页,按上次 tab 重登 | | dev.html 顶栏出现"连接断开,重连中…(N/3)" | SSE 流被切(`--reload` 重启 / nginx 切换 / 网络抖)。客户端自动重连,1s/2s/4s 退避;新进程已 reaper 标 error 则立即收 done + 卡片末尾"请重发"提示;若服务端还活着会继续看后续 delta(断开期间的丢失,broker 不持久化) | | 对话里偶发 `[Error] invalid JSON arguments` / `[Error] bad arguments to write: ... missing required` | deepseek-v4-flash **大参数工具调用(大 write/run_python,≈7K+ 字符)偶发把内容碎片错位粘进 arguments 或退化成空 `{}`**(上游流式抖动)。`core/loop.py` 已自动兜底:畸形参数丢弃整轮重 roll(≤3 次)+ 最后一次降级非流式。仍频繁撞 → 引导模型**把大文件拆小 / 用 run_python 分块写**,或换 `deepseek_v4.pro`。前端看到 warn「工具调用参数损坏…重试」即此机制在生效 | +| 长任务跑到一半停下、提示「已达单轮步数上限…回复『继续』可接着跑」 | **预期行为非崩溃**:单个 run 自主步数到 backstop(`config/models/*.yaml` 的 `max_iterations`,flash 120 / pro 150)就主动停,回 `[reached max iterations]`。直接回复「继续」即续跑。经常撞顶=任务确实大,可调高对应 variant 的 `max_iterations` 或换 pro | +| 任务停下提示「连续 N 步无净产出…已自动停止」(回 `[stopped: no progress]`) | **空转熔断**:连续 `_STALL_LIMIT`(loop.py,默认 8)步所有 tool 都只返 `[Error]`/重复结果/被拦 = 没在推进,主动停以免空烧 token。说明模型卡死在某个错上——**换思路 / 补充信息再回复「继续」**,别原样重发(会再次撞同一墙) | --- diff --git a/config/models/deepseek_v4.yaml b/config/models/deepseek_v4.yaml index 9d372f1..df49b3b 100644 --- a/config/models/deepseek_v4.yaml +++ b/config/models/deepseek_v4.yaml @@ -18,7 +18,7 @@ variants: default_reasoning_effort: "" code_quality: good enable_run_python: true - max_iterations: 50 + max_iterations: 120 # backstop 兜底,非"轮"预算;真正的空转防护是 loop 的无进展熔断 + _RepeatGuard optimal_temperature: 0.3 prompt_caching: false extended_thinking: false @@ -41,7 +41,7 @@ variants: default_reasoning_effort: medium code_quality: excellent enable_run_python: true - max_iterations: 100 + max_iterations: 150 # backstop 兜底,非"轮"预算;真正的空转防护是 loop 的无进展熔断 + _RepeatGuard optimal_temperature: 0.2 prompt_caching: false extended_thinking: false diff --git a/core/capabilities.py b/core/capabilities.py index e20adb8..cd578dc 100644 --- a/core/capabilities.py +++ b/core/capabilities.py @@ -34,7 +34,7 @@ class ModelCapabilities: enable_run_python: bool = False # 工程参数 - max_iterations: int = 50 + max_iterations: int = 120 # 单轮自主步数 backstop;空转防护见 loop 无进展熔断,不靠这个砍正经长任务 optimal_temperature: float = 0.3 # provider 特性 diff --git a/core/loop.py b/core/loop.py index 1bc062e..152443a 100644 --- a/core/loop.py +++ b/core/loop.py @@ -82,8 +82,12 @@ class _RepeatGuard: st["blocked"] += 1 return st["n"], st["blocked"] - def record(self, name: str, args: Any, result: str) -> int: - """执行后调用:登记结果,返回该指纹当前的「无产出重复」计数。""" + def record(self, name: str, args: Any, result: str) -> Tuple[int, bool]: + """执行后调用:登记结果。返回 (该指纹「无产出重复」计数, 本次是否有净产出)。 + + 净产出 = 非 `[Error]` 且非与历史一字不差的重复结果 —— 供全局「无进展」熔断判定: + 一步里只要有一次净产出就算在推进。 + """ st = self._state(name, args) h = hashlib.sha1(result.encode("utf-8", "replace")).hexdigest() is_err = result.lstrip().startswith("[Error") @@ -96,7 +100,7 @@ class _RepeatGuard: st["unproductive"] = 0 st["hashes"].add(h) st["n"] += 1 - return st["unproductive"] + return st["unproductive"], not (is_err or dup) def _extract_delta_content(chunk: Any) -> Optional[str]: @@ -218,6 +222,8 @@ class AgentLoop: self.cancel_check = cancel_check # 病理性重复调用守卫(同名同参 + 无产出),活在本次 run 内,不跨 task。 self._repeat_guard = _RepeatGuard() + # 全局「无进展」计数:连续多少步整步无净产出。有净产出清零,见 run loop 熔断。 + self._stall = 0 def _emit(self, event: dict) -> None: if self.sink is not None: @@ -300,12 +306,14 @@ class AgentLoop: self._emit({"type": "done"}) return getattr(msg, "content", None) or "" + step_productive = False for i, tc in enumerate(tool_calls): if self._is_cancelled(): self._fill_cancelled_tool_results(tool_calls[i:]) self._emit({"type": "cancelled"}) return "[cancelled]" - result = self._execute_tool_call(tc) + result, productive = self._execute_tool_call(tc) + step_productive = step_productive or productive self.session.append( { "role": "tool", @@ -315,6 +323,31 @@ class AgentLoop: } ) + # 全局「无进展」熔断:整步所有 tool 都无净产出(全是 [Error]/重复/被拦)→ 累计; + # 连续 _STALL_LIMIT 步空转就主动停,别烧到 max_iterations。一旦某步有净产出立即清零。 + if step_productive: + self._stall = 0 + else: + self._stall += 1 + if self._stall >= self._STALL_LIMIT: + self._emit({ + "type": "warn", + "msg": ( + f"连续 {self._stall} 步无净产出(全是报错/重复/被拦),已自动停止以免空烧。" + "换思路或补充信息后回复「继续」可重试。" + ), + }) + self._emit({"type": "done"}) + return "[stopped: no progress]" + + # 跑满 backstop:不是出错,是单轮自主步数到顶。明确提示可续跑,别静默停。 + self._emit({ + "type": "warn", + "msg": ( + f"已达单轮步数上限({self.max_iterations} 步),任务可能尚未完成。" + "回复「继续」可接着跑。" + ), + }) self._emit({"type": "done"}) return "[reached max iterations]" @@ -323,6 +356,11 @@ class AgentLoop: # 工具调用偶发连续两次畸形,故留够重试余量。 _MAX_MALFORMED_RETRIES = 3 + # 连续多少步「整步无净产出」(全是 [Error]/重复结果/被拦)就判定空转、主动停。 + # 比 max_iterations 早得多掐死死循环(第 8 步 vs 第 120 步),同时放正经长任务自由跑。 + # 保守取 8:几乎不误伤"连踩几个错再纠正"的正常波动,配 _RepeatGuard 逐指纹 HARD=4 双保险。 + _STALL_LIMIT = 8 + def _stream_llm(self) -> Tuple[Optional[Any], bool]: """拉一轮 LLM 并保证返回的 tool_call arguments 可解析。 @@ -412,13 +450,15 @@ class AgentLoop: pass return response - def _execute_tool_call(self, tc: Any) -> str: + def _execute_tool_call(self, tc: Any) -> Tuple[str, bool]: + """执行一次 tool_call,返回 (结果文本, 本次是否有净产出)。 + 净产出供 run loop 的全局「无进展」熔断判定。""" name = tc.function.name raw_args = tc.function.arguments or "{}" try: args = json.loads(raw_args) except json.JSONDecodeError as e: - return f"[Error] invalid JSON arguments for {name}: {e}" + return f"[Error] invalid JSON arguments for {name}: {e}", False args_preview = json.dumps(args, ensure_ascii=False) if len(args_preview) > 200: @@ -446,7 +486,7 @@ class AgentLoop: "preview": result, "truncated": False, }) - return result + return result, False ctx = ExecCtx( user_id=self.user_id, @@ -464,7 +504,7 @@ class AgentLoop: truncated = True # 登记结果做重复检测(用截断后、未加提示的原始结果算指纹,保证同输出哈希一致)。 - unproductive = self._repeat_guard.record(name, args, result) + unproductive, productive = self._repeat_guard.record(name, args, result) if unproductive >= _RepeatGuard.SOFT: if unproductive == _RepeatGuard.SOFT: self._emit({"type": "warn", "msg": f"{name} 同参重复且结果未变({unproductive} 次),已提示模型换路"}) @@ -481,4 +521,4 @@ class AgentLoop: "preview": preview, "truncated": truncated, }) - return result + return result, productive diff --git a/tests/test_loop_repeat_guard.py b/tests/test_loop_repeat_guard.py index 2aaca5c..71c8802 100644 --- a/tests/test_loop_repeat_guard.py +++ b/tests/test_loop_repeat_guard.py @@ -19,7 +19,7 @@ def _simulate(guard: _RepeatGuard, name: str, args, results: list[str]) -> list[ guard.register_block(name, args) out.append("BLOCK") continue - unprod = guard.record(name, args, r) + unprod, _productive = guard.record(name, args, r) out.append(f"exec(unprod={unprod})") return out @@ -69,11 +69,27 @@ class TestRepeatGuard(unittest.TestCase): g = _RepeatGuard() unprods = [] for _ in range(_RepeatGuard.SOFT + 1): - unprods.append(g.record("document_search", {"queries": ["x"]}, "(no documents found)")) + unprods.append(g.record("document_search", {"queries": ["x"]}, "(no documents found)")[0]) # 累计达到 SOFT(此时应注入软提示),但还没到 HARD 拦截 self.assertGreaterEqual(max(unprods), _RepeatGuard.SOFT) self.assertFalse(g.should_block("document_search", {"queries": ["x"]})) + def test_record_returns_productive_signal(self): + """record 第二个返回值喂全局无进展熔断:新非错结果=有产出,[Error]/重复=无产出。""" + g = _RepeatGuard() + # 首个新结果 → 有产出 + _, p1 = g.record("read", {"path": "a"}, "[stdout] hello") + self.assertTrue(p1) + # 一字不差重复同一结果 → 无产出 + _, p2 = g.record("read", {"path": "a"}, "[stdout] hello") + self.assertFalse(p2) + # 换出新结果 → 又有产出 + _, p3 = g.record("read", {"path": "a"}, "[stdout] world") + self.assertTrue(p3) + # [Error] 开头 → 无产出(哪怕是该指纹首次) + _, p4 = g.record("glob", {"path": "/nope"}, "[Error] not found") + self.assertFalse(p4) + def test_distinct_args_tracked_separately(self): g = _RepeatGuard() _simulate(g, "document_search", {"queries": ["a"]}, ["[Error] e"] * 8)