diff --git a/src/connection.rs b/src/connection.rs index 266b352..38349b9 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -702,11 +702,9 @@ impl ConnectionManager { // 再重新连接 let result = self.connect_from_source(pool, source_id).await; - // 清除重连标记 - if result.is_ok() { - let mut reconnecting = self.reconnecting.write().await; - reconnecting.remove(&source_id); - } + // 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连 + let mut reconnecting = self.reconnecting.write().await; + reconnecting.remove(&source_id); result } diff --git a/src/event.rs b/src/event.rs index b8bc298..5c11c52 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,5 +1,6 @@ use tokio::sync::mpsc; use uuid::Uuid; +use std::collections::HashMap; const EVENT_CHANNEL_CAPACITY: usize = 4096; @@ -43,127 +44,47 @@ impl EventManager { let connection_manager_clone = connection_manager.clone(); while let Some(event) = receiver.recv().await { match event { - ReloadEvent::SourceCreate { source_id } => { - tracing::info!("Processing SourceCreate event for {}", source_id); - if let Err(e) = connection_manager_clone.connect_from_source(&pool, source_id).await { - tracing::error!("Failed to connect to source {}: {}", source_id, e); - } - } - ReloadEvent::SourceUpdate { source_id } => { - tracing::info!("Processing SourceUpdate event for {}", source_id); - if let Err(e) = connection_manager_clone.reconnect(&pool, source_id).await { - tracing::error!("Failed to reconnect source {}: {}", source_id, e); - } - } - ReloadEvent::SourceDelete { source_id } => { - tracing::info!("Processing SourceDelete event for {}", source_id); - if let Err(e) = connection_manager_clone.disconnect(source_id).await { - tracing::error!("Failed to disconnect from source {}: {}", source_id, e); - } - } - ReloadEvent::PointCreateBatch { source_id, point_ids } => { - let requested_count = point_ids.len(); - match connection_manager_clone - .subscribe_points_from_source(source_id, Some(point_ids), &pool) - .await - { - Ok(stats) => { - let subscribed = *stats.get("subscribed").unwrap_or(&0); - let polled = *stats.get("polled").unwrap_or(&0); - let total = *stats.get("total").unwrap_or(&0); - tracing::info!( - "PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}", - source_id, - requested_count, - subscribed, - polled, - total - ); - } - Err(e) => { - tracing::error!("Failed to subscribe to points: {}", e); - } - } - } - ReloadEvent::PointDeleteBatch { source_id, point_ids } => { - tracing::info!( - "Processing PointDeleteBatch event for source {} with {} points", - source_id, - point_ids.len() - ); - if let Err(e) = connection_manager_clone - .unsubscribe_points_from_source(source_id, point_ids) - .await - { - tracing::error!("Failed to unsubscribe points: {}", e); - } - } ReloadEvent::PointNewValue(payload) => { - let source_id = payload.source_id; - let client_handle = payload.client_handle; - let point_id = if let Some(point_id) = payload.point_id { - Some(point_id) - } else { - let status = connection_manager_clone.get_status_read_guard().await; - status - .get(&source_id) - .and_then(|s| s.client_handle_map.get(&client_handle).copied()) - }; - if let Some(point_id) = point_id { - // 从缓存中读取旧值 - let (old_value, old_timestamp, value_changed) = { - let monitor_data = connection_manager_clone.get_point_monitor_data_read_guard().await; - let old_monitor_info = monitor_data.get(&point_id); + let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> = + HashMap::new(); + latest_by_key.insert((payload.source_id, payload.client_handle), payload); - if let Some(old_info) = old_monitor_info { - let changed = old_info.value != payload.value || - old_info.timestamp != payload.timestamp; - (old_info.value.clone(), old_info.timestamp, changed) - } else { - (None, None, false) - } - }; - - let monitor = crate::telemetry::PointMonitorInfo { - protocol: payload.protocol.clone(), - source_id, - point_id, - client_handle, - scan_mode: payload.scan_mode.clone(), - timestamp: payload.timestamp, - quality: payload.quality.clone(), - value: payload.value.clone(), - value_type: payload.value_type.clone(), - value_text: payload.value_text.clone(), - old_value, - old_timestamp, - value_changed, - }; - - // Process in event worker directly to avoid per-point spawn overhead. - if let Err(e) = connection_manager_clone - .update_point_monitor_data(monitor.clone()) - .await - { - tracing::error!( - "Failed to update point monitor data for point {}: {}", - point_id, - e - ); - } - - if let Some(ws_manager) = &ws_manager_clone { - let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); - if let Err(e) = ws_manager.send_to_public(ws_message).await { - tracing::error!( - "Failed to send WebSocket message to public room: {}", - e + loop { + match receiver.try_recv() { + Ok(ReloadEvent::PointNewValue(next_payload)) => { + latest_by_key.insert( + (next_payload.source_id, next_payload.client_handle), + next_payload, ); } + Ok(other_event) => { + handle_control_event( + other_event, + &pool, + &connection_manager_clone, + ) + .await; + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + break; + } + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + break; + } } - } else { - tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle); } + + for point_payload in latest_by_key.into_values() { + process_point_new_value( + point_payload, + &connection_manager_clone, + ws_manager_clone.as_ref(), + ) + .await; + } + } + other_event => { + handle_control_event(other_event, &pool, &connection_manager_clone).await; } } } @@ -193,3 +114,142 @@ impl EventManager { } } } + +async fn handle_control_event( + event: ReloadEvent, + pool: &sqlx::PgPool, + connection_manager: &std::sync::Arc, +) { + match event { + ReloadEvent::SourceCreate { source_id } => { + tracing::info!("Processing SourceCreate event for {}", source_id); + if let Err(e) = connection_manager.connect_from_source(pool, source_id).await { + tracing::error!("Failed to connect to source {}: {}", source_id, e); + } + } + ReloadEvent::SourceUpdate { source_id } => { + tracing::info!("Processing SourceUpdate event for {}", source_id); + if let Err(e) = connection_manager.reconnect(pool, source_id).await { + tracing::error!("Failed to reconnect source {}: {}", source_id, e); + } + } + ReloadEvent::SourceDelete { source_id } => { + tracing::info!("Processing SourceDelete event for {}", source_id); + if let Err(e) = connection_manager.disconnect(source_id).await { + tracing::error!("Failed to disconnect from source {}: {}", source_id, e); + } + } + ReloadEvent::PointCreateBatch { source_id, point_ids } => { + let requested_count = point_ids.len(); + match connection_manager + .subscribe_points_from_source(source_id, Some(point_ids), pool) + .await + { + Ok(stats) => { + let subscribed = *stats.get("subscribed").unwrap_or(&0); + let polled = *stats.get("polled").unwrap_or(&0); + let total = *stats.get("total").unwrap_or(&0); + tracing::info!( + "PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}", + source_id, + requested_count, + subscribed, + polled, + total + ); + } + Err(e) => { + tracing::error!("Failed to subscribe to points: {}", e); + } + } + } + ReloadEvent::PointDeleteBatch { source_id, point_ids } => { + tracing::info!( + "Processing PointDeleteBatch event for source {} with {} points", + source_id, + point_ids.len() + ); + if let Err(e) = connection_manager + .unsubscribe_points_from_source(source_id, point_ids) + .await + { + tracing::error!("Failed to unsubscribe points: {}", e); + } + } + ReloadEvent::PointNewValue(_) => {} + } +} + +async fn process_point_new_value( + payload: crate::telemetry::PointNewValue, + connection_manager: &std::sync::Arc, + ws_manager: Option<&std::sync::Arc>, +) { + let source_id = payload.source_id; + let client_handle = payload.client_handle; + let point_id = if let Some(point_id) = payload.point_id { + Some(point_id) + } else { + let status = connection_manager.get_status_read_guard().await; + status + .get(&source_id) + .and_then(|s| s.client_handle_map.get(&client_handle).copied()) + }; + if let Some(point_id) = point_id { + // 从缓存中读取旧值 + let (old_value, old_timestamp, value_changed) = { + let monitor_data = connection_manager.get_point_monitor_data_read_guard().await; + let old_monitor_info = monitor_data.get(&point_id); + + if let Some(old_info) = old_monitor_info { + let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp; + (old_info.value.clone(), old_info.timestamp, changed) + } else { + (None, None, false) + } + }; + + let monitor = crate::telemetry::PointMonitorInfo { + protocol: payload.protocol, + source_id, + point_id, + client_handle, + scan_mode: payload.scan_mode, + timestamp: payload.timestamp, + quality: payload.quality, + value: payload.value, + value_type: payload.value_type, + value_text: payload.value_text, + old_value, + old_timestamp, + value_changed, + }; + + if let Err(e) = connection_manager + .update_point_monitor_data(monitor.clone()) + .await + { + tracing::error!( + "Failed to update point monitor data for point {}: {}", + point_id, + e + ); + } + + if let Some(ws_manager) = ws_manager { + let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); + if let Err(e) = ws_manager.send_to_public(ws_message).await { + tracing::error!( + "Failed to send WebSocket message to public room: {}", + e + ); + } + } + } else { + tracing::warn!( + "Point not found for source {} client_handle {}", + source_id, + client_handle + ); + } +}