use tokio::sync::mpsc; use uuid::Uuid; #[derive(Debug, Clone)] pub enum ReloadEvent { SourceCreate { source_id: Uuid, }, SourceUpdate { source_id: Uuid, }, SourceDelete { source_id: Uuid, }, PointCreate { source_id: Uuid, point_id: Uuid, }, PointCreateBatch { source_id: Uuid, point_ids: Vec, }, PointDeleteBatch { source_id: Uuid, point_ids: Vec, }, PointNewValue(crate::telemetry::PointNewValue), } pub struct EventManager { sender: mpsc::UnboundedSender, } impl EventManager { pub fn new( pool: sqlx::PgPool, connection_manager: std::sync::Arc, ws_manager: Option>, ) -> Self { let (sender, mut receiver) = mpsc::unbounded_channel::(); let ws_manager_clone = ws_manager.clone(); tokio::spawn(async move { while let Some(event) = receiver.recv().await { match event { ReloadEvent::SourceCreate { source_id } => { tracing::info!("Processing SourceCreate event for {}", source_id); if let Err(e) = connection_manager.connect_from_source(&pool, source_id).await { tracing::error!("Failed to connect to source {}: {}", source_id, e); } } ReloadEvent::SourceUpdate { source_id } => { tracing::info!("SourceUpdate event for {}: not implemented yet", source_id); } ReloadEvent::SourceDelete { source_id } => { tracing::info!("Processing SourceDelete event for {}", source_id); if let Err(e) = connection_manager.disconnect(source_id).await { tracing::error!("Failed to disconnect from source {}: {}", source_id, e); } } ReloadEvent::PointCreate { source_id, point_id } => { match connection_manager .subscribe_points_from_source(source_id, Some(vec![point_id]), &pool) .await { Ok(stats) => { let subscribed = *stats.get("subscribed").unwrap_or(&0); let polled = *stats.get("polled").unwrap_or(&0); let total = *stats.get("total").unwrap_or(&0); tracing::info!( "PointCreate subscribe finished for source {} point {}: subscribed={}, polled={}, total={}", source_id, point_id, subscribed, polled, total ); } Err(e) => { tracing::error!("Failed to subscribe to point {}: {}", point_id, e); } } } ReloadEvent::PointCreateBatch { source_id, point_ids } => { let requested_count = point_ids.len(); match connection_manager .subscribe_points_from_source(source_id, Some(point_ids), &pool) .await { Ok(stats) => { let subscribed = *stats.get("subscribed").unwrap_or(&0); let polled = *stats.get("polled").unwrap_or(&0); let total = *stats.get("total").unwrap_or(&0); tracing::info!( "PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}", source_id, requested_count, subscribed, polled, total ); } Err(e) => { tracing::error!("Failed to subscribe to points: {}", e); } } } ReloadEvent::PointDeleteBatch { source_id, point_ids } => { tracing::info!( "Processing PointDeleteBatch event for source {} with {} points", source_id, point_ids.len() ); if let Err(e) = connection_manager .unsubscribe_points_from_source(source_id, point_ids) .await { tracing::error!("Failed to unsubscribe points: {}", e); } } ReloadEvent::PointNewValue(payload) => { let source_id = payload.source_id; let client_handle = payload.client_handle; let point_id = if let Some(point_id) = payload.point_id { Some(point_id) } else { let status = connection_manager.get_status_read_guard().await; status .get(&source_id) .and_then(|s| s.client_handle_map.get(&client_handle).copied()) }; if let Some(point_id) = point_id { // 从缓存中读取旧值 let (old_value, old_timestamp, value_changed) = { let monitor_data = connection_manager.get_point_monitor_data_read_guard().await; let old_monitor_info = monitor_data.get(&point_id); if let Some(old_info) = old_monitor_info { let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp; (old_info.value.clone(), old_info.timestamp, changed) } else { (None, None, false) } }; let monitor = crate::telemetry::PointMonitorInfo { protocol: payload.protocol.clone(), source_id, point_id, client_handle, scan_mode: payload.scan_mode.clone(), timestamp: payload.timestamp, quality: payload.quality.clone(), value: payload.value.clone(), value_type: payload.value_type.clone(), value_text: payload.value_text.clone(), old_value, old_timestamp, value_changed, }; // 只克隆一次 monitor,减少内存分配 let monitor_clone = monitor.clone(); if let Err(e) = connection_manager.update_point_monitor_data(monitor_clone).await { tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e); } if let Some(ws_manager) = &ws_manager_clone { let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); if let Err(e) = ws_manager.send_to_public(ws_message).await { tracing::error!("Failed to send WebSocket message to public room: {}", e); } // 暂时注释掉 send_to_client,因为现在信息只需发送到 public // if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await { // tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e); // } } } else { tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle); } } } } }); Self { sender } } pub fn send(&self, event: ReloadEvent) -> Result<(), String> { self.sender .send(event) .map_err(|e| format!("Failed to send event: {}", e)) } }