diff --git a/DESIGN.md b/DESIGN.md index c3cc615..a81e4f6 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -726,8 +726,9 @@ create index on usage_events (model_profile, created_at); 2. `core/scheduler.py` `deliver_notify` 加 `channel=="wechat"` 分支,与 email 并列 → 定时简报**把最新产物文件直推**本人微信(取 `_newest_artifact`,≤上限 `sendmessage` 文件、超限退"点此下载"链接;**不改 job schema**——通道是 notify 字段的值)。 3. `web/app.py`:`POST /v1/wechat/bind/qrcode`(起二维码)、`GET /v1/wechat/bind/status`(轮询绑定结果)、`DELETE /v1/wechat/bind`(解绑)、`POST /v1/wechat/test`(自检发一条);**lifespan 起入站长轮询管理器**(见上"架构");前端设置加"绑定微信"扫码 UI。 -**渠道 B:企业微信自建应用(✅ 2026-06-24 实现,纯推送 / 不做对话,共用渠道抽象)** -- **决策:只做推送、不做入站对话**(刻意简化,省回调 + AES `WXBizMsgCrypt` + 5s ACK + agent 回推那一整套;要对话用 ClawBot)。企业微信因此≈"和邮件一个量级"的纯出站通道,其**无条件主动推**正补 ClawBot 的 24h 窗口短板,定时简报必达首选。 +**渠道 B:企业微信自建应用(✅ 2026-06-24 推送;✅ 2026-06-25 入站对话,共用渠道抽象)** +- **决策演进:出站推送先行,入站对话后补(2026-06-25)**。最初(2026-06-24)刻意只做推送以简化("和邮件一个量级"),其无条件主动推正补 ClawBot 24h 窗口短板;公测中需求明确企业微信也要能直接对话 → 补入站。**入站方式与 ClawBot 本质不同**:ClawBot 走长轮询(`getupdates` + 常驻 `run_inbound_manager`),企业微信走**回调 webhook**(企微服务器主动 POST 加密 XML)→ **无需后台轮询 task**,只加 HTTP 端点。agent 跑 >5s 超被动同步(5s 返回密文 XML)窗口 → 回复走 `message/send` 主动推回(复用 `push_wecom`),被动回复回 `success` 防重试。**对话核心与个人微信共用** `_run_channel_conversation(channel)`(建/复用会话 task → run 锁 → `_run_agent_bg` → 取回复),两渠道**各一张会话 task**(企微 binding 也存 `chat_task_id`)。 + - 入站组件:`core/wechat/wecom_crypto.py`(WXBizMsgCrypt 等价:SHA1 验签 + AES-256-CBC 解密 + receiveid/corpid 校验;与 `crypto.py` Fernet 列加密、`wecom.py` 出站 API 全无关);`service.get_user_by_wecom_userid`(回调反查身份)+ `get/set_wecom_chat_task`;`GET/POST /v1/wecom/callback`(无 JWT,身份从加密 XML `FromUserName` 反查)。env:`WECOM_CALLBACK_TOKEN` / `WECOM_CALLBACK_AESKEY`。**暂只收文本**(图片/语音/文件回 success,后续走 `media/get` 补);未绑定/空消息静默。 - **应用凭据(全局 env,需管理员建应用)**:`WECOM_CORPID / WECOM_AGENTID / WECOM_SECRET`;secret 仅 host 进程读、不进沙箱(同 ClawBot / `send_email`)。host 直连 `qyapi.weixin.qq.com`(`core/wechat/wecom.py`)。 - **绑定两路(touser=wecom_userid)**: - **手填 userid(无 HTTPS 域名时,默认)**:`PUT /v1/wecom/bind/userid` 直接写绑定;userid 见管理后台→通讯录→成员→「账号」。**推送是出站调用、不需域名**,故没域名也能用企业微信推送 —— 仅 OAuth 那路要域名。 diff --git a/PROGRESS.md b/PROGRESS.md index becbf75..b2cd3fb 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`。 -最后更新:2026-06-25(修复企业微信扫码绑定报「请在企业微信客户端打开链接」:换扫码授权登录端点 + bump 0.26.10) +最后更新:2026-06-25(企业微信支持入站对话:回调 webhook + AES 解密 + 复用渠道无关对话核心 + bump 0.27.0) --- @@ -21,6 +21,14 @@ ## 已完成关键能力 +### 2026-06-25 / 企业微信支持入站对话(回调 webhook,bump 0.27.0) + +- 需求:企业微信此前只做出站推送(渠道 B 定位"和邮箱似的");现补**入站对话**,企微也能像个人微信那样直接聊。 +- 关键认知 —— 入站方式与 ClawBot 不同:ClawBot 走**长轮询**(`getupdates` + `run_inbound_manager` 常驻),企业微信走**回调 webhook**(企微服务器主动 POST 加密 XML),故**不需要后台轮询 task**,只加一个 HTTP 端点。回复因 agent 跑 >5s 超被动同步窗口 → 走 `message/send` 主动推回(复用 `push_wecom`),被动回复直接回 `success` 防重试。 +- 抽象:把 `_run_wechat_message` 的"建/复用会话 task → 落盘附件 → 抢 run 锁 → `_run_agent_bg` → 取回复"抽成**模块级 `_run_channel_conversation(app, uid, text, atts, channel)`**,个人微信(`channel='wechat'`)与企业微信(`channel='wecom'`)同核心、**各一张会话 task**(企微 binding 也存 `chat_task_id`),互不串扰。run 锁挡企微回调的并发/重复投递。 +- 新增:`core/wechat/wecom_crypto.py`(WXBizMsgCrypt 等价:SHA1 验签 + AES-256-CBC 解密 + receiveid/corpid 校验;**注意**与 `crypto.py` 的 Fernet 列加密、`wecom.py` 的出站 API 全无关);`service.get_user_by_wecom_userid` 回调反查身份 + `get/set_wecom_chat_task`;`upsert_wecom_binding` 改成合并 config(不再覆盖 chat_task_id);`web/app.py` `GET/POST /v1/wecom/callback`(无 JWT,身份从加密 XML `FromUserName` 反查)。 +- env:`WECOM_CALLBACK_TOKEN` / `WECOM_CALLBACK_AESKEY`(企微后台「接收消息」页生成);回调 URL = `<公网 base>/v1/wecom/callback`。**暂只收文本**(图片/语音/文件回 success,后续走 `media/get` 补);未绑定/空消息静默。crypto round-trip 自测过(verify_url / decrypt_message / 坏签名 / 坏 corpid 均符合预期)。 + ### 2026-06-25 / 修复企业微信扫码绑定报「请在企业微信客户端打开链接」(bump 0.26.10) - bug:`oauth_authorize_url()` 用的是 `open.weixin.qq.com/connect/oauth2/authorize`(网页授权),这条只能在企业微信客户端内置浏览器里打开;前端 `wecomBind()` 用 `window.open` 在**桌面浏览器**新标签打开它 → 企业微信返回「请在企业微信客户端打开链接」,扫不了码。注释里「桌面浏览器=出二维码扫」是误解(那是公众号行为,企微 oauth2/authorize 不出扫码页)。 diff --git a/RUN.md b/RUN.md index d9e72b4..ae92ad6 100644 --- a/RUN.md +++ b/RUN.md @@ -60,11 +60,15 @@ # ZCBOT_WECHAT_BOT_ENABLED=1 # 渠道总开关;开启后 lifespan 起入站管理器,用户可扫码绑定 # ZCBOT_WECHAT_SECRET_KEY=<随机串> # 凭据(bot_token/context_token)列加密密钥;缺则退明文标记(公测兜底) # ZCBOT_WECHAT_BASE_URL=... # 可选,覆盖 iLink base(默 https://ilinkai.weixin.qq.com) - # 企业微信(渠道 B,纯推送,§8.7):三件套齐才挂。无条件主动推,补 ClawBot 24h 窗口短板。 + # 企业微信(渠道 B,出站推送 + 入站对话,§8.7):三件套齐才挂推送。无条件主动推,补 ClawBot 24h 窗口短板。 # WECOM_CORPID=ww... # 企业 ID(管理员:我的企业→企业信息) # WECOM_AGENTID=1000002 # 自建应用 AgentId # WECOM_SECRET=... # 自建应用 Secret # ZCBOT_PUBLIC_BASE_URL=https://zcbot.example.com # 可选,OAuth 回调主机(须在应用「企业微信授权登录」可信域名内;缺则取请求 base) + # 入站对话(可选,要公网 HTTPS):企微后台「应用→接收消息→设置 API 接收」填回调 URL + 下面两项, + # 用户即可在企业微信里直接和 zcbot 对话(回调 URL = <公网 base>/v1/wecom/callback)。 + # WECOM_CALLBACK_TOKEN=... # 接收消息 Token(企微后台生成) + # WECOM_CALLBACK_AESKEY=... # EncodingAESKey(43 字符,企微后台生成) ``` > litellm 在 import 时副作用加载 .env;入口走 `main.py`,`.env` 自动生效。直跑 `python -c "from core.storage import ..."` 不经 litellm 链路时记得自己 `import litellm` 触发,或手动 `export ZCBOT_DB_URL=...`。 - **依赖**:`pip install -r requirements.txt`(已在 `.venv` 里;含 `bcrypt`、`segno`、`cryptography`)。 @@ -72,7 +76,8 @@ - **企业微信(渠道 B,纯推送,§8.7)**:① 管理员建自建应用 → 填 `WECOM_CORPID/AGENTID/SECRET`(+ 可见范围含目标用户);② `main.py db upgrade head`。**绑定两条路,任选**: - **手填 userid(无域名时,最省)**:rail「微信」modal 企业微信段填成员 userid(管理后台→通讯录→点成员→「账号」)→ 保存。**推送是出站调用,不需要域名/HTTPS**,这条最省事。 - **扫码授权登录(要 HTTPS 域名)**:管理员在应用→**「企业微信授权登录」**里把 zcbot 域名配进可信域名(注意不是「网页授权可信域名」,是另一项)+ 设 `ZCBOT_PUBLIC_BASE_URL`;用户点「扫码绑定」→ 桌面浏览器出二维码 → 企业微信 App 扫码确认。回调 `/v1/wecom/oauth/callback` 公开(身份从 HMAC state 验)。链接走 `login.work.weixin.qq.com/wwlogin/sso/login`(不是网页授权 `oauth2/authorize`,后者只能在企微客户端内打开 → 桌面浏览器会报「请在企业微信客户端打开链接」)。 - - 绑定后简报/结果**无条件主动推**(不挑活跃度、无 24h 窗口),适合必达;不做对话(要对话用 ClawBot)。 + - 绑定后简报/结果**无条件主动推**(不挑活跃度、无 24h 窗口),适合必达。 + - **入站对话(可选,要公网 HTTPS)**:企微后台「应用 → 接收消息 → 设置 API 接收」填回调 URL `<公网 base>/v1/wecom/callback` + 自动生成的 Token / EncodingAESKey → 写进 env `WECOM_CALLBACK_TOKEN` / `WECOM_CALLBACK_AESKEY` → 保存时企微 GET 验 URL(`/v1/wecom/callback` GET 自动回 echostr)。配好后用户在企业微信里直接给应用发消息即走 zcbot 对话(与个人微信各一张会话上下文)。agent 跑完走 message/send 主动推回(非被动同步,故无 5s 限制)。**暂只收文本**;未绑定/空消息静默。 - **PG**:`ZCBOT_DB_URL` 必填。本地 docker compose / 远端 dev / 生产任选;未设置时启动清晰报错,不引导 docker(§7.4)。 - **Auth env**:`PLATFORM_KEY` + `JWT_SECRET` 任一缺失 web 启动 fail-fast。生成随机串:`python -c "import secrets; print(secrets.token_urlsafe(48))"`。 - **用户管理**(`users.email/password_hash/role`,0005 UNIQUE(email)、0009 role):dev SPA 登录后端。发用户两条路径任选:CLI `main.py user add`(下方),或在登录页右下角"+ 管理员添加用户"链接(需先设 `ZCBOT_ADMIN_TOKEN` env,弹窗输入 email/密码/管理员口令/角色)。撤用户 `DELETE FROM users WHERE email=...`(先 DELETE 该 user 的 tasks)。**用户自助改密**:登录后顶栏「改密码」按钮(走 `POST /v1/auth/change_password`,需知道旧密码);改邮箱 / 用户忘了旧密码无法自助 → 手动 SQL(见故障兜底)。 diff --git a/core/__init__.py b/core/__init__.py index 31edd69..ba42921 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,3 +1,3 @@ # zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。 # 改版本只动这一行。 -__version__ = "0.26.10" +__version__ = "0.27.0" diff --git a/core/storage/models.py b/core/storage/models.py index 0b42095..8d5a33b 100644 --- a/core/storage/models.py +++ b/core/storage/models.py @@ -243,7 +243,8 @@ class ChannelBinding(Base): config 形态(敏感字段经 core/wechat/crypto.py 加密入 JSONB,绝不进沙箱/日志/API): - channel='clawbot':{bot_token*, bot_im_id, user_im_id, base_url, latest_context_token*, context_token_at(iso), chat_task_id(str)} —— *=密文;context_token 24h 窗口主动推靠它。 - - channel='wecom':{wecom_userid} —— 企业成员 id,非密钥、明文;无条件推。 + - channel='wecom':{wecom_userid, chat_task_id(str)} —— wecom_userid 企业成员 id, + 非密钥、明文,无条件推 + 回调反查身份;chat_task_id 企业微信入站对话常驻 task。 (chat_task_id/FK、per-字段 NOT NULL 退到应用层校验,与 usage_events JSONB 同向取舍。) """ diff --git a/core/wechat/service.py b/core/wechat/service.py index 13fe5bc..d75f4da 100644 --- a/core/wechat/service.py +++ b/core/wechat/service.py @@ -204,16 +204,54 @@ def get_wecom_userid(user_id: UUID) -> Optional[str]: return (row.config or {}).get("wecom_userid") +def get_user_by_wecom_userid(wecom_userid: str) -> Optional[UUID]: + """企业微信回调只带 wecom_userid → 反查内部 user_id(仅 active 绑定)。入站对话用。""" + if not wecom_userid: + return None + with session_scope() as s: + row = s.execute( + select(ChannelBinding.user_id).where( + ChannelBinding.channel == _WECOM, + ChannelBinding.status == "active", + ChannelBinding.config["wecom_userid"].astext == wecom_userid, + ) + ).first() + return row[0] if row else None + + def upsert_wecom_binding(user_id: UUID, wecom_userid: str) -> None: - """OAuth 拿到 userid 后写/更新绑定。""" + """OAuth 拿到 userid 后写/更新绑定。合并进 config(保留 chat_task_id 等已有字段)。""" now = datetime.now(timezone.utc) with session_scope() as s: row = _get_or_new(s, user_id, _WECOM) - row.config = {"wecom_userid": wecom_userid} + cfg = dict(row.config or {}) + cfg["wecom_userid"] = wecom_userid + row.config = cfg row.status = "active" row.updated_at = now +def get_wecom_chat_task(user_id: UUID) -> Optional[UUID]: + """企业微信入站对话常驻 task id(无 → None)。""" + with session_scope() as s: + row = s.get(ChannelBinding, (user_id, _WECOM)) + if row is None: + return None + cti = (row.config or {}).get("chat_task_id") + return UUID(cti) if cti else None + + +def set_wecom_chat_task(user_id: UUID, task_id: UUID) -> None: + now = datetime.now(timezone.utc) + with session_scope() as s: + row = s.get(ChannelBinding, (user_id, _WECOM)) + if row is not None: + cfg = dict(row.config or {}) + cfg["chat_task_id"] = str(task_id) + row.config = cfg + row.updated_at = now + + def unbind_wecom(user_id: UUID) -> bool: now = datetime.now(timezone.utc) with session_scope() as s: diff --git a/core/wechat/wecom.py b/core/wechat/wecom.py index b666f36..fee1bf4 100644 --- a/core/wechat/wecom.py +++ b/core/wechat/wecom.py @@ -1,6 +1,10 @@ -"""企业微信自建应用客户端(DESIGN §8.7 渠道 B,纯推送)。 +"""企业微信自建应用客户端(DESIGN §8.7 渠道 B,出站推送 + 入站对话)。 -只做**出站推送**(不做入站对话): +本模块只管**出站**(access_token / OAuth 绑定 / 发送);**入站对话**走回调:加解密在 +`wecom_crypto.py`(WXBizMsgCrypt 等价),回调端点 + 反查身份在 web/app.py `/v1/wecom/callback`, +对话核心复用 `_run_channel_conversation`(与个人微信同核心,各一张会话 task)。 + +出站能力: - `access_token`:`gettoken(corpid,secret)`,进程内缓存 ~2h、线程安全、errcode 失效即重取。 - OAuth 扫码登录:`oauth_authorize_url()` 造扫码授权登录链接(桌面浏览器出二维码); `get_user_id(code)` 拿成员 userid(绑定用,一次性)。需管理员在应用配「企业微信授权登录」可信域名。 diff --git a/core/wechat/wecom_crypto.py b/core/wechat/wecom_crypto.py new file mode 100644 index 0000000..7ccb457 --- /dev/null +++ b/core/wechat/wecom_crypto.py @@ -0,0 +1,93 @@ +"""企业微信「接收消息」回调加解密(WXBizMsgCrypt 等价实现,DESIGN §8.7 渠道 B 入站)。 + +企业微信自建应用配「接收消息」回调后,服务器**主动 POST 加密 XML** 到回调 URL, +配 URL 时还会先 GET 一次 echostr 验有效性。这套加密**与 wecom.py 的 access_token / +出站 API 无关,也与 crypto.py 的 Fernet 列加密无关** —— 是企业微信专用方案: + +- key = base64decode(EncodingAESKey + "="),32B;IV = key[:16](AES-256-CBC) +- 明文密文体 = random(16) || msg_len(4B 大端) || msg || receiveid(自建应用为 corpid) +- 签名 = sha1(sorted([Token, timestamp, nonce, encrypt]) 拼接) 的 hexdigest + +只做**解密 + 验签**(入站);回复走 wecom.send_text 主动推(agent 跑 >5s 无法被动同步回), +故不实现加密。凭据 Token / EncodingAESKey 同 secret —— 只在 host 进程读,绝不进沙箱。 +""" +from __future__ import annotations + +import base64 +import hashlib +import os +import struct +import xml.etree.ElementTree as ET + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + + +def callback_token() -> str: + return os.getenv("WECOM_CALLBACK_TOKEN", "").strip() + + +def callback_aeskey() -> str: + return os.getenv("WECOM_CALLBACK_AESKEY", "").strip() + + +def callback_configured() -> bool: + """Token + EncodingAESKey 都在才算配好回调(沿用「有 key 才挂」§3.4)。""" + return bool(callback_token() and callback_aeskey()) + + +def _aes_key() -> bytes: + """EncodingAESKey(43 字符)→ +'=' → base64 解码 → 32B AES 密钥。""" + return base64.b64decode(callback_aeskey() + "=") + + +def _signature(timestamp: str, nonce: str, encrypt: str) -> str: + arr = sorted([callback_token(), timestamp, nonce, encrypt]) + return hashlib.sha1("".join(arr).encode("utf-8")).hexdigest() + + +def _aes_decrypt(encrypt_b64: str) -> bytes: + key = _aes_key() + cipher = Cipher(algorithms.AES(key), modes.CBC(key[:16])) + dec = cipher.decryptor() + raw = dec.update(base64.b64decode(encrypt_b64)) + dec.finalize() + pad = raw[-1] # PKCS7(企业微信 block=32,按末字节剥即可) + if not 1 <= pad <= 32: + raise ValueError("PKCS7 padding 非法") + return raw[:-pad] + + +def _extract_plain(encrypt_b64: str, *, expect_receiveid: str = "") -> str: + """解密 → 剥 16B 随机前缀 + 4B 长度,取 msg;尾部 receiveid 校验 corpid。""" + raw = _aes_decrypt(encrypt_b64) + body = raw[16:] + msg_len = struct.unpack(">I", body[:4])[0] + msg = body[4:4 + msg_len] + receiveid = body[4 + msg_len:].decode("utf-8", "ignore") + if expect_receiveid and receiveid != expect_receiveid: + raise ValueError("receiveid 不匹配(corpid 校验失败)") + return msg.decode("utf-8") + + +def verify_url( + msg_signature: str, timestamp: str, nonce: str, echostr: str, *, corpid: str = "" +) -> str: + """配回调 URL 时企业微信 GET 验有效性:验签 + 解密 echostr,原样回明文。""" + if _signature(timestamp, nonce, echostr) != msg_signature: + raise ValueError("签名校验失败") + return _extract_plain(echostr, expect_receiveid=corpid) + + +def parse_message(plain_xml: str) -> dict: + """解密后的明文 XML → dict(FromUserName / MsgType / Content / MsgId / ...)。""" + root = ET.fromstring(plain_xml) + return {child.tag: (child.text or "") for child in root} + + +def decrypt_message( + msg_signature: str, timestamp: str, nonce: str, body: str, *, corpid: str = "" +) -> dict: + """收消息 POST:从信封 XML 取 Encrypt → 验签 → 解密 → parse_message。""" + encrypt = ET.fromstring(body).findtext("Encrypt") or "" + if _signature(timestamp, nonce, encrypt) != msg_signature: + raise ValueError("签名校验失败") + return parse_message(_extract_plain(encrypt, expect_receiveid=corpid)) diff --git a/web/app.py b/web/app.py index 4b5a8df..1f80d5d 100644 --- a/web/app.py +++ b/web/app.py @@ -429,6 +429,110 @@ def _resolve_model_profile(profile: str) -> tuple[str, str]: return name, caps.model_id +async def _run_channel_conversation(app, uid, text, attachments, *, channel): + """渠道无关的入站对话核心(§8.7):解析/建该用户该渠道常驻 task → 落盘附件 → 抢 run 锁 + → _run_agent_bg → 取 assistant 回复文本。两渠道各一张会话 task,互不串扰。 + + channel:'wechat'(个人微信 ClawBot,绑定快照取 chat_task_id)| 'wecom'(企业微信, + wecom 绑定行取 chat_task_id)。attachments:已下载解密的入站附件(可空,wecom 暂只收文本)。 + 返回回复文本(供 ClawBot 回流 / wecom 主动推回)。 + """ + from core.agent_builder import resolve_workspace, working_dir_from_name, load_config + from core.storage.utils import ensure_local_task_row + from core.wechat import service as _wx + from core.wechat.ilink import attachment_basename + from core.wechat.inbound import extract_last_assistant_text + + if channel == "wecom": + existing_tid = await asyncio.to_thread(_wx.get_wecom_chat_task, uid) + task_name, slug, desc = "企业微信对话", f"wecom-{str(uid)[:8]}", "(企业微信对话)" + set_task = _wx.set_wecom_chat_task + else: + snap = await asyncio.to_thread(_wx.get_binding, uid) + if snap is None: + return "" + existing_tid = snap.chat_task_id + task_name, slug, desc = "微信对话", f"wechat-{str(uid)[:8]}", "(微信 ClawBot 对话)" + set_task = _wx.set_chat_task + + profile, model_id = _resolve_model_profile("") + ws = resolve_workspace(None, load_config()) + tid = existing_tid + need_create = tid is None + if not need_create: + with session_scope() as s: + exists = s.execute( + select(Task.task_id).where(Task.task_id == tid, Task.deleted_at.is_(None)) + ).first() + if exists is None: + need_create = True + if need_create: + tid = uuid4() + fs_dir = working_dir_from_name(ws, uid, slug) + fs_dir.mkdir(parents=True, exist_ok=True) + ensure_local_task_row( + task_id=tid, name=task_name, working_dir=to_db_path(fs_dir), + skill="", user_id=uid, model=model_id, model_profile=profile, + description=desc, channel=channel, + ) + await asyncio.to_thread(set_task, uid, tid) + + # 落盘入站附件到 /inbound/,拼 [用户上传的...] 行进 text(复用 web 端粘贴图约定) + if attachments: + from datetime import datetime + from pathlib import Path + + with session_scope() as s: + wd_db = s.execute( + select(Task.working_dir).where(Task.task_id == tid) + ).scalar_one() + inbound_dir = from_db_path(wd_db) / "inbound" + inbound_dir.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y%m%d-%H%M%S") + lines: list[str] = [] + for i, att in enumerate(attachments): + if not att.data: + continue + base = attachment_basename(att) + name = f"{ts}-{i}-{base}" + (inbound_dir / name).write_bytes(att.data) + rel = f"inbound/{name}" + tag = "[用户上传的参考图]" if att.kind == "image" else "[用户上传的文件]" + lines.append(f"{tag} {rel}") + if lines: + extra = "\n".join(lines) + text = f"{text}\n\n{extra}" if text.strip() else extra + + # 抢 run 锁:正忙 → 提示稍候(同用户串行;ClawBot loop 本就串行,wecom 回调靠此挡并发) + with session_scope() as s: + row = s.execute( + select(Task.run_status).where(Task.task_id == tid).with_for_update() + ).first() + if row is None: + return "[出错] 对话 task 不存在" + if row.run_status in ("running", "cancelling"): + return "上一条还在处理中,请稍候再发。" + s.execute(update(Task).where(Task.task_id == tid).values( + run_status="running", run_error=None)) + + broker.start(tid) + runner = asyncio.create_task(asyncio.to_thread( + _run_agent_bg, tid, uid, text, "", "", False, + )) + app.state.inflight[runner] = tid + runner.add_done_callback(lambda t: app.state.inflight.pop(t, None)) + await runner + + with session_scope() as s: + st = s.execute( + select(Task.run_status, Task.run_error).where(Task.task_id == tid) + ).first() + if st is not None and st.run_status == "error": + return f"[出错] {st.run_error}" + reply = await asyncio.to_thread(extract_last_assistant_text, tid) + return reply or "(本轮无文本回复)" + + def _list_image_variants() -> list[tuple[str, dict]]: """扫 config/media/doubao.yaml image 段 → [(variant_key, variant_cfg), ...]。 @@ -860,97 +964,13 @@ def create_app() -> FastAPI: wechat_task = None async def _run_wechat_message(uid: UUID, text: str, attachments=None) -> str: - """微信入站一条消息:解析/建用户常驻「微信」task → 落盘附件 → 抢 run 锁 → _run_agent_bg → 取回复。 + """微信(ClawBot)入站一条消息 → 跑用户常驻「微信」task → 取回复。 attachments:已下载解密的入站附件(core.wechat.ilink.InboundAttachment,att.data 已回填)。 - 图片落盘后拼 `[用户上传的参考图] ` 行(agent 见之自调 look_at_image),文件拼 - `[用户上传的文件] ` 行(agent 用 Read/Shell 处理)。 + 建/复用 task、落盘附件、抢 run 锁、跑 agent 全在渠道无关核心 + `_run_channel_conversation` 里(企业微信回调走同一核心,channel='wecom')。 """ - from core.agent_builder import resolve_workspace, working_dir_from_name - from core.storage.utils import ensure_local_task_row - from core.wechat import service as _wx - from core.wechat.ilink import attachment_basename - from core.wechat.inbound import extract_last_assistant_text - - snap = await asyncio.to_thread(_wx.get_binding, uid) - if snap is None: - return "" - profile, model_id = _resolve_model_profile("") - ws = resolve_workspace(None, _cfg) - tid = snap.chat_task_id - need_create = tid is None - if not need_create: - with session_scope() as s: - exists = s.execute( - select(Task.task_id).where(Task.task_id == tid, Task.deleted_at.is_(None)) - ).first() - if exists is None: - need_create = True - if need_create: - tid = uuid4() - fs_dir = working_dir_from_name(ws, uid, f"wechat-{str(uid)[:8]}") - fs_dir.mkdir(parents=True, exist_ok=True) - ensure_local_task_row( - task_id=tid, name="微信对话", working_dir=to_db_path(fs_dir), - skill="", user_id=uid, model=model_id, model_profile=profile, - description="(微信 ClawBot 对话)", channel="wechat", - ) - await asyncio.to_thread(_wx.set_chat_task, uid, tid) - - # 落盘入站附件到 /inbound/,拼 [用户上传的...] 行进 text(复用 web 端粘贴图约定) - if attachments: - from datetime import datetime - from pathlib import Path - - with session_scope() as s: - wd_db = s.execute( - select(Task.working_dir).where(Task.task_id == tid) - ).scalar_one() - inbound_dir = from_db_path(wd_db) / "inbound" - inbound_dir.mkdir(parents=True, exist_ok=True) - ts = datetime.now().strftime("%Y%m%d-%H%M%S") - lines: list[str] = [] - for i, att in enumerate(attachments): - if not att.data: - continue - base = attachment_basename(att) - name = f"{ts}-{i}-{base}" - (inbound_dir / name).write_bytes(att.data) - rel = f"inbound/{name}" - tag = "[用户上传的参考图]" if att.kind == "image" else "[用户上传的文件]" - lines.append(f"{tag} {rel}") - if lines: - extra = "\n".join(lines) - text = f"{text}\n\n{extra}" if text.strip() else extra - - # 抢 run 锁:正忙 → 提示稍候(同用户串行,inbound loop 本就串行) - with session_scope() as s: - row = s.execute( - select(Task.run_status).where(Task.task_id == tid).with_for_update() - ).first() - if row is None: - return "[出错] 对话 task 不存在" - if row.run_status in ("running", "cancelling"): - return "上一条还在处理中,请稍候再发。" - s.execute(update(Task).where(Task.task_id == tid).values( - run_status="running", run_error=None)) - - broker.start(tid) - runner = asyncio.create_task(asyncio.to_thread( - _run_agent_bg, tid, uid, text, "", "", False, - )) - app.state.inflight[runner] = tid - runner.add_done_callback(lambda t: app.state.inflight.pop(t, None)) - await runner - - with session_scope() as s: - st = s.execute( - select(Task.run_status, Task.run_error).where(Task.task_id == tid) - ).first() - if st is not None and st.run_status == "error": - return f"[出错] {st.run_error}" - reply = await asyncio.to_thread(extract_last_assistant_text, tid) - return reply or "(本轮无文本回复)" + return await _run_channel_conversation(app, uid, text, attachments, channel="wechat") if clawbot_enabled(): from core.wechat.inbound import run_inbound_manager @@ -1289,6 +1309,72 @@ def create_app() -> FastAPI: ) return {"ok": res.ok, "reason": res.reason} + # ── 企业微信「接收消息」回调(入站对话,§8.7)── 无 JWT;身份从加密 XML 的 FromUserName 反查。 + # 配置:企业微信后台「应用 → 接收消息 → 设置 API 接收」填本 URL + Token + EncodingAESKey, + # 对应 env WECOM_CALLBACK_TOKEN / WECOM_CALLBACK_AESKEY。回调 URL = <公网 base>/v1/wecom/callback。 + @app.get("/v1/wecom/callback", include_in_schema=False) + def wecom_callback_verify( + msg_signature: str = "", timestamp: str = "", nonce: str = "", echostr: str = "" + ): + """企业微信保存回调配置时 GET 验有效性:验签 + 解密 echostr,原样回明文。""" + from fastapi.responses import PlainTextResponse + from core.wechat import wecom, wecom_crypto + if not wecom_crypto.callback_configured(): + raise HTTPException(404, "wecom callback 未配置(需 WECOM_CALLBACK_TOKEN/AESKEY)") + try: + plain = wecom_crypto.verify_url( + msg_signature, timestamp, nonce, echostr, corpid=wecom._corpid() + ) + except Exception as e: # noqa: BLE001 + raise HTTPException(400, f"verify failed: {type(e).__name__}: {e}") + return PlainTextResponse(plain) + + @app.post("/v1/wecom/callback", include_in_schema=False) + async def wecom_callback( + request: Request, msg_signature: str = "", timestamp: str = "", nonce: str = "" + ): + """企业微信推入站消息(加密 XML POST)。解密 → 反查身份 → 后台跑 agent → 主动推回。 + + agent 跑 >5s,远超被动回复(同步返回密文 XML)5s 窗口 → 异步:立刻回 'success' 防重试, + agent 结果走 wecom.send_text 主动推回(message/send,无 24h 窗口约束)。同一用户的并发/ + 重复投递由对话 task 的 run 锁挡(第二条会收到「上一条还在处理中」)。 + """ + from fastapi.responses import PlainTextResponse + from core.wechat import service as _wx + from core.wechat import wecom, wecom_crypto + if not wecom_crypto.callback_configured(): + raise HTTPException(404, "wecom callback 未配置(需 WECOM_CALLBACK_TOKEN/AESKEY)") + body = (await request.body()).decode("utf-8") + try: + msg = wecom_crypto.decrypt_message( + msg_signature, timestamp, nonce, body, corpid=wecom._corpid() + ) + except Exception as e: # noqa: BLE001 + raise HTTPException(400, f"decrypt failed: {type(e).__name__}: {e}") + # 暂只处理文本消息;图片/语音/文件等回 success 防重试(后续可走 media/get 下载补齐) + if (msg.get("MsgType") or "") != "text": + return PlainTextResponse("success") + content = (msg.get("Content") or "").strip() + wuid = msg.get("FromUserName") or "" + uid = await asyncio.to_thread(_wx.get_user_by_wecom_userid, wuid) + if uid is None or not content: + return PlainTextResponse("success") # 未绑定 / 空消息 → 静默 + + async def _bg(uid=uid, content=content): + try: + reply = await _run_channel_conversation(app, uid, content, None, channel="wecom") + except Exception as e: # noqa: BLE001 + reply = f"[出错] {type(e).__name__}: {e}" + if reply and reply.strip(): + await asyncio.to_thread(_wx.push_wecom, uid, reply) + + # 登记到 inflight:持强引用防 task 被 GC 中途回收 + 关停时 drain(value=None → 不参与 + # broker cancel;内层 _run_agent_bg runner 另有自己的 inflight 项负责取消)。 + bg = asyncio.create_task(_bg(), name=f"wecom-msg-{str(uid)[:8]}") + app.state.inflight[bg] = None + bg.add_done_callback(lambda t: app.state.inflight.pop(t, None)) + return PlainTextResponse("success") + @app.get("/v1/models", tags=["misc"]) def list_models(user_id: UUID = Depends(require_user)): """列出所有可用 LLM 模型(扫 config/models/*.yaml)。