zcbot/tests/test_context_compaction.py

305 lines
12 KiB
Python

import unittest
import json
from core.context import prepare_messages_for_llm, prepare_messages_with_stats
class ContextCompactionTests(unittest.TestCase):
def test_preserves_system_and_recent_messages(self) -> None:
messages = [
{"role": "system", "content": "rules"},
{"role": "user", "content": "old"},
{"role": "tool", "tool_call_id": "old-tool", "name": "shell", "content": "A" * 200},
{"role": "user", "content": "latest"},
{"role": "tool", "tool_call_id": "new-tool", "name": "shell", "content": "B" * 200},
]
prepared = prepare_messages_for_llm(
messages,
keep_recent=2,
old_tool_chars=40,
)
self.assertEqual(prepared[0], messages[0])
self.assertEqual(prepared[-2], messages[-2])
self.assertEqual(prepared[-1], messages[-1])
def test_compacts_old_tool_content_without_breaking_protocol_fields(self) -> None:
messages = [
{"role": "system", "content": "rules"},
{"role": "assistant", "tool_calls": [{"id": "tc1"}], "content": None},
{"role": "tool", "tool_call_id": "tc1", "name": "run_python", "content": "A" * 200},
{"role": "user", "content": "continue"},
]
prepared = prepare_messages_for_llm(
messages,
keep_recent=1,
old_tool_chars=40,
)
tool_msg = prepared[2]
self.assertEqual(tool_msg["role"], "tool")
self.assertEqual(tool_msg["tool_call_id"], "tc1")
self.assertEqual(tool_msg["name"], "run_python")
self.assertIn("[compacted old tool result", tool_msg["content"])
self.assertLess(len(tool_msg["content"]), 120)
def test_short_old_tool_content_is_left_unchanged(self) -> None:
messages = [
{"role": "system", "content": "rules"},
{"role": "tool", "tool_call_id": "tc1", "name": "grep", "content": "short"},
{"role": "user", "content": "next"},
]
prepared = prepare_messages_for_llm(
messages,
keep_recent=1,
old_tool_chars=40,
)
self.assertEqual(prepared[1]["content"], "short")
def test_compacts_old_load_skill_result_to_marker(self) -> None:
messages = [
{"role": "system", "content": "rules"},
{
"role": "tool",
"tool_call_id": "tc1",
"name": "load_skill",
"content": "[skill=proposal, dir=/sandbox/skills/proposal]\n" + "A" * 5000,
},
{"role": "user", "content": "next"},
]
prepared = prepare_messages_for_llm(messages, keep_recent=1)
self.assertIn("loaded skill: proposal", prepared[1]["content"])
self.assertIn("dir=/sandbox/skills/proposal", prepared[1]["content"])
self.assertNotIn("A" * 100, prepared[1]["content"])
def test_prepare_messages_reports_compaction_stats(self) -> None:
messages = [
{"role": "system", "content": "rules"},
{"role": "tool", "tool_call_id": "tc1", "name": "shell", "content": "A" * 200},
{"role": "user", "content": "next"},
]
prepared, stats = prepare_messages_with_stats(
messages,
keep_recent=1,
old_tool_chars=40,
)
self.assertLess(stats["sent_chars"], stats["original_chars"])
self.assertEqual(stats["compacted_tool_messages"], 1)
self.assertGreater(stats["saved_chars"], 0)
self.assertEqual(len(prepared), len(messages))
def test_keeps_old_large_tool_call_arguments_verbatim(self) -> None:
# 旧 assistant tool_call.arguments 一律原样保留,哪怕很大 —— 改写成 `{"_compacted":...}`
# marker 会被模型仿写成参数(2026-06-12 DB 实测:run_python 因此空转报错 60+ 次)。
args = json.dumps({"path": "slides/p01.py", "content": "A" * 5000})
messages = [
{"role": "system", "content": "rules"},
{
"role": "assistant",
"content": "writing slide",
"tool_calls": [{
"id": "tc1",
"type": "function",
"function": {"name": "write", "arguments": args},
}],
},
{"role": "tool", "tool_call_id": "tc1", "name": "write", "content": "[wrote file]"},
{"role": "user", "content": "next"},
]
prepared, stats = prepare_messages_with_stats(messages, keep_recent=1)
tc = prepared[1]["tool_calls"][0]
# 协议字段 + 完整参数都原样保留,marker 永不出现。
self.assertEqual(tc["function"]["arguments"], args)
self.assertNotIn("_compacted", tc["function"]["arguments"])
self.assertNotIn("compacted_tool_call_arguments", stats)
def test_keeps_old_task_progress_arguments_intact(self) -> None:
# task_progress 参数本就很小,压成 marker 还会毁掉前端进度还原。和所有工具一样原样保留。
args = json.dumps({
"action": "set_plan",
"steps": [
{"id": "s1", "title": "理解需求", "status": "completed"},
{"id": "s2", "title": "实现功能", "status": "in_progress"},
],
}, ensure_ascii=False)
messages = [
{"role": "system", "content": "rules"},
{
"role": "assistant",
"content": None,
"tool_calls": [{
"id": "tc1",
"type": "function",
"function": {"name": "task_progress", "arguments": args},
}],
},
] + [{"role": "user", "content": f"recent {i}"} for i in range(12)]
prepared, stats = prepare_messages_with_stats(messages)
kept_args = json.loads(prepared[1]["tool_calls"][0]["function"]["arguments"])
self.assertNotIn("_compacted", kept_args)
self.assertEqual(kept_args["action"], "set_plan")
self.assertEqual(kept_args["steps"][0]["title"], "理解需求")
self.assertNotIn("compacted_tool_call_arguments", stats)
def test_old_task_progress_tool_result_uses_tiny_marker(self) -> None:
messages = [
{"role": "system", "content": "rules"},
{
"role": "tool",
"tool_call_id": "tc1",
"name": "task_progress",
"content": json.dumps({"ok": True, "steps": [{"title": "A" * 2000}]}),
},
{"role": "user", "content": "next"},
]
prepared, stats = prepare_messages_with_stats(messages, keep_recent=1)
self.assertEqual(prepared[1]["content"], "[task_progress updated; UI-only details omitted from context]")
self.assertEqual(stats["compacted_tool_messages"], 1)
def test_below_threshold_skips_compaction_entirely(self) -> None:
"""总体量未到 compact_threshold_chars → 原样发,旧 tool 不被压、缓存可全暖。"""
messages = [
{"role": "system", "content": "rules"},
{"role": "tool", "tool_call_id": "tc1", "name": "shell", "content": "A" * 2000},
] + [{"role": "user", "content": f"recent {i}"} for i in range(12)]
prepared, stats = prepare_messages_with_stats(
messages,
keep_recent=1,
old_tool_chars=40,
compact_threshold_chars=10_000_000, # 远大于本例体量 → 跳过
)
self.assertEqual(prepared[1]["content"], "A" * 2000) # 旧 tool 原样保留
self.assertEqual(stats["compaction_skipped"], 1)
self.assertEqual(stats["saved_chars"], 0)
self.assertEqual(stats["sent_chars"], stats["original_chars"])
def test_above_threshold_still_compacts(self) -> None:
"""体量超过门槛 → 照常压缩,compaction_skipped=0。"""
messages = [
{"role": "system", "content": "rules"},
{"role": "tool", "tool_call_id": "tc1", "name": "shell", "content": "A" * 2000},
] + [{"role": "user", "content": f"recent {i}"} for i in range(12)]
prepared, stats = prepare_messages_with_stats(
messages,
keep_recent=1,
old_tool_chars=40,
compact_threshold_chars=100, # 远小于本例体量 → 触发
)
self.assertIn("compacted old tool result", prepared[1]["content"])
self.assertEqual(stats["compaction_skipped"], 0)
self.assertGreater(stats["saved_chars"], 0)
def test_repairs_dangling_tool_calls_followed_by_user(self) -> None:
# run 在 assistant.tool_calls 之后被中断(断连/取消),tool 结果没写库;用户接着发言。
# 原样发给 DeepSeek/OpenAI 会被拒。发送前必须补占位 tool 结果。(task 5c5d6d25 实测)
messages = [
{"role": "system", "content": "rules"},
{
"role": "assistant",
"content": None,
"tool_calls": [{
"id": "call_x",
"type": "function",
"function": {"name": "run_python", "arguments": "{}"},
}],
},
{"role": "user", "content": "怎么不回应了"},
{"role": "user", "content": "在干什么"},
]
prepared, stats = prepare_messages_with_stats(messages, keep_recent=12)
# assistant.tool_calls 后面紧跟补出来的 tool 结果,再才是 user。
self.assertEqual(prepared[1]["role"], "assistant")
self.assertEqual(prepared[2]["role"], "tool")
self.assertEqual(prepared[2]["tool_call_id"], "call_x")
self.assertEqual(prepared[2]["name"], "run_python")
self.assertIn("interrupted", prepared[2]["content"])
self.assertEqual(prepared[3]["role"], "user")
self.assertEqual(stats["repaired_tool_calls"], 1)
def test_does_not_touch_well_paired_tool_calls(self) -> None:
messages = [
{"role": "system", "content": "rules"},
{
"role": "assistant",
"content": None,
"tool_calls": [{"id": "call_x", "type": "function",
"function": {"name": "shell", "arguments": "{}"}}],
},
{"role": "tool", "tool_call_id": "call_x", "name": "shell", "content": "ok"},
{"role": "user", "content": "next"},
]
prepared, stats = prepare_messages_with_stats(messages, keep_recent=12)
self.assertEqual(stats["repaired_tool_calls"], 0)
self.assertEqual(len(prepared), len(messages))
def test_repairs_partial_multi_tool_call_block(self) -> None:
# 一条 assistant 发了两个 tool_call,只回了一个 → 只补缺的那个。
messages = [
{"role": "system", "content": "rules"},
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "a", "type": "function", "function": {"name": "shell", "arguments": "{}"}},
{"id": "b", "type": "function", "function": {"name": "run_python", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "a", "name": "shell", "content": "ok"},
{"role": "user", "content": "next"},
]
prepared, stats = prepare_messages_with_stats(messages, keep_recent=12)
self.assertEqual(stats["repaired_tool_calls"], 1)
tool_ids = [m["tool_call_id"] for m in prepared if m.get("role") == "tool"]
self.assertEqual(set(tool_ids), {"a", "b"})
def test_repair_runs_even_when_compaction_skipped(self) -> None:
# 低于压缩门槛也要修复(修复在早返回分支之前)。
messages = [
{"role": "system", "content": "rules"},
{
"role": "assistant",
"content": None,
"tool_calls": [{"id": "call_x", "type": "function",
"function": {"name": "run_python", "arguments": "{}"}}],
},
{"role": "user", "content": "hello"},
]
prepared, stats = prepare_messages_with_stats(
messages, keep_recent=12, compact_threshold_chars=10_000_000,
)
self.assertEqual(stats["compaction_skipped"], 1)
self.assertEqual(stats["repaired_tool_calls"], 1)
self.assertEqual(prepared[2]["role"], "tool")
self.assertEqual(prepared[2]["tool_call_id"], "call_x")
if __name__ == "__main__":
unittest.main()