feat: 优化设备采集cd

This commit is contained in:
caoqianming 2025-06-09 13:03:03 +08:00
parent b9cad6bbd8
commit 11f06c36f7
1 changed files with 56 additions and 18 deletions

View File

@ -4,6 +4,41 @@ import json
import time import time
from django.core.cache import cache from django.core.cache import cache
from apps.utils.thread import MyThread from apps.utils.thread import MyThread
import struct
def get_checksum(body_msg):
return sum(body_msg) & 0xFF
def handle_bytes(arr):
if len(arr) < 8:
return "返回数据长度错误"
if arr[0] != 0xEB or arr[1] != 0x90:
return "数据头不正确"
# 读取长度信息
length_arr = arr[2:4][::-1] # 反转字节
length = struct.unpack('<H', bytes(length_arr))[0] # 小端格式
# 提取内容
body = arr[4:4 + length - 3]
# 校验和检查
check_sum = get_checksum(body)
if check_sum != arr[length + 1]:
return "校验错误"
# 尾部标识检查
if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE:
return "尾错误"
content = body.decode('utf-8')
res = json.loads(content)
return res[0]
def get_tyy_data_t(host, port): def get_tyy_data_t(host, port):
cd_thread_key = f"cd_thread_{host}_{port}" cd_thread_key = f"cd_thread_{host}_{port}"
@ -21,18 +56,17 @@ def get_tyy_data_t(host, port):
connect_and_send(retry-1) connect_and_send(retry-1)
except OSError as e: except OSError as e:
sc = None sc = None
if retry > 0: cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"})
connect_and_send(retry-1)
else:
cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"})
except ConnectionResetError: except ConnectionResetError:
sc = None sc = None
if retry > 0: cache.set(cd_thread_key, {"err_msg": "采集器重置了连接"})
connect_and_send(retry-1)
else:
cache.set(cd_thread_key, {"err_msg": "采集器重置了连接"})
except socket.timeout: except socket.timeout:
sc = None
cache.set(cd_thread_key, {"err_msg": "采集器连接超时"}) cache.set(cd_thread_key, {"err_msg": "采集器连接超时"})
except Exception as e:
sc = None
cache.set(cd_thread_key, {"err_msg": f"采集器连接失败-{str(e)}"})
while True: while True:
cd_thread_val = cache.get(cd_thread_key, default=None) cd_thread_val = cache.get(cd_thread_key, default=None)
if cd_thread_val is None: if cd_thread_val is None:
@ -43,23 +77,21 @@ def get_tyy_data_t(host, port):
pass pass
break break
elif cd_thread_val == "get": elif cd_thread_val == "get":
cache.set(cd_thread_key, "working")
connect_and_send() connect_and_send()
if sc is None: if sc is None:
cache.set(cd_thread_key, {"err_msg": "采集器连接失败"})
continue continue
resp = sc.recv(1024) resp = sc.recv(1024)
if not resp: res = handle_bytes(resp)
cache.set(cd_thread_key, {"err_msg": f"采集器返回空数据-{str(resp)}"}) if isinstance(res, str):
elif len(resp) < 8: cache.set(cd_thread_key, {"err_msg": f'采集器返回数据错误-{res}'})
cache.set(cd_thread_key, {"err_msg": f"设备未启动-{str(resp)}"})
else: else:
json_data = resp[5:-4]
json_str = json_data.decode('utf-8')
res = json.loads(json_str)
cache.set(cd_thread_key, res) cache.set(cd_thread_key, res)
time.sleep(0.3) time.sleep(0.3)
def get_tyy_data(*args): def get_tyy_data(*args, sleep=0):
if sleep > 0:
time.sleep(sleep)
host, port = args[0], int(args[1]) host, port = args[0], int(args[1])
cd_thread_key = f"cd_thread_{host}_{port}" cd_thread_key = f"cd_thread_{host}_{port}"
cd_thread_val = cache.get(cd_thread_key, default=None) cd_thread_val = cache.get(cd_thread_key, default=None)
@ -69,17 +101,23 @@ def get_tyy_data(*args):
cd_thread.start() cd_thread.start()
cache.set(cd_thread_key, "get") cache.set(cd_thread_key, "get")
num = 0 num = 0
get_val = False
while True: while True:
num += 1 num += 1
if num > 8: if num > 8:
break break
val = cache.get(cd_thread_key) val = cache.get(cd_thread_key)
if isinstance(val, dict): if isinstance(val, dict):
get_val = True
if "err_msg" in val: if "err_msg" in val:
raise ParseError(val["err_msg"]) raise ParseError(val["err_msg"])
return val return val
time.sleep(0.3) time.sleep(0.3)
raise ParseError("获取数据超时")
if not get_val:
cache.set(cd_thread_key, None)
get_tyy_data(*args, sleep=2)
if __name__ == '__main__': if __name__ == '__main__':