diff --git a/apps/em/cd.py b/apps/em/cd.py index e2d209ca..11683729 100644 --- a/apps/em/cd.py +++ b/apps/em/cd.py @@ -2,41 +2,66 @@ import socket from rest_framework.exceptions import ParseError import json import time +from django.core.cache import cache +from apps.utils.thread import MyThread -def get_tyy_data(*args): - host, port = args[0], args[1] - max_retries = 3 - retry_delay = 0.5 - - for attempt in range(max_retries): - sc = None - try: - sc = socket.socket() - sc.connect((host, int(port))) - sc.sendall(b"R") - resp = sc.recv(1024) - if not resp: - raise ParseError("采集器返回空数据") - elif len(resp) < 8: - raise ParseError("设备未启动") - json_data = resp[5:-4] - json_str = json_data.decode('utf-8') - res = json.loads(json_str) - return res - except ConnectionResetError: - if attempt == max_retries - 1: - raise ParseError("采集器重置了连接,重试次数已达上限") - time.sleep(retry_delay) - except socket.timeout: - raise ParseError("采集器连接超时") - except Exception as e: - raise ParseError(f"其他错误: {str(e)}") - finally: +def get_tyy_data_t(host, port): + cd_thread_key = f"cd_thread_{host}_{port}" + sc = None + while True: + cd_thread_val = cache.get(cd_thread_key, default=None) + if cd_thread_val is None: if sc: try: sc.close() except Exception: pass + break + elif cd_thread_val == "get": + if sc is None: + try: + sc = socket.socket() + sc.connect((host, int(port))) + except OSError as e: + cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"}) + except ConnectionResetError: + cache.set(cd_thread_key, {"err_msg": "采集器重置了连接"}) + except socket.timeout: + cache.set(cd_thread_key, {"err_msg": "采集器连接超时"}) + sc.sendall(b"R") + resp = sc.recv(1024) + if not resp: + cache.set(cd_thread_key, {"err_msg": f"采集器返回空数据-{str(resp)}"}) + elif len(resp) < 8: + cache.set(cd_thread_key, {"err_msg": f"设备未启动-{str(resp)}"}) + else: + json_data = resp[5:-4] + json_str = json_data.decode('utf-8') + res = json.loads(json_str) + cache.set(cd_thread_key, res) + time.sleep(0.3) + +def get_tyy_data(*args): + host, port = args[0], int(args[1]) + cd_thread_key = f"cd_thread_{host}_{port}" + cd_thread_val = cache.get(cd_thread_key, default=None) + if cd_thread_val is None: + cache.set(cd_thread_key, "get") + cd_thread = MyThread(target=get_tyy_data_t, args=(host, port), daemon=True) + cd_thread.start() + num = 0 + while True: + num += 1 + if num > 6: + break + val = cache.get(cd_thread_key) + if isinstance(val, dict): + if "err_msg" in val: + raise ParseError(val["err_msg"]) + return val + time.sleep(0.3) + raise ParseError("获取数据超时") + if __name__ == '__main__': print(get_tyy_data()) \ No newline at end of file