From e2a8da9eb4ac69467911ba6980a6a9984d4d418e Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 17 Jul 2025 09:58:56 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96cd.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- out_service/cd.py | 77 ++++++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 31 deletions(-) diff --git a/out_service/cd.py b/out_service/cd.py index 532bfb29..f8aee930 100644 --- a/out_service/cd.py +++ b/out_service/cd.py @@ -3,7 +3,9 @@ import json from urllib.parse import urlparse, parse_qs import socket import traceback +import threading # 新增:引入线程锁 +# 全局 Socket 连接字典 sc_all = { "192.168.1.220_6000": None, "192.168.1.235_6000": None, @@ -11,6 +13,8 @@ sc_all = { "192.168.1.230_6000": None, } +sc_lock = threading.Lock() # 全局锁,保护 sc_all + def get_checksum(body_msg): return sum(body_msg) & 0xFF @@ -21,29 +25,26 @@ def handle_bytes(arr): if arr[0] != 0xEB or arr[1] != 0x90: return "数据头不正确" - - # 读取长度信息 - length_arr = arr[2:4][::-1] # 反转字节 - length = int.from_bytes(length_arr, byteorder='little', signed=True) # 小端格式 + length_arr = arr[2:4][::-1] + length = int.from_bytes(length_arr, byteorder='little', signed=True) - # 提取内容 - body = arr[4:4 + length - 3] + if len(arr) < length + 4: + return f"数据不完整,期望长度:{length+4},实际长度:{len(arr)}" - # 校验和检查 + body = arr[4:4 + length - 3] check_sum = get_checksum(body) if check_sum != arr[length + 1]: return "校验错误" - - # 尾部标识检查 if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE: return "尾错误" - content = body.decode('utf-8') - - res = json.loads(content) - - return res[0] + try: + content = body.decode('utf-8') + res = json.loads(content) + return res[0] + except Exception as e: + return f"数据解析错误: {str(e)}" class JSONRequestHandler(BaseHTTPRequestHandler): def ok(self, data): @@ -56,9 +57,7 @@ class JSONRequestHandler(BaseHTTPRequestHandler): self.send_response(400) self.send_header('Content-Type', 'application/json; charset=utf-8') self.end_headers() - data = { - "err_msg": err_msg - } + data = {"err_msg": err_msg} self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) def do_GET(self): @@ -67,50 +66,66 @@ class JSONRequestHandler(BaseHTTPRequestHandler): host = query_params.get('host', ['127.0.0.1'])[0] port = query_params.get('port', ['6000'])[0] addr = f'{host}_{port}' + if addr not in sc_all: self.error(f'{addr} 未找到') return def connect_and_send(): - sc = sc_all[addr] + with sc_lock: # 加锁,防止竞争 + sc = sc_all[addr] + try: if sc is None: sc = socket.socket() - sc.settimeout(5) # 设置超时 + sc.settimeout(5) sc.connect((host, int(port))) - sc_all[addr] = sc - # 清空接收缓冲区 - sc.settimeout(0.1) # 设置短暂超时 - for _ in range(5): + with sc_lock: # 再次加锁,更新 sc_all + sc_all[addr] = sc + + sc.settimeout(0.5) + while True: try: data = sc.recv(65536) if not data: break except (socket.timeout, BlockingIOError): break - sc.settimeout(5) # 恢复原超时设置 + + sc.settimeout(5) sc.sendall(b"R") - return sc + + data = bytearray() + while len(data) < 8: + chunk = sc.recv(1024) + if not chunk: + raise ConnectionError("连接中断") + data.extend(chunk) + + return sc, bytes(data) + except Exception as e: if sc is not None: try: sc.close() except Exception: pass - sc_all[addr] = None - self.error(f'采集器连接失败: {e}') + with sc_lock: # 加锁,清除无效连接 + sc_all[addr] = None + self.error(f'采集器通信失败: {e}') print(traceback.format_exc()) - return None + return None, None - sc = connect_and_send() - if sc is None: + sc, resp = connect_and_send() + if sc is None or resp is None: return - resp = sc.recv(1024) + res = handle_bytes(resp) if isinstance(res, str): self.error(res) else: self.ok(res) + def run(server_class=HTTPServer, handler_class=JSONRequestHandler, port=2300): server_address = ('', port) httpd = server_class(server_address, handler_class)