zcbot/core/wechat/service.py

277 lines
9.9 KiB
Python

"""微信渠道服务层(DESIGN §8.7):绑定 CRUD + 主动推送 + `send_to_user` 渠道抽象。
- 绑定行的 `bot_token` / `latest_context_token` 经 `crypto` 加解密;快照(BindingSnapshot)
脱离 session、含明文 token,**仅 host 进程内用,绝不外泄/进沙箱**。
- 主动推送 24h 窗口:`context_token` 仅在末次入站 ~24h 内可用;超期/未开口 → 推不出,
返回 reason 给调用方退邮件兜底(§8.5)。
- `send_to_user` 是渠道抽象:scheduler / WechatPushTool 调它,不感知 ClawBot/企业微信;
企业微信(渠道 B)后续在此追加一路。
阻塞 IO(DB + httpx),调用方放 to_thread / executor。
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Optional
from uuid import UUID
from sqlalchemy import select
from core.storage import session_scope
from core.storage.models import ChannelBinding
from core.wechat import crypto
from core.wechat.ilink import DEFAULT_BASE, ILinkClient
CONTEXT_TOKEN_TTL = timedelta(hours=24)
_CLAWBOT = "clawbot"
_WECOM = "wecom"
def _get_or_new(s, user_id: UUID, channel: str) -> ChannelBinding:
row = s.get(ChannelBinding, (user_id, channel))
if row is None:
row = ChannelBinding(user_id=user_id, channel=channel, config={})
s.add(row)
return row
def clawbot_enabled() -> bool:
"""ClawBot 渠道总开关(沿用「有开关才挂」范式,§3.4)。"""
return os.getenv("ZCBOT_WECHAT_BOT_ENABLED", "").strip().lower() in (
"1", "true", "yes", "on",
)
# ─────────────────────────── 绑定快照 / CRUD ───────────────────────────
@dataclass
class BindingSnapshot:
user_id: UUID
bot_token: str # 明文(已解密)
base_url: str
user_im_id: Optional[str]
context_token: Optional[str] # 明文(已解密)
context_token_at: Optional[datetime]
chat_task_id: Optional[UUID]
status: str
def _snap(row: ChannelBinding) -> BindingSnapshot:
"""channel='clawbot' 行 → 快照(解密 token,反序列化 config)。"""
cfg = row.config or {}
cta = cfg.get("context_token_at")
cti = cfg.get("chat_task_id")
return BindingSnapshot(
user_id=row.user_id,
bot_token=crypto.dec(cfg.get("bot_token")) or "",
base_url=cfg.get("base_url") or DEFAULT_BASE,
user_im_id=cfg.get("user_im_id"),
context_token=crypto.dec(cfg.get("latest_context_token")),
context_token_at=datetime.fromisoformat(cta) if cta else None,
chat_task_id=UUID(cti) if cti else None,
status=row.status,
)
def get_binding(user_id: UUID) -> Optional[BindingSnapshot]:
with session_scope() as s:
row = s.get(ChannelBinding, (user_id, _CLAWBOT))
return _snap(row) if row else None
def list_active_bindings() -> list[BindingSnapshot]:
"""入站长轮询管理器用:所有 active 的 ClawBot 绑定(含明文 bot_token)。"""
with session_scope() as s:
rows = (
s.execute(
select(ChannelBinding).where(
ChannelBinding.channel == _CLAWBOT,
ChannelBinding.status == "active",
)
)
.scalars()
.all()
)
return [_snap(r) for r in rows]
def upsert_clawbot_binding(
user_id: UUID, bot_token: str, base_url: str, *, bot_im_id: Optional[str] = None
) -> None:
"""扫码 confirmed 后写/更新绑定。bot_token 加密存进 config(保留已有 user_im_id 等)。"""
now = datetime.now(timezone.utc)
with session_scope() as s:
row = _get_or_new(s, user_id, _CLAWBOT)
cfg = dict(row.config or {})
cfg["bot_token"] = crypto.enc(bot_token)
cfg["base_url"] = base_url or DEFAULT_BASE
if bot_im_id:
cfg["bot_im_id"] = bot_im_id
row.config = cfg # 重新赋值 → ORM 追踪 JSONB 变更
row.status = "active"
row.updated_at = now
def refresh_context_token(user_id: UUID, user_im_id: str, context_token: str) -> None:
"""每条入站消息刷新该用户的 context_token(+时间戳)——主动推送窗口靠它续命。"""
now = datetime.now(timezone.utc)
with session_scope() as s:
row = s.get(ChannelBinding, (user_id, _CLAWBOT))
if row is None:
return
cfg = dict(row.config or {})
if user_im_id:
cfg["user_im_id"] = user_im_id
cfg["latest_context_token"] = crypto.enc(context_token)
cfg["context_token_at"] = now.isoformat()
row.config = cfg
row.updated_at = now
def set_chat_task(user_id: UUID, task_id: UUID) -> None:
now = datetime.now(timezone.utc)
with session_scope() as s:
row = s.get(ChannelBinding, (user_id, _CLAWBOT))
if row is not None:
cfg = dict(row.config or {})
cfg["chat_task_id"] = str(task_id)
row.config = cfg
row.updated_at = now
def unbind(user_id: UUID) -> bool:
"""解绑 ClawBot(标 revoked,不物理删 → 保留轨迹)。返回是否有绑定被改。"""
now = datetime.now(timezone.utc)
with session_scope() as s:
row = s.get(ChannelBinding, (user_id, _CLAWBOT))
if row is None:
return False
row.status = "revoked"
row.updated_at = now
return True
# ─────────────────────────── 推送 ───────────────────────────
@dataclass
class PushResult:
ok: bool
channel: str = "clawbot"
# sent | no_binding | never_opened | token_stale | error:<...>
reason: str = ""
def _token_fresh(snap: BindingSnapshot) -> bool:
if not snap.context_token or snap.context_token_at is None:
return False
at = snap.context_token_at
if at.tzinfo is None:
at = at.replace(tzinfo=timezone.utc)
return (datetime.now(timezone.utc) - at) < CONTEXT_TOKEN_TTL
def push_clawbot(
user_id: UUID, text: str = "", file_path: Optional[str] = None
) -> PushResult:
"""主动推一条到用户个人微信。仅在 24h 窗口内可用,否则返回 reason 供兜底。"""
snap = get_binding(user_id)
if snap is None or snap.status != "active":
return PushResult(False, reason="no_binding")
if not snap.user_im_id or not snap.context_token:
return PushResult(False, reason="never_opened") # 冷启动:用户从未开口
if not _token_fresh(snap):
return PushResult(False, reason="token_stale") # 超 24h 未互动
client = ILinkClient(snap.bot_token, snap.base_url)
try:
if text:
client.send_text(snap.user_im_id, snap.context_token, text)
if file_path:
client.send_file(snap.user_im_id, snap.context_token, file_path)
except Exception as e: # noqa: BLE001 —— 调用方据 reason 决定兜底
return PushResult(False, reason=f"error: {str(e)[:200]}")
return PushResult(True, reason="sent")
# ─────────────── 企业微信(渠道 B,纯推送;无 24h 窗口约束)───────────────
def get_wecom_userid(user_id: UUID) -> Optional[str]:
with session_scope() as s:
row = s.get(ChannelBinding, (user_id, _WECOM))
if row is None or row.status != "active":
return None
return (row.config or {}).get("wecom_userid")
def upsert_wecom_binding(user_id: UUID, wecom_userid: str) -> None:
"""OAuth 拿到 userid 后写/更新绑定。"""
now = datetime.now(timezone.utc)
with session_scope() as s:
row = _get_or_new(s, user_id, _WECOM)
row.config = {"wecom_userid": wecom_userid}
row.status = "active"
row.updated_at = now
def unbind_wecom(user_id: UUID) -> bool:
now = datetime.now(timezone.utc)
with session_scope() as s:
row = s.get(ChannelBinding, (user_id, _WECOM))
if row is None:
return False
row.status = "revoked"
row.updated_at = now
return True
def push_wecom(user_id: UUID, text: str = "", file_path: Optional[str] = None) -> PushResult:
"""企业微信主动推一条(无条件,不挑活跃度)。"""
from core.wechat import wecom
wuid = get_wecom_userid(user_id)
if not wuid:
return PushResult(False, channel="wecom", reason="no_binding")
try:
if text:
wecom.send_text(wuid, text)
if file_path:
wecom.send_file(wuid, file_path)
except Exception as e: # noqa: BLE001 —— 透出 errcode/errmsg 便于排错
return PushResult(False, channel="wecom", reason=f"error: {str(e)[:200]}")
return PushResult(True, channel="wecom", reason="sent")
@dataclass
class DeliveryReport:
results: list[PushResult] = field(default_factory=list)
@property
def delivered(self) -> bool:
return any(r.ok for r in self.results)
def active_channels() -> list[str]:
"""部署级「哪些渠道开了」的**唯一真相源**:门槛判断(`wechat_push_available`)
与投递(`send_to_user`)都引它,避免两处各列各的(曾漏判企业微信致工具不挂)。
加渠道只改这一处,门槛与投递自动一致。顺序即投递优先序。"""
from core.wechat.wecom import wecom_configured
chans: list[str] = []
if clawbot_enabled():
chans.append(_CLAWBOT)
if wecom_configured():
chans.append(_WECOM)
return chans
_DISPATCH = {_CLAWBOT: push_clawbot, _WECOM: push_wecom}
def send_to_user(
user_id: UUID, text: str = "", file_path: Optional[str] = None
) -> DeliveryReport:
"""渠道抽象:按 `active_channels()` 列出的已开渠道依次投递。"""
report = DeliveryReport()
for ch in active_channels():
report.results.append(_DISPATCH[ch](user_id, text, file_path))
return report