diff --git a/src/connection.rs b/src/connection.rs index f1635b7..b1e4210 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -97,6 +97,7 @@ pub struct ConnectionStatus { pub monitored_item_map: HashMap, // point_id -> monitored_item_id pub poll_points: Vec, // 正在轮询的点集合 poll_handle: Option>, // 统一的轮询任务句柄 + heartbeat_handle: Option>, // 心跳任务句柄 } #[derive(Clone)] @@ -106,6 +107,9 @@ pub struct ConnectionManager { point_history_data: Arc>>>, point_write_target_cache: Arc>>, event_manager: Option>, + pool: Option>, + reconnect_tx: Option>, + reconnect_rx: Arc>>>, } @@ -155,12 +159,16 @@ impl ConnectionManager { } pub fn new() -> Self { + let (reconnect_tx, reconnect_rx) = tokio::sync::mpsc::unbounded_channel::(); Self { status: Arc::new(RwLock::new(HashMap::new())), point_monitor_data: Arc::new(RwLock::new(HashMap::new())), point_history_data: Arc::new(RwLock::new(HashMap::new())), point_write_target_cache: Arc::new(RwLock::new(HashMap::new())), event_manager: None, + pool: None, + reconnect_tx: Some(reconnect_tx), + reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))), } } @@ -168,6 +176,38 @@ impl ConnectionManager { self.event_manager = Some(event_manager); } + pub fn set_pool(&mut self, pool: Arc) { + self.pool = Some(pool); + } + + pub fn set_reconnect_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender) { + self.reconnect_tx = Some(tx); + } + + pub fn get_reconnect_rx(&self) -> Option> { + self.reconnect_rx.lock().unwrap().take() + } + + pub fn start_reconnect_task(&self) { + let manager = self.clone(); + let pool = manager.pool.clone(); + tokio::spawn(async move { + // 获取重连通道的接收端 + let mut reconnect_rx = manager.get_reconnect_rx().expect("Failed to get reconnect receiver"); + + while let Some(source_id) = reconnect_rx.recv().await { + tracing::info!("Received reconnect request for source {}", source_id); + if let Some(ref pool) = pool { + if let Err(e) = manager.reconnect(pool, source_id).await { + tracing::error!("Failed to reconnect source {}: {}", source_id, e); + } + } else { + tracing::warn!("Pool not available for reconnection of source {}", source_id); + } + } + }); + } + pub async fn remove_point_write_target_cache_by_point_ids( &self, point_ids: &[Uuid], @@ -217,6 +257,67 @@ impl ConnectionManager { self.point_monitor_data.read().await } + async fn start_heartbeat_task(&self, source_id: Uuid) { + let manager = self.clone(); + + let handle = tokio::spawn(async move { + let mut ticker = tokio::time::interval(Duration::from_secs(30)); // 每30秒检测一次心跳 + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + ticker.tick().await; + + // 检查session是否有效 + let session_valid = { + let status = manager.status.read().await; + if let Some(conn_status) = status.get(&source_id) { + if let Some(session) = conn_status.session.as_ref() { + // 尝试读取当前时间来验证连接 + let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点 + let read_request = ReadValueId { + node_id, + attribute_id: AttributeId::Value as u32, + index_range: NumericRange::None, + data_encoding: Default::default(), + }; + + match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await { + Ok(_) => true, + Err(_) => false, + } + } else { + false + } + } else { + false + } + }; + + if !session_valid { + tracing::warn!( + "Heartbeat detected invalid session for source {}, triggering reconnection", + source_id + ); + + // 通过通道发送重连请求 + if let Some(tx) = manager.reconnect_tx.as_ref() { + if let Err(e) = tx.send(source_id) { + tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e); + } + } else { + tracing::warn!("Reconnect channel not available for source {}", source_id); + } + } + } + }); + + // 保存心跳任务句柄 + let mut status = self.status.write().await; + if let Some(conn_status) = status.get_mut(&source_id) { + conn_status.heartbeat_handle = Some(handle); + } + } + async fn start_unified_poll_task(&self, source_id: Uuid, session: Arc) { let event_manager = match self.event_manager.clone() { Some(em) => em, @@ -485,12 +586,16 @@ impl ConnectionManager { monitored_item_map: HashMap::new(), poll_points: Vec::new(), poll_handle: None, + heartbeat_handle: None, }, ); drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前 // 启动统一的轮询任务 self.start_unified_poll_task(source_id, session).await; + + // 启动心跳任务 + self.start_heartbeat_task(source_id).await; tracing::info!("Successfully connected to source {}", source_id); Ok(()) @@ -511,9 +616,22 @@ impl ConnectionManager { next_client_handle: 1000, poll_points: Vec::new(), poll_handle: None, + heartbeat_handle: None, }, ); } + pub async fn reconnect(&self, pool: &sqlx::PgPool, source_id: Uuid) -> Result<(), String> { + tracing::info!("Reconnecting to source {}", source_id); + + // 先断开连接 + if let Err(e) = self.disconnect(source_id).await { + tracing::error!("Failed to disconnect source {}: {}", source_id, e); + } + + // 再重新连接 + self.connect_from_source(pool, source_id).await + } + pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> { // 停止轮询任务并清空轮询点列表 { @@ -523,6 +641,10 @@ impl ConnectionManager { if let Some(handle) = conn_status.poll_handle.take() { handle.abort(); } + // 停止心跳任务 + if let Some(handle) = conn_status.heartbeat_handle.take() { + handle.abort(); + } } } @@ -551,6 +673,10 @@ impl ConnectionManager { if let Some(handle) = conn_status.poll_handle.take() { handle.abort(); } + // 停止心跳任务 + if let Some(handle) = conn_status.heartbeat_handle.take() { + handle.abort(); + } } } diff --git a/src/main.rs b/src/main.rs index 57c789e..4665288 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,7 +46,12 @@ async fn main() { Some(ws_manager.clone()), )); connection_manager.set_event_manager(event_manager.clone()); + connection_manager.set_pool(Arc::new(pool.clone())); + let connection_manager = Arc::new(connection_manager); + + // 启动重连任务 + connection_manager.start_reconnect_task(); // Connect to all enabled sources concurrently let sources = service::get_all_enabled_sources(&pool)