264 lines
9.5 KiB
Python
264 lines
9.5 KiB
Python
"""微信渠道服务层(DESIGN §8.7):绑定 CRUD + 主动推送 + `send_to_user` 渠道抽象。
|
|
|
|
- 绑定行的 `bot_token` / `latest_context_token` 经 `crypto` 加解密;快照(BindingSnapshot)
|
|
脱离 session、含明文 token,**仅 host 进程内用,绝不外泄/进沙箱**。
|
|
- 主动推送 24h 窗口:`context_token` 仅在末次入站 ~24h 内可用;超期/未开口 → 推不出,
|
|
返回 reason 给调用方退邮件兜底(§8.5)。
|
|
- `send_to_user` 是渠道抽象:scheduler / WechatPushTool 调它,不感知 ClawBot/企业微信;
|
|
企业微信(渠道 B)后续在此追加一路。
|
|
|
|
阻塞 IO(DB + httpx),调用方放 to_thread / executor。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import select
|
|
|
|
from core.storage import session_scope
|
|
from core.storage.models import ChannelBinding
|
|
from core.wechat import crypto
|
|
from core.wechat.ilink import DEFAULT_BASE, ILinkClient
|
|
|
|
CONTEXT_TOKEN_TTL = timedelta(hours=24)
|
|
_CLAWBOT = "clawbot"
|
|
_WECOM = "wecom"
|
|
|
|
|
|
def _get_or_new(s, user_id: UUID, channel: str) -> ChannelBinding:
|
|
row = s.get(ChannelBinding, (user_id, channel))
|
|
if row is None:
|
|
row = ChannelBinding(user_id=user_id, channel=channel, config={})
|
|
s.add(row)
|
|
return row
|
|
|
|
|
|
def clawbot_enabled() -> bool:
|
|
"""ClawBot 渠道总开关(沿用「有开关才挂」范式,§3.4)。"""
|
|
return os.getenv("ZCBOT_WECHAT_BOT_ENABLED", "").strip().lower() in (
|
|
"1", "true", "yes", "on",
|
|
)
|
|
|
|
|
|
# ─────────────────────────── 绑定快照 / CRUD ───────────────────────────
|
|
|
|
@dataclass
|
|
class BindingSnapshot:
|
|
user_id: UUID
|
|
bot_token: str # 明文(已解密)
|
|
base_url: str
|
|
user_im_id: Optional[str]
|
|
context_token: Optional[str] # 明文(已解密)
|
|
context_token_at: Optional[datetime]
|
|
chat_task_id: Optional[UUID]
|
|
status: str
|
|
|
|
|
|
def _snap(row: ChannelBinding) -> BindingSnapshot:
|
|
"""channel='clawbot' 行 → 快照(解密 token,反序列化 config)。"""
|
|
cfg = row.config or {}
|
|
cta = cfg.get("context_token_at")
|
|
cti = cfg.get("chat_task_id")
|
|
return BindingSnapshot(
|
|
user_id=row.user_id,
|
|
bot_token=crypto.dec(cfg.get("bot_token")) or "",
|
|
base_url=cfg.get("base_url") or DEFAULT_BASE,
|
|
user_im_id=cfg.get("user_im_id"),
|
|
context_token=crypto.dec(cfg.get("latest_context_token")),
|
|
context_token_at=datetime.fromisoformat(cta) if cta else None,
|
|
chat_task_id=UUID(cti) if cti else None,
|
|
status=row.status,
|
|
)
|
|
|
|
|
|
def get_binding(user_id: UUID) -> Optional[BindingSnapshot]:
|
|
with session_scope() as s:
|
|
row = s.get(ChannelBinding, (user_id, _CLAWBOT))
|
|
return _snap(row) if row else None
|
|
|
|
|
|
def list_active_bindings() -> list[BindingSnapshot]:
|
|
"""入站长轮询管理器用:所有 active 的 ClawBot 绑定(含明文 bot_token)。"""
|
|
with session_scope() as s:
|
|
rows = (
|
|
s.execute(
|
|
select(ChannelBinding).where(
|
|
ChannelBinding.channel == _CLAWBOT,
|
|
ChannelBinding.status == "active",
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
return [_snap(r) for r in rows]
|
|
|
|
|
|
def upsert_clawbot_binding(
|
|
user_id: UUID, bot_token: str, base_url: str, *, bot_im_id: Optional[str] = None
|
|
) -> None:
|
|
"""扫码 confirmed 后写/更新绑定。bot_token 加密存进 config(保留已有 user_im_id 等)。"""
|
|
now = datetime.now(timezone.utc)
|
|
with session_scope() as s:
|
|
row = _get_or_new(s, user_id, _CLAWBOT)
|
|
cfg = dict(row.config or {})
|
|
cfg["bot_token"] = crypto.enc(bot_token)
|
|
cfg["base_url"] = base_url or DEFAULT_BASE
|
|
if bot_im_id:
|
|
cfg["bot_im_id"] = bot_im_id
|
|
row.config = cfg # 重新赋值 → ORM 追踪 JSONB 变更
|
|
row.status = "active"
|
|
row.updated_at = now
|
|
|
|
|
|
def refresh_context_token(user_id: UUID, user_im_id: str, context_token: str) -> None:
|
|
"""每条入站消息刷新该用户的 context_token(+时间戳)——主动推送窗口靠它续命。"""
|
|
now = datetime.now(timezone.utc)
|
|
with session_scope() as s:
|
|
row = s.get(ChannelBinding, (user_id, _CLAWBOT))
|
|
if row is None:
|
|
return
|
|
cfg = dict(row.config or {})
|
|
if user_im_id:
|
|
cfg["user_im_id"] = user_im_id
|
|
cfg["latest_context_token"] = crypto.enc(context_token)
|
|
cfg["context_token_at"] = now.isoformat()
|
|
row.config = cfg
|
|
row.updated_at = now
|
|
|
|
|
|
def set_chat_task(user_id: UUID, task_id: UUID) -> None:
|
|
now = datetime.now(timezone.utc)
|
|
with session_scope() as s:
|
|
row = s.get(ChannelBinding, (user_id, _CLAWBOT))
|
|
if row is not None:
|
|
cfg = dict(row.config or {})
|
|
cfg["chat_task_id"] = str(task_id)
|
|
row.config = cfg
|
|
row.updated_at = now
|
|
|
|
|
|
def unbind(user_id: UUID) -> bool:
|
|
"""解绑 ClawBot(标 revoked,不物理删 → 保留轨迹)。返回是否有绑定被改。"""
|
|
now = datetime.now(timezone.utc)
|
|
with session_scope() as s:
|
|
row = s.get(ChannelBinding, (user_id, _CLAWBOT))
|
|
if row is None:
|
|
return False
|
|
row.status = "revoked"
|
|
row.updated_at = now
|
|
return True
|
|
|
|
|
|
# ─────────────────────────── 推送 ───────────────────────────
|
|
|
|
@dataclass
|
|
class PushResult:
|
|
ok: bool
|
|
channel: str = "clawbot"
|
|
# sent | no_binding | never_opened | token_stale | error:<...>
|
|
reason: str = ""
|
|
|
|
|
|
def _token_fresh(snap: BindingSnapshot) -> bool:
|
|
if not snap.context_token or snap.context_token_at is None:
|
|
return False
|
|
at = snap.context_token_at
|
|
if at.tzinfo is None:
|
|
at = at.replace(tzinfo=timezone.utc)
|
|
return (datetime.now(timezone.utc) - at) < CONTEXT_TOKEN_TTL
|
|
|
|
|
|
def push_clawbot(
|
|
user_id: UUID, text: str = "", file_path: Optional[str] = None
|
|
) -> PushResult:
|
|
"""主动推一条到用户个人微信。仅在 24h 窗口内可用,否则返回 reason 供兜底。"""
|
|
snap = get_binding(user_id)
|
|
if snap is None or snap.status != "active":
|
|
return PushResult(False, reason="no_binding")
|
|
if not snap.user_im_id or not snap.context_token:
|
|
return PushResult(False, reason="never_opened") # 冷启动:用户从未开口
|
|
if not _token_fresh(snap):
|
|
return PushResult(False, reason="token_stale") # 超 24h 未互动
|
|
client = ILinkClient(snap.bot_token, snap.base_url)
|
|
try:
|
|
if text:
|
|
client.send_text(snap.user_im_id, snap.context_token, text)
|
|
if file_path:
|
|
client.send_file(snap.user_im_id, snap.context_token, file_path)
|
|
except Exception as e: # noqa: BLE001 —— 调用方据 reason 决定兜底
|
|
return PushResult(False, reason=f"error: {str(e)[:200]}")
|
|
return PushResult(True, reason="sent")
|
|
|
|
|
|
# ─────────────── 企业微信(渠道 B,纯推送;无 24h 窗口约束)───────────────
|
|
|
|
def get_wecom_userid(user_id: UUID) -> Optional[str]:
|
|
with session_scope() as s:
|
|
row = s.get(ChannelBinding, (user_id, _WECOM))
|
|
if row is None or row.status != "active":
|
|
return None
|
|
return (row.config or {}).get("wecom_userid")
|
|
|
|
|
|
def upsert_wecom_binding(user_id: UUID, wecom_userid: str) -> None:
|
|
"""OAuth 拿到 userid 后写/更新绑定。"""
|
|
now = datetime.now(timezone.utc)
|
|
with session_scope() as s:
|
|
row = _get_or_new(s, user_id, _WECOM)
|
|
row.config = {"wecom_userid": wecom_userid}
|
|
row.status = "active"
|
|
row.updated_at = now
|
|
|
|
|
|
def unbind_wecom(user_id: UUID) -> bool:
|
|
now = datetime.now(timezone.utc)
|
|
with session_scope() as s:
|
|
row = s.get(ChannelBinding, (user_id, _WECOM))
|
|
if row is None:
|
|
return False
|
|
row.status = "revoked"
|
|
row.updated_at = now
|
|
return True
|
|
|
|
|
|
def push_wecom(user_id: UUID, text: str = "", file_path: Optional[str] = None) -> PushResult:
|
|
"""企业微信主动推一条(无条件,不挑活跃度)。"""
|
|
from core.wechat import wecom
|
|
wuid = get_wecom_userid(user_id)
|
|
if not wuid:
|
|
return PushResult(False, channel="wecom", reason="no_binding")
|
|
try:
|
|
if text:
|
|
wecom.send_text(wuid, text)
|
|
if file_path:
|
|
wecom.send_file(wuid, file_path)
|
|
except Exception as e: # noqa: BLE001 —— 透出 errcode/errmsg 便于排错
|
|
return PushResult(False, channel="wecom", reason=f"error: {str(e)[:200]}")
|
|
return PushResult(True, channel="wecom", reason="sent")
|
|
|
|
|
|
@dataclass
|
|
class DeliveryReport:
|
|
results: list[PushResult] = field(default_factory=list)
|
|
|
|
@property
|
|
def delivered(self) -> bool:
|
|
return any(r.ok for r in self.results)
|
|
|
|
|
|
def send_to_user(
|
|
user_id: UUID, text: str = "", file_path: Optional[str] = None
|
|
) -> DeliveryReport:
|
|
"""渠道抽象:按用户已绑渠道投递。当前仅 ClawBot;企业微信(渠道 B)后续追加。"""
|
|
report = DeliveryReport()
|
|
if clawbot_enabled():
|
|
report.results.append(push_clawbot(user_id, text, file_path))
|
|
from core.wechat.wecom import wecom_configured
|
|
if wecom_configured():
|
|
report.results.append(push_wecom(user_id, text, file_path))
|
|
return report
|