zcbot/core/context.py

207 lines
8.3 KiB
Python

"""LLM 上下文准备。
不改 Session 持久化历史,只在发给模型前做低风险压缩。只压旧 tool 消息**内容**,
绝不动 assistant 的 `tool_call.arguments` —— arguments 是模型"该怎么调工具"的范本,
把它改写成 `{"_compacted":...}` 这种"看着像合法调用"的标记会毒化模型:它在长任务里
看到几十次"过去的 run_python/write 长这样",就照葫芦画瓢把 marker 当参数原样吐出来,
executor 拿不到 code/path → 报错空转(2026-06-12 DB 实测 60 个 task 命中 83 次,
其中 61 次是模型仿写 marker;详 PROGRESS)。故 arguments 一律原样保留。
"""
from __future__ import annotations
from copy import deepcopy
from typing import Any, List
import json
import re
def _compact_old_tool_content(content: str, max_chars: int) -> str:
if len(content) <= max_chars:
return content
head = max_chars // 2
tail = max_chars - head
omitted = len(content) - head - tail
return (
content[:head]
+ f"\n[compacted old tool result, {omitted} chars omitted]\n"
+ content[-tail:]
)
_LOAD_SKILL_HEADER_RE = re.compile(r"\[skill=([^,\]]+)(?:,\s*dir=([^\]]+))?\]")
def _compact_load_skill_content(content: str) -> str:
first_line = content.splitlines()[0] if content else ""
match = _LOAD_SKILL_HEADER_RE.search(first_line)
if match:
skill = match.group(1)
skill_dir = match.group(2) or ""
suffix = f", dir={skill_dir}" if skill_dir else ""
return f"[loaded skill: {skill}{suffix}; full SKILL.md omitted from old context]"
return "[loaded skill; full SKILL.md omitted from old context]"
def _message_chars(msg: dict[str, Any]) -> int:
try:
return len(json.dumps(msg, ensure_ascii=False))
except TypeError:
return len(str(msg))
_INTERRUPTED_TOOL_RESULT = (
"[interrupted: tool result missing — run was cut off "
"(disconnect/cancel) before this tool finished]"
)
def _repair_dangling_tool_calls(
messages: List[dict[str, Any]],
) -> tuple[List[dict[str, Any]], int]:
"""补齐被中断 run 留下的悬空 tool_calls,返回 (修复后的消息, 补的占位条数)。
run 在写入 `assistant.tool_calls` 之后、tool 结果写入之前被中断(上游断连 /
用户取消 / 崩溃),会在历史里留下一条 `assistant.tool_calls` 后面没有对应 tool
结果的消息;用户随后继续发言,下一轮把历史原样发给 OpenAI/DeepSeek 就会被拒:
"An assistant message with 'tool_calls' must be followed by tool messages
responding to each 'tool_call_id'"(2026-06-18 DB 实测 task 5c5d6d25 命中)。
这里在发送前为每个**缺失**的 tool_call_id 紧跟其 assistant 消息补一条占位 tool
消息,满足协议且不丢上下文。纯发送期处理,不改库 —— 对所有中断路径和已存在的坏
数据都生效。
"""
repaired: List[dict[str, Any]] = []
repaired_count = 0
n = len(messages)
i = 0
while i < n:
msg = messages[i]
repaired.append(msg)
tool_calls = msg.get("tool_calls") if isinstance(msg, dict) else None
if isinstance(msg, dict) and msg.get("role") == "assistant" and tool_calls:
id_to_name = {
tc.get("id"): (tc.get("function") or {}).get("name")
for tc in tool_calls
if isinstance(tc, dict) and tc.get("id")
}
# 收集紧随其后的连续 tool 消息已回应的 id(协议要求 tool 结果紧跟 assistant)。
answered: set[Any] = set()
j = i + 1
while j < n and isinstance(messages[j], dict) and messages[j].get("role") == "tool":
cid = messages[j].get("tool_call_id")
if cid:
answered.add(cid)
repaired.append(messages[j])
j += 1
# 为缺失的 id 补占位 tool 消息(保持在该 assistant 的 tool 结果块内)。
for cid, name in id_to_name.items():
if cid not in answered:
synthetic: dict[str, Any] = {
"role": "tool",
"tool_call_id": cid,
"content": _INTERRUPTED_TOOL_RESULT,
}
if name:
synthetic["name"] = name
repaired.append(synthetic)
repaired_count += 1
i = j
continue
i += 1
return repaired, repaired_count
def prepare_messages_for_llm(
messages: List[dict[str, Any]],
*,
keep_recent: int = 12,
old_tool_chars: int = 2_000,
compact_threshold_chars: int = 0,
) -> List[dict[str, Any]]:
"""返回发给 LLM 的 messages 副本。
- system 和最近 keep_recent 条消息原样保留。
- 较旧且过长的 tool content 压缩为头尾摘要。
- assistant 的 tool_call.arguments 一律原样保留(改写会毒化模型,见模块注释)。
- role/tool_call_id/name 等协议字段不变。
"""
prepared, _ = prepare_messages_with_stats(
messages,
keep_recent=keep_recent,
old_tool_chars=old_tool_chars,
compact_threshold_chars=compact_threshold_chars,
)
return prepared
def prepare_messages_with_stats(
messages: List[dict[str, Any]],
*,
keep_recent: int = 12,
old_tool_chars: int = 2_000,
compact_threshold_chars: int = 0,
) -> tuple[List[dict[str, Any]], dict[str, int]]:
"""返回发给 LLM 的 messages 副本和压缩统计。
`compact_threshold_chars`:上下文压力门槛。总体量(原始 chars)未超过它时**完全不压缩**
—— 短任务不丢旧工具细节,且 prompt 前缀逐轮字节一致、DeepSeek 等前缀缓存全程命中。
默认 0 = 永远压缩(向后兼容)。caller(loop)按模型 reliable_context 折算传入。
"""
if keep_recent < 0:
keep_recent = 0
# 先补齐被中断 run 留下的悬空 tool_calls(否则原样发给模型会被拒,见函数注释)。
messages, repaired_tool_calls = _repair_dangling_tool_calls(messages)
original_chars = sum(_message_chars(m) for m in messages)
# 未到上下文压力门槛 → 原样发,零压缩(缓存全暖 + 不丢信息)。压缩是"放不下"才做的事。
if original_chars < compact_threshold_chars:
prepared = [deepcopy(m) for m in messages]
stats = {
"original_chars": original_chars,
"sent_chars": original_chars,
"saved_chars": 0,
"compacted_tool_messages": 0,
"compacted_skill_messages": 0,
"compaction_skipped": 1,
"repaired_tool_calls": repaired_tool_calls,
}
return prepared, stats
recent_start = max(0, len(messages) - keep_recent)
prepared: List[dict[str, Any]] = []
compacted_tool_messages = 0
compacted_skill_messages = 0
for idx, msg in enumerate(messages):
new_msg = deepcopy(msg)
is_recent = idx >= recent_start
# assistant 的 tool_call.arguments 一律原样保留 —— 压成 marker 会毒化模型(见模块注释)。
if (
not is_recent
and new_msg.get("role") == "tool"
and isinstance(new_msg.get("content"), str)
):
before = new_msg["content"]
if new_msg.get("name") == "load_skill":
new_msg["content"] = _compact_load_skill_content(before)
compacted_skill_messages += int(new_msg["content"] != before)
elif new_msg.get("name") == "task_progress":
new_msg["content"] = "[task_progress updated; UI-only details omitted from context]"
else:
new_msg["content"] = _compact_old_tool_content(
before,
max_chars=max(0, old_tool_chars),
)
compacted_tool_messages += int(new_msg["content"] != before)
prepared.append(new_msg)
sent_chars = sum(_message_chars(m) for m in prepared)
stats = {
"original_chars": original_chars,
"sent_chars": sent_chars,
"saved_chars": max(0, original_chars - sent_chars),
"compacted_tool_messages": compacted_tool_messages,
"compacted_skill_messages": compacted_skill_messages,
"compaction_skipped": 0,
"repaired_tool_calls": repaired_tool_calls,
}
return prepared, stats