feat: 定时任务对话归属 + push 统一记录到渠道对话(bump 0.28.0)

问题1:定时任务产生的 task(isolated 每次新建)混进普通对话列表。
- tasks 加 scheduled_job_id(nullable FK→scheduled_jobs,migration 0017 + backfill
  persistent/isolated);列表 WHERE scheduled_job_id IS NULL 排除(+working_dir LIKE 兜底)
- ensure_local_task_row 加参数,_execute_scheduled_job 建任务时填
- mode 语义澄清:只管对话是否延续,文件夹两种模式都按 job 复用

问题2:任何 push(定时 deliver_notify / agent wechat_push 工具)推到微信渠道,
web 端渠道对话看不到、没法基于推送追问。
- 记录下沉到 send_to_user(两调用方统一入口):投递成功后对每个成功渠道
  ensure_channel_chat_task(不存在自动建,与入站对话共用)+ 写 assistant 消息
  (摘要+文件下载链接+../rel read 路径)
- Unified 进 agent 上下文(基于推送追问);source_task_id 去重(chat task 内调
  wechat_push 时不重复插摘要);不塞正文,agent 按需 read 产物文件
- _run_channel_conversation 复用 ensure_channel_chat_task,消除建 task 重复逻辑

messages.kind 列(migration 0018):push 记录标 kind="push"(独立列不进 payload),
extract_last_assistant_text 加 WHERE kind IS NULL 跳过,避免 wecom 入站取回复
误取 push 摘要当回复。

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-06-26 10:51:06 +08:00
parent 133e350428
commit e66fdd0ffc
13 changed files with 277 additions and 47 deletions

View File

@ -616,6 +616,10 @@ create index on usage_events (model_profile, created_at);
**数据模型(新表 `scheduled_jobs`,独立加表不碰现有 schema → 公测兼容)**: **数据模型(新表 `scheduled_jobs`,独立加表不碰现有 schema → 公测兼容)**:
`id, user_id, name, prompt, cron, tz(默 Asia/Shanghai), mode(isolated|persistent), bound_task_id(可空), notify(JSONB 可空), enabled, timeout_seconds, next_run_at, last_run_at, last_status, last_error, last_task_id, consecutive_failures, expires_at(可空), created_at, deleted_at`。Alembic 加表 migration;`usage_events` 复用现成记账(可加 `kind="scheduled"` 自由文本区分,无需 migration)。 `id, user_id, name, prompt, cron, tz(默 Asia/Shanghai), mode(isolated|persistent), bound_task_id(可空), notify(JSONB 可空), enabled, timeout_seconds, next_run_at, last_run_at, last_status, last_error, last_task_id, consecutive_failures, expires_at(可空), created_at, deleted_at`。Alembic 加表 migration;`usage_events` 复用现成记账(可加 `kind="scheduled"` 自由文本区分,无需 migration)。
**mode 语义(澄清)**:mode 只决定"对话是否延续"——isolated 每次新建 task(隔离对话历史、省 token),persistent 复用 `bound_task_id` 常驻 task(跨天连续性)。**文件夹两种模式都按 job 复用**(`scheduled-<jobid>`,产物累积 + notify 取最新产物依赖它),不是 mode 的区分维度。
**定时执行 task 的归属与可见性(0017)**:定时任务产生的 task 在 `tasks` 上标 `scheduled_job_id`(nullable FK → `scheduled_jobs.job_id`)。普通对话列表 `WHERE scheduled_job_id IS NULL` 排除(不混进"用户项目"列表);crons 页可按 job 反查执行历史。push 投递记录见 §8.7。
**守护循环(仿 §8.4 `_disk_scanner`,plain-asyncio)**:lifespan 起一个后台 task,每 ~10s(`ZCBOT_SCHEDULER_TICK_SECONDS`,只决定最坏延迟≤1tick、不决定会否漏 —— claim 取 `next_run<=now` 的全部)扫 `enabled AND next_run_at<=now()`;命中即 `asyncio.create_task(asyncio.to_thread(_run_agent_bg, ...))` 复用现成路径,登记到 `app.state.inflight`(随关停 drain 一起收尾)。与**单活 run 锁**(§7.x `run_status` + `SELECT FOR UPDATE`)交互:isolated 每次新 task 天然无冲突;persistent 若绑定 task 正忙 → 跳过本次 + 记 warn,下一个点再来(不排队堆积)。run 完回写 `last_*` + croniter 算 `next_run_at` **守护循环(仿 §8.4 `_disk_scanner`,plain-asyncio)**:lifespan 起一个后台 task,每 ~10s(`ZCBOT_SCHEDULER_TICK_SECONDS`,只决定最坏延迟≤1tick、不决定会否漏 —— claim 取 `next_run<=now` 的全部)扫 `enabled AND next_run_at<=now()`;命中即 `asyncio.create_task(asyncio.to_thread(_run_agent_bg, ...))` 复用现成路径,登记到 `app.state.inflight`(随关停 drain 一起收尾)。与**单活 run 锁**(§7.x `run_status` + `SELECT FOR UPDATE`)交互:isolated 每次新 task 天然无冲突;persistent 若绑定 task 正忙 → 跳过本次 + 记 warn,下一个点再来(不排队堆积)。run 完回写 `last_*` + croniter 算 `next_run_at`
**croniter 选型**:存标准 5 段 cron 串 + 时区,`croniter` 算 `next_run_at`。理由:正确处理 dom/dow 同列的 vixie OR 语义和时区折算(手搓极易踩坑,四源都点名这个坑);纯 Python 小依赖。劣选:只支持"每天/每周 HH:MM"自己用 datetime 算 —— 零依赖但遇复杂周期要返工。 **croniter 选型**:存标准 5 段 cron 串 + 时区,`croniter` 算 `next_run_at`。理由:正确处理 dom/dow 同列的 vixie OR 语义和时区折算(手搓极易踩坑,四源都点名这个坑);纯 Python 小依赖。劣选:只支持"每天/每周 HH:MM"自己用 datetime 算 —— 零依赖但遇复杂周期要返工。
@ -679,6 +683,7 @@ create index on usage_events (model_profile, created_at);
**渠道抽象(两渠道共用,加渠道不改 scheduler / 工具主体)**: **渠道抽象(两渠道共用,加渠道不改 scheduler / 工具主体)**:
- **绑定**:per-user 记"绑了哪些渠道 + 各自凭据/标识"(ClawBot:`bot_token`+`latest_context_token`;企业微信:`wecom_userid`,应用凭据走全局 env)。 - **绑定**:per-user 记"绑了哪些渠道 + 各自凭据/标识"(ClawBot:`bot_token`+`latest_context_token`;企业微信:`wecom_userid`,应用凭据走全局 env)。
- **统一发送**:`send_to_user(user_id, text, file?)` → 解析该用户已绑渠道 → 各渠道实现各自发;`scheduler.deliver_notify`、`WechatPushTool` 都调这层,不感知具体渠道。 - **统一发送**:`send_to_user(user_id, text, file?)` → 解析该用户已绑渠道 → 各渠道实现各自发;`scheduler.deliver_notify`、`WechatPushTool` 都调这层,不感知具体渠道。
- **推送即对话记录(Unified)**:`send_to_user` 投递成功后,对每个成功渠道把推送(摘要 + 文件下载链接 + agent `read` 路径 `../<rel>`)作为一条 assistant 消息写进该渠道 chat task(`ensure_channel_chat_task` 不存在自动建,与入站对话共用)。web 端渠道对话卡片可见 + agent 可基于推送追问(`read` 产物文件)。进 agent 上下文(推送是 bot 发给用户的话,记得自己发过 = 连贯,非污染);`source_task_id` 去重——调用方即目标 chat task 自己(如用户在微信里让 agent 推)时 tool 记录已在,跳过。不塞正文(避免上下文膨胀)。push 记录在 `messages.kind` 标 "push"(独立列,不进 payload),`extract_last_assistant_text`(wecom 入站取回复)加 `WHERE kind IS NULL` 跳过,避免误取 push 摘要当回复。
- **推送择优**:简报这类"必达" → 优先企业微信(无条件);ClawBot 作个人微信触达 + 聊天;两者都绑可多投或按用户偏好。 - **推送择优**:简报这类"必达" → 优先企业微信(无条件);ClawBot 作个人微信触达 + 聊天;两者都绑可多投或按用户偏好。
**第一期两处已定决策(评审通过)**: **第一期两处已定决策(评审通过)**:

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9` > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`
最后更新:2026-06-25(企业微信入站对话支持图片/文件附件:media/get 下载 → 复用渠道无关核心落盘 + bump 0.27.2) 最后更新:2026-06-26(定时任务对话归属 + push 统一记录到渠道对话 + bump 0.28.0)
--- ---
@ -21,6 +21,12 @@
## 已完成关键能力 ## 已完成关键能力
### 2026-06-26 / 定时任务对话归属 + push 统一记录到渠道对话(bump 0.28.0)
- 问题1:定时任务产生的 task(isolated 每次新建)混进普通对话列表。解:`tasks` 加 `scheduled_job_id`(nullable FK→scheduled_jobs,0017 migration + backfill persistent/isolated);列表 `WHERE scheduled_job_id IS NULL`(+ `working_dir LIKE '%/scheduled-%'` 兜底漏网孤行);`ensure_local_task_row` 加参数,`_execute_scheduled_job` 建任务时填。mode 语义澄清:只管对话是否延续,文件夹两种模式都按 job 复用。
- 问题2:任何 push(定时 `deliver_notify` / agent `wechat_push` 工具)推到微信渠道,web 端渠道对话看不到、没法基于推送追问。解:**记录下沉到 `send_to_user`**(两调用方统一入口)——投递成功后对每个成功渠道 `ensure_channel_chat_task`(不存在自动建,与入站对话共用)+ 写一条 assistant 消息(摘要 + 文件下载链接 + `../rel` read 路径),Unified 进 agent 上下文;`source_task_id` 去重(chat task 内调 wechat_push 时不重复插摘要)。不塞正文(避免膨胀),agent 按需 `read` 产物文件(fs `_resolve` 无越界拦,`../rel` 相对 cwd 上一级;mount=user_root docker 也可读)。前端零改动(markdown 链接 + 文本 read 路径)。push 记录标 `messages.kind="push"`(0018,独立列不进 payload),`extract_last_assistant_text` 加 `WHERE kind IS NULL` 跳过,避免 wecom 入站取回复误取 push 摘要当回复。
- 文件:`core/storage/models.py`(Task.`scheduled_job_id`+Message.`kind`)、`db/migrations/versions/20260626_1000_0017_*.py`+`20260626_1100_0018_*.py`、`core/storage/utils.py`(`ensure_local_task_row`+`append_channel_message`)、`core/wechat/service.py`(`send_to_user` 记录+`ensure_channel_chat_task`)、`core/wechat/inbound.py`(`extract_last_assistant_text` 过滤 kind)、`tools/wechat_bot.py`、`core/agent_builder.py`、`web/app.py`(`_run_channel_conversation` 复用)、`DESIGN.md`(§8.5/§8.7)。
### 2026-06-25 / 渠道卡片改并排(bump 0.27.4) ### 2026-06-25 / 渠道卡片改并排(bump 0.27.4)
- 接 0.27.3:两张渠道卡片从竖排改并排(`#channel-cards` flex row,各 `flex:1`),省左栏纵向空间;窄栏内图标左、名称 + 条数·时间堆两行(新增 `.cc-body` 列容器)。 - 接 0.27.3:两张渠道卡片从竖排改并排(`#channel-cards` flex row,各 `flex:1`),省左栏纵向空间;窄栏内图标左、名称 + 条数·时间堆两行(新增 `.cc-body` 列容器)。

View File

@ -1,3 +1,3 @@
# zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。 # zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。
# 改版本只动这一行。 # 改版本只动这一行。
__version__ = "0.27.4" __version__ = "0.28.0"

View File

@ -577,7 +577,7 @@ def build_agent(
# base_dir 同 send_email:用 working_dir_path(宿主 task 目录),wechat_push 在宿主进程 # base_dir 同 send_email:用 working_dir_path(宿主 task 目录),wechat_push 在宿主进程
# 读待发文件,需把 agent 给的相对/容器路径翻回宿主(详 _resolve_user_file)。 # 读待发文件,需把 agent 给的相对/容器路径翻回宿主(详 _resolve_user_file)。
if wechat_push_available(): if wechat_push_available():
wp = WechatPushTool(uid, base_dir=working_dir_path, user_root=ur_path) wp = WechatPushTool(uid, base_dir=working_dir_path, user_root=ur_path, task_id=task_id)
tools[wp.name] = wp tools[wp.name] = wp
if caps.enable_run_python: if caps.enable_run_python:

View File

@ -96,6 +96,14 @@ class Task(Base):
deleted_at: Mapped[Optional[datetime]] = mapped_column( deleted_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True DateTime(timezone=True), nullable=True
) )
# 定时任务执行归属(0017):非 NULL = 该 task 是某 scheduled_job 的一次执行(isolated
# 每次新建 / persistent 首次新建都填)。普通对话列表据此排除,不混进"用户项目"列表;
# crons 页可按 job 反查执行历史。job 走软删不硬删 → ondelete SET NULL 安全。
scheduled_job_id: Mapped[Optional[UUID]] = mapped_column(
PG_UUID(as_uuid=True),
ForeignKey("scheduled_jobs.job_id", ondelete="SET NULL"),
nullable=True,
)
class Message(Base): class Message(Base):
@ -115,6 +123,10 @@ class Message(Base):
# 0006:产生该 message 的模型(只在 assistant 行有值;user/tool/system 为 NULL)。 # 0006:产生该 message 的模型(只在 assistant 行有值;user/tool/system 为 NULL)。
# 跟 usage_events.model_profile 写入一致,JOIN-free 时按 message 直查也能拿到。 # 跟 usage_events.model_profile 写入一致,JOIN-free 时按 message 直查也能拿到。
model_profile: Mapped[Optional[str]] = mapped_column(Text, nullable=True) model_profile: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# 消息来源(0018):NULL=agent run 产生;"push"=push 记录(_record_push_to_chat 写)。
# extract_last_assistant_text 据此跳过 push 记录,避免误取当入站回复。独立列不进 payload,
# 不影响 agent 上下文 / LLM API。
kind: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False DateTime(timezone=True), server_default=func.now(), nullable=False
) )

View File

@ -6,9 +6,10 @@ from uuid import UUID
from sqlalchemy import func, select, update from sqlalchemy import func, select, update
from sqlalchemy.dialects.postgresql import insert from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.exc import IntegrityError
from .engine import session_scope from .engine import session_scope
from .models import Task from .models import Message, Task
class NoSubtaskError(ValueError): class NoSubtaskError(ValueError):
@ -26,6 +27,7 @@ def ensure_local_task_row(
model_profile: str = "", model_profile: str = "",
reasoning_effort: str = "", reasoning_effort: str = "",
channel: str = "web", channel: str = "web",
scheduled_job_id: Optional[UUID] = None,
) -> None: ) -> None:
"""占位 INSERT(ON CONFLICT DO NOTHING)—— 不覆盖已有字段。 """占位 INSERT(ON CONFLICT DO NOTHING)—— 不覆盖已有字段。
@ -47,6 +49,7 @@ def ensure_local_task_row(
model_profile=model_profile, model_profile=model_profile,
reasoning_effort=reasoning_effort, reasoning_effort=reasoning_effort,
channel=channel, channel=channel,
scheduled_job_id=scheduled_job_id,
) )
.on_conflict_do_nothing(index_elements=["task_id"]) .on_conflict_do_nothing(index_elements=["task_id"])
) )
@ -54,6 +57,31 @@ def ensure_local_task_row(
s.execute(stmt) s.execute(stmt)
def append_channel_message(
task_id: UUID, content: str, *, role: str = "assistant", kind: Optional[str] = None
) -> None:
"""往 task 追加一条非 agent-run 产生的消息(push 出站记录等)。原子算 idx
(SELECT max(idx)+1)+INSERT; uq_messages_task_idx(与入站 agent run 并发
append) 重试payload 形态同 Session.append {role, content};不设
model_profile / tokens_*(非模型产出,usage 不计)kind messages.kind
(独立列,不进 payload):"push" 标记 push 记录,extract_last_assistant_text 据此跳过"""
payload = {"role": role, "content": content}
last_err: Optional[Exception] = None
for _ in range(3):
try:
with session_scope() as s:
max_idx = s.execute(
select(func.max(Message.idx)).where(Message.task_id == task_id)
).scalar()
next_idx = (max_idx if max_idx is not None else -1) + 1
s.add(Message(task_id=task_id, idx=next_idx, payload=payload, kind=kind))
return
except IntegrityError as e:
last_err = e
continue
raise RuntimeError(f"append_channel_message: idx 冲突重试耗尽: {last_err}")
def upsert_task( def upsert_task(
task_id: UUID, task_id: UUID,
*, *,

View File

@ -47,7 +47,7 @@ def extract_last_assistant_text(task_id: UUID, *, scan: int = 20) -> str:
with session_scope() as s: with session_scope() as s:
rows = s.execute( rows = s.execute(
select(Message.payload) select(Message.payload)
.where(Message.task_id == task_id) .where(Message.task_id == task_id, Message.kind.is_(None))
.order_by(Message.idx.desc()) .order_by(Message.idx.desc())
.limit(scan) .limit(scan)
).all() ).all()

View File

@ -304,17 +304,130 @@ def active_channels() -> list[str]:
_DISPATCH = {_CLAWBOT: push_clawbot, _WECOM: push_wecom} _DISPATCH = {_CLAWBOT: push_clawbot, _WECOM: push_wecom}
def ensure_channel_chat_task(uid: UUID, channel: str) -> Optional[UUID]:
"""确保 uid 的 channel 常驻 chat task 存在(未软删),返回 task_id;不存在则新建并回填绑定。
channel {'wechat','wecom'}wechat binding 返回 None(没法建/)
入站对话(`_run_channel_conversation`) push 记录(`send_to_user`)共用此入口,
避免两条"解析/建 chat task"路径逻辑漂移 task 逻辑搬自原 _run_channel_conversation
"""
from uuid import uuid4
from core.agent_builder import ( # 延迟 import:service 被 tools.wechat_bot 引用,
load_config, resolve_workspace, working_dir_from_name, # agent_builder 又 import tools.wechat_bot
) # → 顶层 import 循环;函数内 import 打破(同 scheduler.py:227 范式)
from core.capabilities import ModelCapabilities
from core.paths import ROOT, to_db_path
from core.storage.models import Task
from core.storage.utils import ensure_local_task_row
if channel == "wecom":
existing_tid = get_wecom_chat_task(uid)
task_name, slug, desc = "企业微信对话", f"wecom-{str(uid)[:8]}", "(企业微信对话)"
set_task = set_wecom_chat_task
else: # wechat
snap = get_binding(uid)
if snap is None:
return None
existing_tid = snap.chat_task_id
task_name, slug, desc = "微信对话", f"wechat-{str(uid)[:8]}", "(微信 ClawBot 对话)"
set_task = set_chat_task
tid = existing_tid
need_create = tid is None
if not need_create:
with session_scope() as s:
exists = s.execute(
select(Task.task_id).where(Task.task_id == tid, Task.deleted_at.is_(None))
).first()
if exists is None:
need_create = True
if need_create:
cfg = load_config()
profile = cfg["default_model"]
caps = ModelCapabilities.load(profile, ROOT / cfg["models_dir"])
ws = resolve_workspace(None, cfg)
tid = uuid4()
fs_dir = working_dir_from_name(ws, uid, slug)
fs_dir.mkdir(parents=True, exist_ok=True)
ensure_local_task_row(
task_id=tid, name=task_name, working_dir=to_db_path(fs_dir),
skill="", user_id=uid, model=caps.model_id, model_profile=profile,
description=desc, channel=channel,
)
set_task(uid, tid)
return tid
def _file_rel_to_user_root(user_id: UUID, file_path: str) -> Optional[str]:
"""宿主绝对路径 → user_root 相对 POSIX(如 scheduled-<jobid>/x.md)。
文件不在 user_root (外部 --working-dir) None"""
from pathlib import Path
from core.agent_builder import load_config, resolve_workspace, user_root
try:
ws = resolve_workspace(None, load_config())
root = user_root(ws, user_id)
return Path(file_path).resolve().relative_to(root.resolve()).as_posix()
except Exception:
return None
def _build_push_message(text: str, rel: Optional[str]) -> str:
"""构造写进 chat task 的 assistant 消息:推送摘要 + 可点文件链接 + agent read 路径。"""
lines: list[str] = []
if text and text.strip():
lines.append(text.strip())
if rel:
fname = rel.rsplit("/", 1)[-1]
lines.append(f"产物文件:[{fname}](/v1/files/download?path={rel})")
lines.append(f"(如需基于此文件提问,可读取 ../{rel})")
return "\n\n".join(lines)
def _record_push_to_chat(
report: DeliveryReport, user_id: UUID, text: str,
file_path: Optional[str], source_task_id: Optional[UUID],
) -> None:
"""把投递成功的推送记为对应渠道 chat task 的 assistant 消息(web 端可见 +
agent 可基于追问)Unified 模式: agent 上下文(推送是 bot 发给用户的话,
记得自己发过什么 = 连贯,非污染)记录失败不影响投递(吞掉打日志)"""
if not report.delivered:
return
from core.storage.utils import append_channel_message
rel = _file_rel_to_user_root(user_id, file_path) if file_path else None
for r in report.results:
if not r.ok:
continue
ch = "wechat" if r.channel == _CLAWBOT else r.channel # clawbot→wechat(建 task channel)
try:
tid = ensure_channel_chat_task(user_id, ch)
if tid is None:
continue
if source_task_id is not None and tid == source_task_id:
continue # 调用方即该 chat task 自己的 run,tool 记录已在,不重复插摘要
append_channel_message(tid, _build_push_message(text, rel), kind="push")
except Exception as e: # noqa: BLE001 —— 记录失败不放大,投递已成功
print(f"[push] record to {ch} chat task failed: {type(e).__name__}: {e}")
def send_to_user( def send_to_user(
user_id: UUID, user_id: UUID,
text: str = "", text: str = "",
file_path: Optional[str] = None, file_path: Optional[str] = None,
channel: Optional[str] = None, channel: Optional[str] = None,
*,
source_task_id: Optional[UUID] = None,
) -> DeliveryReport: ) -> DeliveryReport:
"""渠道抽象:按 `active_channels()` 列出的已开渠道投递。 """渠道抽象:按 `active_channels()` 列出的已开渠道投递 + 把推送记进渠道 chat task
- `channel=None`(默认):广播到所有已开渠道(定时任务/不点名推送沿用此口径) - `channel=None`(默认):广播到所有已开渠道(定时任务/不点名推送沿用此口径)
- `channel="wecom"|"clawbot"`:用户点名某个微信时只投这一条;若该渠道未开/无效, - `channel="wecom"|"clawbot"`:用户点名某个微信时只投这一条;若该渠道未开/无效,
返回单条 `no_binding` 结果(不静默回退到别的渠道,避免又推到没点名的渠道) 返回单条 `no_binding` 结果(不静默回退到别的渠道,避免又推到没点名的渠道)
- 投递成功后,对每个成功渠道把推送(摘要 + 文件链接 + read 路径)作为 assistant
消息写进该渠道 chat task(不存在自动建)`source_task_id` = 调用方所在 task:
若恰为目标 chat task 自己(如用户在微信里让 agent ),tool 记录已在,跳过去重
""" """
report = DeliveryReport() report = DeliveryReport()
if channel is not None: if channel is not None:
@ -322,7 +435,8 @@ def send_to_user(
report.results.append(_DISPATCH[channel](user_id, text, file_path)) report.results.append(_DISPATCH[channel](user_id, text, file_path))
else: else:
report.results.append(PushResult(False, channel=channel, reason="no_binding")) report.results.append(PushResult(False, channel=channel, reason="no_binding"))
return report else:
for ch in active_channels(): for ch in active_channels():
report.results.append(_DISPATCH[ch](user_id, text, file_path)) report.results.append(_DISPATCH[ch](user_id, text, file_path))
_record_push_to_chat(report, user_id, text, file_path, source_task_id)
return report return report

View File

@ -0,0 +1,58 @@
"""tasks.scheduled_job_id 列(定时任务执行归属,DESIGN §8.5).
Revision ID: 0017
Revises: 0016
Create Date: 2026-06-26
tasks scheduled_job_id(nullable FK scheduled_jobs.job_id, ondelete SET NULL)
NULL = task 是某定时任务的一次执行(isolated 每次新建 / persistent 首次新建都填),
普通对话列表据此排除,不混进"用户项目"列表;job 软删不硬删,SET NULL 安全
backfill 存量定时执行 task:
- persistent:bound_task_id 直接指向其常驻 task 精确回填
- isolated:working_dir 末段 'scheduled-<job_id 前 8 位>' 8 位前缀匹配 job_id
匹配不上的孤行(job 已物理删等) NULL,由列表查询的 working_dir LIKE 兜底排除
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
revision: str = "0017"
down_revision: Union[str, None] = "0016"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column(
"tasks",
sa.Column("scheduled_job_id", PG_UUID(as_uuid=True), nullable=True),
)
op.create_foreign_key(
"fk_tasks_scheduled_job_id",
"tasks", "scheduled_jobs",
["scheduled_job_id"], ["job_id"],
ondelete="SET NULL",
)
# persistent:bound_task_id 精确指向其常驻 task
op.execute(
"UPDATE tasks SET scheduled_job_id = j.job_id "
"FROM scheduled_jobs j "
"WHERE j.bound_task_id = tasks.task_id"
)
# isolated:working_dir 末段 scheduled-<8hex> 按 job_id 前 8 位匹配
op.execute(
"UPDATE tasks t SET scheduled_job_id = j.job_id "
"FROM scheduled_jobs j "
"WHERE t.scheduled_job_id IS NULL "
"AND t.working_dir ~ 'scheduled-[0-9a-f]{8}' "
"AND left(j.job_id::text, 8) = substring(t.working_dir from 'scheduled-([0-9a-f]{8})')"
)
def downgrade() -> None:
op.drop_constraint("fk_tasks_scheduled_job_id", "tasks", type_="foreignkey")
op.drop_column("tasks", "scheduled_job_id")

View File

@ -0,0 +1,29 @@
"""messages.kind 列(消息来源标记,避免 push 记录被 extract_last_assistant_text 误取).
Revision ID: 0018
Revises: 0017
Create Date: 2026-06-26
messages kind (nullable Text,默认 NULL)NULL=agent run 产生的消息;
"push"=push 记录(_record_push_to_chat )extract_last_assistant_text
WHERE kind IS NULL 跳过 push 记录,避免 wecom 入站取回复时误取 push 摘要
独立列不进 payload,不影响 agent 上下文 / LLM API纯加列,不动现有数据
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0018"
down_revision: Union[str, None] = "0017"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("messages", sa.Column("kind", sa.Text(), nullable=True))
def downgrade() -> None:
op.drop_column("messages", "kind")

View File

@ -239,7 +239,7 @@ class TestHostFileToolPathResolution(unittest.TestCase):
delivered = True delivered = True
results = [] results = []
def fake_send(uid, text, fpath): def fake_send(uid, text, fpath, **kwargs):
captured["fpath"] = fpath captured["fpath"] = fpath
return _Report() return _Report()

View File

@ -63,9 +63,12 @@ class WechatPushTool(Tool):
"required": ["text"], "required": ["text"],
} }
def __init__(self, user_id: UUID, base_dir=None, user_root=None) -> None: def __init__(self, user_id: UUID, base_dir=None, user_root=None, *, task_id: Optional[UUID] = None) -> None:
super().__init__(base_dir=base_dir, user_root=user_root) super().__init__(base_dir=base_dir, user_root=user_root)
self.user_id = user_id self.user_id = user_id
# 当前 agent run 所在 task:推送记录(send_to_user)据此去重——若调用方即目标
# chat task 自己(如用户在微信里让 agent 推),tool 记录已在对话历史,不重复插摘要。
self.task_id = task_id
def execute( def execute(
self, text: str = "", file: Optional[str] = None, channel: Optional[str] = None self, text: str = "", file: Optional[str] = None, channel: Optional[str] = None
@ -89,7 +92,7 @@ class WechatPushTool(Tool):
return f"[Error] 文件不存在: {file}" return f"[Error] 文件不存在: {file}"
fpath = str(p) fpath = str(p)
report = service.send_to_user(self.user_id, text, fpath, channel=channel) report = service.send_to_user(self.user_id, text, fpath, channel=channel, source_task_id=self.task_id)
if report.delivered: if report.delivered:
n = "(含 1 个文件)" if fpath else "" n = "(含 1 个文件)" if fpath else ""
return f"[ok] 已推送到微信 {n}".strip() return f"[ok] 已推送到微信 {n}".strip()

View File

@ -440,45 +440,15 @@ async def _run_channel_conversation(app, uid, text, attachments, *, channel):
wecom 绑定行取 chat_task_id)attachments:已下载解密的入站附件(可空,wecom 暂只收文本) wecom 绑定行取 chat_task_id)attachments:已下载解密的入站附件(可空,wecom 暂只收文本)
返回回复文本( ClawBot 回流 / wecom 主动推回) 返回回复文本( ClawBot 回流 / wecom 主动推回)
""" """
from core.agent_builder import resolve_workspace, working_dir_from_name, load_config
from core.storage.utils import ensure_local_task_row
from core.wechat import service as _wx from core.wechat import service as _wx
from core.wechat.ilink import attachment_basename from core.wechat.ilink import attachment_basename
from core.wechat.inbound import extract_last_assistant_text from core.wechat.inbound import extract_last_assistant_text
if channel == "wecom": # 解析/建该渠道常驻 chat task(不存在自动建)—— 与 push 记录(send_to_user)共用
existing_tid = await asyncio.to_thread(_wx.get_wecom_chat_task, uid) # ensure_channel_chat_task,避免两条建 task 路径漂移。wechat 无 binding → 返回 None。
task_name, slug, desc = "企业微信对话", f"wecom-{str(uid)[:8]}", "(企业微信对话)" tid = await asyncio.to_thread(_wx.ensure_channel_chat_task, uid, channel)
set_task = _wx.set_wecom_chat_task if tid is None:
else: return ""
snap = await asyncio.to_thread(_wx.get_binding, uid)
if snap is None:
return ""
existing_tid = snap.chat_task_id
task_name, slug, desc = "微信对话", f"wechat-{str(uid)[:8]}", "(微信 ClawBot 对话)"
set_task = _wx.set_chat_task
profile, model_id = _resolve_model_profile("")
ws = resolve_workspace(None, load_config())
tid = existing_tid
need_create = tid is None
if not need_create:
with session_scope() as s:
exists = s.execute(
select(Task.task_id).where(Task.task_id == tid, Task.deleted_at.is_(None))
).first()
if exists is None:
need_create = True
if need_create:
tid = uuid4()
fs_dir = working_dir_from_name(ws, uid, slug)
fs_dir.mkdir(parents=True, exist_ok=True)
ensure_local_task_row(
task_id=tid, name=task_name, working_dir=to_db_path(fs_dir),
skill="", user_id=uid, model=model_id, model_profile=profile,
description=desc, channel=channel,
)
await asyncio.to_thread(set_task, uid, tid)
# 落盘入站附件到 <wd>/inbound/,拼 [用户上传的...] 行进 text(复用 web 端粘贴图约定) # 落盘入站附件到 <wd>/inbound/,拼 [用户上传的...] 行进 text(复用 web 端粘贴图约定)
if attachments: if attachments:
@ -861,6 +831,7 @@ def create_app() -> FastAPI:
skill=snap.get("skill") or "", user_id=uid, skill=snap.get("skill") or "", user_id=uid,
model=model_id, model_profile=profile, model=model_id, model_profile=profile,
description="(定时任务自动创建)", description="(定时任务自动创建)",
scheduled_job_id=job_id,
) )
if snap["mode"] == "persistent": if snap["mode"] == "persistent":
with session_scope() as s: with session_scope() as s:
@ -1685,6 +1656,10 @@ def create_app() -> FastAPI:
Task.user_id == user_id, Task.user_id == user_id,
Task.deleted_at.is_(None), Task.deleted_at.is_(None),
func.coalesce(Task.channel, "web").notin_(CHANNEL_MIRROR_KINDS), func.coalesce(Task.channel, "web").notin_(CHANNEL_MIRROR_KINDS),
# 定时任务执行 task(scheduled_job_id 归属)不进普通列表;兜底 working_dir
# LIKE 防 backfill 漏网的孤行(job 已物理删的 isolated task)
Task.scheduled_job_id.is_(None),
~Task.working_dir.like("%/scheduled-%"),
] ]
if status: if status:
conditions.append(Task.status == status) conditions.append(Task.status == status)