From b27cc9cd5bf815cfc0a4eb65ea9ff7fef68fefcf Mon Sep 17 00:00:00 2001 From: caoqianming Date: Mon, 29 Jun 2026 10:05:07 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20channel=20=E9=95=BF=E4=BC=9A=E8=AF=9D?= =?UTF-8?q?=E4=B8=8A=E4=B8=8B=E6=96=87=E8=BD=AF=E9=87=8D=E7=BD=AE(gap=20?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=88=86=E6=AE=B5=20+=20=E6=96=B0=E8=AF=9D?= =?UTF-8?q?=E9=A2=98=E5=91=BD=E4=BB=A4)(bump=200.32.0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 微信/企业微信常驻会话不再无限膨胀。tasks 加 context_base_idx, Session.load 只把 idx>=base 的消息喂模型,base 之前历史全留 DB (网页端照旧翻完整记录,一条不删)。 - 自动 gap 分段:入站距上次消息超 channel.session_gap_hours(默 6h) → 软重置,base=最后一条 user 消息 idx(保留上一轮做续聊锚点) - 手动新话题:发「新话题/新会话//new/清空上下文」→ 硬重置 base=总数 - clear_messages 全删后归零 base;_db_idx 取真实总数避免 append 撞 idx Co-Authored-By: Claude Opus 4.8 (1M context) --- PROGRESS.md | 9 ++- RUN.md | 1 + core/__init__.py | 2 +- core/session.py | 19 +++++- core/storage/models.py | 6 ++ core/wechat/service.py | 60 ++++++++++++++++++- ...0260629_1000_0019_task_context_base_idx.py | 40 +++++++++++++ web/app.py | 18 ++++++ 8 files changed, 148 insertions(+), 7 deletions(-) create mode 100644 db/migrations/versions/20260629_1000_0019_task_context_base_idx.py diff --git a/PROGRESS.md b/PROGRESS.md index b88800c..b7b0b42 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`。 -最后更新:2026-06-26(admin 近7天用量表加合计行 + bump 0.31.1) +最后更新:2026-06-29(channel 长会话上下文软重置 + bump 0.32.0) --- @@ -21,6 +21,13 @@ ## 已完成关键能力 +### 2026-06-29 / channel 长会话上下文软重置(Phase 1,bump 0.32.0) + +- 问题:微信/企业微信复用同一常驻 chat_task,`Session.load` 全量喂模型 → 越用越贵/慢,终撞 context window。业界(OpenClaw/Hermes)做法:阈值摘要 + 会话分段 + 持久记忆;IM 场景独有的「会话分段」最高杠杆且零信息损失。 +- 方案(对外契约友好,无删用户数据):`tasks` 加 `context_base_idx`(0019,additive),`Session.load` 只把 `idx >= base` 的消息装进 LLM 上下文,base 之前的历史仍全量留 messages 表(web `/messages` 不 gate,照旧翻完整历史)。**关键雷点**:`_db_idx` 取 DB 真实总数而非 `len(rows)`,否则 append 续号撞 `uq_messages_task_idx`。 +- 两个触发口(`core/wechat/service.py`):① 自动 gap——入站时距上次消息超 `channel.session_gap_hours`(默 6h)→ 软重置,base=最后一条 user 消息 idx(保留上一轮原文做续聊锚点,不是失忆墙);② 手动「新话题/新会话/`/new`/清空上下文」→ 硬重置 base=总数,彻底从零。`_run_channel_conversation`(`web/app.py`)接入两口;`clear_messages` 全删后顺手 base 归 0。 +- Phase 2(阈值结构化摘要,对齐 Hermes 四阶段③)、Phase 3(sqlite-vec/FTS5 持久检索,解「问很久前的精确内容」)延后,待观察 token 曲线再定。 + ### 2026-06-26 / 消息框支持拖拽文件 + 修多次粘贴互相顶掉(bump 0.31.3) - 现象:① 消息框只能粘贴文件不能拖拽;② 连粘多个文件,后一个把前一个的 chip 顶掉,只剩一个。 diff --git a/RUN.md b/RUN.md index 1c4307a..e94783e 100644 --- a/RUN.md +++ b/RUN.md @@ -81,6 +81,7 @@ - **扫码授权登录(要 HTTPS 域名)**:管理员在应用→**「企业微信授权登录」**里把 zcbot 域名配进可信域名(注意不是「网页授权可信域名」,是另一项)+ 设 `ZCBOT_PUBLIC_BASE_URL`;用户点「扫码绑定」→ 桌面浏览器出二维码 → 企业微信 App 扫码确认。回调 `/v1/wecom/oauth/callback` 公开(身份从 HMAC state 验)。链接走 `login.work.weixin.qq.com/wwlogin/sso/login`(不是网页授权 `oauth2/authorize`,后者只能在企微客户端内打开 → 桌面浏览器会报「请在企业微信客户端打开链接」)。 - 绑定后简报/结果**无条件主动推**(不挑活跃度、无 24h 窗口),适合必达。 - **入站对话(可选,要公网 HTTPS)**:企微后台「应用 → 接收消息 → 设置 API 接收」填回调 URL `<公网 base>/v1/wecom/callback` + 自动生成的 Token / EncodingAESKey → 写进 env `WECOM_CALLBACK_TOKEN` / `WECOM_CALLBACK_AESKEY` → 保存时企微 GET 验 URL(`/v1/wecom/callback` GET 自动回 echostr)。配好后用户在企业微信里直接给应用发消息即走 zcbot 对话(与个人微信各一张会话上下文)。agent 跑完走 message/send 主动推回(非被动同步,故无 5s 限制)。**支持文本 + 图片 + 文件**(图片/文件走 media/get 下载,落盘进会话目录 inbound/);语音/视频/位置等暂不处理;未绑定/空消息静默。 +- **channel 长会话上下文(微信/企业微信通用,0019)**:常驻会话不再无限膨胀。① **自动分段**——入站时距上次消息超过 `config.json` 的 `channel.session_gap_hours`(默 **6** 小时,设 `<=0` 关闭)→ 软重置:只把「最后一条 user 消息起」喂模型(保留上一轮做续聊锚点),之前的历史仍全留 DB,网页端照旧翻完整记录;② **手动新话题**——用户在微信/企业微信里直接发「新话题 / 新会话 / `/new` / 清空上下文」→ 硬重置,彻底从零(回执提示已归档)。两者都**不删任何消息**,只移动「喂给模型的窗口起点」`tasks.context_base_idx`。网页端「清空对话」(`POST /v1/tasks/{id}/clear`)仍整清并把 base 归 0。需 `main.py db upgrade head` 带上 `0019`。 - **PG**:`ZCBOT_DB_URL` 必填。本地 docker compose / 远端 dev / 生产任选;未设置时启动清晰报错,不引导 docker(§7.4)。 - **Auth env**:`PLATFORM_KEY` + `JWT_SECRET` 任一缺失 web 启动 fail-fast。生成随机串:`python -c "import secrets; print(secrets.token_urlsafe(48))"`。 - **用户管理**(`users.email/password_hash/role`,0005 UNIQUE(email)、0009 role):dev SPA 登录后端。发用户两条路径任选:CLI `main.py user add`(下方),或在登录页右下角"+ 管理员添加用户"链接(需先设 `ZCBOT_ADMIN_TOKEN` env,弹窗输入 email/密码/管理员口令/角色)。撤用户 `DELETE FROM users WHERE email=...`(先 DELETE 该 user 的 tasks)。**用户自助改密**:登录后顶栏「改密码」按钮(走 `POST /v1/auth/change_password`,需知道旧密码);改邮箱 / 用户忘了旧密码无法自助 → 手动 SQL(见故障兜底)。 diff --git a/core/__init__.py b/core/__init__.py index 7916856..df6dc7c 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,3 +1,3 @@ # zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。 # 改版本只动这一行。 -__version__ = "0.31.3" +__version__ = "0.32.0" diff --git a/core/session.py b/core/session.py index b48de3c..4e6d458 100644 --- a/core/session.py +++ b/core/session.py @@ -15,7 +15,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional from uuid import UUID -from sqlalchemy import delete, select +from sqlalchemy import delete, func, select from .storage import session_scope from .storage.models import Message, Task @@ -116,17 +116,30 @@ class Session: 若 task_id 在 DB 不存在,返回空 Session(messages 只含 system,_db_idx=0); 调用方判断该不该报错。 + + 只把 idx >= tasks.context_base_idx 的消息装进 LLM 上下文(channel 长会话软重置, + 0019)。base 之前的历史仍全量留 messages 表(web `/messages` 不 gate,照旧翻得到)。 + **关键**:`_db_idx` 必须取 DB 真实总条数(下一条 append 的 idx),不能用 len(rows) + —— 否则下次 append 会复用已存在的 idx,撞 uq_messages_task_idx / 覆盖历史。 """ sess = cls(task_id=task_id, system_prompt=system_prompt, meta=meta) with session_scope() as s: + base = s.execute( + select(Task.context_base_idx).where(Task.task_id == task_id) + ).scalar_one_or_none() or 0 rows = s.execute( select(Message) - .where(Message.task_id == task_id) + .where(Message.task_id == task_id, Message.idx >= base) .order_by(Message.idx) ).scalars().all() for row in rows: sess.messages.append(dict(row.payload)) - sess._db_idx = len(rows) + # 真实总条数(含 base 之前的归档历史),保证 append 续号不撞 idx。 + sess._db_idx = s.execute( + select(func.count()) + .select_from(Message) + .where(Message.task_id == task_id) + ).scalar_one() return sess @classmethod diff --git a/core/storage/models.py b/core/storage/models.py index df13dd7..f793392 100644 --- a/core/storage/models.py +++ b/core/storage/models.py @@ -85,6 +85,12 @@ class Task(Base): # 只有 error 是持久终态(下次起新 run 时由 post_message 清掉) run_status: Mapped[str] = mapped_column(Text, nullable=False, default="idle") run_error: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + # 喂给模型的上下文窗口起点(0019,channel 长会话软重置)。Session.load 只把 idx >= + # context_base_idx 的消息装进 LLM 上下文;之前的历史仍全量留 messages 表(web 翻得到)。 + # web 普通任务恒 0 = 喂全量;channel 入站按 gap / 「新话题」推进。详 DESIGN §8.7。 + context_base_idx: Mapped[int] = mapped_column( + Integer, nullable=False, default=0, server_default="0" + ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False ) diff --git a/core/wechat/service.py b/core/wechat/service.py index ef66cc1..79746f4 100644 --- a/core/wechat/service.py +++ b/core/wechat/service.py @@ -17,10 +17,10 @@ from datetime import datetime, timedelta, timezone from typing import Optional from uuid import UUID -from sqlalchemy import select +from sqlalchemy import func, select, update from core.storage import session_scope -from core.storage.models import ChannelBinding +from core.storage.models import ChannelBinding, Message, Task from core.wechat import crypto from core.wechat.ilink import DEFAULT_BASE, ILinkClient @@ -359,6 +359,62 @@ def ensure_channel_chat_task(uid: UUID, channel: str) -> Optional[UUID]: return tid +# ─────────────────────── channel 长会话上下文软重置(0019) ─────────────────────── + +# gap 默认值:超过它未说话 → 入站时软重置(保留上一轮原文做续聊锚点)。可被 +# config.json 的 channel.session_gap_hours 覆盖(见 reload 入口)。 +SESSION_GAP_HOURS_DEFAULT = 6.0 + +# 用户在 channel 里发这些词 → 手动「新话题」硬重置(base 推到总数,彻底从零)。 +NEW_TOPIC_COMMANDS = frozenset({"新话题", "新会话", "/new", "清空上下文"}) + + +def reset_channel_context(task_id: UUID, *, hard: bool) -> int: + """推进 task 的 context_base_idx(软重置),返回新 base。不删任何消息。 + + hard=True(手动「新话题」):base = 总消息数 → 下一条入站起彻底新会话。 + hard=False(自动 gap):base = 最后一条 user 消息 idx → 新窗口仍带上「上一轮」原文, + 续聊接得上;无 user 消息(理论上不会)退化为总数。 + """ + with session_scope() as s: + total = s.execute( + select(func.count()).select_from(Message).where(Message.task_id == task_id) + ).scalar_one() + if hard: + new_base = int(total) + else: + last_user_idx = s.execute( + select(func.max(Message.idx)).where( + Message.task_id == task_id, + Message.payload["role"].astext == "user", + ) + ).scalar_one_or_none() + new_base = int(last_user_idx) if last_user_idx is not None else int(total) + s.execute( + update(Task).where(Task.task_id == task_id).values(context_base_idx=new_base) + ) + return new_base + + +def maybe_gap_reset(task_id: UUID, gap_hours: float = SESSION_GAP_HOURS_DEFAULT) -> bool: + """入站时检测:距上次消息超过 gap_hours → 软重置(保留上一轮)。返回是否重置。 + + 仅入站对话调用(push 记录不触发)。gap_hours <= 0 视为关闭自动分段。 + """ + if gap_hours <= 0: + return False + with session_scope() as s: + last_at = s.execute( + select(func.max(Message.created_at)).where(Message.task_id == task_id) + ).scalar_one_or_none() + if last_at is None: + return False # 空 task,首条入站,无需重置 + if (datetime.now(timezone.utc) - last_at) <= timedelta(hours=gap_hours): + return False + reset_channel_context(task_id, hard=False) + return True + + def _file_rel_to_user_root(user_id: UUID, file_path: str) -> Optional[str]: """宿主绝对路径 → user_root 相对 POSIX(如 scheduled-/x.md)。 文件不在 user_root 内(外部 --working-dir)→ None。""" diff --git a/db/migrations/versions/20260629_1000_0019_task_context_base_idx.py b/db/migrations/versions/20260629_1000_0019_task_context_base_idx.py new file mode 100644 index 0000000..8d84d91 --- /dev/null +++ b/db/migrations/versions/20260629_1000_0019_task_context_base_idx.py @@ -0,0 +1,40 @@ +"""tasks.context_base_idx 列(channel 长会话软重置,DESIGN §8.7). + +Revision ID: 0019 +Revises: 0018 +Create Date: 2026-06-29 + +给 tasks 加 context_base_idx(NOT NULL DEFAULT 0):喂给模型的上下文窗口起点。 +Session.load 只把 idx >= context_base_idx 的消息装进 LLM 上下文;idx < base 的历史 +仍全量留在 messages 表(web `/messages` 直查不受影响,用户照旧翻完整历史)。 + +channel 入站对话据此做「软重置」:超过 gap 阈值未说话 → base 推到「最后一条 user 消息 +idx」(保留上一轮原文做续聊锚点);手动「新话题」→ base 推到总消息数(彻底从零)。 +存量行 / web 普通任务 base 恒 0 = 喂全量,行为不变。additive,无数据迁移。 +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +revision: str = "0019" +down_revision: Union[str, None] = "0018" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "tasks", + sa.Column( + "context_base_idx", + sa.Integer(), + nullable=False, + server_default="0", + ), + ) + + +def downgrade() -> None: + op.drop_column("tasks", "context_base_idx") diff --git a/web/app.py b/web/app.py index 5bef6de..a43ef9c 100644 --- a/web/app.py +++ b/web/app.py @@ -489,6 +489,22 @@ async def _run_channel_conversation(app, uid, text, attachments, *, channel): if tid is None: return "" + # 手动「新话题」命令:硬重置上下文窗口(base=总数),不跑 agent,直接回执。之前的 + # 对话全留 DB(网页端可翻),只是不再喂模型。纯文本命令,有附件则不当命令处理。 + if not attachments and text.strip() in _wx.NEW_TOPIC_COMMANDS: + await asyncio.to_thread(_wx.reset_channel_context, tid, hard=True) + return "已开启新话题,之前的对话已归档(网页端仍可查看完整历史)。" + + # 自动分段:距上次消息超过 gap 阈值 → 软重置(base=最后一条 user 消息 idx,保留上一轮 + # 原文做续聊锚点)。在入站消息落库前判断,故 last_at 取的是上一轮的时间。push 不走这。 + from core.agent_builder import load_config as _load_config + gap_hours = float( + (_load_config().get("channel") or {}).get( + "session_gap_hours", _wx.SESSION_GAP_HOURS_DEFAULT + ) + ) + await asyncio.to_thread(_wx.maybe_gap_reset, tid, gap_hours) + # 落盘入站附件到 /inbound/,拼 [用户上传的...] 行进 text(复用 web 端粘贴图约定) if attachments: from datetime import datetime @@ -2449,6 +2465,8 @@ def create_app() -> FastAPI: cost_cny=0, run_status="idle", run_error=None, + # 全删后 idx 从 0 重起,base 必须归零否则 load 窗口起点悬空(0019) + context_base_idx=0, ) ) task_row = s.execute(select(Task).where(Task.task_id == tid)).scalar_one()