import logging import time from threading import Thread import traceback import uuid import requests from requests.exceptions import RequestException from django.conf import settings from rest_framework.exceptions import APIException, ParseError from apps.third.errors import XX_REQUEST_ERROR from apps.third.models import Tlog from apps.utils.tools import print_roundtrip from django.utils.timezone import now from apps.third.tapis import xxapis from apps.utils.tools import singleton from django.core.cache import cache requests.packages.urllib3.disable_warnings() myLogger = logging.getLogger('log') @singleton class XxClient: """ 寻息 """ def __init__(self) -> None: self.xx_enabled = getattr(settings, 'XX_ENABLED', False) if self.xx_enabled: self.licence = settings.XX_LICENCE self.username = settings.XX_USERNAME self.isGetingToken = False self.headers = {"Connection": "close"} self.log = {} # self._get_token() def _get_token(self): self.isGetingToken = True json = { 'licence': self.licence } is_ok, res = self.request(**xxapis['token_login'], json=json, raise_exception=False, timeout=10) if is_ok == 'success': cache.set('xx_token', res['token'], timeout=None) self.isGetingToken = False def request(self, url: str, method: str = 'post', params=dict(), json=dict(), timeout=20, raise_exception=True): if not self.xx_enabled: raise ParseError('寻息对接未启用') params['accessToken'] = cache.get('xx_token', '') json['username'] = self.username json['buildId'] = settings.XX_BUILDID json['licence'] = settings.XX_LICENCE self.log = {"requested_at": now(), "id": uuid.uuid4(), "path": url, "method": method, "params": params, "body": json, "target": "xunxi", "result": 10, "headers": self.headers} try: r = getattr(requests, method)('{}{}'.format(settings.XX_BASE_URL, url), headers=self.headers, params=params, json=json, timeout=timeout, verify=False) # if settings.DEBUG: # print_roundtrip(r) ret = r.text if 300 > r.status_code >= 200: ret = r.json() if ret['errorCode'] != 0: # if ret.get('errorCode') in ['1060000', 1060000]: # self._get_token() # 重新获取token # self.request(url, method, params, json, timeout, raise_exception) # 重新请求 # return err_detail = dict(detail='寻息错误:' + '|'.join(ret['errorMsg']), code='xx_' + str(ret['errorCode'])) self.handle_log(result='fail', response=ret) if raise_exception: raise ParseError(**err_detail) return 'fail', err_detail return 'success', ret['data'] else: self.handle_log(result='error', response=ret) except RequestException: self.handle_log(result='error', errors=traceback.format_exc()) if raise_exception: raise APIException(**XX_REQUEST_ERROR) return 'error', XX_REQUEST_ERROR def _get_response_ms(self): """ Get the duration of the request response cycle is milliseconds. In case of negative duration 0 is returned. """ response_timedelta = now() - self.log["requested_at"] response_ms = int(response_timedelta.total_seconds() * 1000) return max(response_ms, 0) def handle_log(self, result, response=None, errors=None): self.log.update({ "result": result, "response": response, "response_ms": self._get_response_ms(), "errors": errors }) Tlog(**self.log).save() def subscribe(self, type: str, rurl: str): # 订阅 location, /third/xunxi/c_location json_data = {"type": type, "data": {"buildIds": [settings.XX_BUILDID], "serverUrl": settings.BASE_URL_IN + rurl}, "licence": settings.XX_LICENCE} self.request(**xxapis['subscribe'], json=json_data) xxClient = XxClient()