diff --git a/DESIGN.md b/DESIGN.md index 253d093..a6797c8 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -563,6 +563,42 @@ zcbot-sandbox image 已 ~1.5G(python deps + chromium + nodejs + mermaid-cli),后 **升级到 A(主模型多模态)的信号**:用户明确要求"我说话同时贴图,模型直接读图回话",或多模态对话历史(多轮带图)成为高频需求 — 当前 E + C 假设是"图像是工具调用对象"而非"对话上下文";真高频需要"图也是消息内容"时再升 A。 +### 8.2 Token 使用优化与上下文治理(2026-06-04) + +**现状诊断**:最近 7 天 chat 输入 token 约 1.225 亿、输出约 87 万,输入/输出约 140:1;最大单次 chat 输入超过 53 万 token。根因不是回答过长,而是 `Session.load()` 把全量历史消息装回 `self.session.messages`,每轮 LLM 调用继续携带旧 assistant/tool 结果、`load_skill` 完整正文、文献检索结果和长 stdout/stderr。当前 `cost_cny=0` 还说明 LiteLLM cost map 未覆盖 DeepSeek V4,费用统计不可用于判断真实消耗。 + +**质量边界**: +- 不改变模型输入的优化(prompt/context caching、固定 prompt 前缀、费用统计修复、cache hit/miss 记录)不影响输出质量。 +- 会改变模型可见上下文的优化(工具结果裁剪、旧历史摘要、RAG/按需读取)必须保留"可追溯原文":长结果写文件或保留路径,summary 只替代陈旧噪声,用户确认过的需求 / 规格 / 大纲 / 关键结论不删。 +- 禁止简单"只保留最近 N 条"作为主策略;它省 token 但最容易丢已确认约束。 + +**选型**:采用 Context Editing + Memory/File State + Cache Observability 的混合方案。对齐 2025-2026 主流实践:稳定 system/tools 前缀利于 provider prompt cache;旧 tool result 从上下文中移除或压缩;关键发现写入 task summary / 文件系统,需要时由 `read` / domain tool 重新拉取原文。长上下文仍保留作少数全局推理场景的临时能力,不作为默认每轮成本。 + +**Stage 1:可观测性与计费修复(低风险,不改输出)**: +1. `core/loop.py::_extract_usage` 扩展返回 cache hit/miss / reasoning tokens 等 provider usage 细节;DeepSeek 的 `prompt_cache_hit_tokens` / `prompt_cache_miss_tokens` 归入 `usage_events.units`。 +2. `core/storage/usage.py::record_chat_usage` 在 LiteLLM `completion_cost` 返回 0 时,按本地模型价格配置兜底计算 `cost_cny`;历史仍以 `units` snapshot 可复算。 +3. SSE `llm_end` 带 `cache_hit_tokens` / `cache_miss_tokens`;dev SPA 在底部 hint 展示"本次输入 / 输出 / 缓存命中 / 未命中"。 + +**Stage 2:工具结果降噪(低风险,小幅改变上下文)**: +1. `run_python` 新增 `script_path` 模式。非短小一次性代码先用 `write` 落 `.py` 文件,再 `run_python(script_path="...")`;这样 tool_call arguments 只保留路径,源码留在文件系统按需读取。保留 `code` 兼容短 snippet。 +2. `run_python` / `shell` 返回体默认保留 stdout/stderr 的头尾摘要,避免长日志反复进入后续 prompt;暂不写 `.tool_logs`,避免 host backend 污染 repo 根。 +3. `document_search` 暂不调整默认召回量(保留 6×1200 chars),优先通过上下文预算 / 历史压缩控制长期成本;需要全文时先 `document_download` 或再次指定更大 `content_chars_per_doc`。 +4. `load_skill` 的完整正文只在当前轮用于理解;入库 / 后续上下文可压缩成"已加载 skill: name, path, digest",减少每轮重复携带 5-10KB skill 文档。 + +**Stage 3:上下文预算与自动压缩(中风险,需测试)**: +1. 新增 `core/context.py` 负责构造 LLM messages,输入为 `Session.messages` + budget,输出为裁剪后的 messages。 +2. 第一步只做**旧 tool 消息压缩**:保留 system、最近若干条原文,对较旧且过长的 `role=tool` 内容做头尾摘要;旧 `load_skill` 结果压成"已加载 skill: name/dir"标记;`role` / `tool_call_id` / `name` 不变,保证 OpenAI/LiteLLM tool_call 协议完整。这个阶段不调用额外 LLM,不生成全局摘要。 +3. 第二步再做 task summary:保留 system、最近 6-10 轮原文、未闭合 tool_call 协议相关消息、最新用户消息;旧消息压成一条 summary。 +4. summary 必须区分:用户确认的硬约束、当前计划、已生成文件路径、关键事实、待办/风险、可丢弃日志。旧 tool 原文不直接塞回,只保留路径和摘要。 +5. 阈值建议先按字符粗估触发(如 200k chars),后续接 tokenizer 精确预算;触发后目标压到 reliable_context 的 25%-40%,避免刚压完又涨满。 + +**验收指标**: +- 高用量 task 的单次 `tokens_in` 从 50 万级降到 5-10 万级以内,常规任务低于 3 万。 +- `usage_events.units` 能区分 input/output/cache hit/cache miss,`cost_cny` 不再全 0。 +- `llm_start` SSE 事件能看到 `context_original_chars` / `context_sent_chars` / `context_saved_chars` / `context_compacted_tool_messages` / `context_compacted_skill_messages`,dev SPA 底部 hint 同步展示,用于判断压缩是否真实生效。 +- 文献采集 / 论文写作 / PPT 三类长任务仍能复查原文路径,不会因摘要丢失用户确认过的规格。 +- 增加 focused tests 覆盖 usage detail 提取、成本兜底、工具结果裁剪、上下文压缩协议完整性。 + --- ## 附录:DeepSeek V4 关键事实(2026-04-24) diff --git a/PROGRESS.md b/PROGRESS.md index a22778b..73d2261 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -185,6 +185,7 @@ - **05-06 → 05-08**:Phase 6 部分(task + state.json + tokens 累计);TUI rich Markdown + spinner 实时耗时;`/resume [last|]` + 懒创建 + `_cleanup_if_empty`。 - **05-09 → 05-10**:DESIGN §7 初版(05-12 重写);`cli.py export` + `core/export_docx.py`。 - **05-11**:`atomic_write_text` + `core/memory.py`(core.md 入 prompt,extended/* 索引);loop 事件流化 `sink.emit` 铺 SSE 路。 +- **06-04 token 优化启动**:`DESIGN.md §8.2` 写入上下文治理方案;chat usage 记录 cache hit/miss / reasoning tokens,LiteLLM cost=0 时按模型档案 CNY/Mtok 兜底;`run_python` 新增 `script_path` 模式(长代码先 write .py 再按路径执行,减少 tool_call arguments 进历史);`run_python` / `shell` 长输出只做上下文裁剪,不写 `.tool_logs`;`document_search` 默认召回量保持 6×1200 chars;`core/context.py` 先压缩旧 tool 消息和旧 `load_skill` 结果,不改持久化历史;`llm_start` SSE 输出 `context_*` 压缩统计,dev SPA 底部 hint 展示上下文压缩与 cache hit/miss。 --- @@ -208,9 +209,10 @@ ## 文件清单 ``` -core/capabilities.py 71 +core/capabilities.py 75 ← 模型档案增加 CNY/Mtok 计费兜底字段 core/llm.py 151 ← litellm 离线 cost map env + chat_stream(stream=True + include_usage) -core/loop.py 268 ← §7 A sink.emit + _stream_llm(chunk 间 poll cancel + emit delta) +core/loop.py 300 ← §7 A sink.emit + _stream_llm(chunk 间 poll cancel + emit delta)+ usage cache 明细 +core/context.py 95 ← LLM 调用前压缩旧 tool / load_skill 消息,保 tool_call 协议字段,返回压缩统计 core/sinks.py 101 ← §7 A core/ui.py 38 core/paths.py 50 ← task_dir db form 归一 @@ -223,11 +225,11 @@ core/export_docx.py 383 core/storage/__init__.py 29 core/storage/engine.py 80 core/storage/models.py 130 ← 4 表(0004 删 runs;0005 email UNIQUE;0006 usage_events v2 + messages.model_profile;0007 cost_usd→cny) -core/storage/usage.py 125 ← record_chat_usage(USD→CNY ×7.2)+ record_image_usage(单价 snapshot 进 units) +core/storage/usage.py 150 ← record_chat_usage(USD→CNY ×7.2,LiteLLM cost=0 时 YAML 单价兜底)+ record_image_usage(单价 snapshot 进 units) core/storage/utils.py 136 core/ark_client.py 105 ← 火山方舟 HTTP 客户端(seedream / 后续 seedance 共享) core/agent_builder.py 325 ← 装配 lib(有 ARK_API_KEY 才挂 SeedreamTool) -tools/{base,fs,shell,run_python,skill_tool,seedream}.py ~640 行 +tools/{base,fs,shell,run_python,skill_tool,seedream}.py ~680 行(run_python 支持 script_path) main.py ~210 ← 入口:web / db / probe / user db/migrations/env.py 61 db/migrations/versions/ @@ -257,6 +259,6 @@ Python 合计 ~3400 行(+ dev.html 1700 静态 + vendor 1MB) 1. **真 OIDC 接入 + CORS 收紧**(~1 天)—— `/v1/auth/login` 内部换 OIDC ID token 校验(路由层 Depends 不动);CORS 改 platform 域名 allowlist。**真发布给真实用户前必做**。 2. **§7 C Executor + sandbox**(~3-5 天,按 DESIGN §7.5 落地清单 6 条逐项实施)—— `run_python`/`shell` → `Executor.run(...)`,本地保留 subprocess、SaaS 走 docker;`api_key_env` → `KeyProvider` 运行时注入。多用户在线跑代码前置。**Stage C 完成 DoD** = 6 条落地清单全完成 + 红队回归用例通过:① 容器内 `curl http://169.254.169.254/...` → timeout / connection refused;② 容器内 `psql postgresql://...` → IP block(连接失败);③ 容器内 `nohup sleep 1000 &` exec 退出后 `docker top ` 看不到残留进程;④ 跨 user 容器互访(A 容器 `curl http://:*`)→ 网络隔离阻断;⑤ 出网走 proxy 时未在 allowlist 的域名 → 403。原 ~2-3 天估值未含 egress proxy 部署 / xfs project quota 升级 / 红队用例,补回真实工程量。 -3. **Phase 6 context 三层压缩**(~1 天)—— 兜底,V4 长上下文一般用不到。 +3. **Phase 6 context 三层压缩**(~1 天)—— 先做旧 tool 消息压缩(`role/tool_call_id/name` 保持协议完整),再做 task summary;不写 `.tool_logs`,不改 `document_search` 默认召回量。 > §7 B + D + D' + 单活 run 锁 + cancel + 0004 schema 瘦身 + 入口归位 主体已完工。剩余:真 OIDC → C(Executor)→ F(deploy / billing)。§7 E CLI 双模式撤;Phase G Jinja2/HTMX 撤(详见 DESIGN §7.9)。 diff --git a/config/models/deepseek_v4.yaml b/config/models/deepseek_v4.yaml index 757b712..c027aef 100644 --- a/config/models/deepseek_v4.yaml +++ b/config/models/deepseek_v4.yaml @@ -22,6 +22,8 @@ variants: optimal_temperature: 0.3 prompt_caching: false extended_thinking: false + input_cny_per_mtoken: 1.0 + output_cny_per_mtoken: 12.0 pro: display_name: DeepSeek V4 Pro @@ -42,3 +44,5 @@ variants: optimal_temperature: 0.2 prompt_caching: false extended_thinking: false + input_cny_per_mtoken: 2.0 + output_cny_per_mtoken: 20.0 diff --git a/core/capabilities.py b/core/capabilities.py index 21719cd..67e3afc 100644 --- a/core/capabilities.py +++ b/core/capabilities.py @@ -41,6 +41,10 @@ class ModelCapabilities: prompt_caching: bool = False extended_thinking: bool = False + # 计费兜底(CNY / million tokens)。provider / LiteLLM cost map 缺失时使用。 + input_cny_per_mtoken: float = 0.0 + output_cny_per_mtoken: float = 0.0 + # API 接入 api_base: str = "" api_key_env: str = "" diff --git a/core/context.py b/core/context.py new file mode 100644 index 0000000..2510e8c --- /dev/null +++ b/core/context.py @@ -0,0 +1,109 @@ +"""LLM 上下文准备。 + +不改 Session 持久化历史,只在发给模型前做低风险压缩。第一阶段只压旧 tool +消息内容,保留 tool_call 协议字段,避免历史命令输出 / 检索结果反复占满 prompt。 +""" +from __future__ import annotations + +from copy import deepcopy +from typing import Any, List +import json +import re + + +def _compact_old_tool_content(content: str, max_chars: int) -> str: + if len(content) <= max_chars: + return content + head = max_chars // 2 + tail = max_chars - head + omitted = len(content) - head - tail + return ( + content[:head] + + f"\n[compacted old tool result, {omitted} chars omitted]\n" + + content[-tail:] + ) + + +_LOAD_SKILL_HEADER_RE = re.compile(r"\[skill=([^,\]]+)(?:,\s*dir=([^\]]+))?\]") + + +def _compact_load_skill_content(content: str) -> str: + first_line = content.splitlines()[0] if content else "" + match = _LOAD_SKILL_HEADER_RE.search(first_line) + if match: + skill = match.group(1) + skill_dir = match.group(2) or "" + suffix = f", dir={skill_dir}" if skill_dir else "" + return f"[loaded skill: {skill}{suffix}; full SKILL.md omitted from old context]" + return "[loaded skill; full SKILL.md omitted from old context]" + + +def _message_chars(msg: dict[str, Any]) -> int: + try: + return len(json.dumps(msg, ensure_ascii=False)) + except TypeError: + return len(str(msg)) + + +def prepare_messages_for_llm( + messages: List[dict[str, Any]], + *, + keep_recent: int = 20, + old_tool_chars: int = 2_000, +) -> List[dict[str, Any]]: + """返回发给 LLM 的 messages 副本。 + + - system 和最近 keep_recent 条消息原样保留。 + - 较旧且过长的 tool content 压缩为头尾摘要。 + - role/tool_call_id/name 等协议字段不变。 + """ + prepared, _ = prepare_messages_with_stats( + messages, + keep_recent=keep_recent, + old_tool_chars=old_tool_chars, + ) + return prepared + + +def prepare_messages_with_stats( + messages: List[dict[str, Any]], + *, + keep_recent: int = 20, + old_tool_chars: int = 2_000, +) -> tuple[List[dict[str, Any]], dict[str, int]]: + """返回发给 LLM 的 messages 副本和压缩统计。""" + if keep_recent < 0: + keep_recent = 0 + original_chars = sum(_message_chars(m) for m in messages) + recent_start = max(0, len(messages) - keep_recent) + prepared: List[dict[str, Any]] = [] + compacted_tool_messages = 0 + compacted_skill_messages = 0 + for idx, msg in enumerate(messages): + new_msg = deepcopy(msg) + is_recent = idx >= recent_start + if ( + not is_recent + and new_msg.get("role") == "tool" + and isinstance(new_msg.get("content"), str) + ): + before = new_msg["content"] + if new_msg.get("name") == "load_skill": + new_msg["content"] = _compact_load_skill_content(before) + compacted_skill_messages += int(new_msg["content"] != before) + else: + new_msg["content"] = _compact_old_tool_content( + before, + max_chars=max(0, old_tool_chars), + ) + compacted_tool_messages += int(new_msg["content"] != before) + prepared.append(new_msg) + sent_chars = sum(_message_chars(m) for m in prepared) + stats = { + "original_chars": original_chars, + "sent_chars": sent_chars, + "saved_chars": max(0, original_chars - sent_chars), + "compacted_tool_messages": compacted_tool_messages, + "compacted_skill_messages": compacted_skill_messages, + } + return prepared, stats diff --git a/core/executor_docker.py b/core/executor_docker.py index 3008378..6e9af16 100644 --- a/core/executor_docker.py +++ b/core/executor_docker.py @@ -50,6 +50,7 @@ _CANCEL_POLL_INTERVAL_S = 0.2 from .executor import ExecCtx, Executor, ToolResult from .executor_host import HostExecutor from .sandbox import SandboxPool +from tools.base import compact_tool_output # write/edit 走配额 gate;read/glob/grep 不消耗磁盘,放行 @@ -168,15 +169,33 @@ class DockerExecutor(Executor): ) + ["bash", "-c", cmd] result = self._run_subprocess(argv, timeout=timeout, ctx=ctx) self.pool.mark_active(self.user_id) - return result + return self._compact_shell_like_result(result) # ── run_python ─────────────────────────────────────────── def _exec_python(self, args: Dict[str, Any], ctx: ExecCtx) -> ToolResult: code = args.get("code") + script_path = args.get("script_path") + if script_path is not None: + if not isinstance(script_path, str) or not script_path.strip(): + return ToolResult( + content="[Error] bad arguments to run_python: script_path must be non-empty string", + exit_code=2, + ) + timeout = int(args.get("timeout") or 120) + container = self.pool.ensure(self.user_id) + container_script = self._container_script_path(script_path) + argv = self._docker_exec_argv( + container, + extra_env={**_CONTAINER_ENV, "PYTHONIOENCODING": "utf-8"}, + ) + ["python", container_script] + result = self._run_subprocess(argv, timeout=timeout, ctx=ctx) + self.pool.mark_active(self.user_id) + return self._compact_shell_like_result(result) + if not isinstance(code, str): return ToolResult( - content="[Error] bad arguments to run_python: code must be string", + content="[Error] bad arguments to run_python: code or script_path must be provided", exit_code=2, ) timeout = int(args.get("timeout") or 120) @@ -198,13 +217,19 @@ class DockerExecutor(Executor): ) + ["python", container_script] # 删 setsid 同上(_exec_shell 注释) result = self._run_subprocess(argv, timeout=timeout, ctx=ctx) self.pool.mark_active(self.user_id) - return result + return self._compact_shell_like_result(result) finally: try: host_script.unlink() except OSError: pass + def _container_script_path(self, script_path: str) -> str: + p = script_path.replace("\\", "/").strip() + if p.startswith("/"): + return p + return self.container_workdir.rstrip("/") + "/" + p.lstrip("./") + # ── fs tools(read/write/edit/glob/grep)────────────────── def _exec_fs_tool( @@ -369,6 +394,10 @@ class DockerExecutor(Executor): parts.append(f"[exit {proc.returncode}]") return ToolResult(content="\n".join(parts), exit_code=proc.returncode) + def _compact_shell_like_result(self, result: ToolResult) -> ToolResult: + content = compact_tool_output(result.content) + return ToolResult(content=content, exit_code=result.exit_code) + def _check_user_disk_quota(user_id: UUID): """write/edit 前 gate;读 yaml 配额 + 查 user_disk_usage 表。 diff --git a/core/loop.py b/core/loop.py index 63ed8f2..88baa12 100644 --- a/core/loop.py +++ b/core/loop.py @@ -19,6 +19,7 @@ from uuid import UUID import litellm from .capabilities import ModelCapabilities +from .context import prepare_messages_with_stats from .executor import ExecCtx, Executor from .llm import LLM from .session import Session @@ -45,17 +46,51 @@ def _extract_delta_content(chunk: Any) -> Optional[str]: return None -def _extract_usage(usage: Any) -> Tuple[int, int]: - """从 litellm response.usage 提 (prompt_tokens, completion_tokens)。""" +def _usage_to_dict(usage: Any) -> dict: if not usage: - return 0, 0 + return {} if hasattr(usage, "model_dump"): usage = usage.model_dump() elif hasattr(usage, "dict"): usage = usage.dict() if isinstance(usage, dict): - return int(usage.get("prompt_tokens") or 0), int(usage.get("completion_tokens") or 0) - return 0, 0 + return usage + return {} + + +def _extract_usage_details(usage: Any) -> dict: + """从 provider usage 提取统一 token 明细。 + + DeepSeek 直接给 prompt_cache_hit_tokens / prompt_cache_miss_tokens; + OpenAI 风格把 cached tokens 放在 prompt_tokens_details.cached_tokens。 + """ + data = _usage_to_dict(usage) + prompt_details = data.get("prompt_tokens_details") or {} + completion_details = data.get("completion_tokens_details") or {} + if not isinstance(prompt_details, dict): + prompt_details = {} + if not isinstance(completion_details, dict): + completion_details = {} + + cache_hit = ( + data.get("prompt_cache_hit_tokens") + or prompt_details.get("cached_tokens") + or 0 + ) + cache_miss = data.get("prompt_cache_miss_tokens") or 0 + return { + "tokens_in": int(data.get("prompt_tokens") or 0), + "tokens_out": int(data.get("completion_tokens") or 0), + "cache_hit_tokens": int(cache_hit or 0), + "cache_miss_tokens": int(cache_miss or 0), + "reasoning_tokens": int(completion_details.get("reasoning_tokens") or 0), + } + + +def _extract_usage(usage: Any) -> Tuple[int, int]: + """从 litellm response.usage 提 (prompt_tokens, completion_tokens)。""" + details = _extract_usage_details(usage) + return details["tokens_in"], details["tokens_out"] class AgentLoop: @@ -113,7 +148,6 @@ class AgentLoop: self._emit({"type": "cancelled"}) return "[cancelled]" - self._emit({"type": "llm_start"}) start = time.monotonic() response, cancelled_mid_stream = self._stream_llm() elapsed = time.monotonic() - start @@ -127,7 +161,8 @@ class AgentLoop: msg = response.choices[0].message asst_msg_id = self.session.append(msg) - pt, ct = _extract_usage(getattr(response, "usage", None)) + usage_details = _extract_usage_details(getattr(response, "usage", None)) + pt, ct = usage_details["tokens_in"], usage_details["tokens_out"] # 记账(0006):一行 usage_event + 回填 messages.tokens_in/out + model_profile。 # 任何失败都吞掉(litellm cost map miss / DB 异常),不阻塞主 loop; # message 仍在 session/DB 里,后续重启不影响。 @@ -140,6 +175,12 @@ class AgentLoop: model_profile=model_profile, prompt_tokens=pt, completion_tokens=ct, + input_cny_per_mtoken=self.caps.input_cny_per_mtoken, + output_cny_per_mtoken=self.caps.output_cny_per_mtoken, + extra_units={ + k: v for k, v in usage_details.items() + if k not in ("tokens_in", "tokens_out") and v + }, response=response, ) except Exception as e: @@ -148,6 +189,8 @@ class AgentLoop: "type": "llm_end", "prompt_tokens": pt, "completion_tokens": ct, + "cache_hit_tokens": usage_details["cache_hit_tokens"], + "cache_miss_tokens": usage_details["cache_miss_tokens"], "elapsed": elapsed, }) @@ -185,8 +228,13 @@ class AgentLoop: - 中途 cancel → (None, True);已收 chunk 丢弃,内层 generator 在 finally 关闭底层连接 """ chunks: List[Any] = [] + llm_messages, context_stats = prepare_messages_with_stats(self.session.messages) + self._emit({ + "type": "llm_start", + **{f"context_{k}": v for k, v in context_stats.items()}, + }) stream = self.llm.chat_stream( - messages=self.session.messages, + messages=llm_messages, tools=self.executor.schemas(), reasoning_effort=self.caps.default_reasoning_effort or None, ) @@ -213,7 +261,7 @@ class AgentLoop: # 用 litellm 官方 helper 拼回完整 response(包括 tool_calls 拼接 + usage)。 # messages 参数仅用于失败时回填 prompt token 估算,正常路径 stream_options.include_usage # 已让最后一个 chunk 带准确 usage。 - response = litellm.stream_chunk_builder(chunks, messages=self.session.messages) + response = litellm.stream_chunk_builder(chunks, messages=llm_messages) return response, False def _execute_tool_call(self, tc: Any) -> str: diff --git a/core/storage/usage.py b/core/storage/usage.py index 5596cf9..738ac7b 100644 --- a/core/storage/usage.py +++ b/core/storage/usage.py @@ -41,6 +41,25 @@ def _safe_chat_cost_usd(response: Any) -> Decimal: return Decimal("0") +def _fallback_chat_cost_cny( + *, + prompt_tokens: int, + completion_tokens: int, + input_cny_per_mtoken: float, + output_cny_per_mtoken: float, +) -> Decimal: + """按本地模型档案价格兜底计算 chat 成本(CNY)。""" + input_price = Decimal(str(input_cny_per_mtoken or 0)) + output_price = Decimal(str(output_cny_per_mtoken or 0)) + pt = Decimal(str(int(prompt_tokens))) + ct = Decimal(str(int(completion_tokens))) + cost = ( + pt * input_price / Decimal("1000000") + + ct * output_price / Decimal("1000000") + ) + return cost.quantize(Decimal("0.000001")) + + def record_chat_usage( *, task_id: UUID, @@ -49,6 +68,9 @@ def record_chat_usage( model_profile: str, prompt_tokens: int, completion_tokens: int, + input_cny_per_mtoken: float = 0.0, + output_cny_per_mtoken: float = 0.0, + extra_units: Optional[Mapping[str, Any]] = None, response: Any = None, ) -> Decimal: """记一次 chat 调用:写 usage_events 行 + 回填 messages.model_profile/tokens_in/out。 @@ -60,12 +82,26 @@ def record_chat_usage( """ cost_usd = _safe_chat_cost_usd(response) cost_cny = (cost_usd * USD_TO_CNY).quantize(Decimal("0.000001")) + if cost_cny == 0 and (input_cny_per_mtoken or output_cny_per_mtoken): + cost_cny = _fallback_chat_cost_cny( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + input_cny_per_mtoken=input_cny_per_mtoken, + output_cny_per_mtoken=output_cny_per_mtoken, + ) units = { "tokens_in": int(prompt_tokens), "tokens_out": int(completion_tokens), # snapshot 折算系数,便于历史对账(汇率/价格涨跌后仍能还原当时折算逻辑) "usd_to_cny": float(USD_TO_CNY), } + if input_cny_per_mtoken or output_cny_per_mtoken: + units.update({ + "input_cny_per_mtoken": float(input_cny_per_mtoken or 0), + "output_cny_per_mtoken": float(output_cny_per_mtoken or 0), + }) + if extra_units: + units.update(extra_units) with session_scope() as s: s.add(UsageEvent( diff --git a/prompts/system/general_v1.md b/prompts/system/general_v1.md index 80acd3d..f7e79ba 100644 --- a/prompts/system/general_v1.md +++ b/prompts/system/general_v1.md @@ -4,7 +4,7 @@ - `read` / `write` / `edit` —— 文件操作 - `glob` / `grep` —— 文件搜索 - `shell` —— 执行命令(默认 60s 超时) -- `run_python` —— 在子进程里跑 Python (数据处理、生成 .pptx/.docx、画图等) +- `run_python` —— 在子进程里跑 Python (数据处理、生成 .pptx/.docx、画图等)。非短小一次性代码时,先用 `write` 把 `.py` 文件写到 task_dir,再用 `run_python(script_path="...")` 执行;避免大段源码进入对话历史。 - `load_skill` —— 加载某个 skill 的完整指引 ## 媒体生成工具(按需可用,未配置 ARK_API_KEY 时该工具不会出现) diff --git a/tests/test_context_compaction.py b/tests/test_context_compaction.py new file mode 100644 index 0000000..783573e --- /dev/null +++ b/tests/test_context_compaction.py @@ -0,0 +1,100 @@ +import unittest + +from core.context import prepare_messages_for_llm, prepare_messages_with_stats + + +class ContextCompactionTests(unittest.TestCase): + def test_preserves_system_and_recent_messages(self) -> None: + messages = [ + {"role": "system", "content": "rules"}, + {"role": "user", "content": "old"}, + {"role": "tool", "tool_call_id": "old-tool", "name": "shell", "content": "A" * 200}, + {"role": "user", "content": "latest"}, + {"role": "tool", "tool_call_id": "new-tool", "name": "shell", "content": "B" * 200}, + ] + + prepared = prepare_messages_for_llm( + messages, + keep_recent=2, + old_tool_chars=40, + ) + + self.assertEqual(prepared[0], messages[0]) + self.assertEqual(prepared[-2], messages[-2]) + self.assertEqual(prepared[-1], messages[-1]) + + def test_compacts_old_tool_content_without_breaking_protocol_fields(self) -> None: + messages = [ + {"role": "system", "content": "rules"}, + {"role": "assistant", "tool_calls": [{"id": "tc1"}], "content": None}, + {"role": "tool", "tool_call_id": "tc1", "name": "run_python", "content": "A" * 200}, + {"role": "user", "content": "continue"}, + ] + + prepared = prepare_messages_for_llm( + messages, + keep_recent=1, + old_tool_chars=40, + ) + tool_msg = prepared[2] + + self.assertEqual(tool_msg["role"], "tool") + self.assertEqual(tool_msg["tool_call_id"], "tc1") + self.assertEqual(tool_msg["name"], "run_python") + self.assertIn("[compacted old tool result", tool_msg["content"]) + self.assertLess(len(tool_msg["content"]), 120) + + def test_short_old_tool_content_is_left_unchanged(self) -> None: + messages = [ + {"role": "system", "content": "rules"}, + {"role": "tool", "tool_call_id": "tc1", "name": "grep", "content": "short"}, + {"role": "user", "content": "next"}, + ] + + prepared = prepare_messages_for_llm( + messages, + keep_recent=1, + old_tool_chars=40, + ) + + self.assertEqual(prepared[1]["content"], "short") + + def test_compacts_old_load_skill_result_to_marker(self) -> None: + messages = [ + {"role": "system", "content": "rules"}, + { + "role": "tool", + "tool_call_id": "tc1", + "name": "load_skill", + "content": "[skill=proposal, dir=/sandbox/skills/proposal]\n" + "A" * 5000, + }, + {"role": "user", "content": "next"}, + ] + + prepared = prepare_messages_for_llm(messages, keep_recent=1) + + self.assertIn("loaded skill: proposal", prepared[1]["content"]) + self.assertIn("dir=/sandbox/skills/proposal", prepared[1]["content"]) + self.assertNotIn("A" * 100, prepared[1]["content"]) + + def test_prepare_messages_reports_compaction_stats(self) -> None: + messages = [ + {"role": "system", "content": "rules"}, + {"role": "tool", "tool_call_id": "tc1", "name": "shell", "content": "A" * 200}, + {"role": "user", "content": "next"}, + ] + + prepared, stats = prepare_messages_with_stats( + messages, + keep_recent=1, + old_tool_chars=40, + ) + + self.assertLess(stats["sent_chars"], stats["original_chars"]) + self.assertEqual(stats["compacted_tool_messages"], 1) + self.assertGreater(stats["saved_chars"], 0) + self.assertEqual(len(prepared), len(messages)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_executor_docker.py b/tests/test_executor_docker.py index 080e255..27e9a46 100644 --- a/tests/test_executor_docker.py +++ b/tests/test_executor_docker.py @@ -358,6 +358,32 @@ class TestRunPython(unittest.TestCase): leftover = list(tmp_subroot.glob("*.py")) if tmp_subroot.exists() else [] self.assertEqual(leftover, [], f"tmp .py not cleaned up: {leftover}") + def test_run_python_script_path_uses_existing_workspace_file(self): + executor, pool, _ = make_executor() + ctx = make_ctx(executor) + + proc = MagicMock() + proc.communicate.return_value = ("ok\n", "") + proc.returncode = 0 + + captured_argv = [] + + def _popen(argv, **kwargs): + captured_argv.append(argv) + return proc + + with patch("core.executor_docker.subprocess.Popen", side_effect=_popen): + result = executor.call_tool( + "run_python", {"script_path": "scripts/job.py"}, ctx + ) + + self.assertIn("[stdout]\nok", result.content) + argv = captured_argv[0] + self.assertEqual(argv[-2], "python") + self.assertEqual(argv[-1], "/workspace/demo/scripts/job.py") + tmp_subroot = executor.user_root / TMP_SUBDIR / str(ctx.task_id) + self.assertFalse(tmp_subroot.exists()) + def test_run_python_bad_code_type(self): executor, _, _ = make_executor() ctx = make_ctx(executor) diff --git a/tests/test_run_python_script_path.py b/tests/test_run_python_script_path.py new file mode 100644 index 0000000..285099d --- /dev/null +++ b/tests/test_run_python_script_path.py @@ -0,0 +1,27 @@ +import tempfile +import unittest +from pathlib import Path + +from tools.run_python import RunPythonTool + + +class RunPythonScriptPathTests(unittest.TestCase): + def test_executes_existing_script_path(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + script = Path(tmp) / "hello.py" + script.write_text("print('hello from file')\n", encoding="utf-8") + + out = RunPythonTool(base_dir=tmp).execute(script_path="hello.py") + + self.assertIn("[stdout]\nhello from file", out) + self.assertIn("[exit 0]", out) + + def test_requires_code_or_script_path(self) -> None: + out = RunPythonTool().execute() + + self.assertIn("[Error]", out) + self.assertIn("code or script_path", out) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_static_vendor.py b/tests/test_static_vendor.py index 0bc9057..8319e3f 100644 --- a/tests/test_static_vendor.py +++ b/tests/test_static_vendor.py @@ -27,6 +27,13 @@ class StaticVendorTests(unittest.TestCase): self.assertTrue(path.exists(), f"missing vendored asset: {path}") self.assertGreater(path.stat().st_size, 0, f"empty vendored asset: {path}") + def test_dev_html_surfaces_context_and_cache_stats(self) -> None: + html = DEV_HTML.read_text(encoding="utf-8") + + self.assertIn("formatContextStats", html) + self.assertIn("context_original_chars", html) + self.assertIn("cache_hit_tokens", html) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_tool_output_compaction.py b/tests/test_tool_output_compaction.py new file mode 100644 index 0000000..1602ebd --- /dev/null +++ b/tests/test_tool_output_compaction.py @@ -0,0 +1,37 @@ +import unittest +import tempfile + +from tools.base import compact_tool_output +from tools.run_python import RunPythonTool + + +class ToolOutputCompactionTests(unittest.TestCase): + def test_short_output_is_unchanged(self) -> None: + text = "[stdout]\nok\n[exit 0]" + + self.assertEqual(compact_tool_output(text, max_chars=100), text) + + def test_long_output_keeps_head_tail_and_reports_removed_chars(self) -> None: + text = "A" * 60 + "B" * 60 + "C" * 60 + + compacted = compact_tool_output(text, max_chars=80, head_chars=30, tail_chars=30) + + self.assertIn("A" * 30, compacted) + self.assertIn("C" * 30, compacted) + self.assertIn("[... truncated,", compacted) + self.assertNotIn("B" * 60, compacted) + self.assertLess(len(compacted), len(text)) + + def test_run_python_does_not_write_tool_logs(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tool = RunPythonTool(base_dir=tmp) + + out = tool.execute("print('A' * 9000)") + + self.assertIn("[... truncated,", out) + self.assertNotIn("full output saved", out) + self.assertFalse((tool.base_dir / ".tool_logs").exists()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_usage_accounting.py b/tests/test_usage_accounting.py new file mode 100644 index 0000000..8ba8ab1 --- /dev/null +++ b/tests/test_usage_accounting.py @@ -0,0 +1,38 @@ +from decimal import Decimal +import unittest + +from core.loop import _extract_usage_details +from core.storage.usage import _fallback_chat_cost_cny + + +class UsageAccountingTests(unittest.TestCase): + def test_extract_usage_details_includes_cache_tokens(self) -> None: + usage = { + "prompt_tokens": 1200, + "completion_tokens": 80, + "prompt_cache_hit_tokens": 900, + "prompt_cache_miss_tokens": 300, + "completion_tokens_details": {"reasoning_tokens": 12}, + } + + details = _extract_usage_details(usage) + + self.assertEqual(details["tokens_in"], 1200) + self.assertEqual(details["tokens_out"], 80) + self.assertEqual(details["cache_hit_tokens"], 900) + self.assertEqual(details["cache_miss_tokens"], 300) + self.assertEqual(details["reasoning_tokens"], 12) + + def test_fallback_chat_cost_uses_price_snapshots(self) -> None: + cost = _fallback_chat_cost_cny( + prompt_tokens=1_000_000, + completion_tokens=500_000, + input_cny_per_mtoken=1.0, + output_cny_per_mtoken=10.0, + ) + + self.assertEqual(cost, Decimal("6.000000")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/base.py b/tools/base.py index 4d327b2..f668ba5 100644 --- a/tools/base.py +++ b/tools/base.py @@ -6,6 +6,26 @@ from pathlib import Path from typing import Optional +def compact_tool_output( + text: str, + *, + max_chars: int = 8_000, + head_chars: int = 4_000, + tail_chars: int = 2_000, +) -> str: + """压缩长工具输出,保留头尾和截断说明。""" + if len(text) <= max_chars: + return text + head_chars = max(0, min(head_chars, max_chars)) + tail_chars = max(0, min(tail_chars, max_chars - head_chars)) + removed = len(text) - head_chars - tail_chars + return ( + text[:head_chars] + + f"\n[... truncated, {removed} chars omitted ...]\n" + + (text[-tail_chars:] if tail_chars else "") + ) + + class Tool(ABC): name: str = "" description: str = "" diff --git a/tools/run_python.py b/tools/run_python.py index c94b2ff..5ad564e 100644 --- a/tools/run_python.py +++ b/tools/run_python.py @@ -14,7 +14,7 @@ import sys import tempfile from pathlib import Path -from .base import Tool +from .base import Tool, compact_tool_output _SENSITIVE_PATTERNS = ("API_KEY", "TOKEN", "SECRET", "PASSWORD", "PRIVATE_KEY") @@ -23,7 +23,9 @@ class RunPythonTool(Tool): name = "run_python" description = ( "Execute Python code in a subprocess. Returns stdout/stderr/exit_code.\n" - "Use for: data analysis, batch file ops, document generation (.pptx/.docx), " + "Use script_path for non-trivial code: write a .py file first, then execute it " + "so the full source stays in files instead of conversation history.\n" + "Use inline code only for short snippets. Good for: data analysis, batch file ops, document generation (.pptx/.docx), " "matplotlib charts, or any task where Python is more natural than chaining tools.\n" "Working directory is the agent's base dir. Files you create persist there.\n" "Available libs (install with shell pip if missing): " @@ -34,20 +36,38 @@ class RunPythonTool(Tool): "properties": { "code": { "type": "string", - "description": "Python source. Anything written to stdout is returned.", + "description": "Short Python source. For longer code, write a .py file and pass script_path.", + }, + "script_path": { + "type": "string", + "description": "Path to an existing .py file to execute. Prefer this for non-trivial code.", }, "timeout": {"type": "integer", "default": 120, "description": "Seconds before kill"}, }, - "required": ["code"], + "required": [], } - def execute(self, code: str, timeout: int = 120) -> str: - # 写到临时文件,避免 -c 转义问题 - with tempfile.NamedTemporaryFile( - suffix=".py", mode="w", delete=False, encoding="utf-8" - ) as f: - f.write(code) - script_path = f.name + def execute( + self, + code: str | None = None, + script_path: str | None = None, + timeout: int = 120, + ) -> str: + cleanup_script = False + if script_path: + script = self._resolve(script_path) + if not script.is_file(): + return f"[Error] script_path not found: {self._display(script)}" + elif isinstance(code, str): + # 写到临时文件,避免 -c 转义问题 + with tempfile.NamedTemporaryFile( + suffix=".py", mode="w", delete=False, encoding="utf-8" + ) as f: + f.write(code) + script = Path(f.name) + cleanup_script = True + else: + return "[Error] run_python requires code or script_path" try: env = os.environ.copy() @@ -59,7 +79,7 @@ class RunPythonTool(Tool): env["PYTHONPATH"] = str(self.base_dir) + os.pathsep + env.get("PYTHONPATH", "") result = subprocess.run( - [sys.executable, script_path], + [sys.executable, str(script)], cwd=str(self.base_dir), capture_output=True, timeout=timeout, @@ -71,10 +91,11 @@ class RunPythonTool(Tool): except subprocess.TimeoutExpired: return f"[Error] python script timed out after {timeout}s" finally: - try: - Path(script_path).unlink() - except OSError: - pass + if cleanup_script: + try: + script.unlink() + except OSError: + pass parts = [] if result.stdout: @@ -82,4 +103,4 @@ class RunPythonTool(Tool): if result.stderr: parts.append(f"[stderr]\n{result.stderr.rstrip()}") parts.append(f"[exit {result.returncode}]") - return "\n".join(parts) + return compact_tool_output("\n".join(parts)) diff --git a/tools/shell.py b/tools/shell.py index 848a4ec..ba401d3 100644 --- a/tools/shell.py +++ b/tools/shell.py @@ -7,7 +7,7 @@ import shlex import subprocess import sys -from .base import Tool +from .base import Tool, compact_tool_output class ShellTool(Tool): @@ -91,4 +91,4 @@ class ShellTool(Tool): if result.stderr: parts.append(f"[stderr]\n{result.stderr.rstrip()}") parts.append(f"[exit {result.returncode}]") - return "\n".join(parts) + return compact_tool_output("\n".join(parts)) diff --git a/web/static/dev.html b/web/static/dev.html index 7fd4a25..c80bfef 100644 --- a/web/static/dev.html +++ b/web/static/dev.html @@ -1218,6 +1218,35 @@ function fmtTokens(n) { return (n / 1000000).toFixed(1) + "M"; } +function formatContextStats(d) { + d = d || {}; + const orig = d.context_original_chars || 0; + const sent = d.context_sent_chars || 0; + const saved = d.context_saved_chars || 0; + const tools = d.context_compacted_tool_messages || 0; + const skills = d.context_compacted_skill_messages || 0; + if (!orig) return "准备中…"; + const bits = [`ctx ${fmtTokens(orig)}→${fmtTokens(sent)} chars`]; + if (saved > 0) bits.push(`省 ${fmtTokens(saved)}`); + if (tools > 0) bits.push(`压缩工具 ${tools}`); + if (skills > 0) bits.push(`skill ${skills}`); + return bits.join(" · "); +} + +function formatUsageStats(d, contextStats) { + d = d || {}; + const pt = d.prompt_tokens || 0; + const ct = d.completion_tokens || 0; + const hit = d.cache_hit_tokens || 0; + const miss = d.cache_miss_tokens || 0; + const bits = [`${fmtTokens(pt)}+${fmtTokens(ct)} tok`]; + if (hit || miss) bits.push(`cache ${fmtTokens(hit)}/${fmtTokens(miss)}`); + if (contextStats && contextStats.context_saved_chars) { + bits.push(`ctx省 ${fmtTokens(contextStats.context_saved_chars)}`); + } + return bits.join(" · "); +} + // 相对时间(任务列表用):刚刚 / N 分钟前 / N 小时前 / 昨天 HH:MM / MM-DD / YYYY-MM-DD function fmtTimeAgo(iso) { if (!iso) return ""; @@ -2422,7 +2451,7 @@ async function fetchSse(url, run) { state.liveRuns.delete(ctx.taskId); state.streaming = state.liveRuns.size > 0; if (state.taskId === ctx.taskId) { - hint.textContent = "就绪"; + hint.textContent = ctx.lastUsageHint || "就绪"; setActionMode("idle"); } } @@ -2488,7 +2517,13 @@ function handleSseEvent(ev, asstCard, ctx) { const visible = state.taskId === ctx.taskId; // 用户拖到上面看历史时不抢滚动,只在贴底时跟流 const nearBottom = visible && (stream.scrollHeight - stream.scrollTop - stream.clientHeight < 120); - if (t === "text" && ev.data && ev.data.delta) { + if (t === "llm_start") { + ctx.contextStats = ev.data || {}; + setRunHint(ctx, formatContextStats(ctx.contextStats)); + } else if (t === "llm_end") { + ctx.lastUsageHint = formatUsageStats(ev.data || {}, ctx.contextStats); + setRunHint(ctx, ctx.lastUsageHint); + } else if (t === "text" && ev.data && ev.data.delta) { ctx.acc += ev.data.delta; // rAF 节流:每帧最多 1 次重渲染,流式 token 高频时不抖 if (!ctx.pending) {