feat: 实现心跳检测和自动重连功能

This commit is contained in:
caoqianming 2026-03-05 13:27:13 +08:00
parent 494cf1d656
commit 0f37c9435e
2 changed files with 131 additions and 0 deletions

View File

@ -97,6 +97,7 @@ pub struct ConnectionStatus {
pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id
pub poll_points: Vec<PollPointInfo>, // 正在轮询的点集合 pub poll_points: Vec<PollPointInfo>, // 正在轮询的点集合
poll_handle: Option<JoinHandle<()>>, // 统一的轮询任务句柄 poll_handle: Option<JoinHandle<()>>, // 统一的轮询任务句柄
heartbeat_handle: Option<JoinHandle<()>>, // 心跳任务句柄
} }
#[derive(Clone)] #[derive(Clone)]
@ -106,6 +107,9 @@ pub struct ConnectionManager {
point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>, point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>,
point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>, point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>,
event_manager: Option<std::sync::Arc<crate::event::EventManager>>, event_manager: Option<std::sync::Arc<crate::event::EventManager>>,
pool: Option<Arc<sqlx::PgPool>>,
reconnect_tx: Option<tokio::sync::mpsc::UnboundedSender<Uuid>>,
reconnect_rx: Arc<std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>>>>,
} }
@ -155,12 +159,16 @@ impl ConnectionManager {
} }
pub fn new() -> Self { pub fn new() -> Self {
let (reconnect_tx, reconnect_rx) = tokio::sync::mpsc::unbounded_channel::<Uuid>();
Self { Self {
status: Arc::new(RwLock::new(HashMap::new())), status: Arc::new(RwLock::new(HashMap::new())),
point_monitor_data: Arc::new(RwLock::new(HashMap::new())), point_monitor_data: Arc::new(RwLock::new(HashMap::new())),
point_history_data: Arc::new(RwLock::new(HashMap::new())), point_history_data: Arc::new(RwLock::new(HashMap::new())),
point_write_target_cache: Arc::new(RwLock::new(HashMap::new())), point_write_target_cache: Arc::new(RwLock::new(HashMap::new())),
event_manager: None, event_manager: None,
pool: None,
reconnect_tx: Some(reconnect_tx),
reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))),
} }
} }
@ -168,6 +176,38 @@ impl ConnectionManager {
self.event_manager = Some(event_manager); self.event_manager = Some(event_manager);
} }
pub fn set_pool(&mut self, pool: Arc<sqlx::PgPool>) {
self.pool = Some(pool);
}
pub fn set_reconnect_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender<Uuid>) {
self.reconnect_tx = Some(tx);
}
pub fn get_reconnect_rx(&self) -> Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>> {
self.reconnect_rx.lock().unwrap().take()
}
pub fn start_reconnect_task(&self) {
let manager = self.clone();
let pool = manager.pool.clone();
tokio::spawn(async move {
// 获取重连通道的接收端
let mut reconnect_rx = manager.get_reconnect_rx().expect("Failed to get reconnect receiver");
while let Some(source_id) = reconnect_rx.recv().await {
tracing::info!("Received reconnect request for source {}", source_id);
if let Some(ref pool) = pool {
if let Err(e) = manager.reconnect(pool, source_id).await {
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
}
} else {
tracing::warn!("Pool not available for reconnection of source {}", source_id);
}
}
});
}
pub async fn remove_point_write_target_cache_by_point_ids( pub async fn remove_point_write_target_cache_by_point_ids(
&self, &self,
point_ids: &[Uuid], point_ids: &[Uuid],
@ -217,6 +257,67 @@ impl ConnectionManager {
self.point_monitor_data.read().await self.point_monitor_data.read().await
} }
async fn start_heartbeat_task(&self, source_id: Uuid) {
let manager = self.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(30)); // 每30秒检测一次心跳
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
// 检查session是否有效
let session_valid = {
let status = manager.status.read().await;
if let Some(conn_status) = status.get(&source_id) {
if let Some(session) = conn_status.session.as_ref() {
// 尝试读取当前时间来验证连接
let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点
let read_request = ReadValueId {
node_id,
attribute_id: AttributeId::Value as u32,
index_range: NumericRange::None,
data_encoding: Default::default(),
};
match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await {
Ok(_) => true,
Err(_) => false,
}
} else {
false
}
} else {
false
}
};
if !session_valid {
tracing::warn!(
"Heartbeat detected invalid session for source {}, triggering reconnection",
source_id
);
// 通过通道发送重连请求
if let Some(tx) = manager.reconnect_tx.as_ref() {
if let Err(e) = tx.send(source_id) {
tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e);
}
} else {
tracing::warn!("Reconnect channel not available for source {}", source_id);
}
}
}
});
// 保存心跳任务句柄
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
conn_status.heartbeat_handle = Some(handle);
}
}
async fn start_unified_poll_task(&self, source_id: Uuid, session: Arc<Session>) { async fn start_unified_poll_task(&self, source_id: Uuid, session: Arc<Session>) {
let event_manager = match self.event_manager.clone() { let event_manager = match self.event_manager.clone() {
Some(em) => em, Some(em) => em,
@ -485,12 +586,16 @@ impl ConnectionManager {
monitored_item_map: HashMap::new(), monitored_item_map: HashMap::new(),
poll_points: Vec::new(), poll_points: Vec::new(),
poll_handle: None, poll_handle: None,
heartbeat_handle: None,
}, },
); );
drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前 drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前
// 启动统一的轮询任务 // 启动统一的轮询任务
self.start_unified_poll_task(source_id, session).await; self.start_unified_poll_task(source_id, session).await;
// 启动心跳任务
self.start_heartbeat_task(source_id).await;
tracing::info!("Successfully connected to source {}", source_id); tracing::info!("Successfully connected to source {}", source_id);
Ok(()) Ok(())
@ -511,9 +616,22 @@ impl ConnectionManager {
next_client_handle: 1000, next_client_handle: 1000,
poll_points: Vec::new(), poll_points: Vec::new(),
poll_handle: None, poll_handle: None,
heartbeat_handle: None,
}, },
); );
} }
pub async fn reconnect(&self, pool: &sqlx::PgPool, source_id: Uuid) -> Result<(), String> {
tracing::info!("Reconnecting to source {}", source_id);
// 先断开连接
if let Err(e) = self.disconnect(source_id).await {
tracing::error!("Failed to disconnect source {}: {}", source_id, e);
}
// 再重新连接
self.connect_from_source(pool, source_id).await
}
pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> { pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> {
// 停止轮询任务并清空轮询点列表 // 停止轮询任务并清空轮询点列表
{ {
@ -523,6 +641,10 @@ impl ConnectionManager {
if let Some(handle) = conn_status.poll_handle.take() { if let Some(handle) = conn_status.poll_handle.take() {
handle.abort(); handle.abort();
} }
// 停止心跳任务
if let Some(handle) = conn_status.heartbeat_handle.take() {
handle.abort();
}
} }
} }
@ -551,6 +673,10 @@ impl ConnectionManager {
if let Some(handle) = conn_status.poll_handle.take() { if let Some(handle) = conn_status.poll_handle.take() {
handle.abort(); handle.abort();
} }
// 停止心跳任务
if let Some(handle) = conn_status.heartbeat_handle.take() {
handle.abort();
}
} }
} }

View File

@ -46,7 +46,12 @@ async fn main() {
Some(ws_manager.clone()), Some(ws_manager.clone()),
)); ));
connection_manager.set_event_manager(event_manager.clone()); connection_manager.set_event_manager(event_manager.clone());
connection_manager.set_pool(Arc::new(pool.clone()));
let connection_manager = Arc::new(connection_manager); let connection_manager = Arc::new(connection_manager);
// 启动重连任务
connection_manager.start_reconnect_task();
// Connect to all enabled sources concurrently // Connect to all enabled sources concurrently
let sources = service::get_all_enabled_sources(&pool) let sources = service::get_all_enabled_sources(&pool)