252 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			252 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
| import socket
 | |
| from rest_framework.exceptions import ParseError
 | |
| import json
 | |
| import time
 | |
| from django.core.cache import cache
 | |
| from apps.utils.thread import MyThread
 | |
| import uuid
 | |
| import logging
 | |
| import threading
 | |
| import requests
 | |
| 
 | |
| myLogger = logging.getLogger('log')
 | |
| 
 | |
| def get_checksum(body_msg):
 | |
|     return sum(body_msg) & 0xFF
 | |
| 
 | |
| def handle_bytes(arr):
 | |
|     if len(arr) < 8:
 | |
|         return f"返回数据长度错误-{arr}"
 | |
|     
 | |
|     if arr[0] != 0xEB or arr[1] != 0x90:
 | |
|         return "数据头不正确"
 | |
| 
 | |
| 
 | |
|     # 读取长度信息
 | |
|     length_arr = arr[2:4][::-1]  # 反转字节
 | |
|     length = int.from_bytes(length_arr, byteorder='little', signed=True)  # 小端格式
 | |
|     
 | |
|     # 提取内容
 | |
|     body = arr[4:4 + length - 3]
 | |
| 
 | |
|     # 校验和检查
 | |
|     check_sum = get_checksum(body)
 | |
|     if check_sum != arr[length + 1]:
 | |
|         return "校验错误"
 | |
|         
 | |
| 
 | |
|     # 尾部标识检查
 | |
|     if arr[length + 2] != 0xFF or arr[length + 3] != 0xFE:
 | |
|         return "尾错误"
 | |
| 
 | |
|     content = body.decode('utf-8')
 | |
| 
 | |
|     res = json.loads(content)
 | |
|     
 | |
|     return res[0]
 | |
| 
 | |
| def get_tyy_data_t(host, port, tid):
 | |
|     cd_thread_key_id = f"cd_thread_{host}_{port}_id"
 | |
|     cd_thread_key_val = f"cd_thread_{host}_{port}_val"
 | |
|     sc = None
 | |
|     def connect_and_send(retry=1):
 | |
|         nonlocal sc
 | |
|         try:
 | |
|             if sc is None:
 | |
|                 sc = socket.socket()
 | |
|                 sc.connect((host, int(port)))
 | |
|             sc.sendall(b"R")
 | |
|         except BrokenPipeError:
 | |
|             if retry > 0:
 | |
|                 connect_and_send(retry-1)
 | |
|             else:
 | |
|                 if sc:
 | |
|                     try:
 | |
|                         sc.close()
 | |
|                     except Exception:
 | |
|                         pass
 | |
|                 sc = None
 | |
|         except OSError as e:
 | |
|             sc = None
 | |
|             cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"})
 | |
|         except ConnectionResetError:
 | |
|             sc = None
 | |
|             cache.set(cd_thread_key_val, {"err_msg": "采集器重置了连接"})
 | |
|         except socket.timeout:
 | |
|             sc = None
 | |
|             cache.set(cd_thread_key_val, {"err_msg": "采集器连接超时"})
 | |
|         except Exception as e:
 | |
|             sc = None
 | |
|             cache.set(cd_thread_key_val, {"err_msg": f"采集器连接失败-{str(e)}"})
 | |
| 
 | |
|     while cache.get(cd_thread_key_id) == tid:
 | |
|         if cache.get(cd_thread_key_val) == "get":
 | |
|             cache.set(cd_thread_key_val, "working")
 | |
|             connect_and_send()
 | |
|             if sc is None:
 | |
|                 continue
 | |
|             resp = sc.recv(1024)
 | |
|             res = handle_bytes(resp)
 | |
|             if isinstance(res, str):
 | |
|                 cache.set(cd_thread_key_val, {"err_msg": f'采集器返回数据错误-{res}'})
 | |
|             elif not res:
 | |
|                 cache.set(cd_thread_key_val, {"err_msg": f"采集器返回数据为空-{str(res)}"})
 | |
|             else:
 | |
|                 myLogger.info(f"采集器返回数据-{res}")
 | |
|                 cache.set(cd_thread_key_val, res)
 | |
|         time.sleep(0.3)
 | |
| 
 | |
|     if sc:
 | |
|         try:
 | |
|             sc.close()
 | |
|         except Exception:
 | |
|             pass
 | |
| 
 | |
| def get_tyy_data_2(*args, retry=1):
 | |
|     host, port = args[0], int(args[1])
 | |
|     cd_thread_key_id = f"cd_thread_{host}_{port}_id"
 | |
|     cd_thread_key_val = f"cd_thread_{host}_{port}_val"
 | |
|     cd_thread_val_id = cache.get(cd_thread_key_id, default=None)
 | |
|     if cd_thread_val_id is None:
 | |
|         tid = uuid.uuid4()
 | |
|         cache.set(cd_thread_key_id, tid, timeout=10800)
 | |
|         cd_thread = MyThread(target=get_tyy_data_t, args=(host, port, tid), daemon=True)
 | |
|         cd_thread.start()
 | |
|     cache.set(cd_thread_key_val, "get")
 | |
|     num = 0
 | |
|     get_val = False
 | |
| 
 | |
|     while True:
 | |
|         num += 1
 | |
|         if num > 8:
 | |
|             break
 | |
|         val = cache.get(cd_thread_key_val)
 | |
|         if isinstance(val, dict):
 | |
|             get_val = True
 | |
|             if "err_msg" in val:
 | |
|                 raise ParseError(val["err_msg"])
 | |
|             return val
 | |
|         time.sleep(0.3)
 | |
| 
 | |
|     if not get_val and retry > 0:
 | |
|         cache.set(cd_thread_key_id, None)
 | |
|         get_tyy_data_2(*args, retry=retry-1)
 | |
| 
 | |
| 
 | |
| sc_all = {}
 | |
| sc_lock = threading.Lock()
 | |
| 
 | |
| def get_tyy_data_1(*args, retry=1):
 | |
|     host, port = args[0], int(args[1])
 | |
|     global sc_all
 | |
|     sc = None
 | |
|     
 | |
|     def connect_and_send(retry=1):
 | |
|         nonlocal sc
 | |
|         sc = sc_all.get(f"{host}_{port}", None)
 | |
|         try:
 | |
|             if sc is None:
 | |
|                 sc = socket.socket()
 | |
|                 sc.settimeout(5)  # 设置超时
 | |
|                 sc.connect((host, port))
 | |
|                 sc_all[f"{host}_{port}"] = sc
 | |
|             else:
 | |
|                 # 清空接收缓冲区
 | |
|                 sc.settimeout(0.1)  # 设置短暂超时
 | |
|                 for _ in range(5):
 | |
|                     try:
 | |
|                         data = sc.recv(65536)
 | |
|                         if not data:
 | |
|                             break
 | |
|                     except (socket.timeout, BlockingIOError):
 | |
|                         break
 | |
|                 sc.settimeout(5)  # 恢复原超时设置
 | |
|             sc.sendall(b"R")
 | |
|         except BrokenPipeError:
 | |
|             if retry > 0:
 | |
|                 if sc:
 | |
|                     try:
 | |
|                         sc.close()
 | |
|                     except Exception:
 | |
|                         pass
 | |
|                     sc_all.pop(f"{host}_{port}", None)
 | |
|                 return connect_and_send(retry-1)
 | |
|             else:
 | |
|                 if sc:
 | |
|                     try:
 | |
|                         sc.close()
 | |
|                     except Exception:
 | |
|                         pass
 | |
|                     sc_all.pop(f"{host}_{port}", None)
 | |
|                 sc = None
 | |
|                 raise ParseError("采集器连接失败-管道重置")
 | |
|         except OSError as e:
 | |
|             if sc:
 | |
|                 try:
 | |
|                     sc.close()
 | |
|                 except Exception:
 | |
|                     pass
 | |
|                 sc_all.pop(f"{host}_{port}", None)
 | |
|             sc = None
 | |
|             raise ParseError(f"采集器连接失败-{str(e)}")
 | |
|         except TimeoutError as e:
 | |
|             if sc:
 | |
|                 try:
 | |
|                     sc.close()
 | |
|                 except Exception:
 | |
|                     pass
 | |
|                 sc_all.pop(f"{host}_{port}", None)
 | |
|             sc = None
 | |
|             raise ParseError(f"采集器连接超时-{str(e)}")
 | |
|     
 | |
|     with sc_lock:
 | |
|         connect_and_send()
 | |
|         resp = sc.recv(1024)
 | |
|         res = handle_bytes(resp)
 | |
|         # myLogger.error(res)
 | |
|         if isinstance(res, str):
 | |
|             raise ParseError(f'采集器返回数据错误-{res}')
 | |
|         else:
 | |
|             return res
 | |
| 
 | |
| 
 | |
| def get_tyy_data_3(*args, retry=2):
 | |
|     host, port = args[0], int(args[1])
 | |
|     for attempt in range(retry):
 | |
|         try:
 | |
|             # 每次请求都新建连接(确保无共享状态)
 | |
|             with socket.create_connection((host, port), timeout=10) as sc:
 | |
|                 sc.sendall(b"R")
 | |
|                 
 | |
|                 # 接收完整响应(避免数据不完整)
 | |
|                 # resp = b""
 | |
|                 # while True:
 | |
|                 #     chunk = sc.recv(4096)
 | |
|                 #     if not chunk:
 | |
|                 #         break
 | |
|                 #     resp += chunk
 | |
|                 resp = sc.recv(4096)
 | |
|                 if not resp:
 | |
|                     raise ParseError("设备未启动")
 | |
|                 
 | |
|                 res = handle_bytes(resp)
 | |
|                 if isinstance(res, str):
 | |
|                     raise ParseError(f"采集器返回数据错误: {res}")
 | |
|                 return res
 | |
|         except (socket.timeout, ConnectionError) as e:
 | |
|             if attempt == retry - 1:  # 最后一次尝试失败才报错
 | |
|                 raise ParseError(f"采集器连接失败: {str(e)}")
 | |
|             time.sleep(0.5)  # 失败后等待 1s 再重试
 | |
|         except ParseError:
 | |
|             raise
 | |
|         except Exception as e:
 | |
|             raise ParseError(f"未知错误: {str(e)}")
 | |
|         
 | |
| 
 | |
| def get_tyy_data(*args):
 | |
|     host, port = args[0], int(args[1])
 | |
|     r = requests.get(f"http://127.0.0.1:2300?host={host}&port={port}")
 | |
|     res = r.json()
 | |
|     if "err_msg" in res:
 | |
|         raise ParseError(res["err_msg"])
 | |
|     return res |