From 1317271e16c740f15a93281b0fb723b4adf69394 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 21 Apr 2026 15:30:45 +0800 Subject: [PATCH] refactor(core): centralize telemetry, connection management, and event sink in platform Move TelemetryProcessor (PointNewValue batching/dedup/broadcast) from feeder to plc_platform_core. PlatformBuilder.build() now auto-wires telemetry processing and reconnect task. PlatformContext.emit_event() handles connection management side effects (connect/reconnect/disconnect/subscribe/unsubscribe) directly. Simplify PlatformEventSink trait from 6 methods to single on_event(). Feeder's AppEvent now only contains business events; FeederPlatformEventSink only handles UnitsChanged for control runtime. Co-Authored-By: Claude Opus 4.6 --- crates/app_feeder_distributor/src/app.rs | 18 +- crates/app_feeder_distributor/src/event.rs | 316 ++---------------- crates/plc_platform_core/src/bootstrap.rs | 17 +- crates/plc_platform_core/src/lib.rs | 1 + .../plc_platform_core/src/platform_context.rs | 101 ++++-- .../src/telemetry_processor.rs | 148 ++++++++ 6 files changed, 273 insertions(+), 328 deletions(-) create mode 100644 crates/plc_platform_core/src/telemetry_processor.rs diff --git a/crates/app_feeder_distributor/src/app.rs b/crates/app_feeder_distributor/src/app.rs index 04e8c81..9e7c1ca 100644 --- a/crates/app_feeder_distributor/src/app.rs +++ b/crates/app_feeder_distributor/src/app.rs @@ -44,27 +44,22 @@ pub async fn run() { }; let config = AppConfig::from_env().expect("Failed to load configuration"); - let mut builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url) + let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url) .await .expect("Failed to bootstrap platform"); - let event_manager = Arc::new(EventManager::new( - builder.pool.clone(), - Arc::new(builder.connection_manager.clone()), - Some(builder.ws_manager.clone()), - )); - - builder.connection_manager.set_event_manager(event_manager.clone()); - builder.connection_manager.set_pool_and_start_reconnect_task(Arc::new(builder.pool.clone())); - let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new()); let event_sink = Arc::new(crate::event::FeederPlatformEventSink::new( - event_manager.clone(), control_runtime.clone(), )); let platform = builder.build().with_event_sink(event_sink); + let event_manager = Arc::new(EventManager::new( + platform.pool.clone(), + Some(platform.ws_manager.clone()), + )); + let sources = crate::service::get_all_enabled_sources(&platform.pool) .await .expect("Failed to fetch sources"); @@ -143,7 +138,6 @@ pub fn test_state() -> AppState { let ws_manager = Arc::new(WebSocketManager::new()); let event_manager = Arc::new(EventManager::new( pool.clone(), - connection_manager.clone(), Some(ws_manager.clone()), )); let platform = PlatformContext::new(pool, connection_manager, ws_manager); diff --git a/crates/app_feeder_distributor/src/event.rs b/crates/app_feeder_distributor/src/event.rs index af5d00c..64eddb2 100644 --- a/crates/app_feeder_distributor/src/event.rs +++ b/crates/app_feeder_distributor/src/event.rs @@ -1,32 +1,14 @@ -use std::collections::HashMap; use plc_platform_core::event::EventEnvelope; use tokio::sync::mpsc; use uuid::Uuid; use plc_platform_core::model::EventRecord; const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; -const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096; +/// Feeder-specific business events only. +/// Platform events (source/point lifecycle) are handled by core's emit_event(). #[derive(Debug, Clone)] pub enum AppEvent { - SourceCreate { - source_id: Uuid, - }, - SourceUpdate { - source_id: Uuid, - }, - SourceDelete { - source_id: Uuid, - source_name: String, - }, - PointCreateBatch { - source_id: Uuid, - point_ids: Vec, - }, - PointDeleteBatch { - source_id: Uuid, - point_ids: Vec, - }, EquipmentStartCommandSent { equipment_id: Uuid, unit_id: Option, @@ -46,244 +28,98 @@ pub enum AppEvent { RemLocal { unit_id: Uuid, equipment_id: Uuid }, RemRecovered { unit_id: Uuid }, UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String }, - PointNewValue(crate::telemetry::PointNewValue), } pub struct EventManager { control_sender: mpsc::Sender, - telemetry_sender: mpsc::Sender, } impl EventManager { pub fn new( pool: sqlx::PgPool, - connection_manager: std::sync::Arc, ws_manager: Option>, ) -> Self { let (control_sender, mut control_receiver) = mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); - let (telemetry_sender, mut telemetry_receiver) = - mpsc::channel::(TELEMETRY_EVENT_CHANNEL_CAPACITY); - let control_cm = connection_manager.clone(); let control_pool = pool.clone(); let control_ws_manager = ws_manager.clone(); tokio::spawn(async move { while let Some(event) = control_receiver.recv().await { - handle_control_event(event, &control_pool, &control_cm, control_ws_manager.as_ref()) - .await; + handle_control_event(event, &control_pool, control_ws_manager.as_ref()).await; } }); - let ws_manager_clone = ws_manager.clone(); - let telemetry_cm = connection_manager.clone(); - tokio::spawn(async move { - while let Some(payload) = telemetry_receiver.recv().await { - let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> = - HashMap::new(); - latest_by_key.insert((payload.source_id, payload.client_handle), payload); - - loop { - match telemetry_receiver.try_recv() { - Ok(next_payload) => { - latest_by_key.insert( - (next_payload.source_id, next_payload.client_handle), - next_payload, - ); - } - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { - break; - } - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { - break; - } - } - } - - for point_payload in latest_by_key.into_values() { - process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref()) - .await; - } - } - }); - - Self { - control_sender, - telemetry_sender, - } + Self { control_sender } } pub fn send(&self, event: AppEvent) -> Result<(), String> { - match event { - AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) { - Ok(()) => Ok(()), - Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { - Err(format!("Failed to send telemetry event: channel closed ({e:?})")) - } - Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => { - // High-frequency telemetry is lossy by design under sustained pressure. - tracing::warn!( - "Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}", - payload.source_id, - payload.client_handle - ); - Ok(()) - } - }, - control_event => match self.control_sender.try_send(control_event) { - Ok(()) => Ok(()), - Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { - Err(format!("Failed to send control event: channel closed ({e:?})")) - } - Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => { - Err(format!("Failed to send control event: queue full ({e:?})")) - } - }, + match self.control_sender.try_send(event) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Closed(e)) => { + Err(format!("Failed to send control event: channel closed ({e:?})")) + } + Err(mpsc::error::TrySendError::Full(e)) => { + Err(format!("Failed to send control event: queue full ({e:?})")) + } } } } -/// Adapter that bridges platform handler events to feeder's EventManager + ControlRuntime. +/// Bridges platform events to feeder-specific side effects. +/// Connection management and telemetry are handled by core automatically. pub struct FeederPlatformEventSink { - event_manager: std::sync::Arc, control_runtime: std::sync::Arc, } impl FeederPlatformEventSink { pub fn new( - event_manager: std::sync::Arc, control_runtime: std::sync::Arc, ) -> Self { - Self { event_manager, control_runtime } + Self { control_runtime } } } impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEventSink { - fn on_source_created(&self, source_id: Uuid) { - let _ = self.event_manager.send(AppEvent::SourceCreate { source_id }); - } - - fn on_source_updated(&self, source_id: Uuid) { - let _ = self.event_manager.send(AppEvent::SourceUpdate { source_id }); - } - - fn on_source_deleted(&self, source_id: Uuid, source_name: String) { - let _ = self.event_manager.send(AppEvent::SourceDelete { source_id, source_name }); - } - - fn on_points_created(&self, source_id: Uuid, point_ids: Vec) { - let _ = self.event_manager.send(AppEvent::PointCreateBatch { source_id, point_ids }); - } - - fn on_points_deleted(&self, source_id: Uuid, point_ids: Vec) { - let _ = self.event_manager.send(AppEvent::PointDeleteBatch { source_id, point_ids }); - } - - fn on_units_changed(&self, unit_ids: Vec) { - let runtime = self.control_runtime.clone(); - tokio::spawn(async move { - for unit_id in unit_ids { - runtime.notify_unit(unit_id).await; + fn on_event(&self, event: &plc_platform_core::event::PlatformEvent) { + match event { + plc_platform_core::event::PlatformEvent::UnitsChanged { unit_ids } => { + let runtime = self.control_runtime.clone(); + let ids = unit_ids.clone(); + tokio::spawn(async move { + for unit_id in ids { + runtime.notify_unit(unit_id).await; + } + }); } - }); - } -} - -impl plc_platform_core::connection::PointEventSink for EventManager { - fn send_point_new_value( - &self, - payload: plc_platform_core::telemetry::PointNewValue, - ) -> Result<(), String> { - self.send(AppEvent::PointNewValue(payload)) + // Other platform events: connection management handled by core. + _ => {} + } } } async fn handle_control_event( event: AppEvent, pool: &sqlx::PgPool, - connection_manager: &std::sync::Arc, ws_manager: Option<&std::sync::Arc>, ) { persist_event_if_needed(&event, pool, ws_manager).await; match event { - AppEvent::SourceCreate { source_id } => { - tracing::info!("Processing SourceCreate event for {}", source_id); - if let Err(e) = connection_manager.connect_from_source(pool, source_id).await { - tracing::error!("Failed to connect to source {}: {}", source_id, e); - } - } - AppEvent::SourceUpdate { source_id } => { - tracing::info!("Processing SourceUpdate event for {}", source_id); - if let Err(e) = connection_manager.reconnect(pool, source_id).await { - tracing::error!("Failed to reconnect source {}: {}", source_id, e); - } - } - AppEvent::SourceDelete { source_id, .. } => { - tracing::info!("Processing SourceDelete event for {}", source_id); - if let Err(e) = connection_manager.disconnect(source_id).await { - tracing::error!("Failed to disconnect from source {}: {}", source_id, e); - } - } - AppEvent::PointCreateBatch { source_id, point_ids } => { - let requested_count = point_ids.len(); - match connection_manager - .subscribe_points_from_source(source_id, Some(point_ids), pool) - .await - { - Ok(stats) => { - let subscribed = *stats.get("subscribed").unwrap_or(&0); - let polled = *stats.get("polled").unwrap_or(&0); - let total = *stats.get("total").unwrap_or(&0); - tracing::info!( - "PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}", - source_id, - requested_count, - subscribed, - polled, - total - ); - } - Err(e) => { - tracing::error!("Failed to subscribe to points: {}", e); - } - } - } - AppEvent::PointDeleteBatch { source_id, point_ids } => { - tracing::info!( - "Processing PointDeleteBatch event for source {} with {} points", - source_id, - point_ids.len() - ); - if let Err(e) = connection_manager - .unsubscribe_points_from_source(source_id, point_ids) - .await - { - tracing::error!("Failed to unsubscribe points: {}", e); - } - } AppEvent::EquipmentStartCommandSent { - equipment_id, - unit_id, - point_id, + equipment_id, unit_id, point_id, } => { tracing::info!( "Equipment start command sent: equipment={}, unit={:?}, point={}", - equipment_id, - unit_id, - point_id + equipment_id, unit_id, point_id ); } AppEvent::EquipmentStopCommandSent { - equipment_id, - unit_id, - point_id, + equipment_id, unit_id, point_id, } => { tracing::info!( "Equipment stop command sent: equipment={}, unit={:?}, point={}", - equipment_id, - unit_id, - point_id + equipment_id, unit_id, point_id ); } AppEvent::AutoControlStarted { unit_id } => { @@ -313,9 +149,6 @@ async fn handle_control_event( AppEvent::UnitStateChanged { unit_id, from_state, to_state } => { tracing::info!("Unit {} state: {} -> {}", unit_id, from_state, to_state); } - AppEvent::PointNewValue(_) => { - tracing::warn!("PointNewValue routed to control worker unexpectedly"); - } } } @@ -345,13 +178,6 @@ async fn persist_event_if_needed( ws_manager: Option<&std::sync::Arc>, ) { let record: Option<(&str, &str, Option, Option, Option, String, serde_json::Value)> = match event { - // Platform events — persistence is handled by core's emit_event(). - AppEvent::SourceCreate { .. } => None, - AppEvent::SourceUpdate { .. } => None, - AppEvent::SourceDelete { .. } => None, - AppEvent::PointCreateBatch { .. } => None, - AppEvent::PointDeleteBatch { .. } => None, - // Feeder-specific events — persisted here. AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => { let code = fetch_equipment_code(pool, *equipment_id).await; Some(( @@ -453,10 +279,10 @@ async fn persist_event_if_needed( )) } AppEvent::UnitStateChanged { .. } => None, - AppEvent::PointNewValue(_) => None, }; - let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record else { + let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record + else { return; }; let envelope = EventEnvelope::new(event_type, payload); @@ -492,77 +318,3 @@ async fn persist_event_if_needed( } } } - -async fn process_point_new_value( - payload: crate::telemetry::PointNewValue, - connection_manager: &std::sync::Arc, - ws_manager: Option<&std::sync::Arc>, -) { - let source_id = payload.source_id; - let client_handle = payload.client_handle; - let point_id = if let Some(point_id) = payload.point_id { - Some(point_id) - } else { - let status = connection_manager.get_status_read_guard().await; - status - .get(&source_id) - .and_then(|s| s.client_handle_map.get(&client_handle).copied()) - }; - if let Some(point_id) = point_id { - // Read the previous value from the in-memory cache. - let (old_value, old_timestamp, value_changed) = { - let monitor_data = connection_manager.get_point_monitor_data_read_guard().await; - let old_monitor_info = monitor_data.get(&point_id); - - if let Some(old_info) = old_monitor_info { - let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp; - (old_info.value.clone(), old_info.timestamp, changed) - } else { - (None, None, false) - } - }; - - let monitor = crate::telemetry::PointMonitorInfo { - protocol: payload.protocol, - source_id, - point_id, - client_handle, - scan_mode: payload.scan_mode, - timestamp: payload.timestamp, - quality: payload.quality, - value: payload.value, - value_type: payload.value_type, - value_text: payload.value_text, - old_value, - old_timestamp, - value_changed, - }; - - if let Err(e) = connection_manager - .update_point_monitor_data(monitor.clone()) - .await - { - tracing::error!( - "Failed to update point monitor data for point {}: {}", - point_id, - e - ); - } - - if let Some(ws_manager) = ws_manager { - let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); - if let Err(e) = ws_manager.send_to_public(ws_message).await { - tracing::warn!( - "Failed to send WebSocket message to public room: {}", - e - ); - } - } - } else { - tracing::warn!( - "Point not found for source {} client_handle {}", - source_id, - client_handle - ); - } -} diff --git a/crates/plc_platform_core/src/bootstrap.rs b/crates/plc_platform_core/src/bootstrap.rs index 66506b9..9cba1a8 100644 --- a/crates/plc_platform_core/src/bootstrap.rs +++ b/crates/plc_platform_core/src/bootstrap.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::connection::ConnectionManager; use crate::db::init_database; use crate::platform_context::PlatformContext; +use crate::telemetry_processor::TelemetryProcessor; use crate::websocket::WebSocketManager; pub struct PlatformBuilder { @@ -12,7 +13,21 @@ pub struct PlatformBuilder { } impl PlatformBuilder { - pub fn build(self) -> PlatformContext { + /// Finalize the platform: wire up telemetry processing, reconnect task, + /// and wrap everything into `PlatformContext`. + pub fn build(mut self) -> PlatformContext { + // Telemetry processor: handles PointNewValue batching, monitor updates, WS broadcast. + let cm_for_telemetry = Arc::new(self.connection_manager.clone()); + let telemetry_processor = Arc::new(TelemetryProcessor::new( + cm_for_telemetry, + self.ws_manager.clone(), + )); + self.connection_manager.set_event_manager(telemetry_processor); + + // Start reconnect task (auto-reconnects on connection loss). + self.connection_manager + .set_pool_and_start_reconnect_task(Arc::new(self.pool.clone())); + PlatformContext::new( self.pool, Arc::new(self.connection_manager), diff --git a/crates/plc_platform_core/src/lib.rs b/crates/plc_platform_core/src/lib.rs index c796b97..b748bc1 100644 --- a/crates/plc_platform_core/src/lib.rs +++ b/crates/plc_platform_core/src/lib.rs @@ -8,6 +8,7 @@ pub mod model; pub mod platform_context; pub mod service; pub mod telemetry; +pub mod telemetry_processor; pub mod util; pub mod websocket; diff --git a/crates/plc_platform_core/src/platform_context.rs b/crates/plc_platform_core/src/platform_context.rs index 31f10ad..43c3fc9 100644 --- a/crates/plc_platform_core/src/platform_context.rs +++ b/crates/plc_platform_core/src/platform_context.rs @@ -1,24 +1,18 @@ use std::sync::Arc; -use uuid::Uuid; - use crate::connection::ConnectionManager; use crate::event::PlatformEvent; use crate::websocket::WebSocketManager; /// Callback interface for app-specific side effects on platform events. /// -/// Platform-level concerns (event persistence, WebSocket broadcast) are handled -/// automatically by `PlatformContext::emit_event()`. Implementations of this trait -/// only need to handle app-specific behavior (e.g. connection management, -/// control runtime notifications). +/// Platform-level concerns (event persistence, WebSocket broadcast, connection +/// management) are handled automatically by `PlatformContext::emit_event()`. +/// Implementations only need to handle truly app-specific behavior +/// (e.g. feeder's control runtime notifications). pub trait PlatformEventSink: Send + Sync { - fn on_source_created(&self, source_id: Uuid); - fn on_source_updated(&self, source_id: Uuid); - fn on_source_deleted(&self, source_id: Uuid, source_name: String); - fn on_points_created(&self, source_id: Uuid, point_ids: Vec); - fn on_points_deleted(&self, source_id: Uuid, point_ids: Vec); - fn on_units_changed(&self, unit_ids: Vec); + /// Called for every platform event. Override to add app-specific side effects. + fn on_event(&self, event: &PlatformEvent); } #[derive(Clone)] @@ -48,47 +42,88 @@ impl PlatformContext { self } - /// Emit a platform event: persists to DB, broadcasts via WebSocket, - /// then notifies the app-specific event sink for side effects. + /// Emit a platform event. + /// + /// Synchronously notifies the app-specific sink, then spawns async work for: + /// - Event persistence to DB + WebSocket broadcast + /// - Connection management side effects (connect, subscribe, etc.) pub fn emit_event(&self, event: PlatformEvent) { - // Notify app-specific sink synchronously (fire-and-forget via channels). + // 1. Notify app-specific sink synchronously (fire-and-forget via channels). if let Some(sink) = &self.event_sink { + sink.on_event(&event); + } + + // 2. Spawn async: persistence + WS broadcast + connection management. + let pool = self.pool.clone(); + let ws_manager = self.ws_manager.clone(); + let cm = self.connection_manager.clone(); + tokio::spawn(async move { + // Persist + broadcast. + crate::event::persist_and_broadcast(&event, &pool, &ws_manager).await; + + // Connection management side effects. match &event { PlatformEvent::SourceCreated { source_id } => { - sink.on_source_created(*source_id); + tracing::info!("Processing SourceCreated for {}", source_id); + if let Err(e) = cm.connect_from_source(&pool, *source_id).await { + tracing::error!("Failed to connect to source {}: {}", source_id, e); + } } PlatformEvent::SourceUpdated { source_id } => { - sink.on_source_updated(*source_id); + tracing::info!("Processing SourceUpdated for {}", source_id); + if let Err(e) = cm.reconnect(&pool, *source_id).await { + tracing::error!("Failed to reconnect source {}: {}", source_id, e); + } } - PlatformEvent::SourceDeleted { - source_id, - source_name, - } => { - sink.on_source_deleted(*source_id, source_name.clone()); + PlatformEvent::SourceDeleted { source_id, .. } => { + tracing::info!("Processing SourceDeleted for {}", source_id); + if let Err(e) = cm.disconnect(*source_id).await { + tracing::error!("Failed to disconnect source {}: {}", source_id, e); + } } PlatformEvent::PointsCreated { source_id, point_ids, } => { - sink.on_points_created(*source_id, point_ids.clone()); + let requested_count = point_ids.len(); + match cm + .subscribe_points_from_source(*source_id, Some(point_ids.clone()), &pool) + .await + { + Ok(stats) => { + let subscribed = *stats.get("subscribed").unwrap_or(&0); + let polled = *stats.get("polled").unwrap_or(&0); + let total = *stats.get("total").unwrap_or(&0); + tracing::info!( + "PointsCreated subscribe for source {}: requested={}, subscribed={}, polled={}, total={}", + source_id, requested_count, subscribed, polled, total + ); + } + Err(e) => { + tracing::error!("Failed to subscribe points: {}", e); + } + } } PlatformEvent::PointsDeleted { source_id, point_ids, } => { - sink.on_points_deleted(*source_id, point_ids.clone()); + tracing::info!( + "Processing PointsDeleted for source {} ({} points)", + source_id, + point_ids.len() + ); + if let Err(e) = cm + .unsubscribe_points_from_source(*source_id, point_ids.clone()) + .await + { + tracing::error!("Failed to unsubscribe points: {}", e); + } } - PlatformEvent::UnitsChanged { unit_ids } => { - sink.on_units_changed(unit_ids.clone()); + PlatformEvent::UnitsChanged { .. } => { + // No platform-level side effect; app handles via on_event(). } } - } - - // Spawn async persistence + WebSocket broadcast. - let pool = self.pool.clone(); - let ws_manager = self.ws_manager.clone(); - tokio::spawn(async move { - crate::event::persist_and_broadcast(&event, &pool, &ws_manager).await; }); } } diff --git a/crates/plc_platform_core/src/telemetry_processor.rs b/crates/plc_platform_core/src/telemetry_processor.rs new file mode 100644 index 0000000..a28ee06 --- /dev/null +++ b/crates/plc_platform_core/src/telemetry_processor.rs @@ -0,0 +1,148 @@ +//! Platform-level telemetry event processor. +//! +//! Handles `PointNewValue` events from `ConnectionManager`: +//! - Batches/deduplicates high-frequency telemetry data +//! - Updates point monitor data in `ConnectionManager` +//! - Broadcasts changes via WebSocket + +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::mpsc; +use uuid::Uuid; + +use crate::connection::{ConnectionManager, PointEventSink}; +use crate::telemetry::{PointMonitorInfo, PointNewValue}; +use crate::websocket::{WebSocketManager, WsMessage}; + +const TELEMETRY_CHANNEL_CAPACITY: usize = 4096; + +pub struct TelemetryProcessor { + sender: mpsc::Sender, +} + +impl TelemetryProcessor { + pub fn new( + connection_manager: Arc, + ws_manager: Arc, + ) -> Self { + let (sender, mut receiver) = mpsc::channel::(TELEMETRY_CHANNEL_CAPACITY); + + tokio::spawn(async move { + while let Some(payload) = receiver.recv().await { + // Batch: drain all pending messages, keeping only the latest per (source, handle). + let mut latest_by_key: HashMap<(Uuid, u32), PointNewValue> = HashMap::new(); + latest_by_key.insert((payload.source_id, payload.client_handle), payload); + + loop { + match receiver.try_recv() { + Ok(next) => { + latest_by_key.insert((next.source_id, next.client_handle), next); + } + Err(mpsc::error::TryRecvError::Empty) => break, + Err(mpsc::error::TryRecvError::Disconnected) => break, + } + } + + for point_payload in latest_by_key.into_values() { + process_point_new_value(point_payload, &connection_manager, &ws_manager).await; + } + } + }); + + Self { sender } + } +} + +impl PointEventSink for TelemetryProcessor { + fn send_point_new_value(&self, payload: PointNewValue) -> Result<(), String> { + match self.sender.try_send(payload) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Closed(e)) => { + Err(format!( + "Telemetry channel closed ({e:?})" + )) + } + Err(mpsc::error::TrySendError::Full(payload)) => { + tracing::warn!( + "Dropping PointNewValue due to full telemetry queue: source={}, handle={}", + payload.source_id, + payload.client_handle + ); + Ok(()) + } + } + } +} + +async fn process_point_new_value( + payload: PointNewValue, + connection_manager: &Arc, + ws_manager: &Arc, +) { + let source_id = payload.source_id; + let client_handle = payload.client_handle; + + let point_id = if let Some(point_id) = payload.point_id { + Some(point_id) + } else { + let status = connection_manager.get_status_read_guard().await; + status + .get(&source_id) + .and_then(|s| s.client_handle_map.get(&client_handle).copied()) + }; + + let Some(point_id) = point_id else { + tracing::warn!( + "Point not found for source {} client_handle {}", + source_id, + client_handle + ); + return; + }; + + // Read the previous value from the in-memory cache. + let (old_value, old_timestamp, value_changed) = { + let monitor_data = connection_manager.get_point_monitor_data_read_guard().await; + let old_monitor_info = monitor_data.get(&point_id); + + if let Some(old_info) = old_monitor_info { + let changed = + old_info.value != payload.value || old_info.timestamp != payload.timestamp; + (old_info.value.clone(), old_info.timestamp, changed) + } else { + (None, None, false) + } + }; + + let monitor = PointMonitorInfo { + protocol: payload.protocol, + source_id, + point_id, + client_handle, + scan_mode: payload.scan_mode, + timestamp: payload.timestamp, + quality: payload.quality, + value: payload.value, + value_type: payload.value_type, + value_text: payload.value_text, + old_value, + old_timestamp, + value_changed, + }; + + if let Err(e) = connection_manager + .update_point_monitor_data(monitor.clone()) + .await + { + tracing::error!( + "Failed to update point monitor data for point {}: {}", + point_id, + e + ); + } + + let ws_message = WsMessage::PointNewValue(monitor); + if let Err(e) = ws_manager.send_to_public(ws_message).await { + tracing::warn!("Failed to send WebSocket message to public room: {}", e); + } +}