197 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			197 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
| import socket
 | |
| from rest_framework.exceptions import ParseError
 | |
| import json
 | |
| import time
 | |
| from django.core.cache import cache
 | |
| from apps.utils.thread import MyThread
 | |
| import struct
 | |
| import uuid
 | |
| import logging
 | |
| 
 | |
| myLogger = logging.getLogger('log')
 | |
| 
 | |
| def get_checksum(body_msg):
 | |
|     return sum(body_msg) & 0xFF
 | |
| 
 | |
| def handle_bytes(arr):
 | |
|     if len(arr) < 8:
 | |
|         return "返回数据长度错误"
 | |
|     
 | |
|     if arr[0] != 0xEB or arr[1] != 0x90:
 | |
|         return "数据头不正确"
 | |
| 
 | |
| 
 | |
|     # 读取长度信息
 | |
|     length_arr = arr[2:4][::-1]  # 反转字节
 | |
|     length = int.from_bytes(length_arr, byteorder='little', signed=True)  # 小端格式
 | |
|     
 | |
|     # 提取内容
 | |
|     body = arr[4:4 + length - 3]
 | |
| 
 | |
|     # 校验和检查
 | |
|     check_sum = get_checksum(body)
 | |
|     if check_sum != arr[length + 1]:
 | |
|         return "校验错误"
 | |
|         
 | |
| 
 | |
|     # 尾部标识检查
 | |
|     if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE:
 | |
|         return "尾错误"
 | |
| 
 | |
|     content = body.decode('utf-8')
 | |
| 
 | |
|     res = json.loads(content)
 | |
|     
 | |
|     return res[0]
 | |
| 
 | |
| def get_tyy_data_t(host, port, tid):
 | |
|     cd_thread_key_id = f"cd_thread_{host}_{port}_id"
 | |
|     cd_thread_key_val = f"cd_thread_{host}_{port}_val"
 | |
|     sc = None
 | |
|     def connect_and_send(retry=1):
 | |
|         nonlocal sc
 | |
|         try:
 | |
|             if sc is None:
 | |
|                 sc = socket.socket()
 | |
|                 sc.connect((host, int(port)))
 | |
|             sc.sendall(b"R")
 | |
|         except BrokenPipeError:
 | |
|             if retry > 0:
 | |
|                 connect_and_send(retry-1)
 | |
|             else:
 | |
|                 if sc:
 | |
|                     try:
 | |
|                         sc.close()
 | |
|                     except Exception:
 | |
|                         pass
 | |
|                 sc = None
 | |
|         except OSError as e:
 | |
|             sc = None
 | |
|             cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"})
 | |
|         except ConnectionResetError:
 | |
|             sc = None
 | |
|             cache.set(cd_thread_key_val, {"err_msg": "采集器重置了连接"})
 | |
|         except socket.timeout:
 | |
|             sc = None
 | |
|             cache.set(cd_thread_key_val, {"err_msg": "采集器连接超时"})
 | |
|         except Exception as e:
 | |
|             sc = None
 | |
|             cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"})
 | |
| 
 | |
|     while cache.get(cd_thread_key_id) == tid:
 | |
|         if cache.get(cd_thread_key_val) == "get":
 | |
|             cache.set(cd_thread_key_val, "working")
 | |
|             connect_and_send()
 | |
|             if sc is None:
 | |
|                 continue
 | |
|             resp = sc.recv(1024)
 | |
|             res = handle_bytes(resp)
 | |
|             if isinstance(res, str):
 | |
|                 cache.set(cd_thread_key_val, {"err_msg": f'采集器返回数据错误-{res}'})
 | |
|             elif not res:
 | |
|                 cache.set(cd_thread_key_val, {"err_msg": f"采集器返回数据为空-{str(res)}"})
 | |
|             else:
 | |
|                 myLogger.info(f"采集器返回数据-{res}")
 | |
|                 cache.set(cd_thread_key_val, res)
 | |
|         time.sleep(0.3)
 | |
| 
 | |
|     if sc:
 | |
|         try:
 | |
|             sc.close()
 | |
|         except Exception:
 | |
|             pass
 | |
| 
 | |
| def get_tyy_data_2(*args, retry=1):
 | |
|     host, port = args[0], int(args[1])
 | |
|     cd_thread_key_id = f"cd_thread_{host}_{port}_id"
 | |
|     cd_thread_key_val = f"cd_thread_{host}_{port}_val"
 | |
|     cd_thread_val_id = cache.get(cd_thread_key_id, default=None)
 | |
|     if cd_thread_val_id is None:
 | |
|         tid = uuid.uuid4()
 | |
|         cache.set(cd_thread_key_id, tid, timeout=10800)
 | |
|         cd_thread = MyThread(target=get_tyy_data_t, args=(host, port, tid), daemon=True)
 | |
|         cd_thread.start()
 | |
|     cache.set(cd_thread_key_val, "get")
 | |
|     num = 0
 | |
|     get_val = False
 | |
| 
 | |
|     while True:
 | |
|         num += 1
 | |
|         if num > 8:
 | |
|             break
 | |
|         val = cache.get(cd_thread_key_val)
 | |
|         if isinstance(val, dict):
 | |
|             get_val = True
 | |
|             if "err_msg" in val:
 | |
|                 raise ParseError(val["err_msg"])
 | |
|             return val
 | |
|         time.sleep(0.3)
 | |
| 
 | |
|     if not get_val and retry > 0:
 | |
|         cache.set(cd_thread_key_id, None)
 | |
|         get_tyy_data_2(*args, retry=retry-1)
 | |
| 
 | |
| 
 | |
| sc_all = {}
 | |
| 
 | |
| def get_tyy_data(*args):
 | |
|     host, port = args[0], int(args[1])
 | |
|     global sc_all
 | |
|     sc = None
 | |
|     
 | |
|     def connect_and_send(retry=1):
 | |
|         nonlocal sc
 | |
|         sc = sc_all.get(f"{host}_{port}", None)
 | |
|         try:
 | |
|             if sc is None:
 | |
|                 sc = socket.socket()
 | |
|                 sc.settimeout(5)  # 设置超时
 | |
|                 sc.connect((host, port))
 | |
|                 sc_all[f"{host}_{port}"] = sc
 | |
|             sc.sendall(b"R")
 | |
|         except BrokenPipeError:
 | |
|             if retry > 0:
 | |
|                 if sc:
 | |
|                     try:
 | |
|                         sc.close()
 | |
|                     except Exception:
 | |
|                         pass
 | |
|                     sc_all.pop(f"{host}_{port}", None)
 | |
|                 return connect_and_send(retry-1)
 | |
|             else:
 | |
|                 if sc:
 | |
|                     try:
 | |
|                         sc.close()
 | |
|                     except Exception:
 | |
|                         pass
 | |
|                     sc_all.pop(f"{host}_{port}", None)
 | |
|                 sc = None
 | |
|                 raise ParseError("采集器连接失败-管道重置")
 | |
|         except OSError as e:
 | |
|             if sc:
 | |
|                 try:
 | |
|                     sc.close()
 | |
|                 except Exception:
 | |
|                     pass
 | |
|                 sc_all.pop(f"{host}_{port}", None)
 | |
|             sc = None
 | |
|             raise ParseError(f"采集器连接失败-{str(e)}")
 | |
|         except TimeoutError as e:
 | |
|             if sc:
 | |
|                 try:
 | |
|                     sc.close()
 | |
|                 except Exception:
 | |
|                     pass
 | |
|                 sc_all.pop(f"{host}_{port}", None)
 | |
|             sc = None
 | |
|             raise ParseError(f"采集器连接超时-{str(e)}")
 | |
|         
 | |
|     connect_and_send()
 | |
|     resp = sc.recv(1024)
 | |
|     res = handle_bytes(resp)
 | |
|     # myLogger.error(res)
 | |
|     if isinstance(res, str):
 | |
|         raise ParseError(f'采集器返回数据错误-{res}')
 | |
|     else:
 | |
|         return res
 |