206 lines
8.1 KiB
Python
206 lines
8.1 KiB
Python
"""企业微信自建应用客户端(DESIGN §8.7 渠道 B,纯推送)。
|
|
|
|
只做**出站推送**(不做入站对话):
|
|
- `access_token`:`gettoken(corpid,secret)`,进程内缓存 ~2h、线程安全、errcode 失效即重取。
|
|
- OAuth 扫码登录:`oauth_authorize_url()` 造扫码授权登录链接(桌面浏览器出二维码);
|
|
`get_user_id(code)` 拿成员 userid(绑定用,一次性)。需管理员在应用配「企业微信授权登录」可信域名。
|
|
- 发送:`send_text / send_markdown / send_file`(file 先 `media/upload` 换 media_id,≤20MB)。
|
|
- `state` HMAC 签名(绑 user_id + 短 TTL,防 CSRF):回调无 JWT,用户身份从 state 来。
|
|
|
|
凭据(secret)只在 host 进程读,绝不进沙箱 / run_python(同 ClawBot / send_email,§3.4)。
|
|
阻塞 IO(httpx 同步),调用方放 to_thread / executor。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import os
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
QYAPI = "https://qyapi.weixin.qq.com/cgi-bin"
|
|
# 扫码授权登录(桌面浏览器渲染二维码,用企业微信 App 扫码)。
|
|
# 不能用 open.weixin.qq.com/connect/oauth2/authorize —— 那条是「网页授权」,只能在
|
|
# 企业微信客户端内打开,桌面浏览器会报「请在企业微信客户端打开链接」。
|
|
WWLOGIN_SSO = "https://login.work.weixin.qq.com/wwlogin/sso/login"
|
|
MAX_FILE_BYTES = 20 * 1024 * 1024
|
|
|
|
# access_token 进程内缓存
|
|
_tok_lock = threading.Lock()
|
|
_tok_val: Optional[str] = None
|
|
_tok_exp: float = 0.0
|
|
|
|
|
|
def wecom_configured() -> bool:
|
|
"""三件套齐才算配好(沿用「有 key 才挂」§3.4)。"""
|
|
return bool(
|
|
os.getenv("WECOM_CORPID", "").strip()
|
|
and os.getenv("WECOM_AGENTID", "").strip()
|
|
and os.getenv("WECOM_SECRET", "").strip()
|
|
)
|
|
|
|
|
|
def _corpid() -> str:
|
|
return os.getenv("WECOM_CORPID", "").strip()
|
|
|
|
|
|
def _agentid() -> str:
|
|
return os.getenv("WECOM_AGENTID", "").strip()
|
|
|
|
|
|
def _secret() -> str:
|
|
return os.getenv("WECOM_SECRET", "").strip()
|
|
|
|
|
|
def _state_secret() -> bytes:
|
|
# OAuth state 签名密钥:复用凭据加密 key,退 JWT_SECRET
|
|
key = (os.getenv("ZCBOT_WECHAT_SECRET_KEY", "").strip()
|
|
or os.getenv("JWT_SECRET", "").strip() or "zcbot-wecom")
|
|
return key.encode("utf-8")
|
|
|
|
|
|
# ─────────────────────────── access_token ───────────────────────────
|
|
|
|
def get_access_token(*, force: bool = False) -> str:
|
|
"""缓存的 app access_token;过期/force 时重取。线程安全。"""
|
|
global _tok_val, _tok_exp
|
|
with _tok_lock:
|
|
if not force and _tok_val and time.time() < _tok_exp:
|
|
return _tok_val
|
|
with httpx.Client(timeout=15) as c:
|
|
r = c.get(f"{QYAPI}/gettoken",
|
|
params={"corpid": _corpid(), "corpsecret": _secret()})
|
|
r.raise_for_status()
|
|
d = r.json()
|
|
if d.get("errcode", 0) != 0 or not d.get("access_token"):
|
|
raise RuntimeError(f"gettoken 失败: {d.get('errcode')} {d.get('errmsg')}")
|
|
_tok_val = d["access_token"]
|
|
_tok_exp = time.time() + int(d.get("expires_in", 7200)) - 300 # 提前 5min 续
|
|
return _tok_val
|
|
|
|
|
|
def _api_get(path: str, params: dict) -> dict:
|
|
"""带 access_token 的 GET;40014/42001(token 失效)自动重取一次。"""
|
|
for attempt in (1, 2):
|
|
tok = get_access_token(force=(attempt == 2))
|
|
with httpx.Client(timeout=15) as c:
|
|
r = c.get(f"{QYAPI}/{path}", params={"access_token": tok, **params})
|
|
r.raise_for_status()
|
|
d = r.json()
|
|
if d.get("errcode") in (40014, 42001) and attempt == 1:
|
|
continue
|
|
return d
|
|
return d
|
|
|
|
|
|
def _api_post(path: str, json_body: dict) -> dict:
|
|
for attempt in (1, 2):
|
|
tok = get_access_token(force=(attempt == 2))
|
|
with httpx.Client(timeout=20) as c:
|
|
r = c.post(f"{QYAPI}/{path}", params={"access_token": tok}, json=json_body)
|
|
r.raise_for_status()
|
|
d = r.json()
|
|
if d.get("errcode") in (40014, 42001) and attempt == 1:
|
|
continue
|
|
return d
|
|
return d
|
|
|
|
|
|
# ─────────────────────────── OAuth 绑定 ───────────────────────────
|
|
|
|
def sign_state(user_id: str, *, ttl: int = 600) -> str:
|
|
"""state = base64(user_id.exp).hmac —— 绑 user_id + 短 TTL,防 CSRF。"""
|
|
exp = int(time.time()) + ttl
|
|
payload = f"{user_id}.{exp}"
|
|
sig = hmac.new(_state_secret(), payload.encode(), hashlib.sha256).hexdigest()[:32]
|
|
raw = f"{payload}.{sig}"
|
|
return base64.urlsafe_b64encode(raw.encode()).decode().rstrip("=")
|
|
|
|
|
|
def verify_state(state: str) -> Optional[str]:
|
|
"""校验 state,返回 user_id;失败/过期返回 None。"""
|
|
try:
|
|
pad = "=" * (-len(state) % 4)
|
|
raw = base64.urlsafe_b64decode(state + pad).decode()
|
|
user_id, exp_s, sig = raw.rsplit(".", 2)
|
|
payload = f"{user_id}.{exp_s}"
|
|
good = hmac.new(_state_secret(), payload.encode(), hashlib.sha256).hexdigest()[:32]
|
|
if not hmac.compare_digest(sig, good):
|
|
return None
|
|
if int(exp_s) < int(time.time()):
|
|
return None
|
|
return user_id
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def oauth_authorize_url(redirect_uri: str, state: str) -> str:
|
|
"""造**扫码授权登录**链接:桌面浏览器打开会渲染二维码,用户用企业微信 App 扫码确认后
|
|
回跳到 redirect_uri 带 code(后续 auth/getuserinfo 换 userid 不变)。
|
|
|
|
注意:redirect_uri 域名须在企业微信后台「应用 → 企业微信授权登录 → 可信域名」里登记,
|
|
与「网页授权可信域名」是两项不同设置。"""
|
|
from urllib.parse import quote
|
|
return (
|
|
f"{WWLOGIN_SSO}?login_type=CorpApp&appid={_corpid()}"
|
|
f"&agentid={_agentid()}"
|
|
f"&redirect_uri={quote(redirect_uri, safe='')}"
|
|
f"&state={quote(state, safe='')}"
|
|
)
|
|
|
|
|
|
def get_user_id(code: str) -> Optional[str]:
|
|
"""OAuth 回调用 code 换企业成员 userid(非成员返回 None)。"""
|
|
d = _api_get("auth/getuserinfo", {"code": code})
|
|
if d.get("errcode", 0) != 0:
|
|
raise RuntimeError(f"getuserinfo 失败: {d.get('errcode')} {d.get('errmsg')}")
|
|
return d.get("userid") # 外部联系人/非成员只有 openid → None
|
|
|
|
|
|
# ─────────────────────────── 发送 ───────────────────────────
|
|
|
|
def _send(touser: str, msgtype: str, body_field: dict) -> None:
|
|
payload = {"touser": touser, "msgtype": msgtype, "agentid": _agentid(), **body_field}
|
|
d = _api_post("message/send", payload)
|
|
if d.get("errcode", 0) != 0:
|
|
raise RuntimeError(f"message/send 失败: {d.get('errcode')} {d.get('errmsg')}")
|
|
|
|
|
|
def send_text(touser: str, content: str) -> None:
|
|
_send(touser, "text", {"text": {"content": content or ""}})
|
|
|
|
|
|
def send_markdown(touser: str, content: str) -> None:
|
|
_send(touser, "markdown", {"markdown": {"content": content or ""}})
|
|
|
|
|
|
def upload_media(file_path: str | os.PathLike, *, media_type: str = "file") -> str:
|
|
"""上传临时素材(3 天有效)→ media_id。"""
|
|
p = Path(file_path)
|
|
if p.stat().st_size > MAX_FILE_BYTES:
|
|
raise ValueError(f"文件超过 {MAX_FILE_BYTES // (1024*1024)}MB 上限")
|
|
for attempt in (1, 2):
|
|
tok = get_access_token(force=(attempt == 2))
|
|
with httpx.Client(timeout=30) as c, open(p, "rb") as f:
|
|
r = c.post(f"{QYAPI}/media/upload",
|
|
params={"access_token": tok, "type": media_type},
|
|
files={"media": (p.name, f)})
|
|
r.raise_for_status()
|
|
d = r.json()
|
|
if d.get("errcode") in (40014, 42001) and attempt == 1:
|
|
continue
|
|
break
|
|
if d.get("errcode", 0) != 0 or not d.get("media_id"):
|
|
raise RuntimeError(f"media/upload 失败: {d.get('errcode')} {d.get('errmsg')}")
|
|
return d["media_id"]
|
|
|
|
|
|
def send_file(touser: str, file_path: str | os.PathLike) -> None:
|
|
media_id = upload_media(file_path, media_type="file")
|
|
_send(touser, "file", {"file": {"media_id": media_id}})
|