factory/apps/em/cd.py

197 lines
5.8 KiB
Python

import socket
from rest_framework.exceptions import ParseError
import json
import time
from django.core.cache import cache
from apps.utils.thread import MyThread
import struct
import uuid
import logging
myLogger = logging.getLogger('log')
def get_checksum(body_msg):
return sum(body_msg) & 0xFF
def handle_bytes(arr):
if len(arr) < 8:
return "返回数据长度错误"
if arr[0] != 0xEB or arr[1] != 0x90:
return "数据头不正确"
# 读取长度信息
length_arr = arr[2:4][::-1] # 反转字节
length = int.from_bytes(length_arr, byteorder='little', signed=True) # 小端格式
# 提取内容
body = arr[4:4 + length - 3]
# 校验和检查
check_sum = get_checksum(body)
if check_sum != arr[length + 1]:
return "校验错误"
# 尾部标识检查
if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE:
return "尾错误"
content = body.decode('utf-8')
res = json.loads(content)
return res[0]
def get_tyy_data_t(host, port, tid):
cd_thread_key_id = f"cd_thread_{host}_{port}_id"
cd_thread_key_val = f"cd_thread_{host}_{port}_val"
sc = None
def connect_and_send(retry=1):
nonlocal sc
try:
if sc is None:
sc = socket.socket()
sc.connect((host, int(port)))
sc.sendall(b"R")
except BrokenPipeError:
if retry > 0:
connect_and_send(retry-1)
else:
if sc:
try:
sc.close()
except Exception:
pass
sc = None
except OSError as e:
sc = None
cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"})
except ConnectionResetError:
sc = None
cache.set(cd_thread_key_val, {"err_msg": "采集器重置了连接"})
except socket.timeout:
sc = None
cache.set(cd_thread_key_val, {"err_msg": "采集器连接超时"})
except Exception as e:
sc = None
cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"})
while cache.get(cd_thread_key_id) == tid:
if cache.get(cd_thread_key_val) == "get":
cache.set(cd_thread_key_val, "working")
connect_and_send()
if sc is None:
continue
resp = sc.recv(1024)
res = handle_bytes(resp)
if isinstance(res, str):
cache.set(cd_thread_key_val, {"err_msg": f'采集器返回数据错误-{res}'})
elif not res:
cache.set(cd_thread_key_val, {"err_msg": f"采集器返回数据为空-{str(res)}"})
else:
myLogger.info(f"采集器返回数据-{res}")
cache.set(cd_thread_key_val, res)
time.sleep(0.3)
if sc:
try:
sc.close()
except Exception:
pass
def get_tyy_data_2(*args, retry=1):
host, port = args[0], int(args[1])
cd_thread_key_id = f"cd_thread_{host}_{port}_id"
cd_thread_key_val = f"cd_thread_{host}_{port}_val"
cd_thread_val_id = cache.get(cd_thread_key_id, default=None)
if cd_thread_val_id is None:
tid = uuid.uuid4()
cache.set(cd_thread_key_id, tid, timeout=10800)
cd_thread = MyThread(target=get_tyy_data_t, args=(host, port, tid), daemon=True)
cd_thread.start()
cache.set(cd_thread_key_val, "get")
num = 0
get_val = False
while True:
num += 1
if num > 8:
break
val = cache.get(cd_thread_key_val)
if isinstance(val, dict):
get_val = True
if "err_msg" in val:
raise ParseError(val["err_msg"])
return val
time.sleep(0.3)
if not get_val and retry > 0:
cache.set(cd_thread_key_id, None)
get_tyy_data_2(*args, retry=retry-1)
sc_all = {}
def get_tyy_data(*args):
host, port = args[0], int(args[1])
global sc_all
sc = None
def connect_and_send(retry=1):
nonlocal sc
sc = sc_all.get(f"{host}_{port}", None)
try:
if sc is None:
sc = socket.socket()
sc.settimeout(5) # 设置超时
sc.connect((host, port))
sc_all[f"{host}_{port}"] = sc
sc.sendall(b"R")
except BrokenPipeError:
if retry > 0:
if sc:
try:
sc.close()
except Exception:
pass
sc_all.pop(f"{host}_{port}", None)
return connect_and_send(retry-1)
else:
if sc:
try:
sc.close()
except Exception:
pass
sc_all.pop(f"{host}_{port}", None)
sc = None
raise ParseError("采集器连接失败-管道重置")
except OSError as e:
if sc:
try:
sc.close()
except Exception:
pass
sc_all.pop(f"{host}_{port}", None)
sc = None
raise ParseError(f"采集器连接失败-{str(e)}")
except Exception as e:
if sc:
try:
sc.close()
except Exception:
pass
sc_all.pop(f"{host}_{port}", None)
sc = None
raise ParseError(f"采集器连接失败-{str(e)}")
connect_and_send()
resp = sc.recv(1024)
res = handle_bytes(resp)
# myLogger.error(res)
if isinstance(res, str):
raise ParseError(f'采集器返回数据错误-{res}')
else:
return res