zcbot/core/wechat/service.py

195 lines
6.8 KiB
Python

"""微信渠道服务层(DESIGN §8.7):绑定 CRUD + 主动推送 + `send_to_user` 渠道抽象。
- 绑定行的 `bot_token` / `latest_context_token` 经 `crypto` 加解密;快照(BindingSnapshot)
脱离 session、含明文 token,**仅 host 进程内用,绝不外泄/进沙箱**。
- 主动推送 24h 窗口:`context_token` 仅在末次入站 ~24h 内可用;超期/未开口 → 推不出,
返回 reason 给调用方退邮件兜底(§8.5)。
- `send_to_user` 是渠道抽象:scheduler / WechatPushTool 调它,不感知 ClawBot/企业微信;
企业微信(渠道 B)后续在此追加一路。
阻塞 IO(DB + httpx),调用方放 to_thread / executor。
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Optional
from uuid import UUID
from sqlalchemy import select
from core.storage import session_scope
from core.storage.models import WeChatBotBinding
from core.wechat import crypto
from core.wechat.ilink import DEFAULT_BASE, ILinkClient
CONTEXT_TOKEN_TTL = timedelta(hours=24)
def clawbot_enabled() -> bool:
"""ClawBot 渠道总开关(沿用「有开关才挂」范式,§3.4)。"""
return os.getenv("ZCBOT_WECHAT_BOT_ENABLED", "").strip().lower() in (
"1", "true", "yes", "on",
)
# ─────────────────────────── 绑定快照 / CRUD ───────────────────────────
@dataclass
class BindingSnapshot:
user_id: UUID
bot_token: str # 明文(已解密)
base_url: str
user_im_id: Optional[str]
context_token: Optional[str] # 明文(已解密)
context_token_at: Optional[datetime]
chat_task_id: Optional[UUID]
status: str
def _snap(row: WeChatBotBinding) -> BindingSnapshot:
return BindingSnapshot(
user_id=row.user_id,
bot_token=crypto.dec(row.bot_token) or "",
base_url=row.base_url or DEFAULT_BASE,
user_im_id=row.user_im_id,
context_token=crypto.dec(row.latest_context_token),
context_token_at=row.context_token_at,
chat_task_id=row.chat_task_id,
status=row.status,
)
def get_binding(user_id: UUID) -> Optional[BindingSnapshot]:
with session_scope() as s:
row = s.get(WeChatBotBinding, user_id)
return _snap(row) if row else None
def list_active_bindings() -> list[BindingSnapshot]:
"""入站长轮询管理器用:所有 active 绑定(含明文 bot_token)。"""
with session_scope() as s:
rows = (
s.execute(
select(WeChatBotBinding).where(WeChatBotBinding.status == "active")
)
.scalars()
.all()
)
return [_snap(r) for r in rows]
def upsert_clawbot_binding(
user_id: UUID, bot_token: str, base_url: str, *, bot_im_id: Optional[str] = None
) -> None:
"""扫码 confirmed 后写/更新绑定。bot_token 加密入库。"""
now = datetime.now(timezone.utc)
with session_scope() as s:
row = s.get(WeChatBotBinding, user_id)
if row is None:
row = WeChatBotBinding(user_id=user_id)
s.add(row)
row.bot_token = crypto.enc(bot_token)
row.base_url = base_url or DEFAULT_BASE
if bot_im_id:
row.bot_im_id = bot_im_id
row.status = "active"
row.updated_at = now
def refresh_context_token(user_id: UUID, user_im_id: str, context_token: str) -> None:
"""每条入站消息刷新该用户的 context_token(+时间戳)——主动推送窗口靠它续命。"""
now = datetime.now(timezone.utc)
with session_scope() as s:
row = s.get(WeChatBotBinding, user_id)
if row is None:
return
if user_im_id:
row.user_im_id = user_im_id
row.latest_context_token = crypto.enc(context_token)
row.context_token_at = now
row.updated_at = now
def set_chat_task(user_id: UUID, task_id: UUID) -> None:
now = datetime.now(timezone.utc)
with session_scope() as s:
row = s.get(WeChatBotBinding, user_id)
if row is not None:
row.chat_task_id = task_id
row.updated_at = now
def unbind(user_id: UUID) -> bool:
"""解绑(标 revoked,不物理删 → 保留轨迹)。返回是否有绑定被改。"""
now = datetime.now(timezone.utc)
with session_scope() as s:
row = s.get(WeChatBotBinding, user_id)
if row is None:
return False
row.status = "revoked"
row.updated_at = now
return True
# ─────────────────────────── 推送 ───────────────────────────
@dataclass
class PushResult:
ok: bool
channel: str = "clawbot"
# sent | no_binding | never_opened | token_stale | error:<...>
reason: str = ""
def _token_fresh(snap: BindingSnapshot) -> bool:
if not snap.context_token or snap.context_token_at is None:
return False
at = snap.context_token_at
if at.tzinfo is None:
at = at.replace(tzinfo=timezone.utc)
return (datetime.now(timezone.utc) - at) < CONTEXT_TOKEN_TTL
def push_clawbot(
user_id: UUID, text: str = "", file_path: Optional[str] = None
) -> PushResult:
"""主动推一条到用户个人微信。仅在 24h 窗口内可用,否则返回 reason 供兜底。"""
snap = get_binding(user_id)
if snap is None or snap.status != "active":
return PushResult(False, reason="no_binding")
if not snap.user_im_id or not snap.context_token:
return PushResult(False, reason="never_opened") # 冷启动:用户从未开口
if not _token_fresh(snap):
return PushResult(False, reason="token_stale") # 超 24h 未互动
client = ILinkClient(snap.bot_token, snap.base_url)
try:
if text:
client.send_text(snap.user_im_id, snap.context_token, text)
if file_path:
client.send_file(snap.user_im_id, snap.context_token, file_path)
except Exception as e: # noqa: BLE001 —— 调用方据 reason 决定兜底
return PushResult(False, reason=f"error:{type(e).__name__}")
return PushResult(True, reason="sent")
@dataclass
class DeliveryReport:
results: list[PushResult] = field(default_factory=list)
@property
def delivered(self) -> bool:
return any(r.ok for r in self.results)
def send_to_user(
user_id: UUID, text: str = "", file_path: Optional[str] = None
) -> DeliveryReport:
"""渠道抽象:按用户已绑渠道投递。当前仅 ClawBot;企业微信(渠道 B)后续追加。"""
report = DeliveryReport()
if clawbot_enabled():
report.results.append(push_clawbot(user_id, text, file_path))
# TODO 渠道 B:if wecom_configured(): report.results.append(push_wecom(user_id, text, file_path))
return report