import json import time from threading import Thread import uuid import requests from django.conf import settings from rest_framework.exceptions import APIException, ParseError from apps.third.errors import DH_REQUEST_ERROR from apps.third.models import Tlog from apps.utils.my_rsa import encrypt_data from apps.utils.tools import print_roundtrip from django.utils.timezone import now from apps.third.tapis import dhapis requests.packages.urllib3.disable_warnings() class DhClient: """ 大华 """ def __init__(self, client_id=settings.DAHUA_CLIENTID, client_secret=settings.DAHUA_SECRET) -> None: if settings.DAHUA_ENABLED: self.client_id = client_id self.client_secret = client_secret self.headers = {} self.isGetingToken = False self.isRuning = True self.token = None self.t = None # 线程 self.log = {} self.setup() def _get_token_loop(self): while self.isRuning: params = { 'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': self.client_secret } r = requests.post(params=params, url=settings.DAHUA_BASE_URL + '/evo-apigw/evo-oauth/oauth/token', verify=False) ret = r.json() if ret['success']: self.token = ret['data']['access_token'] self.headers['Authorization'] = 'bearer ' + ret['data']['access_token'] self.headers['User-Id'] = '1' time.sleep(3600) def setup(self): t = Thread(target=self._get_token_loop, args=(), daemon=True) t.start() def __del__(self): """ 自定义销毁 """ self.isRuning = False # self.t.join() def request(self, url: str, method: str, params=dict(), json=dict(), timeout=10, file_path_rela=None, raise_exception=True): if not settings.DAHUA_ENABLED: raise ParseError('大华对接未启用') self.log = {"requested_at": now(), "id": uuid.uuid4(), "path": url, "method": method, "params": params, "body": json, "target": "dahua", "result": 10} files = None if file_path_rela: # 相对路径 files = {'file': open(settings.BASE_DIR + file_path_rela, 'rb')} try: if params: url = url.format(**params) self.log.update({"path": url}) r = getattr(requests, method)('{}{}'.format(settings.DAHUA_BASE_URL, url), headers=self.headers, params=params, json=json, timeout=timeout, files=files, verify=False) except Exception: self.handle_log(result='error') if raise_exception: raise APIException(**DH_REQUEST_ERROR) return 'error', DH_REQUEST_ERROR # if settings.DEBUG: # print_roundtrip(r) if r.status_code == 200: ret = r.json() if ret['code'] not in ['0', '100', '00000', '1000', 0, 100, 1000]: detail = '大华错误:' + \ '{}|{}{}{}'.format(str(ret['code']), ret.get('errMsg', ''), ret.get('desc', ''), str(ret.get('data', ''))) err_detail = dict(detail=detail, code='dh_'+str(ret['code'])) self.handle_log(result='fail', response=ret) if raise_exception: raise ParseError(**err_detail) return 'fail', dict(detail=detail, code='dh_'+str(ret['code'])) # self.handle_log(result='success', response=ret) # 成功的日志就不记录了 return 'success', ret['data'] if 'data' in ret else None self.handle_log(result='error', response=None) if raise_exception: raise APIException(**DH_REQUEST_ERROR) return 'error', DH_REQUEST_ERROR def _get_response_ms(self): """ Get the duration of the request response cycle is milliseconds. In case of negative duration 0 is returned. """ response_timedelta = now() - self.log["requested_at"] response_ms = int(response_timedelta.total_seconds() * 1000) return max(response_ms, 0) def handle_log(self, result, response=None): self.log.update({ "result": result, "response": response, "response_ms": self._get_response_ms() }) Tlog(**self.log).save() def get_full_pic(self, path: str): """返回完整访问地址 """ return '{}/evo-apigw/evo-oss/{}?token={}'.format(settings.DAHUA_BASE_URL, path, self.token) def snap(self, code: str): """摄像头实时截图 Args: code (str): 视频通道编号 返回完整访问地址 """ json_data = { "deviceCode": "1000038", "operation": "generalJsonTransport", "params": "{\"method\":\"dev.snap\",\"id\":123,\"params\":{\"DevID\":\"1000038\",\"DevChannel\":0,\"PicNum\":1,\"SnapType\":1,\"CmdSrc\":0}}", } if '$' in code: d_code = code.split('$')[0] json_data['deviceCode'] = d_code def get_password_token(self): _, res = self.request(**dhapis['oauth_key']) e_pwd = encrypt_data(settings.DAHUA_PASSWORD, res['publicKey']) res['public_key'] = res['publicKey'] res['password'] = e_pwd res.update({ "grant_type": "password", "username": settings.DAHUA_USERNAME, "client_id": settings.DAHUA_CLIENTID, "client_secret": settings.DAHUA_SECRET, }) _, res2 = self.request(**dhapis['oauth_token'], json=res) return res2