from threading import Thread import traceback import requests from apps.utils.tools import print_roundtrip from server import settings import json import time requests.packages.urllib3.disable_warnings() class DhClient: """ 大华 """ def __init__(self, client_id= settings.DAHUA_CLIENTID , client_secret = settings.DAHUA_SECRET) -> None: self.client_id = client_id self.client_secret = client_secret self.headers = {} self.isGetingToken = False self.isRuning = True self.t = None # 线程 self.setup() def _get_token_loop(self): while self.isRuning: params = { 'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': self.client_secret } r = requests.post(params=params, url=settings.DAHUA_BASE_URL + '/evo-apigw/evo-oauth/oauth/token', verify=False) ret = r.json() if ret['success']: self.headers['Authorization'] = 'bearer ' + ret['data']['access_token'] self.headers['User-Id'] = '1' time.sleep(3600) def get_token(self): self.isGetingToken = True params = { 'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': self.client_secret } r = requests.post(params=params, url=settings.DAHUA_BASE_URL + '/evo-apigw/evo-oauth/oauth/token', verify=False) ret = r.json() if ret['success']: self.headers['Authorization'] = 'bearer ' + ret['data']['access_token'] self.headers['User-Id'] = '1' self.isGetingToken = False def setup(self): t = Thread(target=self._get_token_loop, args=(), daemon=True) t.start() def __del__(self): """ 自定义销毁 """ self.isRuning = False self.t.join() def request(self, url:str, method:str, params=dict(), json=dict(), timeout=20): if self.isGetingToken: req_num = 0 while True: time.sleep(0.5) if not self.isGetingToken: self.request(url, method, params, json, timeout) req_num = req_num + 1 if req_num > 4: break else: r = getattr(requests, method)('{}{}'.format(settings.DAHUA_BASE_URL, url) , headers = self.headers, params=params, json=json, verify=False) if settings.DEBUG: print_roundtrip(r) if r.status_code == 200: """ 请求成功 """ ret = r.json() if ret.get('code') == '27001007': self.get_token() # 重新获取token self.request(url, method, params, json, timeout) # 重新请求 else: msg = '{}|{}{}'.format(str(ret['code']), ret.get('errMsg',''), ret.get('desc', '')) res = dict(success=True, code=200000, msg= msg, data=ret.get('data', None)) if ret['code'] not in ['0', '100', '00000']: res['success'] = False res['code'] = 400000 return res return dict(success=False, code=400901, msg='大华接口访问异常', data=None) if settings.DAHUA_ENABLED: dhClient = DhClient()