"""channel_bindings 统一表(微信渠道抽象,DESIGN §8.7). Revision ID: 0015 Revises: 0014 Create Date: 2026-06-24 把 0012 wechat_bot_bindings(ClawBot)+ 0014 wecom_bindings(企业微信)合成一张 判别列 + JSONB 表 channel_bindings(user_id, channel, status, config),沿用本库 usage_events(kind+units)的多态范式 —— 加渠道不再各建表。 数据迁移:旧两表的行搬进 config JSONB(敏感 token 列本就是密文串,原样搬、不重新加密), 再 drop 旧表。DDL + DML 同一事务,失败整体回滚不丢数据。详 DESIGN §8.7。 """ import json from typing import Sequence, Union import sqlalchemy as sa from alembic import op from sqlalchemy.dialects.postgresql import JSONB, UUID as PG_UUID revision: str = "0015" down_revision: Union[str, None] = "0014" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: op.create_table( "channel_bindings", sa.Column( "user_id", PG_UUID(as_uuid=True), sa.ForeignKey("users.user_id", ondelete="CASCADE"), primary_key=True, ), sa.Column("channel", sa.Text(), primary_key=True), # clawbot | wecom | ... sa.Column("status", sa.Text(), nullable=False, server_default="active"), sa.Column("config", JSONB(), nullable=False, server_default="{}"), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), ) # 入站管理器/推送:按 (channel, status) 扫某渠道活跃绑定 op.create_index("ix_channel_bindings_channel", "channel_bindings", ["channel", "status"]) conn = op.get_bind() insert = sa.text( "INSERT INTO channel_bindings (user_id, channel, status, config, created_at, updated_at) " "VALUES (:uid, :ch, :st, CAST(:cfg AS JSONB), :ca, :ua)" ) # 0012 wechat_bot_bindings → channel='clawbot'(token 列已是密文串,原样搬) insp = sa.inspect(conn) if insp.has_table("wechat_bot_bindings"): rows = conn.execute(sa.text( "SELECT user_id, bot_token, bot_im_id, user_im_id, base_url, " "latest_context_token, context_token_at, chat_task_id, status, created_at, updated_at " "FROM wechat_bot_bindings" )).mappings().all() for r in rows: cfg = { "bot_token": r["bot_token"], "bot_im_id": r["bot_im_id"], "user_im_id": r["user_im_id"], "base_url": r["base_url"], "latest_context_token": r["latest_context_token"], "context_token_at": r["context_token_at"].isoformat() if r["context_token_at"] else None, "chat_task_id": str(r["chat_task_id"]) if r["chat_task_id"] else None, } conn.execute(insert, { "uid": r["user_id"], "ch": "clawbot", "st": r["status"], "cfg": json.dumps(cfg), "ca": r["created_at"], "ua": r["updated_at"], }) op.drop_table("wechat_bot_bindings") # 0014 wecom_bindings → channel='wecom' if insp.has_table("wecom_bindings"): rows = conn.execute(sa.text( "SELECT user_id, wecom_userid, status, created_at, updated_at FROM wecom_bindings" )).mappings().all() for r in rows: cfg = {"wecom_userid": r["wecom_userid"]} conn.execute(insert, { "uid": r["user_id"], "ch": "wecom", "st": r["status"], "cfg": json.dumps(cfg), "ca": r["created_at"], "ua": r["updated_at"], }) op.drop_table("wecom_bindings") def downgrade() -> None: # 回滚:重建旧两表 + 把 config 拆回列,再 drop channel_bindings。 op.create_table( "wechat_bot_bindings", sa.Column("user_id", PG_UUID(as_uuid=True), sa.ForeignKey("users.user_id", ondelete="CASCADE"), primary_key=True), sa.Column("bot_token", sa.Text(), nullable=False), sa.Column("bot_im_id", sa.Text(), nullable=True), sa.Column("user_im_id", sa.Text(), nullable=True), sa.Column("base_url", sa.Text(), nullable=False, server_default="https://ilinkai.weixin.qq.com"), sa.Column("latest_context_token", sa.Text(), nullable=True), sa.Column("context_token_at", sa.DateTime(timezone=True), nullable=True), sa.Column("chat_task_id", PG_UUID(as_uuid=True), sa.ForeignKey("tasks.task_id", ondelete="SET NULL"), nullable=True), sa.Column("status", sa.Text(), nullable=False, server_default="active"), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), ) op.create_table( "wecom_bindings", sa.Column("user_id", PG_UUID(as_uuid=True), sa.ForeignKey("users.user_id", ondelete="CASCADE"), primary_key=True), sa.Column("wecom_userid", sa.Text(), nullable=False), sa.Column("status", sa.Text(), nullable=False, server_default="active"), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), ) conn = op.get_bind() rows = conn.execute(sa.text( "SELECT user_id, channel, status, config, created_at, updated_at FROM channel_bindings" )).mappings().all() for r in rows: cfg = r["config"] or {} if r["channel"] == "clawbot": conn.execute(sa.text( "INSERT INTO wechat_bot_bindings (user_id, bot_token, bot_im_id, user_im_id, base_url, " "latest_context_token, context_token_at, chat_task_id, status, created_at, updated_at) " "VALUES (:uid, :bt, :bim, :uim, :bu, :lct, CAST(:cta AS timestamptz), " "CAST(:cti AS uuid), :st, :ca, :ua)" ), { "uid": r["user_id"], "bt": cfg.get("bot_token") or "", "bim": cfg.get("bot_im_id"), "uim": cfg.get("user_im_id"), "bu": cfg.get("base_url") or "https://ilinkai.weixin.qq.com", "lct": cfg.get("latest_context_token"), "cta": cfg.get("context_token_at"), "cti": cfg.get("chat_task_id"), "st": r["status"], "ca": r["created_at"], "ua": r["updated_at"], }) elif r["channel"] == "wecom": conn.execute(sa.text( "INSERT INTO wecom_bindings (user_id, wecom_userid, status, created_at, updated_at) " "VALUES (:uid, :wu, :st, :ca, :ua)" ), { "uid": r["user_id"], "wu": cfg.get("wecom_userid") or "", "st": r["status"], "ca": r["created_at"], "ua": r["updated_at"], }) op.drop_index("ix_channel_bindings_channel", table_name="channel_bindings") op.drop_table("channel_bindings")