perf(cd): out_service/cd.py 改 ThreadingHTTPServer + per-host 锁, drain 改非阻塞
- HTTPServer -> ThreadingHTTPServer, HTTP 层放开并发 - 全局 sc_lock 拆为 per-host 锁, 不同采集器互不阻塞 - drain 改 setblocking(False), 缓冲区为空时立即返回, 省掉 0.5s 阻塞等待 - recv 改读满 4+length 字节, 避免分包时误判"数据不完整" Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
8de4d1197b
commit
86a8110a5e
|
|
@ -1,11 +1,9 @@
|
|||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
import json
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import socket
|
||||
import traceback
|
||||
import threading # 新增:引入线程锁
|
||||
import threading
|
||||
|
||||
# 全局 Socket 连接字典
|
||||
sc_all = {
|
||||
"192.168.1.220_6000": None,
|
||||
"192.168.1.235_6000": None,
|
||||
|
|
@ -13,7 +11,7 @@ sc_all = {
|
|||
"192.168.1.230_6000": None,
|
||||
}
|
||||
|
||||
sc_lock = threading.Lock() # 全局锁,保护 sc_all
|
||||
sc_locks = {addr: threading.Lock() for addr in sc_all}
|
||||
|
||||
def get_checksum(body_msg):
|
||||
return sum(body_msg) & 0xFF
|
||||
|
|
@ -21,13 +19,13 @@ def get_checksum(body_msg):
|
|||
def handle_bytes(arr):
|
||||
if len(arr) < 8:
|
||||
return f"返回数据长度错误-{arr}"
|
||||
|
||||
|
||||
if arr[0] != 0xEB or arr[1] != 0x90:
|
||||
return "数据头不正确"
|
||||
|
||||
length_arr = arr[2:4][::-1]
|
||||
length = int.from_bytes(length_arr, byteorder='little', signed=True)
|
||||
|
||||
|
||||
if len(arr) < length + 4:
|
||||
return f"数据不完整,期望长度:{length+4},实际长度:{len(arr)}"
|
||||
|
||||
|
|
@ -46,6 +44,45 @@ def handle_bytes(arr):
|
|||
except Exception as e:
|
||||
return f"数据解析错误: {str(e)}"
|
||||
|
||||
|
||||
def drain_socket(sc):
|
||||
sc.setblocking(False)
|
||||
try:
|
||||
while True:
|
||||
data = sc.recv(65536)
|
||||
if not data:
|
||||
break
|
||||
except BlockingIOError:
|
||||
pass
|
||||
finally:
|
||||
sc.settimeout(5)
|
||||
|
||||
|
||||
def recv_full_message(sc):
|
||||
data = bytearray()
|
||||
while len(data) < 4:
|
||||
chunk = sc.recv(1024)
|
||||
if not chunk:
|
||||
raise ConnectionError("连接中断")
|
||||
data.extend(chunk)
|
||||
|
||||
if data[0] != 0xEB or data[1] != 0x90:
|
||||
return bytes(data)
|
||||
|
||||
length = int.from_bytes(bytes(data[2:4])[::-1], byteorder='little', signed=True)
|
||||
if length <= 0 or length > 65536:
|
||||
return bytes(data)
|
||||
|
||||
total_needed = 4 + length
|
||||
while len(data) < total_needed:
|
||||
chunk = sc.recv(1024)
|
||||
if not chunk:
|
||||
raise ConnectionError("连接中断")
|
||||
data.extend(chunk)
|
||||
|
||||
return bytes(data)
|
||||
|
||||
|
||||
class JSONRequestHandler(BaseHTTPRequestHandler):
|
||||
def ok(self, data):
|
||||
self.send_response(200)
|
||||
|
|
@ -60,48 +97,34 @@ class JSONRequestHandler(BaseHTTPRequestHandler):
|
|||
data = {"err_msg": err_msg}
|
||||
self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
|
||||
|
||||
def log_message(self, format, *args):
|
||||
return
|
||||
|
||||
def do_GET(self):
|
||||
parsed_url = urlparse(self.path)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
host = query_params.get('host', ['127.0.0.1'])[0]
|
||||
port = query_params.get('port', ['6000'])[0]
|
||||
addr = f'{host}_{port}'
|
||||
|
||||
|
||||
if addr not in sc_all:
|
||||
self.error(f'{addr} 未找到')
|
||||
return
|
||||
|
||||
def connect_and_send():
|
||||
with sc_lock: # 加锁,防止竞争
|
||||
def request_once():
|
||||
with sc_locks[addr]:
|
||||
sc = sc_all[addr]
|
||||
|
||||
try:
|
||||
if sc is None:
|
||||
sc = socket.socket()
|
||||
sc.settimeout(5)
|
||||
sc.connect((host, int(port)))
|
||||
sc_all[addr] = sc
|
||||
sc.settimeout(0.5)
|
||||
while True:
|
||||
try:
|
||||
data = sc.recv(65536)
|
||||
if not data:
|
||||
break
|
||||
except (socket.timeout, BlockingIOError):
|
||||
break
|
||||
|
||||
sc.settimeout(5)
|
||||
else:
|
||||
drain_socket(sc)
|
||||
|
||||
sc.sendall(b"R")
|
||||
|
||||
data = bytearray()
|
||||
while len(data) < 8:
|
||||
chunk = sc.recv(1024)
|
||||
if not chunk:
|
||||
raise ConnectionError("连接中断")
|
||||
data.extend(chunk)
|
||||
|
||||
return sc, bytes(data)
|
||||
|
||||
return recv_full_message(sc)
|
||||
except Exception as e:
|
||||
if sc is not None:
|
||||
try:
|
||||
|
|
@ -109,34 +132,34 @@ class JSONRequestHandler(BaseHTTPRequestHandler):
|
|||
except Exception:
|
||||
pass
|
||||
sc_all[addr] = None
|
||||
self.error(f'采集器通信失败: {e}')
|
||||
return None, None
|
||||
raise e
|
||||
|
||||
sc, resp = connect_and_send()
|
||||
if sc is None or resp is None:
|
||||
try:
|
||||
resp = request_once()
|
||||
except Exception as e:
|
||||
self.error(f'采集器通信失败: {e}')
|
||||
return
|
||||
|
||||
|
||||
res = handle_bytes(resp)
|
||||
if isinstance(res, str) and res == "数据头不正确":
|
||||
try:
|
||||
resp = request_once()
|
||||
except Exception as e:
|
||||
self.error(f'采集器通信失败: {e}')
|
||||
return
|
||||
res = handle_bytes(resp)
|
||||
|
||||
if isinstance(res, str):
|
||||
if res == "数据头不正确":
|
||||
sc, resp = connect_and_send()
|
||||
if sc is None or resp is None:
|
||||
return
|
||||
res = handle_bytes(resp)
|
||||
if isinstance(res, str):
|
||||
self.error(res)
|
||||
else:
|
||||
self.ok(res)
|
||||
else:
|
||||
self.error(res)
|
||||
self.error(res)
|
||||
else:
|
||||
self.ok(res)
|
||||
|
||||
def run(server_class=HTTPServer, handler_class=JSONRequestHandler, port=2300):
|
||||
|
||||
def run(server_class=ThreadingHTTPServer, handler_class=JSONRequestHandler, port=2300):
|
||||
server_address = ('', port)
|
||||
httpd = server_class(server_address, handler_class)
|
||||
print(f'Starting httpd server on port {port}...')
|
||||
httpd.serve_forever()
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
run()
|
||||
|
|
|
|||
Loading…
Reference in New Issue