zcbot/core/wechat/wecom.py

210 lines
8.4 KiB
Python

"""企业微信自建应用客户端(DESIGN §8.7 渠道 B,出站推送 + 入站对话)。
本模块只管**出站**(access_token / OAuth 绑定 / 发送);**入站对话**走回调:加解密在
`wecom_crypto.py`(WXBizMsgCrypt 等价),回调端点 + 反查身份在 web/app.py `/v1/wecom/callback`,
对话核心复用 `_run_channel_conversation`(与个人微信同核心,各一张会话 task)。
出站能力:
- `access_token`:`gettoken(corpid,secret)`,进程内缓存 ~2h、线程安全、errcode 失效即重取。
- OAuth 扫码登录:`oauth_authorize_url()` 造扫码授权登录链接(桌面浏览器出二维码);
`get_user_id(code)` 拿成员 userid(绑定用,一次性)。需管理员在应用配「企业微信授权登录」可信域名。
- 发送:`send_text / send_markdown / send_file`(file 先 `media/upload` 换 media_id,≤20MB)。
- `state` HMAC 签名(绑 user_id + 短 TTL,防 CSRF):回调无 JWT,用户身份从 state 来。
凭据(secret)只在 host 进程读,绝不进沙箱 / run_python(同 ClawBot / send_email,§3.4)。
阻塞 IO(httpx 同步),调用方放 to_thread / executor。
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import os
import threading
import time
from pathlib import Path
from typing import Optional
import httpx
QYAPI = "https://qyapi.weixin.qq.com/cgi-bin"
# 扫码授权登录(桌面浏览器渲染二维码,用企业微信 App 扫码)。
# 不能用 open.weixin.qq.com/connect/oauth2/authorize —— 那条是「网页授权」,只能在
# 企业微信客户端内打开,桌面浏览器会报「请在企业微信客户端打开链接」。
WWLOGIN_SSO = "https://login.work.weixin.qq.com/wwlogin/sso/login"
MAX_FILE_BYTES = 20 * 1024 * 1024
# access_token 进程内缓存
_tok_lock = threading.Lock()
_tok_val: Optional[str] = None
_tok_exp: float = 0.0
def wecom_configured() -> bool:
"""三件套齐才算配好(沿用「有 key 才挂」§3.4)。"""
return bool(
os.getenv("WECOM_CORPID", "").strip()
and os.getenv("WECOM_AGENTID", "").strip()
and os.getenv("WECOM_SECRET", "").strip()
)
def _corpid() -> str:
return os.getenv("WECOM_CORPID", "").strip()
def _agentid() -> str:
return os.getenv("WECOM_AGENTID", "").strip()
def _secret() -> str:
return os.getenv("WECOM_SECRET", "").strip()
def _state_secret() -> bytes:
# OAuth state 签名密钥:复用凭据加密 key,退 JWT_SECRET
key = (os.getenv("ZCBOT_WECHAT_SECRET_KEY", "").strip()
or os.getenv("JWT_SECRET", "").strip() or "zcbot-wecom")
return key.encode("utf-8")
# ─────────────────────────── access_token ───────────────────────────
def get_access_token(*, force: bool = False) -> str:
"""缓存的 app access_token;过期/force 时重取。线程安全。"""
global _tok_val, _tok_exp
with _tok_lock:
if not force and _tok_val and time.time() < _tok_exp:
return _tok_val
with httpx.Client(timeout=15) as c:
r = c.get(f"{QYAPI}/gettoken",
params={"corpid": _corpid(), "corpsecret": _secret()})
r.raise_for_status()
d = r.json()
if d.get("errcode", 0) != 0 or not d.get("access_token"):
raise RuntimeError(f"gettoken 失败: {d.get('errcode')} {d.get('errmsg')}")
_tok_val = d["access_token"]
_tok_exp = time.time() + int(d.get("expires_in", 7200)) - 300 # 提前 5min 续
return _tok_val
def _api_get(path: str, params: dict) -> dict:
"""带 access_token 的 GET;40014/42001(token 失效)自动重取一次。"""
for attempt in (1, 2):
tok = get_access_token(force=(attempt == 2))
with httpx.Client(timeout=15) as c:
r = c.get(f"{QYAPI}/{path}", params={"access_token": tok, **params})
r.raise_for_status()
d = r.json()
if d.get("errcode") in (40014, 42001) and attempt == 1:
continue
return d
return d
def _api_post(path: str, json_body: dict) -> dict:
for attempt in (1, 2):
tok = get_access_token(force=(attempt == 2))
with httpx.Client(timeout=20) as c:
r = c.post(f"{QYAPI}/{path}", params={"access_token": tok}, json=json_body)
r.raise_for_status()
d = r.json()
if d.get("errcode") in (40014, 42001) and attempt == 1:
continue
return d
return d
# ─────────────────────────── OAuth 绑定 ───────────────────────────
def sign_state(user_id: str, *, ttl: int = 600) -> str:
"""state = base64(user_id.exp).hmac —— 绑 user_id + 短 TTL,防 CSRF。"""
exp = int(time.time()) + ttl
payload = f"{user_id}.{exp}"
sig = hmac.new(_state_secret(), payload.encode(), hashlib.sha256).hexdigest()[:32]
raw = f"{payload}.{sig}"
return base64.urlsafe_b64encode(raw.encode()).decode().rstrip("=")
def verify_state(state: str) -> Optional[str]:
"""校验 state,返回 user_id;失败/过期返回 None。"""
try:
pad = "=" * (-len(state) % 4)
raw = base64.urlsafe_b64decode(state + pad).decode()
user_id, exp_s, sig = raw.rsplit(".", 2)
payload = f"{user_id}.{exp_s}"
good = hmac.new(_state_secret(), payload.encode(), hashlib.sha256).hexdigest()[:32]
if not hmac.compare_digest(sig, good):
return None
if int(exp_s) < int(time.time()):
return None
return user_id
except Exception:
return None
def oauth_authorize_url(redirect_uri: str, state: str) -> str:
"""造**扫码授权登录**链接:桌面浏览器打开会渲染二维码,用户用企业微信 App 扫码确认后
回跳到 redirect_uri 带 code(后续 auth/getuserinfo 换 userid 不变)。
注意:redirect_uri 域名须在企业微信后台「应用 → 企业微信授权登录 → 可信域名」里登记,
与「网页授权可信域名」是两项不同设置。"""
from urllib.parse import quote
return (
f"{WWLOGIN_SSO}?login_type=CorpApp&appid={_corpid()}"
f"&agentid={_agentid()}"
f"&redirect_uri={quote(redirect_uri, safe='')}"
f"&state={quote(state, safe='')}"
)
def get_user_id(code: str) -> Optional[str]:
"""OAuth 回调用 code 换企业成员 userid(非成员返回 None)。"""
d = _api_get("auth/getuserinfo", {"code": code})
if d.get("errcode", 0) != 0:
raise RuntimeError(f"getuserinfo 失败: {d.get('errcode')} {d.get('errmsg')}")
return d.get("userid") # 外部联系人/非成员只有 openid → None
# ─────────────────────────── 发送 ───────────────────────────
def _send(touser: str, msgtype: str, body_field: dict) -> None:
payload = {"touser": touser, "msgtype": msgtype, "agentid": _agentid(), **body_field}
d = _api_post("message/send", payload)
if d.get("errcode", 0) != 0:
raise RuntimeError(f"message/send 失败: {d.get('errcode')} {d.get('errmsg')}")
def send_text(touser: str, content: str) -> None:
_send(touser, "text", {"text": {"content": content or ""}})
def send_markdown(touser: str, content: str) -> None:
_send(touser, "markdown", {"markdown": {"content": content or ""}})
def upload_media(file_path: str | os.PathLike, *, media_type: str = "file") -> str:
"""上传临时素材(3 天有效)→ media_id。"""
p = Path(file_path)
if p.stat().st_size > MAX_FILE_BYTES:
raise ValueError(f"文件超过 {MAX_FILE_BYTES // (1024*1024)}MB 上限")
for attempt in (1, 2):
tok = get_access_token(force=(attempt == 2))
with httpx.Client(timeout=30) as c, open(p, "rb") as f:
r = c.post(f"{QYAPI}/media/upload",
params={"access_token": tok, "type": media_type},
files={"media": (p.name, f)})
r.raise_for_status()
d = r.json()
if d.get("errcode") in (40014, 42001) and attempt == 1:
continue
break
if d.get("errcode", 0) != 0 or not d.get("media_id"):
raise RuntimeError(f"media/upload 失败: {d.get('errcode')} {d.get('errmsg')}")
return d["media_id"]
def send_file(touser: str, file_path: str | os.PathLike) -> None:
media_id = upload_media(file_path, media_type="file")
_send(touser, "file", {"file": {"media_id": media_id}})