import socket from rest_framework.exceptions import ParseError import json import time from django.core.cache import cache from apps.utils.thread import MyThread import struct import uuid import logging import threading myLogger = logging.getLogger('log') def get_checksum(body_msg): return sum(body_msg) & 0xFF def handle_bytes(arr): if len(arr) < 8: return "返回数据长度错误" if arr[0] != 0xEB or arr[1] != 0x90: return "数据头不正确" # 读取长度信息 length_arr = arr[2:4][::-1] # 反转字节 length = int.from_bytes(length_arr, byteorder='little', signed=True) # 小端格式 # 提取内容 body = arr[4:4 + length - 3] # 校验和检查 check_sum = get_checksum(body) if check_sum != arr[length + 1]: return "校验错误" # 尾部标识检查 if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE: return "尾错误" content = body.decode('utf-8') res = json.loads(content) return res[0] def get_tyy_data_t(host, port, tid): cd_thread_key_id = f"cd_thread_{host}_{port}_id" cd_thread_key_val = f"cd_thread_{host}_{port}_val" sc = None def connect_and_send(retry=1): nonlocal sc try: if sc is None: sc = socket.socket() sc.connect((host, int(port))) sc.sendall(b"R") except BrokenPipeError: if retry > 0: connect_and_send(retry-1) else: if sc: try: sc.close() except Exception: pass sc = None except OSError as e: sc = None cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"}) except ConnectionResetError: sc = None cache.set(cd_thread_key_val, {"err_msg": "采集器重置了连接"}) except socket.timeout: sc = None cache.set(cd_thread_key_val, {"err_msg": "采集器连接超时"}) except Exception as e: sc = None cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"}) while cache.get(cd_thread_key_id) == tid: if cache.get(cd_thread_key_val) == "get": cache.set(cd_thread_key_val, "working") connect_and_send() if sc is None: continue resp = sc.recv(1024) res = handle_bytes(resp) if isinstance(res, str): cache.set(cd_thread_key_val, {"err_msg": f'采集器返回数据错误-{res}'}) elif not res: cache.set(cd_thread_key_val, {"err_msg": f"采集器返回数据为空-{str(res)}"}) else: myLogger.info(f"采集器返回数据-{res}") cache.set(cd_thread_key_val, res) time.sleep(0.3) if sc: try: sc.close() except Exception: pass def get_tyy_data_2(*args, retry=1): host, port = args[0], int(args[1]) cd_thread_key_id = f"cd_thread_{host}_{port}_id" cd_thread_key_val = f"cd_thread_{host}_{port}_val" cd_thread_val_id = cache.get(cd_thread_key_id, default=None) if cd_thread_val_id is None: tid = uuid.uuid4() cache.set(cd_thread_key_id, tid, timeout=10800) cd_thread = MyThread(target=get_tyy_data_t, args=(host, port, tid), daemon=True) cd_thread.start() cache.set(cd_thread_key_val, "get") num = 0 get_val = False while True: num += 1 if num > 8: break val = cache.get(cd_thread_key_val) if isinstance(val, dict): get_val = True if "err_msg" in val: raise ParseError(val["err_msg"]) return val time.sleep(0.3) if not get_val and retry > 0: cache.set(cd_thread_key_id, None) get_tyy_data_2(*args, retry=retry-1) sc_all = {} sc_lock = threading.Lock() def get_tyy_data(*args, retry=1): host, port = args[0], int(args[1]) global sc_all sc = None def connect_and_send(retry=1): nonlocal sc sc = sc_all.get(f"{host}_{port}", None) try: if sc is None: sc = socket.socket() sc.settimeout(5) # 设置超时 sc.connect((host, port)) sc_all[f"{host}_{port}"] = sc else: # 清空接收缓冲区 sc.settimeout(0.1) # 设置短暂超时 for _ in range(5): try: data = sc.recv(65536) if not data: break except (socket.timeout, BlockingIOError): break sc.settimeout(5) # 恢复原超时设置 sc.sendall(b"R") except BrokenPipeError: if retry > 0: if sc: try: sc.close() except Exception: pass sc_all.pop(f"{host}_{port}", None) return connect_and_send(retry-1) else: if sc: try: sc.close() except Exception: pass sc_all.pop(f"{host}_{port}", None) sc = None raise ParseError("采集器连接失败-管道重置") except OSError as e: if sc: try: sc.close() except Exception: pass sc_all.pop(f"{host}_{port}", None) sc = None raise ParseError(f"采集器连接失败-{str(e)}") except TimeoutError as e: if sc: try: sc.close() except Exception: pass sc_all.pop(f"{host}_{port}", None) sc = None raise ParseError(f"采集器连接超时-{str(e)}") with sc_lock: connect_and_send() resp = sc.recv(1024) res = handle_bytes(resp) # myLogger.error(res) if isinstance(res, str): raise ParseError(f'采集器返回数据错误-{res}') else: return res