from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import json from urllib.parse import urlparse, parse_qs import socket import threading sc_all = {} sc_locks = {} registry_lock = threading.Lock() def get_lock(addr): lock = sc_locks.get(addr) if lock is None: with registry_lock: lock = sc_locks.get(addr) if lock is None: lock = threading.Lock() sc_locks[addr] = lock sc_all.setdefault(addr, None) return lock def get_checksum(body_msg): return sum(body_msg) & 0xFF def handle_bytes(arr): if len(arr) < 8: return f"返回数据长度错误-{arr}" if arr[0] != 0xEB or arr[1] != 0x90: return "数据头不正确" length_arr = arr[2:4][::-1] length = int.from_bytes(length_arr, byteorder='little', signed=True) if len(arr) < length + 4: return f"数据不完整,期望长度:{length+4},实际长度:{len(arr)}" body = arr[4:4 + length - 3] check_sum = get_checksum(body) if check_sum != arr[length + 1]: return "校验错误" if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE: return "尾错误" try: content = body.decode('utf-8') res = json.loads(content) return res[0] except Exception as e: return f"数据解析错误: {str(e)}" def drain_socket(sc): sc.setblocking(False) try: while True: data = sc.recv(65536) if not data: break except BlockingIOError: pass finally: sc.settimeout(5) def recv_full_message(sc): data = bytearray() while len(data) < 4: chunk = sc.recv(1024) if not chunk: raise ConnectionError("连接中断") data.extend(chunk) if data[0] != 0xEB or data[1] != 0x90: return bytes(data) length = int.from_bytes(bytes(data[2:4])[::-1], byteorder='little', signed=True) if length <= 0 or length > 65536: return bytes(data) total_needed = 4 + length while len(data) < total_needed: chunk = sc.recv(1024) if not chunk: raise ConnectionError("连接中断") data.extend(chunk) return bytes(data) class JSONRequestHandler(BaseHTTPRequestHandler): def ok(self, data): self.send_response(200) self.send_header('Content-Type', 'application/json; charset=utf-8') self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) def error(self, err_msg): self.send_response(400) self.send_header('Content-Type', 'application/json; charset=utf-8') self.end_headers() data = {"err_msg": err_msg} self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) def log_message(self, format, *args): return def do_GET(self): parsed_url = urlparse(self.path) query_params = parse_qs(parsed_url.query) host = query_params.get('host', ['127.0.0.1'])[0] port = query_params.get('port', ['6000'])[0] addr = f'{host}_{port}' lock = get_lock(addr) def request_once(): with lock: sc = sc_all[addr] try: if sc is None: sc = socket.socket() sc.settimeout(5) sc.connect((host, int(port))) sc_all[addr] = sc else: drain_socket(sc) sc.sendall(b"R") return recv_full_message(sc) except Exception as e: if sc is not None: try: sc.close() except Exception: pass sc_all[addr] = None raise e try: resp = request_once() except Exception as e: self.error(f'采集器通信失败: {e}') return res = handle_bytes(resp) if isinstance(res, str) and res == "数据头不正确": try: resp = request_once() except Exception as e: self.error(f'采集器通信失败: {e}') return res = handle_bytes(resp) if isinstance(res, str): self.error(res) else: self.ok(res) def run(server_class=ThreadingHTTPServer, handler_class=JSONRequestHandler, port=2300): server_address = ('', port) httpd = server_class(server_address, handler_class) print(f'Starting httpd server on port {port}...') httpd.serve_forever() if __name__ == '__main__': run()