"""企业微信自建应用客户端(DESIGN §8.7 渠道 B,纯推送)。 只做**出站推送**(不做入站对话): - `access_token`:`gettoken(corpid,secret)`,进程内缓存 ~2h、线程安全、errcode 失效即重取。 - OAuth 网页授权:`oauth_authorize_url()` 造扫码链接;`get_user_id(code)` 拿成员 userid (绑定用,一次性)。需管理员在应用配「网页授权可信域名」。 - 发送:`send_text / send_markdown / send_file`(file 先 `media/upload` 换 media_id,≤20MB)。 - `state` HMAC 签名(绑 user_id + 短 TTL,防 CSRF):回调无 JWT,用户身份从 state 来。 凭据(secret)只在 host 进程读,绝不进沙箱 / run_python(同 ClawBot / send_email,§3.4)。 阻塞 IO(httpx 同步),调用方放 to_thread / executor。 """ from __future__ import annotations import base64 import hashlib import hmac import os import threading import time from pathlib import Path from typing import Optional import httpx QYAPI = "https://qyapi.weixin.qq.com/cgi-bin" OAUTH_AUTHORIZE = "https://open.weixin.qq.com/connect/oauth2/authorize" MAX_FILE_BYTES = 20 * 1024 * 1024 # access_token 进程内缓存 _tok_lock = threading.Lock() _tok_val: Optional[str] = None _tok_exp: float = 0.0 def wecom_configured() -> bool: """三件套齐才算配好(沿用「有 key 才挂」§3.4)。""" return bool( os.getenv("WECOM_CORPID", "").strip() and os.getenv("WECOM_AGENTID", "").strip() and os.getenv("WECOM_SECRET", "").strip() ) def _corpid() -> str: return os.getenv("WECOM_CORPID", "").strip() def _agentid() -> str: return os.getenv("WECOM_AGENTID", "").strip() def _secret() -> str: return os.getenv("WECOM_SECRET", "").strip() def _state_secret() -> bytes: # OAuth state 签名密钥:复用凭据加密 key,退 JWT_SECRET key = (os.getenv("ZCBOT_WECHAT_SECRET_KEY", "").strip() or os.getenv("JWT_SECRET", "").strip() or "zcbot-wecom") return key.encode("utf-8") # ─────────────────────────── access_token ─────────────────────────── def get_access_token(*, force: bool = False) -> str: """缓存的 app access_token;过期/force 时重取。线程安全。""" global _tok_val, _tok_exp with _tok_lock: if not force and _tok_val and time.time() < _tok_exp: return _tok_val with httpx.Client(timeout=15) as c: r = c.get(f"{QYAPI}/gettoken", params={"corpid": _corpid(), "corpsecret": _secret()}) r.raise_for_status() d = r.json() if d.get("errcode", 0) != 0 or not d.get("access_token"): raise RuntimeError(f"gettoken 失败: {d.get('errcode')} {d.get('errmsg')}") _tok_val = d["access_token"] _tok_exp = time.time() + int(d.get("expires_in", 7200)) - 300 # 提前 5min 续 return _tok_val def _api_get(path: str, params: dict) -> dict: """带 access_token 的 GET;40014/42001(token 失效)自动重取一次。""" for attempt in (1, 2): tok = get_access_token(force=(attempt == 2)) with httpx.Client(timeout=15) as c: r = c.get(f"{QYAPI}/{path}", params={"access_token": tok, **params}) r.raise_for_status() d = r.json() if d.get("errcode") in (40014, 42001) and attempt == 1: continue return d return d def _api_post(path: str, json_body: dict) -> dict: for attempt in (1, 2): tok = get_access_token(force=(attempt == 2)) with httpx.Client(timeout=20) as c: r = c.post(f"{QYAPI}/{path}", params={"access_token": tok}, json=json_body) r.raise_for_status() d = r.json() if d.get("errcode") in (40014, 42001) and attempt == 1: continue return d return d # ─────────────────────────── OAuth 绑定 ─────────────────────────── def sign_state(user_id: str, *, ttl: int = 600) -> str: """state = base64(user_id.exp).hmac —— 绑 user_id + 短 TTL,防 CSRF。""" exp = int(time.time()) + ttl payload = f"{user_id}.{exp}" sig = hmac.new(_state_secret(), payload.encode(), hashlib.sha256).hexdigest()[:32] raw = f"{payload}.{sig}" return base64.urlsafe_b64encode(raw.encode()).decode().rstrip("=") def verify_state(state: str) -> Optional[str]: """校验 state,返回 user_id;失败/过期返回 None。""" try: pad = "=" * (-len(state) % 4) raw = base64.urlsafe_b64decode(state + pad).decode() user_id, exp_s, sig = raw.rsplit(".", 2) payload = f"{user_id}.{exp_s}" good = hmac.new(_state_secret(), payload.encode(), hashlib.sha256).hexdigest()[:32] if not hmac.compare_digest(sig, good): return None if int(exp_s) < int(time.time()): return None return user_id except Exception: return None def oauth_authorize_url(redirect_uri: str, state: str) -> str: """造网页授权链接。桌面浏览器打开 = 出二维码扫;企业微信内 = 静默授权。""" from urllib.parse import quote return ( f"{OAUTH_AUTHORIZE}?appid={_corpid()}" f"&redirect_uri={quote(redirect_uri, safe='')}" f"&response_type=code&scope=snsapi_base&agentid={_agentid()}" f"&state={quote(state, safe='')}#wechat_redirect" ) def get_user_id(code: str) -> Optional[str]: """OAuth 回调用 code 换企业成员 userid(非成员返回 None)。""" d = _api_get("auth/getuserinfo", {"code": code}) if d.get("errcode", 0) != 0: raise RuntimeError(f"getuserinfo 失败: {d.get('errcode')} {d.get('errmsg')}") return d.get("userid") # 外部联系人/非成员只有 openid → None # ─────────────────────────── 发送 ─────────────────────────── def _send(touser: str, msgtype: str, body_field: dict) -> None: payload = {"touser": touser, "msgtype": msgtype, "agentid": _agentid(), **body_field} d = _api_post("message/send", payload) if d.get("errcode", 0) != 0: raise RuntimeError(f"message/send 失败: {d.get('errcode')} {d.get('errmsg')}") def send_text(touser: str, content: str) -> None: _send(touser, "text", {"text": {"content": content or ""}}) def send_markdown(touser: str, content: str) -> None: _send(touser, "markdown", {"markdown": {"content": content or ""}}) def upload_media(file_path: str | os.PathLike, *, media_type: str = "file") -> str: """上传临时素材(3 天有效)→ media_id。""" p = Path(file_path) if p.stat().st_size > MAX_FILE_BYTES: raise ValueError(f"文件超过 {MAX_FILE_BYTES // (1024*1024)}MB 上限") for attempt in (1, 2): tok = get_access_token(force=(attempt == 2)) with httpx.Client(timeout=30) as c, open(p, "rb") as f: r = c.post(f"{QYAPI}/media/upload", params={"access_token": tok, "type": media_type}, files={"media": (p.name, f)}) r.raise_for_status() d = r.json() if d.get("errcode") in (40014, 42001) and attempt == 1: continue break if d.get("errcode", 0) != 0 or not d.get("media_id"): raise RuntimeError(f"media/upload 失败: {d.get('errcode')} {d.get('errmsg')}") return d["media_id"] def send_file(touser: str, file_path: str | os.PathLike) -> None: media_id = upload_media(file_path, media_type="file") _send(touser, "file", {"file": {"media_id": media_id}})