From 11f06c36f78786e4738f500bf17f50ec11b7ea1b Mon Sep 17 00:00:00 2001 From: caoqianming Date: Mon, 9 Jun 2025 13:03:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E9=87=87=E9=9B=86cd?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/em/cd.py | 74 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/apps/em/cd.py b/apps/em/cd.py index 70a4ec77..ce544053 100644 --- a/apps/em/cd.py +++ b/apps/em/cd.py @@ -4,6 +4,41 @@ import json import time from django.core.cache import cache from apps.utils.thread import MyThread +import struct + +def get_checksum(body_msg): + return sum(body_msg) & 0xFF + +def handle_bytes(arr): + if len(arr) < 8: + return "返回数据长度错误" + + if arr[0] != 0xEB or arr[1] != 0x90: + return "数据头不正确" + + + # 读取长度信息 + length_arr = arr[2:4][::-1] # 反转字节 + length = struct.unpack(' 0: - connect_and_send(retry-1) - else: - cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"}) + cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"}) except ConnectionResetError: sc = None - if retry > 0: - connect_and_send(retry-1) - else: - cache.set(cd_thread_key, {"err_msg": "采集器重置了连接"}) + cache.set(cd_thread_key, {"err_msg": "采集器重置了连接"}) except socket.timeout: + sc = None cache.set(cd_thread_key, {"err_msg": "采集器连接超时"}) + except Exception as e: + sc = None + cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"}) + while True: cd_thread_val = cache.get(cd_thread_key, default=None) if cd_thread_val is None: @@ -43,23 +77,21 @@ def get_tyy_data_t(host, port): pass break elif cd_thread_val == "get": + cache.set(cd_thread_key, "working") connect_and_send() if sc is None: - cache.set(cd_thread_key, {"err_msg": "采集器连接失败"}) continue resp = sc.recv(1024) - if not resp: - cache.set(cd_thread_key, {"err_msg": f"采集器返回空数据-{str(resp)}"}) - elif len(resp) < 8: - cache.set(cd_thread_key, {"err_msg": f"设备未启动-{str(resp)}"}) + res = handle_bytes(resp) + if isinstance(res, str): + cache.set(cd_thread_key, {"err_msg": f'采集器返回数据错误-{res}'}) else: - json_data = resp[5:-4] - json_str = json_data.decode('utf-8') - res = json.loads(json_str) cache.set(cd_thread_key, res) time.sleep(0.3) -def get_tyy_data(*args): +def get_tyy_data(*args, sleep=0): + if sleep > 0: + time.sleep(sleep) host, port = args[0], int(args[1]) cd_thread_key = f"cd_thread_{host}_{port}" cd_thread_val = cache.get(cd_thread_key, default=None) @@ -69,17 +101,23 @@ def get_tyy_data(*args): cd_thread.start() cache.set(cd_thread_key, "get") num = 0 + get_val = False + while True: num += 1 if num > 8: break val = cache.get(cd_thread_key) if isinstance(val, dict): + get_val = True if "err_msg" in val: raise ParseError(val["err_msg"]) return val time.sleep(0.3) - raise ParseError("获取数据超时") + + if not get_val: + cache.set(cd_thread_key, None) + get_tyy_data(*args, sleep=2) if __name__ == '__main__':