import logging import json import time from threading import Thread import traceback import uuid import requests from requests.exceptions import RequestException from django.conf import settings from rest_framework.exceptions import APIException, ParseError from apps.third.errors import DH_REQUEST_ERROR from apps.third.models import Tlog from apps.utils.my_rsa import encrypt_data from apps.utils.tools import convert_to_base64, print_roundtrip from django.utils.timezone import now from apps.third.tapis import dhapis from apps.utils.tools import singleton from django.core.cache import cache requests.packages.urllib3.disable_warnings() myLogger = logging.getLogger('log') @singleton class DhClient: """ 大华 """ def __init__(self): self.dahua_enabled = getattr(settings, 'DAHUA_ENABLED', False) if self.dahua_enabled: self.dahua_enabled = True self.client_id = settings.DAHUA_CLIENTID self.client_secret = settings.DAHUA_SECRET self.headers = {"Connection": "close"} self.isGetingToken = False self.log = {} # self._get_token() def _get_token(self): self.isGetingToken = True params = { 'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': self.client_secret } is_ok, res = self.request( **dhapis['token_login'], params=params, raise_exception=False, timeout=10) if is_ok == 'success': cache.set('dh_token', res['access_token'], timeout=None) self.isGetingToken = False def request(self, url: str, method: str, params=dict(), json=dict(), timeout=120, file_path_rela=None, raise_exception=True): if not self.dahua_enabled: raise ParseError('大华对接未启用') self.headers['Authorization'] = 'bearer ' + cache.get('dh_token', '') self.headers['User-Id'] = '1' self.log = {"requested_at": now(), "id": uuid.uuid4(), "path": url, "method": method, "params": params, "body": json, "target": "dahua", "result": 10, "headers": self.headers} files = None if file_path_rela: # 相对路径 files = {'file': open(settings.BASE_DIR + file_path_rela, 'rb')} if params: url = url.format(**params) self.log.update({"path": url}) try: r = getattr(requests, method)('{}{}'.format(settings.DAHUA_BASE_URL, url), headers=self.headers, params=params, json=json, timeout=timeout, files=files, verify=False) # if settings.DEBUG: # print_roundtrip(r) ret = r.text if 300 > r.status_code >= 200: ret = r.json() if ret['code'] not in ['0', '100', '00000', '1000', 0, 100, 1000]: detail = '大华错误:' + \ '{}|{}{}{}'.format(str(ret['code']), ret.get('errMsg', ''), ret.get('desc', ''), str(ret.get('data', ''))) err_detail = dict( detail=detail, code='dh_'+str(ret['code'])) self.handle_log(result='fail', response=ret) if raise_exception: raise ParseError(**err_detail) return 'fail', err_detail # self.handle_log(result='success', response=ret) # 成功的日志就不记录了 return 'success', ret['data'] if 'data' in ret else None else: self.handle_log(result='error', response=ret) except RequestException: self.handle_log(result='error', errors=traceback.format_exc()) if raise_exception: raise APIException(**DH_REQUEST_ERROR) return 'error', DH_REQUEST_ERROR def _get_response_ms(self): """ Get the duration of the request response cycle is milliseconds. In case of negative duration 0 is returned. """ response_timedelta = now() - self.log["requested_at"] response_ms = int(response_timedelta.total_seconds() * 1000) return max(response_ms, 0) def handle_log(self, result, response=None, errors=None): self.log.update({ "result": result, "response": response, "response_ms": self._get_response_ms(), "errors": errors }) Tlog(**self.log).save() def get_full_pic(self, path: str): """返回完整访问地址 """ return '{}/evo-apigw/evo-oss/{}?token={}'.format(settings.DAHUA_BASE_URL, path, cache.get('dh_token', '')) def snap(self, code: str, raise_exception:bool): """摄像头实时截图 Args: code (str): 视频通道编号 返回完整访问地址 """ json_data = { "deviceCode": "1000093", "operation": "generalJsonTransport" } if '$' in code: d_code_list = code.split('$') d_code = d_code_list[0] num = d_code_list[-1] json_data['deviceCode'] = d_code json_data['params'] = '{\"method\":\"dev.snap\",\"id\":123,\"params\":{\"DevID\":\"' + \ str(d_code) + '\",\"DevChannel\":' + str(num) + \ ',\"PicNum\":1,\"SnapType\":1,\"CmdSrc\":0}}' is_ok, res = self.request(**dhapis['dev_snap'], json=json_data, raise_exception=raise_exception) if is_ok == 'success': res = json.loads(res) return self.get_full_pic(res['params']['PicInfo']) return None def get_password_token(self): _, res = self.request(**dhapis['oauth_key']) e_pwd = encrypt_data(settings.DAHUA_PASSWORD, res['publicKey']) res['public_key'] = res['publicKey'] res['password'] = e_pwd res.update({ "grant_type": "password", "username": settings.DAHUA_USERNAME, "client_id": settings.DAHUA_CLIENTID, "client_secret": settings.DAHUA_SECRET, }) _, res2 = self.request(**dhapis['oauth_token'], json=res) return res2 def face_search(self, path: str): """人像库检索 Args: path (str): 图片地址 返回识别结果 """ base64img = convert_to_base64(path) json_data = { "base64Img": base64img, "devType": 1, "deviceCodes": ["1000038"], "groupIds": ["100001"], "searchGroupType": 2, "threshold": "80" } _, res = self.request(**dhapis['face_search'], json=json_data) return res['pageData'] # 返回的识别结果是从右往左 """ "pageData": [ { "id": null, "devId": "1000038", "channelId": null, "channelSeq": null, "channelName": null, "faceLibId": "1", "libImgUrl": "6ad010cf-ce45-11ec-9715-e4246c7d1635/20220805/63/dsf_8c2487c1-14a1-11ed-880a-e4246c7d1635_2551349_2624751.jpg", "faceImgUrl": null, "hitImgUrl": null, "identity": "342422199004040175", "name": "曹前明", "sex": "男", "nation": null, "bornYear": null, "age": null, "glasses": null, "fringe": null, "similarity": 99, "recTime": null, "mask": null, "beard": null, "glass": null, "maskStr": null, "beardStr": null, "glassStr": null, "featureStr": "其他 ", "groupName": "内部库1", "groupType": 3, "groupTypeStr": "内部库", "identityType": 1, "identityTypeStr": "身份证", "birthday": null, "searchGroupType": 2, "birthdayStr": "2022-07-14", "deviceName": "ivs服务器", "personCode": "123" }, { "id": null, "devId": "1000038", "channelId": null, "channelSeq": null, "channelName": null, "faceLibId": "1", "libImgUrl": "6ad010cf-ce45-11ec-9715-e4246c7d1635/20220805/63/dsf_8c2487c1-14a1-11ed-880a-e4246c7d1635_2624751_2701750.jpg", "faceImgUrl": null, "hitImgUrl": null, "identity": "371324199502156548", "name": "石静", "sex": "女", "nation": null, "bornYear": null, "age": null, "glasses": null, "fringe": null, "similarity": 96, "recTime": null, "mask": null, "beard": null, "glass": null, "maskStr": null, "beardStr": null, "glassStr": null, "featureStr": "其他 ", "groupName": "内部库1", "groupType": 3, "groupTypeStr": "内部库", "identityType": 1, "identityTypeStr": "身份证", "birthday": null, "searchGroupType": 2, "birthdayStr": "2022-06-24", "deviceName": "ivs服务器", "personCode": "shijing" } ] """ def face_bind(self): # 人像绑定到主库 json_data = { "deptId": 1, "groupIdList": [settings.DAHUA_FACEGROUPID_1], "cascade": True } self.request(**dhapis['face_bind'], json=json_data) def face_deploy(self, minSim: int = 90): params = { "id": "001001001", "checkStat": "1", "type": "001;00_1;1", "capability": "1_00000000000000000000000100000000", "curCount": "0", "isDomain": 0 } _, res = self.request(**dhapis['dev_tree'], params=params) for i in json.loads(res): json_data = { "minSimilarity": str(minSim), "dpMinSimilarity": minSim, "surveyType": ["1"], "groups": str(settings.DAHUA_FACEGROUPID_1), "chnId": i['id'] } self.request(**dhapis['face_deploy'], json=json_data) def get_rtsp(self, code: str): """获取rtsp流 Args: code (str): 通道编号 """ is_ok, res = self.request(**dhapis['video_rtsp'], json={"data": {"channelId": code,"dataType": "1","streamType": "1"}}) if is_ok == 'success': if '|' in res['url']: urls = res['url'].split('|') for i in urls: if settings.DAHUA_HOST in i: return f"{i}?token={res['token']}" else: return f"{res['url']}?token={res['token']}" return None dhClient = DhClient()