examtest/test_server/crm/WXBizDataCrypt.py

33 lines
1.0 KiB
Python

import base64
import json
from Crypto.Cipher import AES
from rest_framework.exceptions import APIException
import logging
logger = logging.getLogger('log')
class WXBizDataCrypt:
def __init__(self, appId, sessionKey):
self.appId = appId
self.sessionKey = sessionKey
def decrypt(self, encryptedData, iv):
# base64 decode
try:
sessionKey = base64.b64decode(self.sessionKey)
encryptedData = base64.b64decode(encryptedData)
iv = base64.b64decode(iv)
cipher = AES.new(sessionKey, AES.MODE_CBC, iv)
decrypted = json.loads(self._unpad(cipher.decrypt(encryptedData)))
if decrypted['watermark']['appid'] != self.appId:
raise Exception('Invalid Buffer')
return decrypted
except:
logger.error('腾讯sb-{}-{}-{}'.format(self.sessionKey,encryptedData, iv))
raise APIException('获取手机号失败,请稍后重试')
def _unpad(self, s):
return s[:-ord(s[len(s)-1:])]