diff --git a/src/connection.rs b/src/connection.rs index 0ccf42e..6f2b34f 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -110,6 +110,7 @@ pub struct ConnectionManager { pool: Option>, reconnect_tx: Option>, reconnect_rx: Arc>>>, + reconnecting: Arc>>, } @@ -169,6 +170,7 @@ impl ConnectionManager { pool: None, reconnect_tx: Some(reconnect_tx), reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))), + reconnecting: Arc::new(RwLock::new(HashSet::new())), } } @@ -301,18 +303,27 @@ impl ConnectionManager { }; if !session_valid { - tracing::warn!( - "Heartbeat detected invalid session for source {}, triggering reconnection", - source_id - ); + // 检查是否已经在重连中 + let mut reconnecting = manager.reconnecting.write().await; + if !reconnecting.contains(&source_id) { + reconnecting.insert(source_id); + drop(reconnecting); + + tracing::warn!( + "Heartbeat detected invalid session for source {}, triggering reconnection", + source_id + ); - // 通过通道发送重连请求 - if let Some(tx) = manager.reconnect_tx.as_ref() { - if let Err(e) = tx.send(source_id) { - tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e); + // 通过通道发送重连请求 + if let Some(tx) = manager.reconnect_tx.as_ref() { + if let Err(e) = tx.send(source_id) { + tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e); + } + } else { + tracing::warn!("Reconnect channel not available for source {}", source_id); } } else { - tracing::warn!("Reconnect channel not available for source {}", source_id); + drop(reconnecting); } } } @@ -636,7 +647,15 @@ impl ConnectionManager { } // 再重新连接 - self.connect_from_source(pool, source_id).await + let result = self.connect_from_source(pool, source_id).await; + + // 清除重连标记 + if result.is_ok() { + let mut reconnecting = self.reconnecting.write().await; + reconnecting.remove(&source_id); + } + + result } pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> { diff --git a/src/event.rs b/src/event.rs index 10319b3..270d2ac 100644 --- a/src/event.rs +++ b/src/event.rs @@ -37,11 +37,13 @@ impl EventManager { let ws_manager_clone = ws_manager.clone(); tokio::spawn(async move { + // 在循环外克隆,避免在循环中移动 + let connection_manager_clone = connection_manager.clone(); while let Some(event) = receiver.recv().await { match event { ReloadEvent::SourceCreate { source_id } => { tracing::info!("Processing SourceCreate event for {}", source_id); - if let Err(e) = connection_manager.connect_from_source(&pool, source_id).await { + if let Err(e) = connection_manager_clone.connect_from_source(&pool, source_id).await { tracing::error!("Failed to connect to source {}: {}", source_id, e); } } @@ -50,13 +52,13 @@ impl EventManager { } ReloadEvent::SourceDelete { source_id } => { tracing::info!("Processing SourceDelete event for {}", source_id); - if let Err(e) = connection_manager.disconnect(source_id).await { + if let Err(e) = connection_manager_clone.disconnect(source_id).await { tracing::error!("Failed to disconnect from source {}: {}", source_id, e); } } ReloadEvent::PointCreateBatch { source_id, point_ids } => { let requested_count = point_ids.len(); - match connection_manager + match connection_manager_clone .subscribe_points_from_source(source_id, Some(point_ids), &pool) .await { @@ -84,7 +86,7 @@ impl EventManager { source_id, point_ids.len() ); - if let Err(e) = connection_manager + if let Err(e) = connection_manager_clone .unsubscribe_points_from_source(source_id, point_ids) .await { @@ -97,7 +99,7 @@ impl EventManager { let point_id = if let Some(point_id) = payload.point_id { Some(point_id) } else { - let status = connection_manager.get_status_read_guard().await; + let status = connection_manager_clone.get_status_read_guard().await; status .get(&source_id) .and_then(|s| s.client_handle_map.get(&client_handle).copied()) @@ -105,7 +107,7 @@ impl EventManager { if let Some(point_id) = point_id { // 从缓存中读取旧值 let (old_value, old_timestamp, value_changed) = { - let monitor_data = connection_manager.get_point_monitor_data_read_guard().await; + let monitor_data = connection_manager_clone.get_point_monitor_data_read_guard().await; let old_monitor_info = monitor_data.get(&point_id); if let Some(old_info) = old_monitor_info { @@ -138,16 +140,18 @@ impl EventManager { let monitor_for_db = monitor.clone(); // 并行执行 update_point_monitor_data 和 send_to_public,不等待完成 + let cm_clone = connection_manager_clone.clone(); tokio::spawn(async move { // 更新监控数据 - if let Err(e) = connection_manager.update_point_monitor_data(monitor_for_db).await { + if let Err(e) = cm_clone.update_point_monitor_data(monitor_for_db).await { tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e); } }); + let ws_clone = ws_manager_clone.clone(); tokio::spawn(async move { // 发送WebSocket消息 - if let Some(ws_manager) = &ws_manager_clone { + if let Some(ws_manager) = ws_clone { let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws); if let Err(e) = ws_manager.send_to_public(ws_message).await { tracing::error!("Failed to send WebSocket message to public room: {}", e);