import socket from rest_framework.exceptions import ParseError import json import time from django.core.cache import cache from apps.utils.thread import MyThread def get_tyy_data_t(host, port): cd_thread_key = f"cd_thread_{host}_{port}" sc = None def connect(): nonlocal sc try: sc = socket.socket() sc.connect((host, int(port))) except OSError as e: cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"}) except ConnectionResetError: cache.set(cd_thread_key, {"err_msg": "采集器重置了连接"}) except socket.timeout: cache.set(cd_thread_key, {"err_msg": "采集器连接超时"}) while True: cd_thread_val = cache.get(cd_thread_key, default=None) if cd_thread_val is None: if sc: try: sc.close() except Exception: pass break elif cd_thread_val == "get": if sc is None: connect() try: sc.sendall(b"R") except BrokenPipeError: try: sc.close() except Exception: pass connect() resp = sc.recv(1024) if not resp: cache.set(cd_thread_key, {"err_msg": f"采集器返回空数据-{str(resp)}"}) elif len(resp) < 8: cache.set(cd_thread_key, {"err_msg": f"设备未启动-{str(resp)}"}) else: json_data = resp[5:-4] json_str = json_data.decode('utf-8') res = json.loads(json_str) cache.set(cd_thread_key, res) time.sleep(0.3) def get_tyy_data(*args): host, port = args[0], int(args[1]) cd_thread_key = f"cd_thread_{host}_{port}" cd_thread_val = cache.get(cd_thread_key, default=None) if cd_thread_val is None: cache.set(cd_thread_key, "get") cd_thread = MyThread(target=get_tyy_data_t, args=(host, port), daemon=True) cd_thread.start() num = 0 while True: num += 1 if num > 8: break val = cache.get(cd_thread_key) if isinstance(val, dict): if "err_msg" in val: raise ParseError(val["err_msg"]) return val time.sleep(0.3) raise ParseError("获取数据超时") if __name__ == '__main__': print(get_tyy_data())