From 8efd28633ca94d179b2a56382036a5cb4530fe7a Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 10 Jun 2025 12:42:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96get=5Ftyy=5Fdata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/em/cd.py | 144 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 108 insertions(+), 36 deletions(-) diff --git a/apps/em/cd.py b/apps/em/cd.py index ce544053..905ad23a 100644 --- a/apps/em/cd.py +++ b/apps/em/cd.py @@ -5,6 +5,10 @@ import time from django.core.cache import cache from apps.utils.thread import MyThread import struct +import uuid +import logging + +myLogger = logging.getLogger('log') def get_checksum(body_msg): return sum(body_msg) & 0xFF @@ -19,7 +23,7 @@ def handle_bytes(arr): # 读取长度信息 length_arr = arr[2:4][::-1] # 反转字节 - length = struct.unpack(' 0: connect_and_send(retry-1) + else: + if sc: + try: + sc.close() + except Exception: + pass + sc = None except OSError as e: sc = None - cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"}) + cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"}) except ConnectionResetError: sc = None - cache.set(cd_thread_key, {"err_msg": "采集器重置了连接"}) + cache.set(cd_thread_key_val, {"err_msg": "采集器重置了连接"}) except socket.timeout: sc = None - cache.set(cd_thread_key, {"err_msg": "采集器连接超时"}) + cache.set(cd_thread_key_val, {"err_msg": "采集器连接超时"}) except Exception as e: sc = None - cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"}) + cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"}) - while True: - cd_thread_val = cache.get(cd_thread_key, default=None) - if cd_thread_val is None: - if sc: - try: - sc.close() - except Exception: - pass - break - elif cd_thread_val == "get": - cache.set(cd_thread_key, "working") + while cache.get(cd_thread_key_id) == tid: + if cache.get(cd_thread_key_val) == "get": + cache.set(cd_thread_key_val, "working") connect_and_send() if sc is None: continue resp = sc.recv(1024) res = handle_bytes(resp) if isinstance(res, str): - cache.set(cd_thread_key, {"err_msg": f'采集器返回数据错误-{res}'}) + cache.set(cd_thread_key_val, {"err_msg": f'采集器返回数据错误-{res}'}) + elif not res: + cache.set(cd_thread_key_val, {"err_msg": f"采集器返回数据为空-{str(res)}"}) else: - cache.set(cd_thread_key, res) + myLogger.info(f"采集器返回数据-{res}") + cache.set(cd_thread_key_val, res) time.sleep(0.3) -def get_tyy_data(*args, sleep=0): - if sleep > 0: - time.sleep(sleep) + if sc: + try: + sc.close() + except Exception: + pass + +def get_tyy_data_2(*args, retry=1): host, port = args[0], int(args[1]) - cd_thread_key = f"cd_thread_{host}_{port}" - cd_thread_val = cache.get(cd_thread_key, default=None) - if cd_thread_val is None: - cache.set(cd_thread_key, "start") - cd_thread = MyThread(target=get_tyy_data_t, args=(host, port), daemon=True) + cd_thread_key_id = f"cd_thread_{host}_{port}_id" + cd_thread_key_val = f"cd_thread_{host}_{port}_val" + cd_thread_val_id = cache.get(cd_thread_key_id, default=None) + if cd_thread_val_id is None: + tid = uuid.uuid4() + cache.set(cd_thread_key_id, tid, timeout=10800) + cd_thread = MyThread(target=get_tyy_data_t, args=(host, port, tid), daemon=True) cd_thread.start() - cache.set(cd_thread_key, "get") + cache.set(cd_thread_key_val, "get") num = 0 get_val = False @@ -107,7 +119,7 @@ def get_tyy_data(*args, sleep=0): num += 1 if num > 8: break - val = cache.get(cd_thread_key) + val = cache.get(cd_thread_key_val) if isinstance(val, dict): get_val = True if "err_msg" in val: @@ -115,10 +127,70 @@ def get_tyy_data(*args, sleep=0): return val time.sleep(0.3) - if not get_val: - cache.set(cd_thread_key, None) - get_tyy_data(*args, sleep=2) + if not get_val and retry > 0: + cache.set(cd_thread_key_id, None) + get_tyy_data_2(*args, retry=retry-1) -if __name__ == '__main__': - print(get_tyy_data()) \ No newline at end of file +sc_all = {} + +def get_tyy_data(*args): + host, port = args[0], int(args[1]) + global sc_all + sc = None + + def connect_and_send(retry=1): + nonlocal sc + sc = sc_all.get(f"{host}_{port}", None) + try: + if sc is None: + sc = socket.socket() + sc.settimeout(5) # 设置超时 + sc.connect((host, port)) + sc_all[f"{host}_{port}"] = sc + sc.sendall(b"R") + except BrokenPipeError: + if retry > 0: + if sc: + try: + sc.close() + except Exception: + pass + sc_all.pop(f"{host}_{port}", None) + return connect_and_send(retry-1) + else: + if sc: + try: + sc.close() + except Exception: + pass + sc_all.pop(f"{host}_{port}", None) + sc = None + raise ParseError("采集器连接失败-管道重置") + except OSError as e: + if sc: + try: + sc.close() + except Exception: + pass + sc_all.pop(f"{host}_{port}", None) + sc = None + raise ParseError(f"采集器连接失败-{str(e)}") + except Exception as e: + if sc: + try: + sc.close() + except Exception: + pass + sc_all.pop(f"{host}_{port}", None) + sc = None + raise ParseError(f"采集器连接失败-{str(e)}") + + connect_and_send() + resp = sc.recv(1024) + res = handle_bytes(resp) + # myLogger.error(res) + if isinstance(res, str): + raise ParseError(f'采集器返回数据错误-{res}') + else: + return res