From 86a8110a5e399cefa861e41e709355659ee9bd83 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Wed, 27 May 2026 13:47:44 +0800 Subject: [PATCH 1/2] =?UTF-8?q?perf(cd):=20out=5Fservice/cd.py=20=E6=94=B9?= =?UTF-8?q?=20ThreadingHTTPServer=20+=20per-host=20=E9=94=81,=20drain=20?= =?UTF-8?q?=E6=94=B9=E9=9D=9E=E9=98=BB=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - HTTPServer -> ThreadingHTTPServer, HTTP 层放开并发 - 全局 sc_lock 拆为 per-host 锁, 不同采集器互不阻塞 - drain 改 setblocking(False), 缓冲区为空时立即返回, 省掉 0.5s 阻塞等待 - recv 改读满 4+length 字节, 避免分包时误判"数据不完整" Co-Authored-By: Claude Opus 4.7 (1M context) --- out_service/cd.py | 121 +++++++++++++++++++++++++++------------------- 1 file changed, 72 insertions(+), 49 deletions(-) diff --git a/out_service/cd.py b/out_service/cd.py index 61e1b299..567eaab4 100644 --- a/out_service/cd.py +++ b/out_service/cd.py @@ -1,11 +1,9 @@ -from http.server import BaseHTTPRequestHandler, HTTPServer +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import json from urllib.parse import urlparse, parse_qs import socket -import traceback -import threading # 新增:引入线程锁 +import threading -# 全局 Socket 连接字典 sc_all = { "192.168.1.220_6000": None, "192.168.1.235_6000": None, @@ -13,7 +11,7 @@ sc_all = { "192.168.1.230_6000": None, } -sc_lock = threading.Lock() # 全局锁,保护 sc_all +sc_locks = {addr: threading.Lock() for addr in sc_all} def get_checksum(body_msg): return sum(body_msg) & 0xFF @@ -21,13 +19,13 @@ def get_checksum(body_msg): def handle_bytes(arr): if len(arr) < 8: return f"返回数据长度错误-{arr}" - + if arr[0] != 0xEB or arr[1] != 0x90: return "数据头不正确" length_arr = arr[2:4][::-1] length = int.from_bytes(length_arr, byteorder='little', signed=True) - + if len(arr) < length + 4: return f"数据不完整,期望长度:{length+4},实际长度:{len(arr)}" @@ -46,6 +44,45 @@ def handle_bytes(arr): except Exception as e: return f"数据解析错误: {str(e)}" + +def drain_socket(sc): + sc.setblocking(False) + try: + while True: + data = sc.recv(65536) + if not data: + break + except BlockingIOError: + pass + finally: + sc.settimeout(5) + + +def recv_full_message(sc): + data = bytearray() + while len(data) < 4: + chunk = sc.recv(1024) + if not chunk: + raise ConnectionError("连接中断") + data.extend(chunk) + + if data[0] != 0xEB or data[1] != 0x90: + return bytes(data) + + length = int.from_bytes(bytes(data[2:4])[::-1], byteorder='little', signed=True) + if length <= 0 or length > 65536: + return bytes(data) + + total_needed = 4 + length + while len(data) < total_needed: + chunk = sc.recv(1024) + if not chunk: + raise ConnectionError("连接中断") + data.extend(chunk) + + return bytes(data) + + class JSONRequestHandler(BaseHTTPRequestHandler): def ok(self, data): self.send_response(200) @@ -60,48 +97,34 @@ class JSONRequestHandler(BaseHTTPRequestHandler): data = {"err_msg": err_msg} self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) + def log_message(self, format, *args): + return + def do_GET(self): parsed_url = urlparse(self.path) query_params = parse_qs(parsed_url.query) host = query_params.get('host', ['127.0.0.1'])[0] port = query_params.get('port', ['6000'])[0] addr = f'{host}_{port}' - + if addr not in sc_all: self.error(f'{addr} 未找到') return - def connect_and_send(): - with sc_lock: # 加锁,防止竞争 + def request_once(): + with sc_locks[addr]: sc = sc_all[addr] - try: if sc is None: sc = socket.socket() sc.settimeout(5) sc.connect((host, int(port))) sc_all[addr] = sc - sc.settimeout(0.5) - while True: - try: - data = sc.recv(65536) - if not data: - break - except (socket.timeout, BlockingIOError): - break - - sc.settimeout(5) + else: + drain_socket(sc) + sc.sendall(b"R") - - data = bytearray() - while len(data) < 8: - chunk = sc.recv(1024) - if not chunk: - raise ConnectionError("连接中断") - data.extend(chunk) - - return sc, bytes(data) - + return recv_full_message(sc) except Exception as e: if sc is not None: try: @@ -109,34 +132,34 @@ class JSONRequestHandler(BaseHTTPRequestHandler): except Exception: pass sc_all[addr] = None - self.error(f'采集器通信失败: {e}') - return None, None + raise e - sc, resp = connect_and_send() - if sc is None or resp is None: + try: + resp = request_once() + except Exception as e: + self.error(f'采集器通信失败: {e}') return - + res = handle_bytes(resp) + if isinstance(res, str) and res == "数据头不正确": + try: + resp = request_once() + except Exception as e: + self.error(f'采集器通信失败: {e}') + return + res = handle_bytes(resp) + if isinstance(res, str): - if res == "数据头不正确": - sc, resp = connect_and_send() - if sc is None or resp is None: - return - res = handle_bytes(resp) - if isinstance(res, str): - self.error(res) - else: - self.ok(res) - else: - self.error(res) + self.error(res) else: self.ok(res) -def run(server_class=HTTPServer, handler_class=JSONRequestHandler, port=2300): + +def run(server_class=ThreadingHTTPServer, handler_class=JSONRequestHandler, port=2300): server_address = ('', port) httpd = server_class(server_address, handler_class) print(f'Starting httpd server on port {port}...') httpd.serve_forever() if __name__ == '__main__': - run() \ No newline at end of file + run() From 7ef9763a50a7fe0031cdf3822c5558fc8ea234f4 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Wed, 27 May 2026 14:14:12 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(cd):=20out=5Fservice/cd.py=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E4=BB=BB=E6=84=8F=20host:port,=20=E5=8E=BB=E6=8E=89?= =?UTF-8?q?=20IP=20=E7=99=BD=E5=90=8D=E5=8D=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit socket 槽位和 per-host 锁改为首次访问时懒注册, 用 double-checked locking 保证热路径无竞争。不再硬编码 4 个采集器地址。 Co-Authored-By: Claude Opus 4.7 (1M context) --- out_service/cd.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/out_service/cd.py b/out_service/cd.py index 567eaab4..19d18d41 100644 --- a/out_service/cd.py +++ b/out_service/cd.py @@ -4,14 +4,21 @@ from urllib.parse import urlparse, parse_qs import socket import threading -sc_all = { - "192.168.1.220_6000": None, - "192.168.1.235_6000": None, - "192.168.1.225_6000": None, - "192.168.1.230_6000": None, -} +sc_all = {} +sc_locks = {} +registry_lock = threading.Lock() -sc_locks = {addr: threading.Lock() for addr in sc_all} + +def get_lock(addr): + lock = sc_locks.get(addr) + if lock is None: + with registry_lock: + lock = sc_locks.get(addr) + if lock is None: + lock = threading.Lock() + sc_locks[addr] = lock + sc_all.setdefault(addr, None) + return lock def get_checksum(body_msg): return sum(body_msg) & 0xFF @@ -106,13 +113,10 @@ class JSONRequestHandler(BaseHTTPRequestHandler): host = query_params.get('host', ['127.0.0.1'])[0] port = query_params.get('port', ['6000'])[0] addr = f'{host}_{port}' - - if addr not in sc_all: - self.error(f'{addr} 未找到') - return + lock = get_lock(addr) def request_once(): - with sc_locks[addr]: + with lock: sc = sc_all[addr] try: if sc is None: