feat: get_tyy_data优化保持socket连接

This commit is contained in:
caoqianming 2025-05-29 10:20:46 +08:00
parent 725a471fd8
commit 4cd4ba3c9d
1 changed files with 54 additions and 29 deletions

View File

@ -2,41 +2,66 @@ import socket
from rest_framework.exceptions import ParseError from rest_framework.exceptions import ParseError
import json import json
import time import time
from django.core.cache import cache
from apps.utils.thread import MyThread
def get_tyy_data(*args): def get_tyy_data_t(host, port):
host, port = args[0], args[1] cd_thread_key = f"cd_thread_{host}_{port}"
max_retries = 3 sc = None
retry_delay = 0.5 while True:
cd_thread_val = cache.get(cd_thread_key, default=None)
for attempt in range(max_retries): if cd_thread_val is None:
sc = None
try:
sc = socket.socket()
sc.connect((host, int(port)))
sc.sendall(b"R")
resp = sc.recv(1024)
if not resp:
raise ParseError("采集器返回空数据")
elif len(resp) < 8:
raise ParseError("设备未启动")
json_data = resp[5:-4]
json_str = json_data.decode('utf-8')
res = json.loads(json_str)
return res
except ConnectionResetError:
if attempt == max_retries - 1:
raise ParseError("采集器重置了连接,重试次数已达上限")
time.sleep(retry_delay)
except socket.timeout:
raise ParseError("采集器连接超时")
except Exception as e:
raise ParseError(f"其他错误: {str(e)}")
finally:
if sc: if sc:
try: try:
sc.close() sc.close()
except Exception: except Exception:
pass pass
break
elif cd_thread_val == "get":
if sc is None:
try:
sc = socket.socket()
sc.connect((host, int(port)))
except OSError as e:
cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"})
except ConnectionResetError:
cache.set(cd_thread_key, {"err_msg": "采集器重置了连接"})
except socket.timeout:
cache.set(cd_thread_key, {"err_msg": "采集器连接超时"})
sc.sendall(b"R")
resp = sc.recv(1024)
if not resp:
cache.set(cd_thread_key, {"err_msg": f"采集器返回空数据-{str(resp)}"})
elif len(resp) < 8:
cache.set(cd_thread_key, {"err_msg": f"设备未启动-{str(resp)}"})
else:
json_data = resp[5:-4]
json_str = json_data.decode('utf-8')
res = json.loads(json_str)
cache.set(cd_thread_key, res)
time.sleep(0.3)
def get_tyy_data(*args):
host, port = args[0], int(args[1])
cd_thread_key = f"cd_thread_{host}_{port}"
cd_thread_val = cache.get(cd_thread_key, default=None)
if cd_thread_val is None:
cache.set(cd_thread_key, "get")
cd_thread = MyThread(target=get_tyy_data_t, args=(host, port), daemon=True)
cd_thread.start()
num = 0
while True:
num += 1
if num > 6:
break
val = cache.get(cd_thread_key)
if isinstance(val, dict):
if "err_msg" in val:
raise ParseError(val["err_msg"])
return val
time.sleep(0.3)
raise ParseError("获取数据超时")
if __name__ == '__main__': if __name__ == '__main__':
print(get_tyy_data()) print(get_tyy_data())