diff --git a/src/connection.rs b/src/connection.rs index 38349b9..9782d0e 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -436,7 +436,7 @@ impl ConnectionManager { .map(crate::telemetry::PointQuality::from_status_code) .unwrap_or(crate::telemetry::PointQuality::Unknown); - let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( + let _ = event_manager.send(crate::event::AppEvent::PointNewValue( crate::telemetry::PointNewValue { source_id, point_id: Some(poll_point.point_id), @@ -986,7 +986,7 @@ impl ConnectionManager { // Emit local updates only when the full batch succeeds. if let Some(event_manager) = &self.event_manager { for (source_id, point_id, variant) in success_events { - if let Err(e) = event_manager.send(crate::event::ReloadEvent::PointNewValue( + if let Err(e) = event_manager.send(crate::event::AppEvent::PointNewValue( crate::telemetry::PointNewValue { source_id, point_id: Some(point_id), @@ -1126,7 +1126,7 @@ impl ConnectionManager { .unwrap_or(crate::telemetry::PointQuality::Unknown); if let Some(event_manager) = &data_manager.event_manager { - let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( + let _ = event_manager.send(crate::event::AppEvent::PointNewValue( crate::telemetry::PointNewValue { source_id: current_source_id, point_id: None, diff --git a/src/event.rs b/src/event.rs index 5c11c52..7811af2 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,11 +1,12 @@ +use std::collections::HashMap; use tokio::sync::mpsc; use uuid::Uuid; -use std::collections::HashMap; -const EVENT_CHANNEL_CAPACITY: usize = 4096; +const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; +const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096; #[derive(Debug, Clone)] -pub enum ReloadEvent { +pub enum AppEvent { SourceCreate { source_id: Uuid, }, @@ -27,7 +28,8 @@ pub enum ReloadEvent { } pub struct EventManager { - sender: mpsc::Sender, + control_sender: mpsc::Sender, + telemetry_sender: mpsc::Sender, } impl EventManager { @@ -36,110 +38,112 @@ impl EventManager { connection_manager: std::sync::Arc, ws_manager: Option>, ) -> Self { - let (sender, mut receiver) = mpsc::channel::(EVENT_CHANNEL_CAPACITY); - let ws_manager_clone = ws_manager.clone(); + let (control_sender, mut control_receiver) = + mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); + let (telemetry_sender, mut telemetry_receiver) = + mpsc::channel::(TELEMETRY_EVENT_CHANNEL_CAPACITY); + let control_cm = connection_manager.clone(); + let control_pool = pool.clone(); tokio::spawn(async move { - // 在循环外克隆,避免在循环中移动 - let connection_manager_clone = connection_manager.clone(); - while let Some(event) = receiver.recv().await { - match event { - ReloadEvent::PointNewValue(payload) => { - let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> = - HashMap::new(); - latest_by_key.insert((payload.source_id, payload.client_handle), payload); + while let Some(event) = control_receiver.recv().await { + handle_control_event(event, &control_pool, &control_cm).await; + } + }); - loop { - match receiver.try_recv() { - Ok(ReloadEvent::PointNewValue(next_payload)) => { - latest_by_key.insert( - (next_payload.source_id, next_payload.client_handle), - next_payload, - ); - } - Ok(other_event) => { - handle_control_event( - other_event, - &pool, - &connection_manager_clone, - ) - .await; - } - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { - break; - } - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { - break; - } - } + let ws_manager_clone = ws_manager.clone(); + let telemetry_cm = connection_manager.clone(); + tokio::spawn(async move { + while let Some(payload) = telemetry_receiver.recv().await { + let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> = + HashMap::new(); + latest_by_key.insert((payload.source_id, payload.client_handle), payload); + + loop { + match telemetry_receiver.try_recv() { + Ok(next_payload) => { + latest_by_key.insert( + (next_payload.source_id, next_payload.client_handle), + next_payload, + ); } - - for point_payload in latest_by_key.into_values() { - process_point_new_value( - point_payload, - &connection_manager_clone, - ws_manager_clone.as_ref(), - ) - .await; + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + break; + } + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + break; } } - other_event => { - handle_control_event(other_event, &pool, &connection_manager_clone).await; - } + } + + for point_payload in latest_by_key.into_values() { + process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref()) + .await; } } }); - Self { sender } + Self { + control_sender, + telemetry_sender, + } } - pub fn send(&self, event: ReloadEvent) -> Result<(), String> { - match self.sender.try_send(event) { - Ok(()) => Ok(()), - Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { - Err(format!("Failed to send event: channel closed ({e:?})")) - } - Err(tokio::sync::mpsc::error::TrySendError::Full(ReloadEvent::PointNewValue(payload))) => { - // High-frequency telemetry is lossy by design under sustained pressure. - tracing::warn!( - "Dropping PointNewValue due to full event queue: source={}, client_handle={}", - payload.source_id, - payload.client_handle - ); - Ok(()) - } - Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => { - Err(format!("Failed to send event: queue full ({e:?})")) - } + pub fn send(&self, event: AppEvent) -> Result<(), String> { + match event { + AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) { + Ok(()) => Ok(()), + Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { + Err(format!("Failed to send telemetry event: channel closed ({e:?})")) + } + Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => { + // High-frequency telemetry is lossy by design under sustained pressure. + tracing::warn!( + "Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}", + payload.source_id, + payload.client_handle + ); + Ok(()) + } + }, + control_event => match self.control_sender.try_send(control_event) { + Ok(()) => Ok(()), + Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { + Err(format!("Failed to send control event: channel closed ({e:?})")) + } + Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => { + Err(format!("Failed to send control event: queue full ({e:?})")) + } + }, } } } async fn handle_control_event( - event: ReloadEvent, + event: AppEvent, pool: &sqlx::PgPool, connection_manager: &std::sync::Arc, ) { match event { - ReloadEvent::SourceCreate { source_id } => { + AppEvent::SourceCreate { source_id } => { tracing::info!("Processing SourceCreate event for {}", source_id); if let Err(e) = connection_manager.connect_from_source(pool, source_id).await { tracing::error!("Failed to connect to source {}: {}", source_id, e); } } - ReloadEvent::SourceUpdate { source_id } => { + AppEvent::SourceUpdate { source_id } => { tracing::info!("Processing SourceUpdate event for {}", source_id); if let Err(e) = connection_manager.reconnect(pool, source_id).await { tracing::error!("Failed to reconnect source {}: {}", source_id, e); } } - ReloadEvent::SourceDelete { source_id } => { + AppEvent::SourceDelete { source_id } => { tracing::info!("Processing SourceDelete event for {}", source_id); if let Err(e) = connection_manager.disconnect(source_id).await { tracing::error!("Failed to disconnect from source {}: {}", source_id, e); } } - ReloadEvent::PointCreateBatch { source_id, point_ids } => { + AppEvent::PointCreateBatch { source_id, point_ids } => { let requested_count = point_ids.len(); match connection_manager .subscribe_points_from_source(source_id, Some(point_ids), pool) @@ -163,7 +167,7 @@ async fn handle_control_event( } } } - ReloadEvent::PointDeleteBatch { source_id, point_ids } => { + AppEvent::PointDeleteBatch { source_id, point_ids } => { tracing::info!( "Processing PointDeleteBatch event for source {} with {} points", source_id, @@ -176,7 +180,9 @@ async fn handle_control_event( tracing::error!("Failed to unsubscribe points: {}", e); } } - ReloadEvent::PointNewValue(_) => {} + AppEvent::PointNewValue(_) => { + tracing::warn!("PointNewValue routed to control worker unexpectedly"); + } } } @@ -239,7 +245,7 @@ async fn process_point_new_value( if let Some(ws_manager) = ws_manager { let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); if let Err(e) = ws_manager.send_to_public(ws_message).await { - tracing::error!( + tracing::warn!( "Failed to send WebSocket message to public room: {}", e ); diff --git a/src/handler/point.rs b/src/handler/point.rs index 51e6899..d8d6835 100644 --- a/src/handler/point.rs +++ b/src/handler/point.rs @@ -247,7 +247,7 @@ pub async fn delete_point( if let Some(source_id) = source_id { if let Err(e) = state .event_manager - .send(crate::event::ReloadEvent::PointDeleteBatch { + .send(crate::event::AppEvent::PointDeleteBatch { source_id, point_ids: vec![point_id], }) @@ -365,7 +365,7 @@ pub async fn batch_create_points( let point_ids: Vec = points.into_iter().map(|p| p.point_id).collect(); if let Err(e) = state .event_manager - .send(crate::event::ReloadEvent::PointCreateBatch { source_id, point_ids }) + .send(crate::event::AppEvent::PointCreateBatch { source_id, point_ids }) { tracing::error!("Failed to send PointCreateBatch event: {}", e); } @@ -427,7 +427,7 @@ pub async fn batch_delete_points( let ids: Vec = points.into_iter().map(|p| p.point_id).collect(); if let Err(e) = state .event_manager - .send(crate::event::ReloadEvent::PointDeleteBatch { + .send(crate::event::AppEvent::PointDeleteBatch { source_id, point_ids: ids, }) diff --git a/src/handler/source.rs b/src/handler/source.rs index effa3fd..069f044 100644 --- a/src/handler/source.rs +++ b/src/handler/source.rs @@ -225,7 +225,7 @@ pub async fn create_source( .await?; // 触发 SourceCreate 事件 - let _ = state.event_manager.send(crate::event::ReloadEvent::SourceCreate { source_id: new_id }); + let _ = state.event_manager.send(crate::event::AppEvent::SourceCreate { source_id: new_id }); Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id }))) } @@ -300,7 +300,7 @@ pub async fn update_source( qb.push(" WHERE id = ").push_bind(source_id); qb.build().execute(pool).await?; - let _ = state.event_manager.send(crate::event::ReloadEvent::SourceUpdate { source_id }); + let _ = state.event_manager.send(crate::event::AppEvent::SourceUpdate { source_id }); Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"}))) } @@ -323,7 +323,7 @@ pub async fn delete_source( } // 触发 SourceDelete 事件 - let _ = state.event_manager.send(crate::event::ReloadEvent::SourceDelete { source_id }); + let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id }); Ok(StatusCode::NO_CONTENT) }