This commit is contained in:
shijing 2026-05-28 11:20:45 +08:00
commit c7515052cf
1 changed files with 85 additions and 58 deletions

View File

@ -1,19 +1,24 @@
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import json import json
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
import socket import socket
import traceback import threading
import threading # 新增:引入线程锁
# 全局 Socket 连接字典 sc_all = {}
sc_all = { sc_locks = {}
"192.168.1.220_6000": None, registry_lock = threading.Lock()
"192.168.1.235_6000": None,
"192.168.1.225_6000": None,
"192.168.1.230_6000": None,
}
sc_lock = threading.Lock() # 全局锁,保护 sc_all
def get_lock(addr):
lock = sc_locks.get(addr)
if lock is None:
with registry_lock:
lock = sc_locks.get(addr)
if lock is None:
lock = threading.Lock()
sc_locks[addr] = lock
sc_all.setdefault(addr, None)
return lock
def get_checksum(body_msg): def get_checksum(body_msg):
return sum(body_msg) & 0xFF return sum(body_msg) & 0xFF
@ -46,6 +51,45 @@ def handle_bytes(arr):
except Exception as e: except Exception as e:
return f"数据解析错误: {str(e)}" return f"数据解析错误: {str(e)}"
def drain_socket(sc):
sc.setblocking(False)
try:
while True:
data = sc.recv(65536)
if not data:
break
except BlockingIOError:
pass
finally:
sc.settimeout(5)
def recv_full_message(sc):
data = bytearray()
while len(data) < 4:
chunk = sc.recv(1024)
if not chunk:
raise ConnectionError("连接中断")
data.extend(chunk)
if data[0] != 0xEB or data[1] != 0x90:
return bytes(data)
length = int.from_bytes(bytes(data[2:4])[::-1], byteorder='little', signed=True)
if length <= 0 or length > 65536:
return bytes(data)
total_needed = 4 + length
while len(data) < total_needed:
chunk = sc.recv(1024)
if not chunk:
raise ConnectionError("连接中断")
data.extend(chunk)
return bytes(data)
class JSONRequestHandler(BaseHTTPRequestHandler): class JSONRequestHandler(BaseHTTPRequestHandler):
def ok(self, data): def ok(self, data):
self.send_response(200) self.send_response(200)
@ -60,48 +104,31 @@ class JSONRequestHandler(BaseHTTPRequestHandler):
data = {"err_msg": err_msg} data = {"err_msg": err_msg}
self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
def log_message(self, format, *args):
return
def do_GET(self): def do_GET(self):
parsed_url = urlparse(self.path) parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query) query_params = parse_qs(parsed_url.query)
host = query_params.get('host', ['127.0.0.1'])[0] host = query_params.get('host', ['127.0.0.1'])[0]
port = query_params.get('port', ['6000'])[0] port = query_params.get('port', ['6000'])[0]
addr = f'{host}_{port}' addr = f'{host}_{port}'
lock = get_lock(addr)
if addr not in sc_all: def request_once():
self.error(f'{addr} 未找到') with lock:
return
def connect_and_send():
with sc_lock: # 加锁,防止竞争
sc = sc_all[addr] sc = sc_all[addr]
try: try:
if sc is None: if sc is None:
sc = socket.socket() sc = socket.socket()
sc.settimeout(5) sc.settimeout(5)
sc.connect((host, int(port))) sc.connect((host, int(port)))
sc_all[addr] = sc sc_all[addr] = sc
sc.settimeout(0.5) else:
while True: drain_socket(sc)
try:
data = sc.recv(65536)
if not data:
break
except (socket.timeout, BlockingIOError):
break
sc.settimeout(5)
sc.sendall(b"R") sc.sendall(b"R")
return recv_full_message(sc)
data = bytearray()
while len(data) < 8:
chunk = sc.recv(1024)
if not chunk:
raise ConnectionError("连接中断")
data.extend(chunk)
return sc, bytes(data)
except Exception as e: except Exception as e:
if sc is not None: if sc is not None:
try: try:
@ -109,30 +136,30 @@ class JSONRequestHandler(BaseHTTPRequestHandler):
except Exception: except Exception:
pass pass
sc_all[addr] = None sc_all[addr] = None
self.error(f'采集器通信失败: {e}') raise e
return None, None
sc, resp = connect_and_send() try:
if sc is None or resp is None: resp = request_once()
except Exception as e:
self.error(f'采集器通信失败: {e}')
return return
res = handle_bytes(resp) res = handle_bytes(resp)
if isinstance(res, str) and res == "数据头不正确":
try:
resp = request_once()
except Exception as e:
self.error(f'采集器通信失败: {e}')
return
res = handle_bytes(resp)
if isinstance(res, str): if isinstance(res, str):
if res == "数据头不正确": self.error(res)
sc, resp = connect_and_send()
if sc is None or resp is None:
return
res = handle_bytes(resp)
if isinstance(res, str):
self.error(res)
else:
self.ok(res)
else:
self.error(res)
else: else:
self.ok(res) self.ok(res)
def run(server_class=HTTPServer, handler_class=JSONRequestHandler, port=2300):
def run(server_class=ThreadingHTTPServer, handler_class=JSONRequestHandler, port=2300):
server_address = ('', port) server_address = ('', port)
httpd = server_class(server_address, handler_class) httpd = server_class(server_address, handler_class)
print(f'Starting httpd server on port {port}...') print(f'Starting httpd server on port {port}...')