use std::collections::HashMap; use plc_platform_core::event::EventEnvelope; use tokio::sync::mpsc; use uuid::Uuid; use plc_platform_core::model::EventRecord; const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096; #[derive(Debug, Clone)] pub enum AppEvent { SourceCreate { source_id: Uuid, }, SourceUpdate { source_id: Uuid, }, SourceDelete { source_id: Uuid, source_name: String, }, PointCreateBatch { source_id: Uuid, point_ids: Vec, }, PointDeleteBatch { source_id: Uuid, point_ids: Vec, }, EquipmentStartCommandSent { equipment_id: Uuid, unit_id: Option, point_id: Uuid, }, EquipmentStopCommandSent { equipment_id: Uuid, unit_id: Option, point_id: Uuid, }, AutoControlStarted { unit_id: Uuid }, AutoControlStopped { unit_id: Uuid }, FaultLocked { unit_id: Uuid, equipment_id: Uuid }, FaultAcked { unit_id: Uuid }, CommLocked { unit_id: Uuid }, CommRecovered { unit_id: Uuid }, RemLocal { unit_id: Uuid, equipment_id: Uuid }, RemRecovered { unit_id: Uuid }, UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String }, PointNewValue(crate::telemetry::PointNewValue), } pub struct EventManager { control_sender: mpsc::Sender, telemetry_sender: mpsc::Sender, } impl EventManager { pub fn new( pool: sqlx::PgPool, connection_manager: std::sync::Arc, ws_manager: Option>, ) -> Self { let (control_sender, mut control_receiver) = mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); let (telemetry_sender, mut telemetry_receiver) = mpsc::channel::(TELEMETRY_EVENT_CHANNEL_CAPACITY); let control_cm = connection_manager.clone(); let control_pool = pool.clone(); let control_ws_manager = ws_manager.clone(); tokio::spawn(async move { while let Some(event) = control_receiver.recv().await { handle_control_event(event, &control_pool, &control_cm, control_ws_manager.as_ref()) .await; } }); let ws_manager_clone = ws_manager.clone(); let telemetry_cm = connection_manager.clone(); tokio::spawn(async move { while let Some(payload) = telemetry_receiver.recv().await { let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> = HashMap::new(); latest_by_key.insert((payload.source_id, payload.client_handle), payload); loop { match telemetry_receiver.try_recv() { Ok(next_payload) => { latest_by_key.insert( (next_payload.source_id, next_payload.client_handle), next_payload, ); } Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { break; } Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { break; } } } for point_payload in latest_by_key.into_values() { process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref()) .await; } } }); Self { control_sender, telemetry_sender, } } pub fn send(&self, event: AppEvent) -> Result<(), String> { match event { AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) { Ok(()) => Ok(()), Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { Err(format!("Failed to send telemetry event: channel closed ({e:?})")) } Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => { // High-frequency telemetry is lossy by design under sustained pressure. tracing::warn!( "Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}", payload.source_id, payload.client_handle ); Ok(()) } }, control_event => match self.control_sender.try_send(control_event) { Ok(()) => Ok(()), Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { Err(format!("Failed to send control event: channel closed ({e:?})")) } Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => { Err(format!("Failed to send control event: queue full ({e:?})")) } }, } } } impl plc_platform_core::connection::PointEventSink for EventManager { fn send_point_new_value( &self, payload: plc_platform_core::telemetry::PointNewValue, ) -> Result<(), String> { self.send(AppEvent::PointNewValue(payload)) } } async fn handle_control_event( event: AppEvent, pool: &sqlx::PgPool, connection_manager: &std::sync::Arc, ws_manager: Option<&std::sync::Arc>, ) { persist_event_if_needed(&event, pool, ws_manager).await; match event { AppEvent::SourceCreate { source_id } => { tracing::info!("Processing SourceCreate event for {}", source_id); if let Err(e) = connection_manager.connect_from_source(pool, source_id).await { tracing::error!("Failed to connect to source {}: {}", source_id, e); } } AppEvent::SourceUpdate { source_id } => { tracing::info!("Processing SourceUpdate event for {}", source_id); if let Err(e) = connection_manager.reconnect(pool, source_id).await { tracing::error!("Failed to reconnect source {}: {}", source_id, e); } } AppEvent::SourceDelete { source_id, .. } => { tracing::info!("Processing SourceDelete event for {}", source_id); if let Err(e) = connection_manager.disconnect(source_id).await { tracing::error!("Failed to disconnect from source {}: {}", source_id, e); } } AppEvent::PointCreateBatch { source_id, point_ids } => { let requested_count = point_ids.len(); match connection_manager .subscribe_points_from_source(source_id, Some(point_ids), pool) .await { Ok(stats) => { let subscribed = *stats.get("subscribed").unwrap_or(&0); let polled = *stats.get("polled").unwrap_or(&0); let total = *stats.get("total").unwrap_or(&0); tracing::info!( "PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}", source_id, requested_count, subscribed, polled, total ); } Err(e) => { tracing::error!("Failed to subscribe to points: {}", e); } } } AppEvent::PointDeleteBatch { source_id, point_ids } => { tracing::info!( "Processing PointDeleteBatch event for source {} with {} points", source_id, point_ids.len() ); if let Err(e) = connection_manager .unsubscribe_points_from_source(source_id, point_ids) .await { tracing::error!("Failed to unsubscribe points: {}", e); } } AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id, } => { tracing::info!( "Equipment start command sent: equipment={}, unit={:?}, point={}", equipment_id, unit_id, point_id ); } AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id, } => { tracing::info!( "Equipment stop command sent: equipment={}, unit={:?}, point={}", equipment_id, unit_id, point_id ); } AppEvent::AutoControlStarted { unit_id } => { tracing::info!("Auto control started for unit {}", unit_id); } AppEvent::AutoControlStopped { unit_id } => { tracing::info!("Auto control stopped for unit {}", unit_id); } AppEvent::FaultLocked { unit_id, equipment_id } => { tracing::warn!("Fault locked: unit={}, equipment={}", unit_id, equipment_id); } AppEvent::FaultAcked { unit_id } => { tracing::info!("Fault acked for unit {}", unit_id); } AppEvent::CommLocked { unit_id } => { tracing::warn!("Comm locked for unit {}", unit_id); } AppEvent::CommRecovered { unit_id } => { tracing::info!("Comm recovered for unit {}", unit_id); } AppEvent::RemLocal { unit_id, equipment_id } => { tracing::warn!("REM local: unit={}, equipment={}", unit_id, equipment_id); } AppEvent::RemRecovered { unit_id } => { tracing::info!("REM recovered for unit {}", unit_id); } AppEvent::UnitStateChanged { unit_id, from_state, to_state } => { tracing::info!("Unit {} state: {} -> {}", unit_id, from_state, to_state); } AppEvent::PointNewValue(_) => { tracing::warn!("PointNewValue routed to control worker unexpectedly"); } } } async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String { sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") .bind(id) .fetch_optional(pool) .await .ok() .flatten() .unwrap_or_else(|| id.to_string()) } async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String { sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1") .bind(id) .fetch_optional(pool) .await .ok() .flatten() .unwrap_or_else(|| id.to_string()) } async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String { sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1") .bind(id) .fetch_optional(pool) .await .ok() .flatten() .unwrap_or_else(|| id.to_string()) } async fn persist_event_if_needed( event: &AppEvent, pool: &sqlx::PgPool, ws_manager: Option<&std::sync::Arc>, ) { let record = match event { AppEvent::SourceCreate { source_id } => { let name = fetch_source_name(pool, *source_id).await; Some(( "source.created", "info", None, None, Some(*source_id), format!("Source {} created", name), serde_json::json!({ "source_id": source_id }), )) } AppEvent::SourceUpdate { source_id } => { let name = fetch_source_name(pool, *source_id).await; Some(( "source.updated", "info", None, None, Some(*source_id), format!("Source {} updated", name), serde_json::json!({ "source_id": source_id }), )) } AppEvent::SourceDelete { source_id, source_name } => Some(( "source.deleted", "warn", None, None, None, format!("Source {} deleted", source_name), serde_json::json!({ "source_id": source_id }), )), AppEvent::PointCreateBatch { source_id, point_ids } => { let name = fetch_source_name(pool, *source_id).await; Some(( "point.batch_created", "info", None, None, Some(*source_id), format!("Created {} points for source {}", point_ids.len(), name), serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), )) } AppEvent::PointDeleteBatch { source_id, point_ids } => { let name = fetch_source_name(pool, *source_id).await; Some(( "point.batch_deleted", "warn", None, None, Some(*source_id), format!("Deleted {} points for source {}", point_ids.len(), name), serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), )) } AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => { let code = fetch_equipment_code(pool, *equipment_id).await; Some(( "equipment.start_command_sent", "info", *unit_id, Some(*equipment_id), None, format!("Start command sent to equipment {}", code), serde_json::json!({ "equipment_id": equipment_id, "unit_id": unit_id, "point_id": point_id }), )) } AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id } => { let code = fetch_equipment_code(pool, *equipment_id).await; Some(( "equipment.stop_command_sent", "info", *unit_id, Some(*equipment_id), None, format!("Stop command sent to equipment {}", code), serde_json::json!({ "equipment_id": equipment_id, "unit_id": unit_id, "point_id": point_id }), )) } AppEvent::AutoControlStarted { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "unit.auto_control_started", "info", Some(*unit_id), None, None, format!("Auto control started for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::AutoControlStopped { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "unit.auto_control_stopped", "info", Some(*unit_id), None, None, format!("Auto control stopped for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::FaultLocked { unit_id, equipment_id } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; Some(( "unit.fault_locked", "error", Some(*unit_id), Some(*equipment_id), None, format!("Fault locked for unit {} by equipment {}", unit_code, eq_code), serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), )) } AppEvent::FaultAcked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "unit.fault_acked", "info", Some(*unit_id), None, None, format!("Fault acknowledged for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::CommLocked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "unit.comm_locked", "warn", Some(*unit_id), None, None, format!("Communication locked for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::CommRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "unit.comm_recovered", "info", Some(*unit_id), None, None, format!("Communication recovered for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::RemLocal { unit_id, equipment_id } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; Some(( "unit.rem_local", "warn", Some(*unit_id), Some(*equipment_id), None, format!("Unit {} switched to local control via equipment {}", unit_code, eq_code), serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), )) } AppEvent::RemRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "unit.rem_recovered", "warn", Some(*unit_id), None, None, format!("Unit {} returned to remote control; auto control requires manual restart", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::UnitStateChanged { .. } => None, AppEvent::PointNewValue(_) => None, }; let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record else { return; }; let envelope = EventEnvelope::new(event_type, payload); let inserted = sqlx::query_as::<_, EventRecord>( r#" INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "#, ) .bind(envelope.event_type) .bind(level) .bind(unit_id as Option) .bind(equipment_id as Option) .bind(source_id) .bind(message) .bind(sqlx::types::Json(envelope.payload)) .fetch_one(pool) .await; match inserted { Ok(record) => { if let Some(ws_manager) = ws_manager { let ws_message = crate::websocket::WsMessage::EventCreated(record); if let Err(err) = ws_manager.send_to_public(ws_message).await { tracing::warn!("Failed to broadcast event websocket message: {}", err); } } } Err(err) => { tracing::warn!("Failed to persist event: {}", err); } } } async fn process_point_new_value( payload: crate::telemetry::PointNewValue, connection_manager: &std::sync::Arc, ws_manager: Option<&std::sync::Arc>, ) { let source_id = payload.source_id; let client_handle = payload.client_handle; let point_id = if let Some(point_id) = payload.point_id { Some(point_id) } else { let status = connection_manager.get_status_read_guard().await; status .get(&source_id) .and_then(|s| s.client_handle_map.get(&client_handle).copied()) }; if let Some(point_id) = point_id { // Read the previous value from the in-memory cache. let (old_value, old_timestamp, value_changed) = { let monitor_data = connection_manager.get_point_monitor_data_read_guard().await; let old_monitor_info = monitor_data.get(&point_id); if let Some(old_info) = old_monitor_info { let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp; (old_info.value.clone(), old_info.timestamp, changed) } else { (None, None, false) } }; let monitor = crate::telemetry::PointMonitorInfo { protocol: payload.protocol, source_id, point_id, client_handle, scan_mode: payload.scan_mode, timestamp: payload.timestamp, quality: payload.quality, value: payload.value, value_type: payload.value_type, value_text: payload.value_text, old_value, old_timestamp, value_changed, }; if let Err(e) = connection_manager .update_point_monitor_data(monitor.clone()) .await { tracing::error!( "Failed to update point monitor data for point {}: {}", point_id, e ); } if let Some(ws_manager) = ws_manager { let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); if let Err(e) = ws_manager.send_to_public(ws_message).await { tracing::warn!( "Failed to send WebSocket message to public room: {}", e ); } } } else { tracing::warn!( "Point not found for source {} client_handle {}", source_id, client_handle ); } }