From 503aefc4cbf816c40ba8717c28ca73d1492f2d59 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Fri, 13 Mar 2026 14:44:30 +0800 Subject: [PATCH] refactor(event): rename ReloadEvent to AppEvent and split event channels Clarify event semantics by renaming ReloadEvent to AppEvent and route control vs telemetry traffic through dedicated channels. This keeps control events isolated from high-frequency PointNewValue updates while preserving the existing send() call pattern. Made-with: Cursor --- src/connection.rs | 6 +- src/event.rs | 156 ++++++++++++++++++++++-------------------- src/handler/point.rs | 6 +- src/handler/source.rs | 6 +- 4 files changed, 90 insertions(+), 84 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 38349b9..9782d0e 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -436,7 +436,7 @@ impl ConnectionManager { .map(crate::telemetry::PointQuality::from_status_code) .unwrap_or(crate::telemetry::PointQuality::Unknown); - let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( + let _ = event_manager.send(crate::event::AppEvent::PointNewValue( crate::telemetry::PointNewValue { source_id, point_id: Some(poll_point.point_id), @@ -986,7 +986,7 @@ impl ConnectionManager { // Emit local updates only when the full batch succeeds. if let Some(event_manager) = &self.event_manager { for (source_id, point_id, variant) in success_events { - if let Err(e) = event_manager.send(crate::event::ReloadEvent::PointNewValue( + if let Err(e) = event_manager.send(crate::event::AppEvent::PointNewValue( crate::telemetry::PointNewValue { source_id, point_id: Some(point_id), @@ -1126,7 +1126,7 @@ impl ConnectionManager { .unwrap_or(crate::telemetry::PointQuality::Unknown); if let Some(event_manager) = &data_manager.event_manager { - let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( + let _ = event_manager.send(crate::event::AppEvent::PointNewValue( crate::telemetry::PointNewValue { source_id: current_source_id, point_id: None, diff --git a/src/event.rs b/src/event.rs index 5c11c52..7811af2 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,11 +1,12 @@ +use std::collections::HashMap; use tokio::sync::mpsc; use uuid::Uuid; -use std::collections::HashMap; -const EVENT_CHANNEL_CAPACITY: usize = 4096; +const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; +const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096; #[derive(Debug, Clone)] -pub enum ReloadEvent { +pub enum AppEvent { SourceCreate { source_id: Uuid, }, @@ -27,7 +28,8 @@ pub enum ReloadEvent { } pub struct EventManager { - sender: mpsc::Sender, + control_sender: mpsc::Sender, + telemetry_sender: mpsc::Sender, } impl EventManager { @@ -36,110 +38,112 @@ impl EventManager { connection_manager: std::sync::Arc, ws_manager: Option>, ) -> Self { - let (sender, mut receiver) = mpsc::channel::(EVENT_CHANNEL_CAPACITY); - let ws_manager_clone = ws_manager.clone(); + let (control_sender, mut control_receiver) = + mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); + let (telemetry_sender, mut telemetry_receiver) = + mpsc::channel::(TELEMETRY_EVENT_CHANNEL_CAPACITY); + let control_cm = connection_manager.clone(); + let control_pool = pool.clone(); tokio::spawn(async move { - // 在循环外克隆,避免在循环中移动 - let connection_manager_clone = connection_manager.clone(); - while let Some(event) = receiver.recv().await { - match event { - ReloadEvent::PointNewValue(payload) => { - let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> = - HashMap::new(); - latest_by_key.insert((payload.source_id, payload.client_handle), payload); + while let Some(event) = control_receiver.recv().await { + handle_control_event(event, &control_pool, &control_cm).await; + } + }); - loop { - match receiver.try_recv() { - Ok(ReloadEvent::PointNewValue(next_payload)) => { - latest_by_key.insert( - (next_payload.source_id, next_payload.client_handle), - next_payload, - ); - } - Ok(other_event) => { - handle_control_event( - other_event, - &pool, - &connection_manager_clone, - ) - .await; - } - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { - break; - } - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { - break; - } - } + let ws_manager_clone = ws_manager.clone(); + let telemetry_cm = connection_manager.clone(); + tokio::spawn(async move { + while let Some(payload) = telemetry_receiver.recv().await { + let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> = + HashMap::new(); + latest_by_key.insert((payload.source_id, payload.client_handle), payload); + + loop { + match telemetry_receiver.try_recv() { + Ok(next_payload) => { + latest_by_key.insert( + (next_payload.source_id, next_payload.client_handle), + next_payload, + ); } - - for point_payload in latest_by_key.into_values() { - process_point_new_value( - point_payload, - &connection_manager_clone, - ws_manager_clone.as_ref(), - ) - .await; + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + break; + } + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + break; } } - other_event => { - handle_control_event(other_event, &pool, &connection_manager_clone).await; - } + } + + for point_payload in latest_by_key.into_values() { + process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref()) + .await; } } }); - Self { sender } + Self { + control_sender, + telemetry_sender, + } } - pub fn send(&self, event: ReloadEvent) -> Result<(), String> { - match self.sender.try_send(event) { - Ok(()) => Ok(()), - Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { - Err(format!("Failed to send event: channel closed ({e:?})")) - } - Err(tokio::sync::mpsc::error::TrySendError::Full(ReloadEvent::PointNewValue(payload))) => { - // High-frequency telemetry is lossy by design under sustained pressure. - tracing::warn!( - "Dropping PointNewValue due to full event queue: source={}, client_handle={}", - payload.source_id, - payload.client_handle - ); - Ok(()) - } - Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => { - Err(format!("Failed to send event: queue full ({e:?})")) - } + pub fn send(&self, event: AppEvent) -> Result<(), String> { + match event { + AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) { + Ok(()) => Ok(()), + Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { + Err(format!("Failed to send telemetry event: channel closed ({e:?})")) + } + Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => { + // High-frequency telemetry is lossy by design under sustained pressure. + tracing::warn!( + "Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}", + payload.source_id, + payload.client_handle + ); + Ok(()) + } + }, + control_event => match self.control_sender.try_send(control_event) { + Ok(()) => Ok(()), + Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { + Err(format!("Failed to send control event: channel closed ({e:?})")) + } + Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => { + Err(format!("Failed to send control event: queue full ({e:?})")) + } + }, } } } async fn handle_control_event( - event: ReloadEvent, + event: AppEvent, pool: &sqlx::PgPool, connection_manager: &std::sync::Arc, ) { match event { - ReloadEvent::SourceCreate { source_id } => { + AppEvent::SourceCreate { source_id } => { tracing::info!("Processing SourceCreate event for {}", source_id); if let Err(e) = connection_manager.connect_from_source(pool, source_id).await { tracing::error!("Failed to connect to source {}: {}", source_id, e); } } - ReloadEvent::SourceUpdate { source_id } => { + AppEvent::SourceUpdate { source_id } => { tracing::info!("Processing SourceUpdate event for {}", source_id); if let Err(e) = connection_manager.reconnect(pool, source_id).await { tracing::error!("Failed to reconnect source {}: {}", source_id, e); } } - ReloadEvent::SourceDelete { source_id } => { + AppEvent::SourceDelete { source_id } => { tracing::info!("Processing SourceDelete event for {}", source_id); if let Err(e) = connection_manager.disconnect(source_id).await { tracing::error!("Failed to disconnect from source {}: {}", source_id, e); } } - ReloadEvent::PointCreateBatch { source_id, point_ids } => { + AppEvent::PointCreateBatch { source_id, point_ids } => { let requested_count = point_ids.len(); match connection_manager .subscribe_points_from_source(source_id, Some(point_ids), pool) @@ -163,7 +167,7 @@ async fn handle_control_event( } } } - ReloadEvent::PointDeleteBatch { source_id, point_ids } => { + AppEvent::PointDeleteBatch { source_id, point_ids } => { tracing::info!( "Processing PointDeleteBatch event for source {} with {} points", source_id, @@ -176,7 +180,9 @@ async fn handle_control_event( tracing::error!("Failed to unsubscribe points: {}", e); } } - ReloadEvent::PointNewValue(_) => {} + AppEvent::PointNewValue(_) => { + tracing::warn!("PointNewValue routed to control worker unexpectedly"); + } } } @@ -239,7 +245,7 @@ async fn process_point_new_value( if let Some(ws_manager) = ws_manager { let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); if let Err(e) = ws_manager.send_to_public(ws_message).await { - tracing::error!( + tracing::warn!( "Failed to send WebSocket message to public room: {}", e ); diff --git a/src/handler/point.rs b/src/handler/point.rs index 51e6899..d8d6835 100644 --- a/src/handler/point.rs +++ b/src/handler/point.rs @@ -247,7 +247,7 @@ pub async fn delete_point( if let Some(source_id) = source_id { if let Err(e) = state .event_manager - .send(crate::event::ReloadEvent::PointDeleteBatch { + .send(crate::event::AppEvent::PointDeleteBatch { source_id, point_ids: vec![point_id], }) @@ -365,7 +365,7 @@ pub async fn batch_create_points( let point_ids: Vec = points.into_iter().map(|p| p.point_id).collect(); if let Err(e) = state .event_manager - .send(crate::event::ReloadEvent::PointCreateBatch { source_id, point_ids }) + .send(crate::event::AppEvent::PointCreateBatch { source_id, point_ids }) { tracing::error!("Failed to send PointCreateBatch event: {}", e); } @@ -427,7 +427,7 @@ pub async fn batch_delete_points( let ids: Vec = points.into_iter().map(|p| p.point_id).collect(); if let Err(e) = state .event_manager - .send(crate::event::ReloadEvent::PointDeleteBatch { + .send(crate::event::AppEvent::PointDeleteBatch { source_id, point_ids: ids, }) diff --git a/src/handler/source.rs b/src/handler/source.rs index effa3fd..069f044 100644 --- a/src/handler/source.rs +++ b/src/handler/source.rs @@ -225,7 +225,7 @@ pub async fn create_source( .await?; // 触发 SourceCreate 事件 - let _ = state.event_manager.send(crate::event::ReloadEvent::SourceCreate { source_id: new_id }); + let _ = state.event_manager.send(crate::event::AppEvent::SourceCreate { source_id: new_id }); Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id }))) } @@ -300,7 +300,7 @@ pub async fn update_source( qb.push(" WHERE id = ").push_bind(source_id); qb.build().execute(pool).await?; - let _ = state.event_manager.send(crate::event::ReloadEvent::SourceUpdate { source_id }); + let _ = state.event_manager.send(crate::event::AppEvent::SourceUpdate { source_id }); Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"}))) } @@ -323,7 +323,7 @@ pub async fn delete_source( } // 触发 SourceDelete 事件 - let _ = state.event_manager.send(crate::event::ReloadEvent::SourceDelete { source_id }); + let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id }); Ok(StatusCode::NO_CONTENT) }