zcbot/core/agent_builder.py

722 lines
36 KiB
Python

"""装配入口: 读 config → 加载 capabilities/skills → 构造 LLM/tools/session/loop。
存储布局(§7.0 / §7.4):本地 + SaaS 共用 `workspace/` 根,只差 user_id:
PG tasks / messages ← 元数据 + 消息
workspace/users/<user_id>/<working_dir>/ ← 工作目录(用户起名,可多 task 共享)
workspace/users/<user_id>/.memory/{core.md, extended/} ← per-user 记忆(dotfile 隔离)
所有入口都走 web `/v1` + JWT(user_id = sub);dev SPA 走邮箱密码登录
(`users.email/password_hash`,bcrypt)、platform 服务端走 platform_key 登录。task_id / user_id 全 UUID;
state.json 已删除(元数据全在 PG)。
**新建 task 必须给 `name`**(任务显示名,DB 列 NOT NULL);**`working_dir` 可选**
(留空 → 用 name 作目录名;同 working_dir 多 task 自动共享 §7.1)。name 和 working_dir
都过同一份 `validate_task_name` 校验(简单名,不含 `/\\..`、不以 `.` 起头)。
`_cleanup_if_empty` 不 rmtree FS —— 同 working_dir 跨 task 复用,空 task 只删 DB 行。
"""
from __future__ import annotations
import os
from datetime import datetime
from pathlib import Path
from typing import Callable, Optional, Tuple
from uuid import UUID, uuid4
import yaml
from rich.console import Console
from core.capabilities import ModelCapabilities
from core.executor_docker import DockerExecutor
from core.executor_host import HostExecutor
from core.llm import LLM
from core.loop import AgentLoop
from core.memory import memory_block
from core.paths import ROOT, from_db_path, to_db_path
from core.session import Session
from core.sinks import ConsoleEventSink
from core.skills import SkillRegistry
from core.storage import check_no_subtask
from core.task import TaskState
from tools.fs import EditTool, GlobTool, GrepTool, ReadTool, WriteTool
from tools.documents import DocumentDownloadTool, DocumentListKbTool, DocumentSearchTool
from tools.materials_project import (
MaterialsProjectGetEntriesTool,
MaterialsProjectGetStructureTool,
MaterialsProjectSearchSummaryTool,
)
from tools.look_at_image import LookAtImageTool
from tools.run_python import RunPythonTool
from tools.seedance import SeedanceTool
from tools.seedream import SeedreamTool
from tools.shell import ShellTool
from tools.skill_authoring import ForkSkillTool, SaveSkillTool
from tools.skill_tool import LoadSkillTool
from tools.task_progress import TaskProgressTool
from tools.ask_user import AskUserTool
from tools.web_fetch import WebFetchTool
from tools.web_search import WebSearchTool
from tools.schedule import (
ScheduleCancelTool, ScheduleCreateTool, ScheduleListTool, ScheduleUpdateTool,
)
from tools.send_email import SendEmailTool, smtp_configured
from tools.wechat_bot import WechatPushTool, wechat_push_available
from core.ark_client import ArkConfig
from core.bocha_client import BochaConfig
# 媒体工具(seedream / seedance)指引:仅当本 run 真的挂了媒体工具(ARK_API_KEY 存在,
# ArkConfig.load() 非 None)才追加进 system prompt —— 没 key 的用户不会看到永远报错的工具,
# 也不该背这段红线。文案与 base 模板里其余工具表平级,放在 _build_system_prompt 里按需拼。
_MEDIA_TOOLS_BLOCK = """\
## 媒体工具(seedream 图 / seedance 视频 / look_at_image 看图)
- `look_at_image` —— 看图 / 读图(豆包 Seed 2.0 Lite 视觉)。**你(主模型)是纯文本看不见图,要""图就调它**:OCR 文字、描述画面、读图表/表格/示意图、识别物体。每次很便宜(按 token,通常 < ¥0.01)。
- **何时调**:用户消息里出现 `[用户上传的参考图] <路径>` 且需要据图内容回答(问"这图里写了啥 / 是什么 / 表格数据多少");或要基于 task 内某张图(`figures/xxx.png`)的**实际内容**做事(不是改图,改图走 seedream)。传 `image=<路径>` + 可选 `question`。
- **何时不调**:用户只是要改图(走 seedream i2i)/ 只要文件名不关心内容 / 图是你自己刚生成的且 prompt 已知(无需再读)。别对同一张图无意义反复看(每次都烧 token)。
- `seedream` —— 豆包图像生成 / 改图。产物自动落 `<task_dir>/figures/`。每次 **¥0.22**(联网 `search=true` 加 ¥0.05)。
- **文生图**(不传 `reference_images`):从零按 prompt 画。**改图 i2i**(传 `reference_images=["figures/xxx.png"]`):在已有图上做像素级修改。**用户对刚生成 / 上传的图说"改成 X / 换个颜色 / 去掉某处" → 必须走改图(reference_images 指那张图),绝不重新文生图**(重画 = 完全不同的图,丢原构图)。v1 改图仅支持单张参考。
- **调用前必须先 `load_skill('imagegen')`** —— skill 里有「何时该用 / 该不该用 mermaid 替代 / 用户描述模糊度诊断 / 一次性追问范式 / prompt 装配 / 改图(i2i)范式 / 失败解药」全套引导。**不要拿用户原话直接当 prompt 调 tool** —— 容易烧 ¥0.22 在错的方向上。
- 兜底硬约束(即使没 load skill 也守):用户没主动要图就别装饰性生成;同一目的不满意**不要连发**,先口头校准 prompt 再调。用户消息里出现 `[用户上传的参考图] <路径>` = 用户贴了图,要看图 / 改图时用那个路径。
- `seedance` —— 豆包视频生成(Seedance 2.0 Fast)。异步任务,**等 30-90s 出片**;产物自动落 `<task_dir>/videos/`。每次 **¥1.86 起**(480p 4s)~ **¥12+**(720p 15s),比图贵 10 倍以上。触发词:视频 / 动画 / 动起来 / 做个 video / 镜头 / 短片 / 演示视频 / 动效。
- **调用前必须先 `load_skill('videogen')`** —— skill 里有「6 维诊断(含运动维必填)/ seedream/mermaid 反向选型 / prompt 装配 / 参数取舍(时长/分辨率/比例直接决定钱)/ 失败解药」全套引导。视频比图贵 10 倍且 90s 等待,绝对不要拿用户原话当 prompt 直接调。
- 兜底硬约束:用户没主动要视频就别装饰性生成(比生图更严重的红线);同一目的不满意**绝不连发**(1 次错 = ¥4+60s,连发 2 次 = ¥8+2min);phase 1 仅文生视频,**不支持** image-to-video / video-to-video。"""
# 运行环境段(按 backend 注入,general_v1.md 的「平台」段指向这里)。环境事实(在哪 /
# 能否联网 / 装了啥)是全局不变量,放 system 比塞进某个 skill 高杠杆 —— 一句话省掉一整类
# 试错(外网试错 / 平台命令试错)。docker = 线上真实形态(Ubuntu 容器,无外网);host =
# 本地 dogfood(Windows),给一行最小提示免 general_v1 里那句指向落空。
_CONTAINER_ENV_BLOCK = """\
## 运行环境(容器)
你的 `shell` / `run_python` / 文件工具都在一个 **Linux(Ubuntu)容器**里执行 —— 是 **bash 不是 cmd**:unix 命令 / 管道 / 重定向正常用,`mkdir -p`、`&&`、`2>&1` 都行。
- **渲 mermaid 图一律走本地 `mmdc`**:`mmdc -i 图.md -o 图.png`(要矢量就 `-o 图.svg`;chromium 已配好,**不用加 `--no-sandbox`、不用自己写 puppeteer 配置**)。⛔ **别去调 `mermaid.ink` 等在线渲图服务** —— 境外、易被墙 / 不稳,实测有对话在上面反复试编码、改压缩,白烧上百 k token;本地 mmdc 一条命令就出图。
- 中文字体已装(matplotlib / mermaid 出图不乱码);常用 Python 库已预装;`/tmp` 可写、其余 rootfs 只读。"""
_HOST_ENV_BLOCK = """\
## 运行环境(本地 host)
`shell` 走的是 **Windows cmd.exe**(非 bash):避免 unix-only flag,`mkdir -p` 不识别 → 用 `run_python` 的 `os.makedirs(..., exist_ok=True)` 建目录;复杂管道/重定向用 run_python 更稳。"""
def load_config() -> dict:
return yaml.safe_load((ROOT / "config" / "agent.yaml").read_text(encoding="utf-8")) or {}
def _resolve_executor(
host: HostExecutor,
user_id: UUID,
user_root_path: Path,
working_dir_path: Path,
):
"""选 Executor backend(§7.5 #5)。
env `ZCBOT_SANDBOX_BACKEND=docker` 时构造 DockerExecutor;其他值 / 缺失 → host。
docker 路径要 lifespan 已 `core.sandbox.init_pool` 过(否则 pool 为 None → 退 host
+ 启动日志由 web 入口在 init 时打印,这里不重复 warn)。
"""
import os
if os.getenv("ZCBOT_SANDBOX_BACKEND", "host").lower() != "docker":
return host
from core.sandbox import get_pool
pool = get_pool()
if pool is None:
# lifespan 没 init 成功 —— 让上层早死比静默退化更安全(避免外部用户开放时
# 误以为在沙盒里跑实则 host)。Web 入口启动会 fail-fast,这里再补一条提醒。
raise RuntimeError(
"ZCBOT_SANDBOX_BACKEND=docker but sandbox pool not initialized; "
"check web lifespan init_pool() / docker daemon availability"
)
return DockerExecutor(
host=host,
pool=pool,
user_id=user_id,
user_root=user_root_path,
working_dir=working_dir_path,
)
def resolve_workspace(workspace: Optional[str], cfg: Optional[dict] = None) -> Path:
"""workspace 根解析,优先级:显式 arg > cfg `workspace_dir` > 默 "workspace"(均 `ROOT/<值>`)。
workspace 必须落在 ROOT 子树内 —— DB 的 `working_dir` 经 `core/paths.py` 锚定 ROOT 存
相对串(`to_db_path` 对 ROOT 外路径直接 raise)。要把重写入区落独立数据盘,用 **bind mount**
把 `/data/...` 接到 `ROOT/workspace`(逻辑路径不变,`.resolve()` 不展开 bind),
不要指向 ROOT 外的绝对路径(详 RUN.md「workspace 落独立数据盘」段)。
"""
cfg = cfg or load_config()
p = Path(workspace) if workspace else ROOT / cfg.get("workspace_dir", "workspace")
p.mkdir(parents=True, exist_ok=True)
return p
def user_root(workspace_dir: Path, user_id: UUID) -> Path:
"""per-user 子树根:`<workspace>/users/<user_id>/`。working_dir / `.memory/` / `.skills/` 都在下面。"""
d = workspace_dir / "users" / str(user_id)
d.mkdir(parents=True, exist_ok=True)
return d
def build_skill_registry(
cfg: dict, workspace_dir: Path, user_id: UUID, *, docker: bool
) -> "SkillRegistry":
"""装两来源 registry:内置 skill(`ROOT/skills`,只读)+ 用户 skill(`user_root/.skills`)。
用户来源排在内置之后 → 同名时 user wins(详 core/skills.py)。container_root 仅 docker
用:内置 bind 到 `/sandbox/skills`,用户 `.skills` 在 user_root 内、随 user_root bind 到
`/workspace`,故为 `/workspace/.skills`。host backend 传 None。
"""
from core.skills import SkillSource
builtin = SkillSource(
ROOT / cfg.get("skills_dir", "skills"),
"builtin",
"/sandbox/skills" if docker else None,
)
user = SkillSource(
user_root(workspace_dir, user_id) / ".skills",
"user",
"/workspace/.skills" if docker else None,
)
return SkillRegistry([builtin, user])
class InvalidTaskName(ValueError):
"""task name / working_dir 不合法(空 / 含分隔符 / dotfile 起头 / 超长)。"""
def validate_task_name(name: str) -> str:
"""返回 stripped name;非法抛 InvalidTaskName。
name 和 working_dir 共用一份规则:非空 / 不含 `/\\` 和 NUL / 不以 `.` 起头
(挡 `.memory` 等系统区)/ ≤ 255 字符。允许 CJK 与其他 Unicode 字符。
"""
n = (name or "").strip()
if not n:
raise InvalidTaskName("name 不能为空")
if len(n) > 255:
raise InvalidTaskName(f"name 超长(>255 字符): {n[:40]!r}...")
if any(c in n for c in ("/", "\\", "\x00")):
raise InvalidTaskName(f"name 不能含 `/` `\\` 或 NUL: {n!r}")
if n.startswith("."):
raise InvalidTaskName(
f"name 不能以 `.` 起头(保留给 .memory 等系统区): {n!r}"
)
return n
def working_dir_from_name(workspace_dir: Path, user_id: UUID, dir_name: str) -> Path:
"""`<workspace>/users/<user_id>/<dir_name>` 绝对路径。
入参 dir_name 由 `validate_task_name` 在入口校验过;本函数只拼路径,不 mkdir
(目录创建放在 task 创建入口 build_agent / web `/v1/tasks`,函数保持纯)。
"""
return user_root(workspace_dir, user_id) / dir_name
def resolve_task_id(
workspace_dir: Path,
task_id_arg: Optional[str],
resume: bool,
user_id: UUID,
working_dir_name: Optional[str] = None,
) -> Tuple[UUID, Path]:
"""返回 (task_id, working_dir 绝对路径)。
新建:`working_dir_name` 必填(调用方应已 fallback 到 name + 校验过),
工作目录 = `<workspace>/users/<uid>/<working_dir_name>/`。
Resume:`task_id_arg` 是完整 UUID 字符串(web 路由进来的总是 UUID),
working_dir 从 PG `tasks.working_dir` 读还原;`working_dir_name` 在 resume 时被忽略。
"""
if resume:
from sqlalchemy import select
from core.storage import session_scope
from core.storage.models import Task
tid = UUID(task_id_arg) if task_id_arg else None
if tid is None:
raise ValueError("resume 必须指定 task_id")
with session_scope() as s:
db_dir = s.execute(
select(Task.working_dir).where(Task.task_id == tid)
).scalar_one_or_none() or ""
if not db_dir:
raise ValueError(
f"task {tid} has empty working_dir in DB — should not happen "
"(new tasks require name + working_dir; legacy empty data was wiped)"
)
# DB 存的是 db 形态(相对 ROOT 或绝对),走 from_db_path 还原绝对
return tid, from_db_path(db_dir)
if not working_dir_name:
raise InvalidTaskName("new task 必须指定 working_dir(或留空 fallback 用 name)")
safe = validate_task_name(working_dir_name)
return uuid4(), working_dir_from_name(workspace_dir, user_id, safe)
def _build_system_prompt(
cfg: dict,
skills: SkillRegistry,
workspace_dir: Path,
tool_base: Path,
working_dir: Path,
user_id: UUID,
task_id: UUID,
task_name: str,
task_skill: str = "",
media_enabled: bool = False,
) -> str:
"""拼 system prompt: 模板 + skill 列表 + memory + 工作目录段 + task 上下文 + 命名约定。
new task 和 resume task 都走这里,memory 演化即时生效。memory 按 user_id 隔离。
task_short_id (task_id.hex 前 8 位) 作「宪法」文件主锚 —— task.name 可改,
task_id 永不变,glob 按 short_id 找文件,免 cascade rename。
task_name 仍写进文件名作"建时元数据 / 人类可读说明",改名后文件名里的旧 name
不强求同步(由 short_id 兜底定位)。
today 当场算,落 prompt 给 LLM 直接拼路径(避免 LLM 不知道当前日期)。
"""
prompt = (ROOT / cfg["system_prompt"]).read_text(encoding="utf-8")
# docker backend 下 shell/run_python/fs 工具全在容器里跑,容器把
# `<workspace>/users/<uid>` bind 到 `/workspace`、`--workdir /workspace/<wd>`
# (executor_docker.py:99-100)。此时 prompt 给 agent 的所有可写/可读绝对路径
# (含 .memory/ 写入锚点)都必须是**容器路径**,否则 LLM 拿着宿主绝对路径在沙盒里
# find 不到任何东西。host backend 不变,直接用宿主绝对路径。
is_docker = os.getenv("ZCBOT_SANDBOX_BACKEND", "host").lower() == "docker"
# 运行环境段紧跟模板(平台/网络是基础事实,放前面);general_v1 的「平台」段指向这里。
prompt += _CONTAINER_ENV_BLOCK if is_docker else _HOST_ENV_BLOCK
if skills.skills:
prompt += f"\n\n## 可用 skill (用 load_skill 加载完整指引)\n{skills.discovery_block()}"
# .memory/ 在 agent 视角下的可写路径:docker 给容器路径,host 给宿主绝对路径。
mem_dir_display = "/workspace/.memory" if is_docker else str(
user_root(workspace_dir, user_id) / ".memory"
)
prompt += memory_block(workspace_dir, user_id, mem_dir_display)
if media_enabled:
prompt += "\n\n" + _MEDIA_TOOLS_BLOCK
wd_abs = working_dir.resolve()
if is_docker:
try:
wd_rel = wd_abs.relative_to(user_root(workspace_dir, user_id))
wd_path = "/workspace/" + wd_rel.as_posix()
except ValueError:
# working_dir 不在 user_root 下 —— 防御性兜底,正常路径不会到这里
wd_path = "/workspace"
else:
wd_path = str(wd_abs)
today = datetime.now().strftime("%Y-%m-%d")
tname = task_name or "<未指定>"
short_id = task_id.hex[:8]
skill_line = (
f"- **task 预选 skill**: `{task_skill}` — 用户创建时声明的主 skill\n"
if task_skill else ""
)
# docker 下容器 cwd 恒等于 task_dir(fs 工具 base_dir = --workdir = task_dir),
# 不存在"另一个只读宿主 cwd",留 cwd 行只会误导 → 去掉,改提一句工具起步点。
if is_docker:
loc_lines = (
f"- **task_dir(工作目录,所有产物写这里;shell / run_python / 读写工具"
f"均从此目录起步,相对路径都相对它)**: `{wd_path}`\n"
)
else:
loc_lines = (
f"- cwd(用户启动时所在目录,只读用): `{tool_base}`\n"
f"- **task_dir(所有产物写到这里)**: `{wd_path}`\n"
)
prompt += (
f"\n\n## 工作目录与 task 上下文\n"
f"{loc_lines}"
f"- **task_short_id**(永不变,「宪法」文件主锚): `{short_id}`\n"
f"- **task_name**(可变,写进文件名作人类可读说明): `{tname}`\n"
f"- **today**(当前日期,用于「宪法」文件命名): `{today}`\n"
f"{skill_line}"
f"\n"
f"SKILL 文档里出现的 `<task_dir>` 占位符,一律指上面这个绝对路径。"
f"普通产物(sections / slides / 终稿 .docx/.pptx)按 SKILL 文档落路径;"
f"「宪法」性文件(spec 等)按下面《task 级「宪法」文件命名约定》拼路径。\n"
f"⛔ 不要把产物写到 cwd / `skills/` / repo 根 —— 只写到 task_dir。\n"
f"\n## task 级「宪法」文件命名约定(跨 skill 通用)\n"
f"跟 task 1:1 绑定、后续步骤会**反复 read** 的「宪法」性文件(如 proposal/ppt 的 "
f"spec、outline),统一落 task_dir 根、按此格式命名:\n\n"
f" <YYYY-MM-DD>-<task_short_id>-<task_name>.<base>.md\n\n"
f"用上面注入的值:`<YYYY-MM-DD>`=today=`{today}`、`<task_short_id>`=`{short_id}`"
f"(永不变主锚)、`<task_name>`=`{tname}`(原样用 含 CJK/空格);`<base>` 由 skill "
f"定义(如 `spec`)。取 current:按 short_id glob `{wd_path}/*-{short_id}-*.<base>.md`"
f" → 文件名字典序取最大者(= 最新日期,改过 task_name 旧文件仍能定位);重定调时以 "
f"today 为前缀写新版、**旧版留作历史快照不要覆盖**(同日多版加 `-v2`/`-v3`)。"
f"取用 / 重定调的具体时机见对应 skill。"
)
return prompt
def build_agent(
*,
user_id: UUID,
model_name: Optional[str] = None,
workspace: Optional[str] = None,
console: Optional[Console] = None,
session_id: Optional[str] = None,
resume: bool = False,
tool_base: Optional[Path] = None,
skill: str = "",
description: str = "",
name: Optional[str] = None,
working_dir: Optional[str] = None,
image_variant: str = "",
video_variant: str = "",
cancel_check: Optional[Callable[[], bool]] = None,
scheduled_run: bool = False,
) -> Tuple[AgentLoop, Session, str, TaskState, Path]:
"""返回 (agent, session, task_id_str, task_state, working_dir_path)。
新建 task:
- `name` 必填(任务显示名,DB 列 NOT NULL,走 validate_task_name)
- `working_dir` 可选(留空 → fallback 用 name 作目录名;非空也走 validate_task_name)
Resume:name / working_dir 都忽略(从 DB 读)。
`user_id` 必填,决定 working_dir 根、memory 子树、no-subtask 校验作用域。
web 入口从 JWT 拿到后透传;不允许无 user 的调用路径。
"""
cfg = load_config()
uid = user_id
# model 选择优先级:caller 传参 > resume 时 task.model_profile > cfg["default_model"]。
# caller 传参为新建 task 时 web POST /v1/tasks 接收的 model_profile 字段;resume
# 不传时读 tasks 表(由顶栏下拉切换 PATCH 维护)。整体满足 grill A 粒度:下条 send 生效。
model = model_name
if model is None and resume and session_id:
from sqlalchemy import select as _select
from core.storage import session_scope as _scope
from core.storage.models import Task as _Task
with _scope() as _s:
model = _s.execute(
_select(_Task.model_profile).where(_Task.task_id == UUID(session_id))
).scalar_one_or_none() or None
if not model:
model = cfg["default_model"]
caps = ModelCapabilities.load(model, ROOT / cfg["models_dir"])
llm = LLM(caps)
workspace_dir = resolve_workspace(workspace, cfg)
# 新建时校验 name + 解析 working_dir(留空 fallback 用 name);resume 跳过
task_name_safe = ""
wd_name_for_resolve: Optional[str] = None
if not resume:
if not name:
raise InvalidTaskName("new task 必须指定 name(任务显示名)")
task_name_safe = validate_task_name(name)
wd_raw = (working_dir or "").strip()
wd_name = wd_raw if wd_raw else task_name_safe
wd_name_for_resolve = validate_task_name(wd_name)
task_id, working_dir_path = resolve_task_id(
workspace_dir, session_id, resume, uid, wd_name_for_resolve
)
sid = str(task_id)
# §7.4 no-subtask:新建 task 时校验 working_dir 不与同 user 已有 task 形成前缀嵌套
# (resume 跳过 —— 该 task 已落库,改名走 Folder API 的 cascade)
if not resume:
check_no_subtask(str(working_dir_path), user_id=uid)
# working_dir 立刻建出 —— DB 是 source of truth,FS 目录视为可重生的视图。
# resume 时也兜底 mkdir(用户可能经 /v1/files/delete 删过空目录),
# 同 working_dir 多 task 共享,exist_ok=True 不冲突。
working_dir_path.mkdir(parents=True, exist_ok=True)
tool_base = Path(tool_base) if tool_base else Path.cwd()
is_docker = os.getenv("ZCBOT_SANDBOX_BACKEND", "host").lower() == "docker"
skills = build_skill_registry(cfg, workspace_dir, uid, docker=is_docker)
# 媒体配置提前 load 一次:既决定 system prompt 要不要追加媒体段(media_enabled),
# 也复用给下方 seedream/seedance 注册(避免重复读 doubao.yaml)。无 ARK_API_KEY → None。
ark_cfg = ArkConfig.load()
now_iso = datetime.now().isoformat(timespec="seconds")
# meta["working_dir"] 是 db 形态(相对 ROOT 或绝对);Session.append → ensure_local_task_row
# 把它直接落 PG tasks.working_dir,所以这里就转好。文件系统操作仍用 working_dir_path(absolute)。
wd_db = to_db_path(working_dir_path)
# task_state 先就位:resume 从 DB 拿真 name,new 直接用 task_name_safe。
# system_prompt 拼接需要 task.name 注入(「宪法」文件命名约定),所以拼 prompt
# 必须在 task_state 之后。
if resume:
task_state = TaskState.load(task_id)
if task_state is None:
# tasks 行不存在 —— 理论上 resolve_task_id 已经定位到 DB 行了,走到这里
# 说明被并发删了,兜底构造空 state(不主动 save,等下条 append / 命令)
task_state = TaskState(
task_id=sid, user_id=uid, name="", working_dir=wd_db,
skill=skill, description=description, status="active",
model=caps.model_id, model_profile=model,
)
else:
# 懒创建:TaskState 仅内存。tasks 行在首条 user 消息 append 时由
# ensure_local_task_row 占位 INSERT(name 已就位);首次 sync_task_tokens
# 或 /done /desc 走 upsert 覆盖完整字段。
task_state = TaskState(
task_id=sid, user_id=uid, name=task_name_safe, working_dir=wd_db,
skill=skill, description=description, status="active",
model=caps.model_id, model_profile=model,
reasoning_effort=caps.default_reasoning_effort or "",
)
system_prompt = _build_system_prompt(
cfg, skills, workspace_dir, tool_base, working_dir_path, uid,
task_id, task_state.name, task_state.skill,
media_enabled=ark_cfg is not None,
)
meta = {
"id": sid,
"created_at": now_iso,
"cwd": str(tool_base),
"name": task_state.name, # resume / new 都拿到真 name(空字符串只在并发删兜底分支)
"working_dir": wd_db,
"model": caps.model_id,
"model_profile": model,
"skill": skill,
"description": description,
"reasoning_effort": caps.default_reasoning_effort or "",
}
if resume:
session = Session.load(task_id, system_prompt=system_prompt, meta=meta)
else:
session = Session(task_id=task_id, system_prompt=system_prompt, meta=meta)
# user_root 传给 tool 让 fs 输出渲染成相对路径(不泄漏 user_id / 部署根,
# 同时让 web SPA artifact chip 抽取稳定锚定 <wd>/ 前缀)
ur_path = user_root(workspace_dir, uid)
tools = {}
tp = TaskProgressTool(base_dir=tool_base, user_root=ur_path)
tools[tp.name] = tp
au = AskUserTool(base_dir=tool_base, user_root=ur_path)
tools[au.name] = au
for cls in (ReadTool, WriteTool, EditTool, GlobTool, GrepTool, ShellTool):
t = cls(base_dir=tool_base, user_root=ur_path)
tools[t.name] = t
# web_fetch 无需 API key,始终可用
wf = WebFetchTool(base_dir=tool_base, user_root=ur_path)
tools[wf.name] = wf
# Secret-bearing domain tools stay host-side. Never expose DOCUMENT_SEARCH_API_KEY
# / MP_API_KEY to run_python or the sandbox; only register typed tools when the
# corresponding host env exists.
if os.getenv("DOCUMENT_SEARCH_API_KEY", "").strip():
for t in (
DocumentListKbTool(base_dir=tool_base, user_root=ur_path),
DocumentSearchTool(base_dir=tool_base, user_root=ur_path),
DocumentDownloadTool(
working_dir=working_dir_path,
base_dir=tool_base,
user_root=ur_path,
),
):
tools[t.name] = t
if os.getenv("MP_API_KEY", "").strip():
for t in (
MaterialsProjectSearchSummaryTool(base_dir=tool_base, user_root=ur_path),
MaterialsProjectGetStructureTool(
working_dir=working_dir_path,
base_dir=tool_base,
user_root=ur_path,
),
MaterialsProjectGetEntriesTool(
working_dir=working_dir_path,
base_dir=tool_base,
user_root=ur_path,
),
):
tools[t.name] = t
if skills.skills:
# LoadSkillTool 返回头里的 dir 由 registry 按 skill.source 给容器内路径
# (内置 → /sandbox/skills,用户 → /workspace/.skills);host backend → host 绝对路径。
ls = LoadSkillTool(registry=skills, base_dir=tool_base, user_root=ur_path)
tools[ls.name] = ls
# 用户 skill 创作工具:恒挂(每个用户都能造自己的 skill)。host-side 直接写
# user_root/.skills —— 不走沙箱 fs(其 base_dir 锚 cwd / 容器 wd,够不到 .skills)。
user_skills_dir = ur_path / ".skills"
for t in (
SaveSkillTool(user_skills_dir, skills, base_dir=tool_base, user_root=ur_path),
ForkSkillTool(user_skills_dir, skills, base_dir=tool_base, user_root=ur_path),
):
tools[t.name] = t
# 定时任务管理(DESIGN §8.5):增删查三件套。**定时 run 内不挂**(防任务造任务,
# 自我繁殖);仅交互对话里能建/管 job。user_id 由 ctor 注入,不信模型传的 id。
if not scheduled_run:
for t in (
ScheduleCreateTool(uid, base_dir=tool_base, user_root=ur_path),
ScheduleListTool(uid, base_dir=tool_base, user_root=ur_path),
ScheduleUpdateTool(uid, base_dir=tool_base, user_root=ur_path),
ScheduleCancelTool(uid, base_dir=tool_base, user_root=ur_path),
):
tools[t.name] = t
# 发邮件(§8.5 投递):仅当 SMTP_* env 齐了才挂(沿用"有 key 才注册",没配的
# 部署里 agent 看不到一个永远报错的工具)。定时与交互 run 都可用。
# base_dir 用 working_dir_path(该 task 的**宿主**工作目录绝对路径),不是 tool_base(cwd)。
# send_email 在宿主进程读附件文件,docker 下 agent 给的相对路径相对容器 workdir=task_dir,
# 翻回宿主即 working_dir_path;tool 内 _resolve_user_file 再处理 /workspace 容器绝对路径。
if smtp_configured():
se = SendEmailTool(base_dir=working_dir_path, user_root=ur_path)
tools[se.name] = se
# 微信主动推送(§8.7 渠道抽象):仅当微信渠道开关在才挂(沿用"有开关才注册")。
# 交互与定时 run 都可用(定时简报可主动推回用户微信,24h 窗口内)。user_id ctor 注入。
# base_dir 同 send_email:用 working_dir_path(宿主 task 目录),wechat_push 在宿主进程
# 读待发文件,需把 agent 给的相对/容器路径翻回宿主(详 _resolve_user_file)。
if wechat_push_available():
wp = WechatPushTool(uid, base_dir=working_dir_path, user_root=ur_path, task_id=task_id)
tools[wp.name] = wp
if caps.enable_run_python:
rp = RunPythonTool(base_dir=tool_base, user_root=ur_path)
tools[rp.name] = rp
# 每账号每日配额(yaml `quotas` 段,跨 task 跨 variant 全口径合计;
# 0 / 缺失 = 不限)。tool 起手 check_daily_quota,超额返 [Error] 不调远端。
quotas = cfg.get("quotas") or {}
images_per_day = int(quotas.get("images_per_day", 0))
videos_per_day = int(quotas.get("videos_per_day", 0))
# 媒体生成 tool(豆包 seedream / 后续 seedance):仅当 ARK_API_KEY 设了才挂 ——
# 没 key 的用户无感知,不至于看到 schema 里突然多个永远报错的工具。
# image_variant 由 caller 传(web 入口随消息 POST 带);空 → 取 yaml 第一个 variant
# (fallback,沿用原行为)。本次 run 装的 SeedreamTool 锁定该 variant,本 run 内的
# 多次 tool call 全用同一个;下一条消息可以重选。
# ark_cfg 已在函数上半部 load 过(复用,顺带决定 system prompt 的 media 段)。
if ark_cfg is not None:
image_cfg = (ark_cfg.raw.get("image") or {})
chosen_key, chosen_cfg = "", None
if image_variant:
v = image_cfg.get(image_variant)
if isinstance(v, dict):
chosen_key, chosen_cfg = image_variant, v
# 不认的 variant 静默退到 fallback —— web 入口已校验过;留兜底防 yaml 改动
if chosen_cfg is None:
for variant_key, variant_cfg in image_cfg.items():
if isinstance(variant_cfg, dict):
chosen_key, chosen_cfg = variant_key, variant_cfg
break
if chosen_cfg is not None:
seedream_tool = SeedreamTool(
ark_cfg=ark_cfg,
image_variant_cfg=chosen_cfg,
variant_key=chosen_key,
working_dir=working_dir_path,
task_id=task_id,
user_id=uid,
base_dir=tool_base,
user_root=ur_path,
daily_limit=images_per_day,
)
tools[seedream_tool.name] = seedream_tool
# 视频 variant 选择(同上 image_variant 范式):video_variant 由 caller 传,
# 空 → 取 yaml 第一个 video variant。本 run 的 SeedanceTool 锁定该 variant。
# cancel_check 是 web 入口构造的 `lambda: broker.is_cancelled(task_id)` —— 轮询
# 期间(典型 30-90s)拿来响应用户停止按钮;远端 cgt 任务无 cancel API,best-effort 不动远端
video_cfg = (ark_cfg.raw.get("video") or {})
v_chosen_key, v_chosen_cfg = "", None
if video_variant:
v = video_cfg.get(video_variant)
if isinstance(v, dict):
v_chosen_key, v_chosen_cfg = video_variant, v
if v_chosen_cfg is None:
for variant_key, variant_cfg in video_cfg.items():
if isinstance(variant_cfg, dict):
v_chosen_key, v_chosen_cfg = variant_key, variant_cfg
break
if v_chosen_cfg is not None:
seedance_tool = SeedanceTool(
ark_cfg=ark_cfg,
video_variant_cfg=v_chosen_cfg,
variant_key=v_chosen_key,
working_dir=working_dir_path,
task_id=task_id,
user_id=uid,
base_dir=tool_base,
user_root=ur_path,
cancel_check=cancel_check,
daily_limit=videos_per_day,
)
tools[seedance_tool.name] = seedance_tool
# 图像理解 tool(look_at_image / 豆包 Seed 2.0 Lite vision):仅当 yaml 有 vision 段才挂。
# 无 variant 选择维度(读图不分档,固定第一个 variant),与 image/video 的"用户可切档"不同。
vision_cfg = (ark_cfg.raw.get("vision") or {})
vis_key, vis_variant = "", None
for variant_key, variant_cfg in vision_cfg.items():
if isinstance(variant_cfg, dict):
vis_key, vis_variant = variant_key, variant_cfg
break
if vis_variant is not None:
look_tool = LookAtImageTool(
ark_cfg=ark_cfg,
vision_variant_cfg=vis_variant,
variant_key=vis_key,
working_dir=working_dir_path,
task_id=task_id,
user_id=uid,
base_dir=tool_base,
user_root=ur_path,
)
tools[look_tool.name] = look_tool
# 博查联网搜索:仅当 BOCHA_API_KEY 设了才挂
bocha_cfg = BochaConfig.load()
if bocha_cfg is not None:
ws = WebSearchTool(cfg=bocha_cfg)
tools[ws.name] = ws
sink = ConsoleEventSink(console) if console else None
# §7.5 #5/#6 Executor 抽象:env `ZCBOT_SANDBOX_BACKEND=host|docker` 切 backend。
# host(默)= 全 in-process,本地 dogfood / Windows 走这条;docker = shell/run_python
# dispatch 到 per-user 容器(其他工具仍 host)。docker 路径要求 lifespan 已 `init_pool`。
host_executor = HostExecutor(tools)
executor = _resolve_executor(host_executor, uid, ur_path, working_dir_path)
agent = AgentLoop(
llm, executor, session, caps,
user_id=uid, working_dir=working_dir_path, sink=sink,
)
if cancel_check is not None:
agent.cancel_check = cancel_check
return agent, session, sid, task_state, working_dir_path
def sync_task_tokens(task_state: TaskState) -> None:
"""每轮 agent.run 后调,把累计 tokens UPDATE 到 PG tasks 表。
从 `messages.tokens_in/out` SUM 现算 —— `record_chat_usage` 写每条 assistant
message 时已落库,这里聚合写入 tasks 概览列。query 走 (task_id) 索引,行数
顶天几百,亚毫秒级,在刚跑完几秒 LLM 后的 round-trip 噪声里。
"""
from uuid import UUID
from sqlalchemy import func, select
from core.storage import update_task
from core.storage.engine import session_scope
from core.storage.models import Message
tid = UUID(task_state.task_id)
with session_scope() as s:
row = s.execute(
select(
func.coalesce(func.sum(Message.tokens_in), 0),
func.coalesce(func.sum(Message.tokens_out), 0),
).where(Message.task_id == tid)
).one()
tp, tc = int(row[0]), int(row[1])
task_state.tokens_prompt = tp
task_state.tokens_completion = tc
update_task(tid, tokens_prompt=tp, tokens_completion=tc)