feat(loop): 停机判据从"步数"解耦为"是否在推进"

max_iterations 降级为纯安全 backstop(flash 50→120 / pro 100→150),
不再当"轮预算"砍正经长任务;真正的空转防护改用进展信号:

- _RepeatGuard.record 多返 productive(净产出=非[Error]且非一字不差重复)
- _execute_tool_call 三个返回点都带 productive
- run loop 全局 _stall:整步全无净产出+1、任一净产出清零,
  连续 _STALL_LIMIT=8 步主动停([stopped: no progress]),
  比撞 backstop 早得多掐死循环,配逐指纹 HARD=4 双保险
- 撞 backstop / 空转停都 emit"回复继续可续跑"提示,不再静默停

诊断依据:task b27466a0"中途断了"实为撞 max_iterations=50 后干净停、
用户离开 25min 回来打"继续"续完(非崩溃);"步骤太长"=DeepSeek API
延迟 126-185s 而非工具(全<13s)。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-06-10 13:22:02 +08:00
parent b3aea7880f
commit 0df9e5fe3f
7 changed files with 74 additions and 15 deletions

View File

@ -65,7 +65,7 @@ ReAct:LLM → 若有 tool_calls 就执行 → 结果塞回消息 → 再调 LLM
- 事件通过 `sink.emit` 流式发布(§7 A,SSE 桥);content delta 在 stream chunk 到达即时 emit `text` 事件,前端打字机渲染 - 事件通过 `sink.emit` 流式发布(§7 A,SSE 桥);content delta 在 stream chunk 到达即时 emit `text` 事件,前端打字机渲染
- **LLM 调用走 `LLM.chat_stream`(litellm `stream=True`)**:chunks 攒齐后用 `litellm.stream_chunk_builder` 拼回完整 response 给 tool_calls 解析 + usage 记账;`stream_options.include_usage=True` 让最后一个 chunk 带 usage - **LLM 调用走 `LLM.chat_stream`(litellm `stream=True`)**:chunks 攒齐后用 `litellm.stream_chunk_builder` 拼回完整 response 给 tool_calls 解析 + usage 记账;`stream_options.include_usage=True` 让最后一个 chunk 带 usage
- `cancel_check: Optional[Callable[[], bool]]` 协作式 cancel,每轮 LLM 前 + **stream chunk 之间** + tool_calls 之间 poll;chunk 间 poll 让 cancel 延迟从「整轮 generation 时长」(几十秒)降到「单 chunk 间隔」(~100ms);中途 cancel 时已收 chunk 丢弃,assistant 半截内容不入库(resume 上下文干净);命中给未执行 tool_call 补 `[cancelled by user]` 保 LiteLLM 协议 - `cancel_check: Optional[Callable[[], bool]]` 协作式 cancel,每轮 LLM 前 + **stream chunk 之间** + tool_calls 之间 poll;chunk 间 poll 让 cancel 延迟从「整轮 generation 时长」(几十秒)降到「单 chunk 间隔」(~100ms);中途 cancel 时已收 chunk 丢弃,assistant 半截内容不入库(resume 上下文干净);命中给未执行 tool_call 补 `[cancelled by user]` 保 LiteLLM 协议
- `max_iterations` 从 capabilities 读 - **停机判据 = 解耦「跑了几步」与「是否在推进」**(2026-06-10):用户感知的"轮"是来回对话次数,一个 run 内模型自主连调 N 次 tool **概念上仍是 1 轮**,该放它跑完;真正要掐的是"空转"。故 `max_iterations`(从 capabilities/yaml 读,flash 120 / pro 150)降级为**纯安全 backstop**,不再当"轮预算"砍正经长任务;主防护是两道**进展信号**:① `_RepeatGuard` 逐指纹"同名同参+无产出(`[Error]`/结果一字不差重复)"累计,SOFT=2 注提示、HARD=4 拦截;② run 级全局 `_stall`——整步所有 tool 都无净产出则 +1、任一净产出清零,连续 `_STALL_LIMIT=8` 步主动停(`[stopped: no progress]`),比撞 backstop 早得多掐死循环。撞 backstop / 空转停都 emit 明确"回复『继续』可续跑"提示,不静默停。**取舍**:step-count 是"不收敛"的粗糙代理,正经任务 80 步和死循环 5 步被一刀切同等对待是错的;进展信号才对症。新增成本=run loop 一个计数器,死循环兜底反而更早(8 步 vs 120 步)。
### 3.2 Model Profile(`core/capabilities.py` + `config/models/*.yaml`) ### 3.2 Model Profile(`core/capabilities.py` + `config/models/*.yaml`)
每模型一份 yaml,agent 行为按档案动态调整。新模型 5 分钟接入,不改代码。 每模型一份 yaml,agent 行为按档案动态调整。新模型 5 分钟接入,不改代码。

View File

@ -23,6 +23,7 @@
### 2026-06-10 ### 2026-06-10
- **单轮停机判据从"步数"解耦为"是否在推进":`max_iterations` 升为纯 backstop + 新增全局「无进展」熔断 + 撞顶明确提示**:DB 诊断 task `b27466a0`(智能体介绍 PPT)所谓"中途断了"——查实=该 run 跑满 `max_iterations`(flash 旧值 50)后 `return "[reached max iterations]"` 干净停下、留一条悬空 tool 结果,用户离开 25min 回来打"继续"才续完(`run_status=idle/run_error=None`,非崩溃);"步骤太长"=少数轮 DeepSeek API 延迟 126-185s,工具本身全 <13s;顺带实测该 task DeepSeek 前缀缓存命中 92-94%,**上下文压缩对缓存几乎无害**(压缩函数确定性旧消息压缩态稳定,只滑动边界这一处断,每轮 miss 几十~几百 token)。**洞察**:`max_iterations` "用户感知的轮(来回对话)""一轮内自主工作步数"混在一个旋钮上——自主 tool 链概念上是 1 ,该松;真正要掐的是"空转"。落地: yaml `max_iterations` flash 50120 / pro 100150,dataclass 默认 50120,定位为安全兜底非""预算;② `_RepeatGuard.record` 多返一个 `productive`(净产出= `[Error]` 且非一字不差重复);③ `_execute_tool_call` 三个返回点都带 `productive`(invalid-JSON/被拦=False);④ run loop 累计 `self._stall`——整步所有 tool 都无净产出则 +1任一净产出清零,连续 `_STALL_LIMIT=8` 步空转主动停(`[stopped: no progress]`),比撞 120 早得多掐死循环, `_RepeatGuard` 逐指纹 HARD=4 双保险;⑤ backstop / 熔断都 emit 明确"回复继续可接着跑"提示,不再静默停。`tests/test_loop_repeat_guard.py` 更新 record 解包调用 + `productive` 信号用例(17 例过,全量 103 )。
- **`systemctl restart` 优雅 drain in-flight run(单实例止血,不再误标 error)**:此前 restart 硬杀 BG run 线程,下次启动 reaper 把所有 `running/cancelling``error: server restarted before run finished` —— 用户一多就不能随便重启。落地纯进程内、**零 DB 改动**:① lifespan 加 `app.state.draining`(asyncio.Event)+ `app.state.inflight`(`{asyncio.Task: task_id}`,顺手修 `create_task` 不留引用可能被 GC 的旧坑);② POST `/messages` 起 run 时登记+done 回调自摘除,draining 置位时返 503+`Retry-After`;③ lifespan `finally` 先置 draining 拒新 run,`asyncio.wait(inflight, drain_timeout)` 等收尾,超时的 `broker.request_cancel` 转协作式 cancel(下个 chunk 间隙退、标 idle 不报 error),再过 `cancel_grace` 仍没退的留给 SIGKILL(最坏退化=改前)。④ `main.py` uvicorn 加 `timeout_graceful_shutdown=5`(否则长连 SSE 无限挡在 drain 前);⑤ `config/agent.yaml``shutdown` 段(drain_timeout 30s / cancel_grace 15s,超时转 cancel = 用户按停止可重发,故偏短);⑥ dev SPA `chat.js` 发送包退避重试(503 背压 + 交接拒连 TypeError 都重试 ~26s,显"服务更新中",耗尽贴友好提示)。**部署强耦合**:unit `TimeoutStopSec` 从 10 提到 90(必须 > drain+grace+sandbox 清扫余量,否则 SIGKILL 砍掉 drain),已写进 RUN.md unit + 故障兜底。B 蓝绿(零 503 窗口)留作触发信号后再做,前置是 instance-aware reaper(§7.8)。 - **`systemctl restart` 优雅 drain in-flight run(单实例止血,不再误标 error)**:此前 restart 硬杀 BG run 线程,下次启动 reaper 把所有 `running/cancelling``error: server restarted before run finished` —— 用户一多就不能随便重启。落地纯进程内、**零 DB 改动**:① lifespan 加 `app.state.draining`(asyncio.Event)+ `app.state.inflight`(`{asyncio.Task: task_id}`,顺手修 `create_task` 不留引用可能被 GC 的旧坑);② POST `/messages` 起 run 时登记+done 回调自摘除,draining 置位时返 503+`Retry-After`;③ lifespan `finally` 先置 draining 拒新 run,`asyncio.wait(inflight, drain_timeout)` 等收尾,超时的 `broker.request_cancel` 转协作式 cancel(下个 chunk 间隙退、标 idle 不报 error),再过 `cancel_grace` 仍没退的留给 SIGKILL(最坏退化=改前)。④ `main.py` uvicorn 加 `timeout_graceful_shutdown=5`(否则长连 SSE 无限挡在 drain 前);⑤ `config/agent.yaml``shutdown` 段(drain_timeout 30s / cancel_grace 15s,超时转 cancel = 用户按停止可重发,故偏短);⑥ dev SPA `chat.js` 发送包退避重试(503 背压 + 交接拒连 TypeError 都重试 ~26s,显"服务更新中",耗尽贴友好提示)。**部署强耦合**:unit `TimeoutStopSec` 从 10 提到 90(必须 > drain+grace+sandbox 清扫余量,否则 SIGKILL 砍掉 drain),已写进 RUN.md unit + 故障兜底。B 蓝绿(零 503 窗口)留作触发信号后再做,前置是 instance-aware reaper(§7.8)。
### 2026-06-09 ### 2026-06-09

2
RUN.md
View File

@ -720,6 +720,8 @@ sudo xfs_quota -x -c "limit -p bhard=10g zcbot_<user_uuid>" /opt
| dev.html 显示 "load failed" 立刻回登录页 | token 过期或 JWT_SECRET 服务端变了。已自动跳登录页,按上次 tab 重登 | | dev.html 显示 "load failed" 立刻回登录页 | token 过期或 JWT_SECRET 服务端变了。已自动跳登录页,按上次 tab 重登 |
| dev.html 顶栏出现"连接断开,重连中…(N/3)" | SSE 流被切(`--reload` 重启 / nginx 切换 / 网络抖)。客户端自动重连,1s/2s/4s 退避;新进程已 reaper 标 error 则立即收 done + 卡片末尾"请重发"提示;若服务端还活着会继续看后续 delta(断开期间的丢失,broker 不持久化) | | dev.html 顶栏出现"连接断开,重连中…(N/3)" | SSE 流被切(`--reload` 重启 / nginx 切换 / 网络抖)。客户端自动重连,1s/2s/4s 退避;新进程已 reaper 标 error 则立即收 done + 卡片末尾"请重发"提示;若服务端还活着会继续看后续 delta(断开期间的丢失,broker 不持久化) |
| 对话里偶发 `[Error] invalid JSON arguments` / `[Error] bad arguments to write: ... missing required` | deepseek-v4-flash **大参数工具调用(大 write/run_python,≈7K+ 字符)偶发把内容碎片错位粘进 arguments 或退化成空 `{}`**(上游流式抖动)。`core/loop.py` 已自动兜底:畸形参数丢弃整轮重 roll(≤3 次)+ 最后一次降级非流式。仍频繁撞 → 引导模型**把大文件拆小 / 用 run_python 分块写**,或换 `deepseek_v4.pro`。前端看到 warn「工具调用参数损坏…重试」即此机制在生效 | | 对话里偶发 `[Error] invalid JSON arguments` / `[Error] bad arguments to write: ... missing required` | deepseek-v4-flash **大参数工具调用(大 write/run_python,≈7K+ 字符)偶发把内容碎片错位粘进 arguments 或退化成空 `{}`**(上游流式抖动)。`core/loop.py` 已自动兜底:畸形参数丢弃整轮重 roll(≤3 次)+ 最后一次降级非流式。仍频繁撞 → 引导模型**把大文件拆小 / 用 run_python 分块写**,或换 `deepseek_v4.pro`。前端看到 warn「工具调用参数损坏…重试」即此机制在生效 |
| 长任务跑到一半停下、提示「已达单轮步数上限…回复『继续』可接着跑」 | **预期行为非崩溃**:单个 run 自主步数到 backstop(`config/models/*.yaml` 的 `max_iterations`,flash 120 / pro 150)就主动停,回 `[reached max iterations]`。直接回复「继续」即续跑。经常撞顶=任务确实大,可调高对应 variant 的 `max_iterations` 或换 pro |
| 任务停下提示「连续 N 步无净产出…已自动停止」(回 `[stopped: no progress]`) | **空转熔断**:连续 `_STALL_LIMIT`(loop.py,默认 8)步所有 tool 都只返 `[Error]`/重复结果/被拦 = 没在推进,主动停以免空烧 token。说明模型卡死在某个错上——**换思路 / 补充信息再回复「继续」**,别原样重发(会再次撞同一墙) |
--- ---

View File

@ -18,7 +18,7 @@ variants:
default_reasoning_effort: "" default_reasoning_effort: ""
code_quality: good code_quality: good
enable_run_python: true enable_run_python: true
max_iterations: 50 max_iterations: 120 # backstop 兜底,非"轮"预算;真正的空转防护是 loop 的无进展熔断 + _RepeatGuard
optimal_temperature: 0.3 optimal_temperature: 0.3
prompt_caching: false prompt_caching: false
extended_thinking: false extended_thinking: false
@ -41,7 +41,7 @@ variants:
default_reasoning_effort: medium default_reasoning_effort: medium
code_quality: excellent code_quality: excellent
enable_run_python: true enable_run_python: true
max_iterations: 100 max_iterations: 150 # backstop 兜底,非"轮"预算;真正的空转防护是 loop 的无进展熔断 + _RepeatGuard
optimal_temperature: 0.2 optimal_temperature: 0.2
prompt_caching: false prompt_caching: false
extended_thinking: false extended_thinking: false

View File

@ -34,7 +34,7 @@ class ModelCapabilities:
enable_run_python: bool = False enable_run_python: bool = False
# 工程参数 # 工程参数
max_iterations: int = 50 max_iterations: int = 120 # 单轮自主步数 backstop;空转防护见 loop 无进展熔断,不靠这个砍正经长任务
optimal_temperature: float = 0.3 optimal_temperature: float = 0.3
# provider 特性 # provider 特性

View File

@ -82,8 +82,12 @@ class _RepeatGuard:
st["blocked"] += 1 st["blocked"] += 1
return st["n"], st["blocked"] return st["n"], st["blocked"]
def record(self, name: str, args: Any, result: str) -> int: def record(self, name: str, args: Any, result: str) -> Tuple[int, bool]:
"""执行后调用:登记结果,返回该指纹当前的「无产出重复」计数。""" """执行后调用:登记结果。返回 (该指纹「无产出重复」计数, 本次是否有净产出)。
净产出 = `[Error]` 且非与历史一字不差的重复结果 供全局无进展熔断判定:
一步里只要有一次净产出就算在推进
"""
st = self._state(name, args) st = self._state(name, args)
h = hashlib.sha1(result.encode("utf-8", "replace")).hexdigest() h = hashlib.sha1(result.encode("utf-8", "replace")).hexdigest()
is_err = result.lstrip().startswith("[Error") is_err = result.lstrip().startswith("[Error")
@ -96,7 +100,7 @@ class _RepeatGuard:
st["unproductive"] = 0 st["unproductive"] = 0
st["hashes"].add(h) st["hashes"].add(h)
st["n"] += 1 st["n"] += 1
return st["unproductive"] return st["unproductive"], not (is_err or dup)
def _extract_delta_content(chunk: Any) -> Optional[str]: def _extract_delta_content(chunk: Any) -> Optional[str]:
@ -218,6 +222,8 @@ class AgentLoop:
self.cancel_check = cancel_check self.cancel_check = cancel_check
# 病理性重复调用守卫(同名同参 + 无产出),活在本次 run 内,不跨 task。 # 病理性重复调用守卫(同名同参 + 无产出),活在本次 run 内,不跨 task。
self._repeat_guard = _RepeatGuard() self._repeat_guard = _RepeatGuard()
# 全局「无进展」计数:连续多少步整步无净产出。有净产出清零,见 run loop 熔断。
self._stall = 0
def _emit(self, event: dict) -> None: def _emit(self, event: dict) -> None:
if self.sink is not None: if self.sink is not None:
@ -300,12 +306,14 @@ class AgentLoop:
self._emit({"type": "done"}) self._emit({"type": "done"})
return getattr(msg, "content", None) or "" return getattr(msg, "content", None) or ""
step_productive = False
for i, tc in enumerate(tool_calls): for i, tc in enumerate(tool_calls):
if self._is_cancelled(): if self._is_cancelled():
self._fill_cancelled_tool_results(tool_calls[i:]) self._fill_cancelled_tool_results(tool_calls[i:])
self._emit({"type": "cancelled"}) self._emit({"type": "cancelled"})
return "[cancelled]" return "[cancelled]"
result = self._execute_tool_call(tc) result, productive = self._execute_tool_call(tc)
step_productive = step_productive or productive
self.session.append( self.session.append(
{ {
"role": "tool", "role": "tool",
@ -315,6 +323,31 @@ class AgentLoop:
} }
) )
# 全局「无进展」熔断:整步所有 tool 都无净产出(全是 [Error]/重复/被拦)→ 累计;
# 连续 _STALL_LIMIT 步空转就主动停,别烧到 max_iterations。一旦某步有净产出立即清零。
if step_productive:
self._stall = 0
else:
self._stall += 1
if self._stall >= self._STALL_LIMIT:
self._emit({
"type": "warn",
"msg": (
f"连续 {self._stall} 步无净产出(全是报错/重复/被拦),已自动停止以免空烧。"
"换思路或补充信息后回复「继续」可重试。"
),
})
self._emit({"type": "done"})
return "[stopped: no progress]"
# 跑满 backstop:不是出错,是单轮自主步数到顶。明确提示可续跑,别静默停。
self._emit({
"type": "warn",
"msg": (
f"已达单轮步数上限({self.max_iterations} 步),任务可能尚未完成。"
"回复「继续」可接着跑。"
),
})
self._emit({"type": "done"}) self._emit({"type": "done"})
return "[reached max iterations]" return "[reached max iterations]"
@ -323,6 +356,11 @@ class AgentLoop:
# 工具调用偶发连续两次畸形,故留够重试余量。 # 工具调用偶发连续两次畸形,故留够重试余量。
_MAX_MALFORMED_RETRIES = 3 _MAX_MALFORMED_RETRIES = 3
# 连续多少步「整步无净产出」(全是 [Error]/重复结果/被拦)就判定空转、主动停。
# 比 max_iterations 早得多掐死死循环(第 8 步 vs 第 120 步),同时放正经长任务自由跑。
# 保守取 8:几乎不误伤"连踩几个错再纠正"的正常波动,配 _RepeatGuard 逐指纹 HARD=4 双保险。
_STALL_LIMIT = 8
def _stream_llm(self) -> Tuple[Optional[Any], bool]: def _stream_llm(self) -> Tuple[Optional[Any], bool]:
"""拉一轮 LLM 并保证返回的 tool_call arguments 可解析。 """拉一轮 LLM 并保证返回的 tool_call arguments 可解析。
@ -412,13 +450,15 @@ class AgentLoop:
pass pass
return response return response
def _execute_tool_call(self, tc: Any) -> str: def _execute_tool_call(self, tc: Any) -> Tuple[str, bool]:
"""执行一次 tool_call,返回 (结果文本, 本次是否有净产出)。
净产出供 run loop 的全局无进展熔断判定"""
name = tc.function.name name = tc.function.name
raw_args = tc.function.arguments or "{}" raw_args = tc.function.arguments or "{}"
try: try:
args = json.loads(raw_args) args = json.loads(raw_args)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
return f"[Error] invalid JSON arguments for {name}: {e}" return f"[Error] invalid JSON arguments for {name}: {e}", False
args_preview = json.dumps(args, ensure_ascii=False) args_preview = json.dumps(args, ensure_ascii=False)
if len(args_preview) > 200: if len(args_preview) > 200:
@ -446,7 +486,7 @@ class AgentLoop:
"preview": result, "preview": result,
"truncated": False, "truncated": False,
}) })
return result return result, False
ctx = ExecCtx( ctx = ExecCtx(
user_id=self.user_id, user_id=self.user_id,
@ -464,7 +504,7 @@ class AgentLoop:
truncated = True truncated = True
# 登记结果做重复检测(用截断后、未加提示的原始结果算指纹,保证同输出哈希一致)。 # 登记结果做重复检测(用截断后、未加提示的原始结果算指纹,保证同输出哈希一致)。
unproductive = self._repeat_guard.record(name, args, result) unproductive, productive = self._repeat_guard.record(name, args, result)
if unproductive >= _RepeatGuard.SOFT: if unproductive >= _RepeatGuard.SOFT:
if unproductive == _RepeatGuard.SOFT: if unproductive == _RepeatGuard.SOFT:
self._emit({"type": "warn", "msg": f"{name} 同参重复且结果未变({unproductive} 次),已提示模型换路"}) self._emit({"type": "warn", "msg": f"{name} 同参重复且结果未变({unproductive} 次),已提示模型换路"})
@ -481,4 +521,4 @@ class AgentLoop:
"preview": preview, "preview": preview,
"truncated": truncated, "truncated": truncated,
}) })
return result return result, productive

View File

@ -19,7 +19,7 @@ def _simulate(guard: _RepeatGuard, name: str, args, results: list[str]) -> list[
guard.register_block(name, args) guard.register_block(name, args)
out.append("BLOCK") out.append("BLOCK")
continue continue
unprod = guard.record(name, args, r) unprod, _productive = guard.record(name, args, r)
out.append(f"exec(unprod={unprod})") out.append(f"exec(unprod={unprod})")
return out return out
@ -69,11 +69,27 @@ class TestRepeatGuard(unittest.TestCase):
g = _RepeatGuard() g = _RepeatGuard()
unprods = [] unprods = []
for _ in range(_RepeatGuard.SOFT + 1): for _ in range(_RepeatGuard.SOFT + 1):
unprods.append(g.record("document_search", {"queries": ["x"]}, "(no documents found)")) unprods.append(g.record("document_search", {"queries": ["x"]}, "(no documents found)")[0])
# 累计达到 SOFT(此时应注入软提示),但还没到 HARD 拦截 # 累计达到 SOFT(此时应注入软提示),但还没到 HARD 拦截
self.assertGreaterEqual(max(unprods), _RepeatGuard.SOFT) self.assertGreaterEqual(max(unprods), _RepeatGuard.SOFT)
self.assertFalse(g.should_block("document_search", {"queries": ["x"]})) self.assertFalse(g.should_block("document_search", {"queries": ["x"]}))
def test_record_returns_productive_signal(self):
"""record 第二个返回值喂全局无进展熔断:新非错结果=有产出,[Error]/重复=无产出。"""
g = _RepeatGuard()
# 首个新结果 → 有产出
_, p1 = g.record("read", {"path": "a"}, "[stdout] hello")
self.assertTrue(p1)
# 一字不差重复同一结果 → 无产出
_, p2 = g.record("read", {"path": "a"}, "[stdout] hello")
self.assertFalse(p2)
# 换出新结果 → 又有产出
_, p3 = g.record("read", {"path": "a"}, "[stdout] world")
self.assertTrue(p3)
# [Error] 开头 → 无产出(哪怕是该指纹首次)
_, p4 = g.record("glob", {"path": "/nope"}, "[Error] not found")
self.assertFalse(p4)
def test_distinct_args_tracked_separately(self): def test_distinct_args_tracked_separately(self):
g = _RepeatGuard() g = _RepeatGuard()
_simulate(g, "document_search", {"queries": ["a"]}, ["[Error] e"] * 8) _simulate(g, "document_search", {"queries": ["a"]}, ["[Error] e"] * 8)