import socket from rest_framework.exceptions import ParseError import json import time from django.core.cache import cache from apps.utils.thread import MyThread import uuid import logging import threading import requests myLogger = logging.getLogger('log') def get_checksum(body_msg): return sum(body_msg) & 0xFF def handle_bytes(arr): if len(arr) < 8: return f"返回数据长度错误-{arr}" if arr[0] != 0xEB or arr[1] != 0x90: return "数据头不正确" # 读取长度信息 length_arr = arr[2:4][::-1] # 反转字节 length = int.from_bytes(length_arr, byteorder='little', signed=True) # 小端格式 # 提取内容 body = arr[4:4 + length - 3] # 校验和检查 check_sum = get_checksum(body) if check_sum != arr[length + 1]: return "校验错误" # 尾部标识检查 if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE: return "尾错误" content = body.decode('utf-8') res = json.loads(content) return res[0] def get_tyy_data_t(host, port, tid): cd_thread_key_id = f"cd_thread_{host}_{port}_id" cd_thread_key_val = f"cd_thread_{host}_{port}_val" sc = None def connect_and_send(retry=1): nonlocal sc try: if sc is None: sc = socket.socket() sc.connect((host, int(port))) sc.sendall(b"R") except BrokenPipeError: if retry > 0: connect_and_send(retry-1) else: if sc: try: sc.close() except Exception: pass sc = None except OSError as e: sc = None cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"}) except ConnectionResetError: sc = None cache.set(cd_thread_key_val, {"err_msg": "采集器重置了连接"}) except socket.timeout: sc = None cache.set(cd_thread_key_val, {"err_msg": "采集器连接超时"}) except Exception as e: sc = None cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"}) while cache.get(cd_thread_key_id) == tid: if cache.get(cd_thread_key_val) == "get": cache.set(cd_thread_key_val, "working") connect_and_send() if sc is None: continue resp = sc.recv(1024) res = handle_bytes(resp) if isinstance(res, str): cache.set(cd_thread_key_val, {"err_msg": f'采集器返回数据错误-{res}'}) elif not res: cache.set(cd_thread_key_val, {"err_msg": f"采集器返回数据为空-{str(res)}"}) else: myLogger.info(f"采集器返回数据-{res}") cache.set(cd_thread_key_val, res) time.sleep(0.3) if sc: try: sc.close() except Exception: pass def get_tyy_data_2(*args, retry=1): host, port = args[0], int(args[1]) cd_thread_key_id = f"cd_thread_{host}_{port}_id" cd_thread_key_val = f"cd_thread_{host}_{port}_val" cd_thread_val_id = cache.get(cd_thread_key_id, default=None) if cd_thread_val_id is None: tid = uuid.uuid4() cache.set(cd_thread_key_id, tid, timeout=10800) cd_thread = MyThread(target=get_tyy_data_t, args=(host, port, tid), daemon=True) cd_thread.start() cache.set(cd_thread_key_val, "get") num = 0 get_val = False while True: num += 1 if num > 8: break val = cache.get(cd_thread_key_val) if isinstance(val, dict): get_val = True if "err_msg" in val: raise ParseError(val["err_msg"]) return val time.sleep(0.3) if not get_val and retry > 0: cache.set(cd_thread_key_id, None) get_tyy_data_2(*args, retry=retry-1) sc_all = {} sc_lock = threading.Lock() def get_tyy_data_1(*args, retry=1): host, port = args[0], int(args[1]) global sc_all sc = None def connect_and_send(retry=1): nonlocal sc sc = sc_all.get(f"{host}_{port}", None) try: if sc is None: sc = socket.socket() sc.settimeout(5) # 设置超时 sc.connect((host, port)) sc_all[f"{host}_{port}"] = sc else: # 清空接收缓冲区 sc.settimeout(0.1) # 设置短暂超时 for _ in range(5): try: data = sc.recv(65536) if not data: break except (socket.timeout, BlockingIOError): break sc.settimeout(5) # 恢复原超时设置 sc.sendall(b"R") except BrokenPipeError: if retry > 0: if sc: try: sc.close() except Exception: pass sc_all.pop(f"{host}_{port}", None) return connect_and_send(retry-1) else: if sc: try: sc.close() except Exception: pass sc_all.pop(f"{host}_{port}", None) sc = None raise ParseError("采集器连接失败-管道重置") except OSError as e: if sc: try: sc.close() except Exception: pass sc_all.pop(f"{host}_{port}", None) sc = None raise ParseError(f"采集器连接失败-{str(e)}") except TimeoutError as e: if sc: try: sc.close() except Exception: pass sc_all.pop(f"{host}_{port}", None) sc = None raise ParseError(f"采集器连接超时-{str(e)}") with sc_lock: connect_and_send() resp = sc.recv(1024) res = handle_bytes(resp) # myLogger.error(res) if isinstance(res, str): raise ParseError(f'采集器返回数据错误-{res}') else: return res def get_tyy_data_3(*args, retry=2): host, port = args[0], int(args[1]) for attempt in range(retry): try: # 每次请求都新建连接(确保无共享状态) with socket.create_connection((host, port), timeout=10) as sc: sc.sendall(b"R") # 接收完整响应(避免数据不完整) # resp = b"" # while True: # chunk = sc.recv(4096) # if not chunk: # break # resp += chunk resp = sc.recv(4096) if not resp: raise ParseError("设备未启动") res = handle_bytes(resp) if isinstance(res, str): raise ParseError(f"采集器返回数据错误: {res}") return res except (socket.timeout, ConnectionError) as e: if attempt == retry - 1: # 最后一次尝试失败才报错 raise ParseError(f"采集器连接失败: {str(e)}") time.sleep(0.5) # 失败后等待 1s 再重试 except ParseError: raise except Exception as e: raise ParseError(f"未知错误: {str(e)}") def get_tyy_data(*args): host, port = args[0], int(args[1]) r = requests.get(f"http://127.0.0.1:2300?host={host}&port={port}") res = r.json() if "err_msg" in res: raise ParseError(res["err_msg"]) return res