From e66fdd0ffc82c063d7aa255660cade11d11c818f Mon Sep 17 00:00:00 2001 From: caoqianming Date: Fri, 26 Jun 2026 10:51:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=AF=B9=E8=AF=9D=E5=BD=92=E5=B1=9E=20+=20push=20=E7=BB=9F?= =?UTF-8?q?=E4=B8=80=E8=AE=B0=E5=BD=95=E5=88=B0=E6=B8=A0=E9=81=93=E5=AF=B9?= =?UTF-8?q?=E8=AF=9D(bump=200.28.0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题1:定时任务产生的 task(isolated 每次新建)混进普通对话列表。 - tasks 加 scheduled_job_id(nullable FK→scheduled_jobs,migration 0017 + backfill persistent/isolated);列表 WHERE scheduled_job_id IS NULL 排除(+working_dir LIKE 兜底) - ensure_local_task_row 加参数,_execute_scheduled_job 建任务时填 - mode 语义澄清:只管对话是否延续,文件夹两种模式都按 job 复用 问题2:任何 push(定时 deliver_notify / agent wechat_push 工具)推到微信渠道, web 端渠道对话看不到、没法基于推送追问。 - 记录下沉到 send_to_user(两调用方统一入口):投递成功后对每个成功渠道 ensure_channel_chat_task(不存在自动建,与入站对话共用)+ 写 assistant 消息 (摘要+文件下载链接+../rel read 路径) - Unified 进 agent 上下文(基于推送追问);source_task_id 去重(chat task 内调 wechat_push 时不重复插摘要);不塞正文,agent 按需 read 产物文件 - _run_channel_conversation 复用 ensure_channel_chat_task,消除建 task 重复逻辑 messages.kind 列(migration 0018):push 记录标 kind="push"(独立列不进 payload), extract_last_assistant_text 加 WHERE kind IS NULL 跳过,避免 wecom 入站取回复 误取 push 摘要当回复。 Co-Authored-By: Claude Fable 5 --- DESIGN.md | 5 + PROGRESS.md | 8 +- core/__init__.py | 2 +- core/agent_builder.py | 2 +- core/storage/models.py | 12 ++ core/storage/utils.py | 30 ++++- core/wechat/inbound.py | 2 +- core/wechat/service.py | 122 +++++++++++++++++- ...0260626_1000_0017_task_scheduled_job_id.py | 58 +++++++++ .../20260626_1100_0018_message_kind.py | 29 +++++ tests/test_secret_host_tools.py | 2 +- tools/wechat_bot.py | 7 +- web/app.py | 45 ++----- 13 files changed, 277 insertions(+), 47 deletions(-) create mode 100644 db/migrations/versions/20260626_1000_0017_task_scheduled_job_id.py create mode 100644 db/migrations/versions/20260626_1100_0018_message_kind.py diff --git a/DESIGN.md b/DESIGN.md index a81e4f6..8345a06 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -616,6 +616,10 @@ create index on usage_events (model_profile, created_at); **数据模型(新表 `scheduled_jobs`,独立加表不碰现有 schema → 公测兼容)**: `id, user_id, name, prompt, cron, tz(默 Asia/Shanghai), mode(isolated|persistent), bound_task_id(可空), notify(JSONB 可空), enabled, timeout_seconds, next_run_at, last_run_at, last_status, last_error, last_task_id, consecutive_failures, expires_at(可空), created_at, deleted_at`。Alembic 加表 migration;`usage_events` 复用现成记账(可加 `kind="scheduled"` 自由文本区分,无需 migration)。 +**mode 语义(澄清)**:mode 只决定"对话是否延续"——isolated 每次新建 task(隔离对话历史、省 token),persistent 复用 `bound_task_id` 常驻 task(跨天连续性)。**文件夹两种模式都按 job 复用**(`scheduled-`,产物累积 + notify 取最新产物依赖它),不是 mode 的区分维度。 + +**定时执行 task 的归属与可见性(0017)**:定时任务产生的 task 在 `tasks` 上标 `scheduled_job_id`(nullable FK → `scheduled_jobs.job_id`)。普通对话列表 `WHERE scheduled_job_id IS NULL` 排除(不混进"用户项目"列表);crons 页可按 job 反查执行历史。push 投递记录见 §8.7。 + **守护循环(仿 §8.4 `_disk_scanner`,plain-asyncio)**:lifespan 起一个后台 task,每 ~10s(`ZCBOT_SCHEDULER_TICK_SECONDS`,只决定最坏延迟≤1tick、不决定会否漏 —— claim 取 `next_run<=now` 的全部)扫 `enabled AND next_run_at<=now()`;命中即 `asyncio.create_task(asyncio.to_thread(_run_agent_bg, ...))` 复用现成路径,登记到 `app.state.inflight`(随关停 drain 一起收尾)。与**单活 run 锁**(§7.x `run_status` + `SELECT FOR UPDATE`)交互:isolated 每次新 task 天然无冲突;persistent 若绑定 task 正忙 → 跳过本次 + 记 warn,下一个点再来(不排队堆积)。run 完回写 `last_*` + croniter 算 `next_run_at`。 **croniter 选型**:存标准 5 段 cron 串 + 时区,`croniter` 算 `next_run_at`。理由:正确处理 dom/dow 同列的 vixie OR 语义和时区折算(手搓极易踩坑,四源都点名这个坑);纯 Python 小依赖。劣选:只支持"每天/每周 HH:MM"自己用 datetime 算 —— 零依赖但遇复杂周期要返工。 @@ -679,6 +683,7 @@ create index on usage_events (model_profile, created_at); **渠道抽象(两渠道共用,加渠道不改 scheduler / 工具主体)**: - **绑定**:per-user 记"绑了哪些渠道 + 各自凭据/标识"(ClawBot:`bot_token`+`latest_context_token`;企业微信:`wecom_userid`,应用凭据走全局 env)。 - **统一发送**:`send_to_user(user_id, text, file?)` → 解析该用户已绑渠道 → 各渠道实现各自发;`scheduler.deliver_notify`、`WechatPushTool` 都调这层,不感知具体渠道。 +- **推送即对话记录(Unified)**:`send_to_user` 投递成功后,对每个成功渠道把推送(摘要 + 文件下载链接 + agent `read` 路径 `../`)作为一条 assistant 消息写进该渠道 chat task(`ensure_channel_chat_task` 不存在自动建,与入站对话共用)。web 端渠道对话卡片可见 + agent 可基于推送追问(`read` 产物文件)。进 agent 上下文(推送是 bot 发给用户的话,记得自己发过 = 连贯,非污染);`source_task_id` 去重——调用方即目标 chat task 自己(如用户在微信里让 agent 推)时 tool 记录已在,跳过。不塞正文(避免上下文膨胀)。push 记录在 `messages.kind` 标 "push"(独立列,不进 payload),`extract_last_assistant_text`(wecom 入站取回复)加 `WHERE kind IS NULL` 跳过,避免误取 push 摘要当回复。 - **推送择优**:简报这类"必达" → 优先企业微信(无条件);ClawBot 作个人微信触达 + 聊天;两者都绑可多投或按用户偏好。 **第一期两处已定决策(评审通过)**: diff --git a/PROGRESS.md b/PROGRESS.md index 18bc352..4b99d05 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`。 -最后更新:2026-06-25(企业微信入站对话支持图片/文件附件:media/get 下载 → 复用渠道无关核心落盘 + bump 0.27.2) +最后更新:2026-06-26(定时任务对话归属 + push 统一记录到渠道对话 + bump 0.28.0) --- @@ -21,6 +21,12 @@ ## 已完成关键能力 +### 2026-06-26 / 定时任务对话归属 + push 统一记录到渠道对话(bump 0.28.0) + +- 问题1:定时任务产生的 task(isolated 每次新建)混进普通对话列表。解:`tasks` 加 `scheduled_job_id`(nullable FK→scheduled_jobs,0017 migration + backfill persistent/isolated);列表 `WHERE scheduled_job_id IS NULL`(+ `working_dir LIKE '%/scheduled-%'` 兜底漏网孤行);`ensure_local_task_row` 加参数,`_execute_scheduled_job` 建任务时填。mode 语义澄清:只管对话是否延续,文件夹两种模式都按 job 复用。 +- 问题2:任何 push(定时 `deliver_notify` / agent `wechat_push` 工具)推到微信渠道,web 端渠道对话看不到、没法基于推送追问。解:**记录下沉到 `send_to_user`**(两调用方统一入口)——投递成功后对每个成功渠道 `ensure_channel_chat_task`(不存在自动建,与入站对话共用)+ 写一条 assistant 消息(摘要 + 文件下载链接 + `../rel` read 路径),Unified 进 agent 上下文;`source_task_id` 去重(chat task 内调 wechat_push 时不重复插摘要)。不塞正文(避免膨胀),agent 按需 `read` 产物文件(fs `_resolve` 无越界拦,`../rel` 相对 cwd 上一级;mount=user_root docker 也可读)。前端零改动(markdown 链接 + 文本 read 路径)。push 记录标 `messages.kind="push"`(0018,独立列不进 payload),`extract_last_assistant_text` 加 `WHERE kind IS NULL` 跳过,避免 wecom 入站取回复误取 push 摘要当回复。 +- 文件:`core/storage/models.py`(Task.`scheduled_job_id`+Message.`kind`)、`db/migrations/versions/20260626_1000_0017_*.py`+`20260626_1100_0018_*.py`、`core/storage/utils.py`(`ensure_local_task_row`+`append_channel_message`)、`core/wechat/service.py`(`send_to_user` 记录+`ensure_channel_chat_task`)、`core/wechat/inbound.py`(`extract_last_assistant_text` 过滤 kind)、`tools/wechat_bot.py`、`core/agent_builder.py`、`web/app.py`(`_run_channel_conversation` 复用)、`DESIGN.md`(§8.5/§8.7)。 + ### 2026-06-25 / 渠道卡片改并排(bump 0.27.4) - 接 0.27.3:两张渠道卡片从竖排改并排(`#channel-cards` flex row,各 `flex:1`),省左栏纵向空间;窄栏内图标左、名称 + 条数·时间堆两行(新增 `.cc-body` 列容器)。 diff --git a/core/__init__.py b/core/__init__.py index 8970a85..85810d0 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,3 +1,3 @@ # zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。 # 改版本只动这一行。 -__version__ = "0.27.4" +__version__ = "0.28.0" diff --git a/core/agent_builder.py b/core/agent_builder.py index 4f6af70..5146f34 100644 --- a/core/agent_builder.py +++ b/core/agent_builder.py @@ -577,7 +577,7 @@ def build_agent( # base_dir 同 send_email:用 working_dir_path(宿主 task 目录),wechat_push 在宿主进程 # 读待发文件,需把 agent 给的相对/容器路径翻回宿主(详 _resolve_user_file)。 if wechat_push_available(): - wp = WechatPushTool(uid, base_dir=working_dir_path, user_root=ur_path) + wp = WechatPushTool(uid, base_dir=working_dir_path, user_root=ur_path, task_id=task_id) tools[wp.name] = wp if caps.enable_run_python: diff --git a/core/storage/models.py b/core/storage/models.py index 8d5a33b..df13dd7 100644 --- a/core/storage/models.py +++ b/core/storage/models.py @@ -96,6 +96,14 @@ class Task(Base): deleted_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True ) + # 定时任务执行归属(0017):非 NULL = 该 task 是某 scheduled_job 的一次执行(isolated + # 每次新建 / persistent 首次新建都填)。普通对话列表据此排除,不混进"用户项目"列表; + # crons 页可按 job 反查执行历史。job 走软删不硬删 → ondelete SET NULL 安全。 + scheduled_job_id: Mapped[Optional[UUID]] = mapped_column( + PG_UUID(as_uuid=True), + ForeignKey("scheduled_jobs.job_id", ondelete="SET NULL"), + nullable=True, + ) class Message(Base): @@ -115,6 +123,10 @@ class Message(Base): # 0006:产生该 message 的模型(只在 assistant 行有值;user/tool/system 为 NULL)。 # 跟 usage_events.model_profile 写入一致,JOIN-free 时按 message 直查也能拿到。 model_profile: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + # 消息来源(0018):NULL=agent run 产生;"push"=push 记录(_record_push_to_chat 写)。 + # extract_last_assistant_text 据此跳过 push 记录,避免误取当入站回复。独立列不进 payload, + # 不影响 agent 上下文 / LLM API。 + kind: Mapped[Optional[str]] = mapped_column(Text, nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False ) diff --git a/core/storage/utils.py b/core/storage/utils.py index 217819b..5051559 100644 --- a/core/storage/utils.py +++ b/core/storage/utils.py @@ -6,9 +6,10 @@ from uuid import UUID from sqlalchemy import func, select, update from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.exc import IntegrityError from .engine import session_scope -from .models import Task +from .models import Message, Task class NoSubtaskError(ValueError): @@ -26,6 +27,7 @@ def ensure_local_task_row( model_profile: str = "", reasoning_effort: str = "", channel: str = "web", + scheduled_job_id: Optional[UUID] = None, ) -> None: """占位 INSERT(ON CONFLICT DO NOTHING)—— 不覆盖已有字段。 @@ -47,6 +49,7 @@ def ensure_local_task_row( model_profile=model_profile, reasoning_effort=reasoning_effort, channel=channel, + scheduled_job_id=scheduled_job_id, ) .on_conflict_do_nothing(index_elements=["task_id"]) ) @@ -54,6 +57,31 @@ def ensure_local_task_row( s.execute(stmt) +def append_channel_message( + task_id: UUID, content: str, *, role: str = "assistant", kind: Optional[str] = None +) -> None: + """往 task 追加一条非 agent-run 产生的消息(push 出站记录等)。原子算 idx + (SELECT max(idx)+1)+INSERT;撞 uq_messages_task_idx(与入站 agent run 并发 + append)→ 重试。payload 形态同 Session.append 的 {role, content};不设 + model_profile / tokens_*(非模型产出,usage 不计)。kind 写 messages.kind 列 + (独立列,不进 payload):"push" 标记 push 记录,extract_last_assistant_text 据此跳过。""" + payload = {"role": role, "content": content} + last_err: Optional[Exception] = None + for _ in range(3): + try: + with session_scope() as s: + max_idx = s.execute( + select(func.max(Message.idx)).where(Message.task_id == task_id) + ).scalar() + next_idx = (max_idx if max_idx is not None else -1) + 1 + s.add(Message(task_id=task_id, idx=next_idx, payload=payload, kind=kind)) + return + except IntegrityError as e: + last_err = e + continue + raise RuntimeError(f"append_channel_message: idx 冲突重试耗尽: {last_err}") + + def upsert_task( task_id: UUID, *, diff --git a/core/wechat/inbound.py b/core/wechat/inbound.py index 9578e1b..c7e0fe2 100644 --- a/core/wechat/inbound.py +++ b/core/wechat/inbound.py @@ -47,7 +47,7 @@ def extract_last_assistant_text(task_id: UUID, *, scan: int = 20) -> str: with session_scope() as s: rows = s.execute( select(Message.payload) - .where(Message.task_id == task_id) + .where(Message.task_id == task_id, Message.kind.is_(None)) .order_by(Message.idx.desc()) .limit(scan) ).all() diff --git a/core/wechat/service.py b/core/wechat/service.py index b1e883b..ef66cc1 100644 --- a/core/wechat/service.py +++ b/core/wechat/service.py @@ -304,17 +304,130 @@ def active_channels() -> list[str]: _DISPATCH = {_CLAWBOT: push_clawbot, _WECOM: push_wecom} +def ensure_channel_chat_task(uid: UUID, channel: str) -> Optional[UUID]: + """确保 uid 的 channel 常驻 chat task 存在(未软删),返回 task_id;不存在则新建并回填绑定。 + + channel ∈ {'wechat','wecom'}。wechat 无 binding → 返回 None(没法建/记)。 + 入站对话(`_run_channel_conversation`)与 push 记录(`send_to_user`)共用此入口, + 避免两条"解析/建 chat task"路径逻辑漂移。建 task 逻辑搬自原 _run_channel_conversation。 + """ + from uuid import uuid4 + + from core.agent_builder import ( # 延迟 import:service 被 tools.wechat_bot 引用, + load_config, resolve_workspace, working_dir_from_name, # agent_builder 又 import tools.wechat_bot + ) # → 顶层 import 循环;函数内 import 打破(同 scheduler.py:227 范式) + from core.capabilities import ModelCapabilities + from core.paths import ROOT, to_db_path + from core.storage.models import Task + from core.storage.utils import ensure_local_task_row + + if channel == "wecom": + existing_tid = get_wecom_chat_task(uid) + task_name, slug, desc = "企业微信对话", f"wecom-{str(uid)[:8]}", "(企业微信对话)" + set_task = set_wecom_chat_task + else: # wechat + snap = get_binding(uid) + if snap is None: + return None + existing_tid = snap.chat_task_id + task_name, slug, desc = "微信对话", f"wechat-{str(uid)[:8]}", "(微信 ClawBot 对话)" + set_task = set_chat_task + + tid = existing_tid + need_create = tid is None + if not need_create: + with session_scope() as s: + exists = s.execute( + select(Task.task_id).where(Task.task_id == tid, Task.deleted_at.is_(None)) + ).first() + if exists is None: + need_create = True + if need_create: + cfg = load_config() + profile = cfg["default_model"] + caps = ModelCapabilities.load(profile, ROOT / cfg["models_dir"]) + ws = resolve_workspace(None, cfg) + tid = uuid4() + fs_dir = working_dir_from_name(ws, uid, slug) + fs_dir.mkdir(parents=True, exist_ok=True) + ensure_local_task_row( + task_id=tid, name=task_name, working_dir=to_db_path(fs_dir), + skill="", user_id=uid, model=caps.model_id, model_profile=profile, + description=desc, channel=channel, + ) + set_task(uid, tid) + return tid + + +def _file_rel_to_user_root(user_id: UUID, file_path: str) -> Optional[str]: + """宿主绝对路径 → user_root 相对 POSIX(如 scheduled-/x.md)。 + 文件不在 user_root 内(外部 --working-dir)→ None。""" + from pathlib import Path + + from core.agent_builder import load_config, resolve_workspace, user_root + try: + ws = resolve_workspace(None, load_config()) + root = user_root(ws, user_id) + return Path(file_path).resolve().relative_to(root.resolve()).as_posix() + except Exception: + return None + + +def _build_push_message(text: str, rel: Optional[str]) -> str: + """构造写进 chat task 的 assistant 消息:推送摘要 + 可点文件链接 + agent read 路径。""" + lines: list[str] = [] + if text and text.strip(): + lines.append(text.strip()) + if rel: + fname = rel.rsplit("/", 1)[-1] + lines.append(f"产物文件:[{fname}](/v1/files/download?path={rel})") + lines.append(f"(如需基于此文件提问,可读取 ../{rel})") + return "\n\n".join(lines) + + +def _record_push_to_chat( + report: DeliveryReport, user_id: UUID, text: str, + file_path: Optional[str], source_task_id: Optional[UUID], +) -> None: + """把投递成功的推送记为对应渠道 chat task 的 assistant 消息(web 端可见 + + agent 可基于追问)。Unified 模式:进 agent 上下文(推送是 bot 发给用户的话, + 记得自己发过什么 = 连贯,非污染)。记录失败不影响投递(吞掉打日志)。""" + if not report.delivered: + return + from core.storage.utils import append_channel_message + + rel = _file_rel_to_user_root(user_id, file_path) if file_path else None + for r in report.results: + if not r.ok: + continue + ch = "wechat" if r.channel == _CLAWBOT else r.channel # clawbot→wechat(建 task channel) + try: + tid = ensure_channel_chat_task(user_id, ch) + if tid is None: + continue + if source_task_id is not None and tid == source_task_id: + continue # 调用方即该 chat task 自己的 run,tool 记录已在,不重复插摘要 + append_channel_message(tid, _build_push_message(text, rel), kind="push") + except Exception as e: # noqa: BLE001 —— 记录失败不放大,投递已成功 + print(f"[push] record to {ch} chat task failed: {type(e).__name__}: {e}") + + def send_to_user( user_id: UUID, text: str = "", file_path: Optional[str] = None, channel: Optional[str] = None, + *, + source_task_id: Optional[UUID] = None, ) -> DeliveryReport: - """渠道抽象:按 `active_channels()` 列出的已开渠道投递。 + """渠道抽象:按 `active_channels()` 列出的已开渠道投递 + 把推送记进渠道 chat task。 - `channel=None`(默认):广播到所有已开渠道(定时任务/不点名推送沿用此口径)。 - `channel="wecom"|"clawbot"`:用户点名某个微信时只投这一条;若该渠道未开/无效, 返回单条 `no_binding` 结果(不静默回退到别的渠道,避免又推到没点名的渠道)。 + - 投递成功后,对每个成功渠道把推送(摘要 + 文件链接 + read 路径)作为 assistant + 消息写进该渠道 chat task(不存在自动建)。`source_task_id` = 调用方所在 task: + 若恰为目标 chat task 自己(如用户在微信里让 agent 推),tool 记录已在,跳过去重。 """ report = DeliveryReport() if channel is not None: @@ -322,7 +435,8 @@ def send_to_user( report.results.append(_DISPATCH[channel](user_id, text, file_path)) else: report.results.append(PushResult(False, channel=channel, reason="no_binding")) - return report - for ch in active_channels(): - report.results.append(_DISPATCH[ch](user_id, text, file_path)) + else: + for ch in active_channels(): + report.results.append(_DISPATCH[ch](user_id, text, file_path)) + _record_push_to_chat(report, user_id, text, file_path, source_task_id) return report diff --git a/db/migrations/versions/20260626_1000_0017_task_scheduled_job_id.py b/db/migrations/versions/20260626_1000_0017_task_scheduled_job_id.py new file mode 100644 index 0000000..8635c96 --- /dev/null +++ b/db/migrations/versions/20260626_1000_0017_task_scheduled_job_id.py @@ -0,0 +1,58 @@ +"""tasks.scheduled_job_id 列(定时任务执行归属,DESIGN §8.5). + +Revision ID: 0017 +Revises: 0016 +Create Date: 2026-06-26 + +给 tasks 加 scheduled_job_id(nullable FK → scheduled_jobs.job_id, ondelete SET NULL)。 +非 NULL = 该 task 是某定时任务的一次执行(isolated 每次新建 / persistent 首次新建都填), +普通对话列表据此排除,不混进"用户项目"列表;job 软删不硬删,SET NULL 安全。 + +backfill 存量定时执行 task: +- persistent:bound_task_id 直接指向其常驻 task → 精确回填。 +- isolated:working_dir 末段 'scheduled-' → 按 8 位前缀匹配 job_id。 + 匹配不上的孤行(job 已物理删等)留 NULL,由列表查询的 working_dir LIKE 兜底排除。 +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects.postgresql import UUID as PG_UUID + + +revision: str = "0017" +down_revision: Union[str, None] = "0016" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "tasks", + sa.Column("scheduled_job_id", PG_UUID(as_uuid=True), nullable=True), + ) + op.create_foreign_key( + "fk_tasks_scheduled_job_id", + "tasks", "scheduled_jobs", + ["scheduled_job_id"], ["job_id"], + ondelete="SET NULL", + ) + # persistent:bound_task_id 精确指向其常驻 task + op.execute( + "UPDATE tasks SET scheduled_job_id = j.job_id " + "FROM scheduled_jobs j " + "WHERE j.bound_task_id = tasks.task_id" + ) + # isolated:working_dir 末段 scheduled-<8hex> 按 job_id 前 8 位匹配 + op.execute( + "UPDATE tasks t SET scheduled_job_id = j.job_id " + "FROM scheduled_jobs j " + "WHERE t.scheduled_job_id IS NULL " + "AND t.working_dir ~ 'scheduled-[0-9a-f]{8}' " + "AND left(j.job_id::text, 8) = substring(t.working_dir from 'scheduled-([0-9a-f]{8})')" + ) + + +def downgrade() -> None: + op.drop_constraint("fk_tasks_scheduled_job_id", "tasks", type_="foreignkey") + op.drop_column("tasks", "scheduled_job_id") diff --git a/db/migrations/versions/20260626_1100_0018_message_kind.py b/db/migrations/versions/20260626_1100_0018_message_kind.py new file mode 100644 index 0000000..e93b67c --- /dev/null +++ b/db/migrations/versions/20260626_1100_0018_message_kind.py @@ -0,0 +1,29 @@ +"""messages.kind 列(消息来源标记,避免 push 记录被 extract_last_assistant_text 误取). + +Revision ID: 0018 +Revises: 0017 +Create Date: 2026-06-26 + +给 messages 加 kind 列(nullable Text,默认 NULL)。NULL=agent run 产生的消息; +"push"=push 记录(_record_push_to_chat 写)。extract_last_assistant_text 加 +WHERE kind IS NULL 跳过 push 记录,避免 wecom 入站取回复时误取 push 摘要。 +独立列不进 payload,不影响 agent 上下文 / LLM API。纯加列,不动现有数据。 +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +revision: str = "0018" +down_revision: Union[str, None] = "0017" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("messages", sa.Column("kind", sa.Text(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("messages", "kind") diff --git a/tests/test_secret_host_tools.py b/tests/test_secret_host_tools.py index 0032dd4..c908a22 100644 --- a/tests/test_secret_host_tools.py +++ b/tests/test_secret_host_tools.py @@ -239,7 +239,7 @@ class TestHostFileToolPathResolution(unittest.TestCase): delivered = True results = [] - def fake_send(uid, text, fpath): + def fake_send(uid, text, fpath, **kwargs): captured["fpath"] = fpath return _Report() diff --git a/tools/wechat_bot.py b/tools/wechat_bot.py index 2de8a4f..c613814 100644 --- a/tools/wechat_bot.py +++ b/tools/wechat_bot.py @@ -63,9 +63,12 @@ class WechatPushTool(Tool): "required": ["text"], } - def __init__(self, user_id: UUID, base_dir=None, user_root=None) -> None: + def __init__(self, user_id: UUID, base_dir=None, user_root=None, *, task_id: Optional[UUID] = None) -> None: super().__init__(base_dir=base_dir, user_root=user_root) self.user_id = user_id + # 当前 agent run 所在 task:推送记录(send_to_user)据此去重——若调用方即目标 + # chat task 自己(如用户在微信里让 agent 推),tool 记录已在对话历史,不重复插摘要。 + self.task_id = task_id def execute( self, text: str = "", file: Optional[str] = None, channel: Optional[str] = None @@ -89,7 +92,7 @@ class WechatPushTool(Tool): return f"[Error] 文件不存在: {file}" fpath = str(p) - report = service.send_to_user(self.user_id, text, fpath, channel=channel) + report = service.send_to_user(self.user_id, text, fpath, channel=channel, source_task_id=self.task_id) if report.delivered: n = "(含 1 个文件)" if fpath else "" return f"[ok] 已推送到微信 {n}".strip() diff --git a/web/app.py b/web/app.py index 7e20b44..e13de60 100644 --- a/web/app.py +++ b/web/app.py @@ -440,45 +440,15 @@ async def _run_channel_conversation(app, uid, text, attachments, *, channel): wecom 绑定行取 chat_task_id)。attachments:已下载解密的入站附件(可空,wecom 暂只收文本)。 返回回复文本(供 ClawBot 回流 / wecom 主动推回)。 """ - from core.agent_builder import resolve_workspace, working_dir_from_name, load_config - from core.storage.utils import ensure_local_task_row from core.wechat import service as _wx from core.wechat.ilink import attachment_basename from core.wechat.inbound import extract_last_assistant_text - if channel == "wecom": - existing_tid = await asyncio.to_thread(_wx.get_wecom_chat_task, uid) - task_name, slug, desc = "企业微信对话", f"wecom-{str(uid)[:8]}", "(企业微信对话)" - set_task = _wx.set_wecom_chat_task - else: - snap = await asyncio.to_thread(_wx.get_binding, uid) - if snap is None: - return "" - existing_tid = snap.chat_task_id - task_name, slug, desc = "微信对话", f"wechat-{str(uid)[:8]}", "(微信 ClawBot 对话)" - set_task = _wx.set_chat_task - - profile, model_id = _resolve_model_profile("") - ws = resolve_workspace(None, load_config()) - tid = existing_tid - need_create = tid is None - if not need_create: - with session_scope() as s: - exists = s.execute( - select(Task.task_id).where(Task.task_id == tid, Task.deleted_at.is_(None)) - ).first() - if exists is None: - need_create = True - if need_create: - tid = uuid4() - fs_dir = working_dir_from_name(ws, uid, slug) - fs_dir.mkdir(parents=True, exist_ok=True) - ensure_local_task_row( - task_id=tid, name=task_name, working_dir=to_db_path(fs_dir), - skill="", user_id=uid, model=model_id, model_profile=profile, - description=desc, channel=channel, - ) - await asyncio.to_thread(set_task, uid, tid) + # 解析/建该渠道常驻 chat task(不存在自动建)—— 与 push 记录(send_to_user)共用 + # ensure_channel_chat_task,避免两条建 task 路径漂移。wechat 无 binding → 返回 None。 + tid = await asyncio.to_thread(_wx.ensure_channel_chat_task, uid, channel) + if tid is None: + return "" # 落盘入站附件到 /inbound/,拼 [用户上传的...] 行进 text(复用 web 端粘贴图约定) if attachments: @@ -861,6 +831,7 @@ def create_app() -> FastAPI: skill=snap.get("skill") or "", user_id=uid, model=model_id, model_profile=profile, description="(定时任务自动创建)", + scheduled_job_id=job_id, ) if snap["mode"] == "persistent": with session_scope() as s: @@ -1685,6 +1656,10 @@ def create_app() -> FastAPI: Task.user_id == user_id, Task.deleted_at.is_(None), func.coalesce(Task.channel, "web").notin_(CHANNEL_MIRROR_KINDS), + # 定时任务执行 task(scheduled_job_id 归属)不进普通列表;兜底 working_dir + # LIKE 防 backfill 漏网的孤行(job 已物理删的 isolated task) + Task.scheduled_job_id.is_(None), + ~Task.working_dir.like("%/scheduled-%"), ] if status: conditions.append(Task.status == status)