from __future__ import annotations import sys import unittest from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from core.loop import _RepeatGuard # noqa: E402 def _simulate(guard: _RepeatGuard, name: str, args, results: list[str]) -> list[str]: """模拟 loop 逐次调用:先 should_block,未拦才 record。返回每次的判定标记。 'BLOCK' = 被拦截未执行;否则返回 'exec(unprod=N)'。 """ out = [] for r in results: if guard.should_block(name, args): guard.register_block(name, args) out.append("BLOCK") continue unprod, _productive = guard.record(name, args, r) out.append(f"exec(unprod={unprod})") return out class TestRepeatGuard(unittest.TestCase): def test_identical_error_repeats_get_blocked(self): g = _RepeatGuard() trace = _simulate(g, "glob", {"path": "/home/ubuntu/zcbot"}, ["[Error] base path not found"] * 8) # 第一次执行无产出计 0,之后每次 +1;累计到 HARD 后拦截 self.assertIn("BLOCK", trace) self.assertTrue(g.should_block("glob", {"path": "/home/ubuntu/zcbot"})) # 拦截前最多放过 HARD 次无产出重复(共 HARD+1 次执行) n_exec = sum(1 for t in trace if t.startswith("exec")) self.assertEqual(n_exec, _RepeatGuard.HARD + 1) def test_empty_args_error_storm_blocked(self): """空 {} 缺参风暴:executor 每次返回同一句错误 → 被同一机制拦下(堵 malformed 洞)。""" g = _RepeatGuard() trace = _simulate(g, "shell", {}, ["[Error] 缺少必填参数 [command]"] * 7) self.assertIn("BLOCK", trace) def test_identical_nonerror_result_blocked(self): """同参且结果一字不差(非错误)也算无产出 → 拦截。""" g = _RepeatGuard() trace = _simulate(g, "read", {"path": "a.txt"}, ["same content"] * 8) self.assertIn("BLOCK", trace) def test_changing_results_never_blocked(self): """同参但每次结果不同(改脚本后重跑)= 有产出 → 永不拦,计数清零。""" g = _RepeatGuard() results = [f"[stdout]\nrun {i} output\n[exit 0]" for i in range(10)] trace = _simulate(g, "run_python", {"script_path": "x.py"}, results) self.assertNotIn("BLOCK", trace) self.assertFalse(g.should_block("run_python", {"script_path": "x.py"})) # 每次都是新结果,无产出计数恒为 0 self.assertTrue(all(t == "exec(unprod=0)" for t in trace)) def test_productive_result_resets_counter(self): """报错几次后拿到新结果(修好了)→ 计数清零,不会被先前的失败拖去拦截。""" g = _RepeatGuard() seq = ["[Error] x", "[Error] x", "[stdout]\nfixed!\n[exit 0]", "[stdout]\nfixed!\n[exit 0]"] _simulate(g, "shell", {"command": "make"}, seq) # 中途修好清零,不该进入 block 态 self.assertFalse(g.should_block("shell", {"command": "make"})) def test_soft_threshold_reached_before_hard(self): g = _RepeatGuard() unprods = [] for _ in range(_RepeatGuard.SOFT + 1): unprods.append(g.record("document_search", {"queries": ["x"]}, "(no documents found)")[0]) # 累计达到 SOFT(此时应注入软提示),但还没到 HARD 拦截 self.assertGreaterEqual(max(unprods), _RepeatGuard.SOFT) self.assertFalse(g.should_block("document_search", {"queries": ["x"]})) def test_record_returns_productive_signal(self): """record 第二个返回值喂全局无进展熔断:新非错结果=有产出,[Error]/重复=无产出。""" g = _RepeatGuard() # 首个新结果 → 有产出 _, p1 = g.record("read", {"path": "a"}, "[stdout] hello") self.assertTrue(p1) # 一字不差重复同一结果 → 无产出 _, p2 = g.record("read", {"path": "a"}, "[stdout] hello") self.assertFalse(p2) # 换出新结果 → 又有产出 _, p3 = g.record("read", {"path": "a"}, "[stdout] world") self.assertTrue(p3) # [Error] 开头 → 无产出(哪怕是该指纹首次) _, p4 = g.record("glob", {"path": "/nope"}, "[Error] not found") self.assertFalse(p4) def test_distinct_args_tracked_separately(self): g = _RepeatGuard() _simulate(g, "document_search", {"queries": ["a"]}, ["[Error] e"] * 8) # 不同参数互不影响 self.assertTrue(g.should_block("document_search", {"queries": ["a"]})) self.assertFalse(g.should_block("document_search", {"queries": ["b"]})) if __name__ == "__main__": unittest.main()