feat: 优化cd.py

This commit is contained in:
caoqianming 2025-07-17 09:58:56 +08:00
parent 47cfa7ce17
commit e2a8da9eb4
1 changed files with 46 additions and 31 deletions

View File

@ -3,7 +3,9 @@ import json
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
import socket import socket
import traceback import traceback
import threading # 新增:引入线程锁
# 全局 Socket 连接字典
sc_all = { sc_all = {
"192.168.1.220_6000": None, "192.168.1.220_6000": None,
"192.168.1.235_6000": None, "192.168.1.235_6000": None,
@ -11,6 +13,8 @@ sc_all = {
"192.168.1.230_6000": None, "192.168.1.230_6000": None,
} }
sc_lock = threading.Lock() # 全局锁,保护 sc_all
def get_checksum(body_msg): def get_checksum(body_msg):
return sum(body_msg) & 0xFF return sum(body_msg) & 0xFF
@ -21,29 +25,26 @@ def handle_bytes(arr):
if arr[0] != 0xEB or arr[1] != 0x90: if arr[0] != 0xEB or arr[1] != 0x90:
return "数据头不正确" return "数据头不正确"
length_arr = arr[2:4][::-1]
# 读取长度信息 length = int.from_bytes(length_arr, byteorder='little', signed=True)
length_arr = arr[2:4][::-1] # 反转字节
length = int.from_bytes(length_arr, byteorder='little', signed=True) # 小端格式
# 提取内容 if len(arr) < length + 4:
body = arr[4:4 + length - 3] return f"数据不完整,期望长度:{length+4},实际长度:{len(arr)}"
# 校验和检查 body = arr[4:4 + length - 3]
check_sum = get_checksum(body) check_sum = get_checksum(body)
if check_sum != arr[length + 1]: if check_sum != arr[length + 1]:
return "校验错误" return "校验错误"
# 尾部标识检查
if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE: if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE:
return "尾错误" return "尾错误"
content = body.decode('utf-8') try:
content = body.decode('utf-8')
res = json.loads(content) res = json.loads(content)
return res[0]
return res[0] except Exception as e:
return f"数据解析错误: {str(e)}"
class JSONRequestHandler(BaseHTTPRequestHandler): class JSONRequestHandler(BaseHTTPRequestHandler):
def ok(self, data): def ok(self, data):
@ -56,9 +57,7 @@ class JSONRequestHandler(BaseHTTPRequestHandler):
self.send_response(400) self.send_response(400)
self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers() self.end_headers()
data = { data = {"err_msg": err_msg}
"err_msg": err_msg
}
self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
def do_GET(self): def do_GET(self):
@ -67,50 +66,66 @@ class JSONRequestHandler(BaseHTTPRequestHandler):
host = query_params.get('host', ['127.0.0.1'])[0] host = query_params.get('host', ['127.0.0.1'])[0]
port = query_params.get('port', ['6000'])[0] port = query_params.get('port', ['6000'])[0]
addr = f'{host}_{port}' addr = f'{host}_{port}'
if addr not in sc_all: if addr not in sc_all:
self.error(f'{addr} 未找到') self.error(f'{addr} 未找到')
return return
def connect_and_send(): def connect_and_send():
sc = sc_all[addr] with sc_lock: # 加锁,防止竞争
sc = sc_all[addr]
try: try:
if sc is None: if sc is None:
sc = socket.socket() sc = socket.socket()
sc.settimeout(5) # 设置超时 sc.settimeout(5)
sc.connect((host, int(port))) sc.connect((host, int(port)))
sc_all[addr] = sc with sc_lock: # 再次加锁,更新 sc_all
# 清空接收缓冲区 sc_all[addr] = sc
sc.settimeout(0.1) # 设置短暂超时
for _ in range(5): sc.settimeout(0.5)
while True:
try: try:
data = sc.recv(65536) data = sc.recv(65536)
if not data: if not data:
break break
except (socket.timeout, BlockingIOError): except (socket.timeout, BlockingIOError):
break break
sc.settimeout(5) # 恢复原超时设置
sc.settimeout(5)
sc.sendall(b"R") sc.sendall(b"R")
return sc
data = bytearray()
while len(data) < 8:
chunk = sc.recv(1024)
if not chunk:
raise ConnectionError("连接中断")
data.extend(chunk)
return sc, bytes(data)
except Exception as e: except Exception as e:
if sc is not None: if sc is not None:
try: try:
sc.close() sc.close()
except Exception: except Exception:
pass pass
sc_all[addr] = None with sc_lock: # 加锁,清除无效连接
self.error(f'采集器连接失败: {e}') sc_all[addr] = None
self.error(f'采集器通信失败: {e}')
print(traceback.format_exc()) print(traceback.format_exc())
return None return None, None
sc = connect_and_send() sc, resp = connect_and_send()
if sc is None: if sc is None or resp is None:
return return
resp = sc.recv(1024)
res = handle_bytes(resp) res = handle_bytes(resp)
if isinstance(res, str): if isinstance(res, str):
self.error(res) self.error(res)
else: else:
self.ok(res) self.ok(res)
def run(server_class=HTTPServer, handler_class=JSONRequestHandler, port=2300): def run(server_class=HTTPServer, handler_class=JSONRequestHandler, port=2300):
server_address = ('', port) server_address = ('', port)
httpd = server_class(server_address, handler_class) httpd = server_class(server_address, handler_class)