use plc_platform_core::model::EventRecord; use plc_platform_core::{ event::EventEnvelope, websocket::{WebSocketManager, WsMessage}, }; use tokio::sync::mpsc; use uuid::Uuid; const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; /// Feeder-specific business events only. /// Platform events (source/point lifecycle) are handled by core's emit_event(). #[derive(Debug, Clone)] pub enum AppEvent { EquipmentStartCommandSent { equipment_id: Uuid, unit_id: Option, point_id: Uuid, }, EquipmentStopCommandSent { equipment_id: Uuid, unit_id: Option, point_id: Uuid, }, AutoControlStarted { unit_id: Uuid, }, AutoControlStopped { unit_id: Uuid, }, FaultLocked { unit_id: Uuid, equipment_id: Uuid, }, FaultAcked { unit_id: Uuid, }, CommLocked { unit_id: Uuid, }, CommRecovered { unit_id: Uuid, }, RemLocal { unit_id: Uuid, equipment_id: Uuid, }, RemRecovered { unit_id: Uuid, }, UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String, }, } pub struct EventManager { control_sender: mpsc::Sender, } impl EventManager { pub fn new(pool: sqlx::PgPool, ws_manager: Option>) -> Self { let (control_sender, mut control_receiver) = mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); let control_pool = pool.clone(); let control_ws_manager = ws_manager.clone(); tokio::spawn(async move { while let Some(event) = control_receiver.recv().await { handle_control_event(event, &control_pool, control_ws_manager.as_ref()).await; } }); Self { control_sender } } pub fn send(&self, event: AppEvent) -> Result<(), String> { match self.control_sender.try_send(event) { Ok(()) => Ok(()), Err(mpsc::error::TrySendError::Closed(e)) => Err(format!( "Failed to send control event: channel closed ({e:?})" )), Err(mpsc::error::TrySendError::Full(e)) => { Err(format!("Failed to send control event: queue full ({e:?})")) } } } } /// Bridges platform events to feeder-specific side effects. /// Connection management and telemetry are handled by core automatically. pub struct FeederPlatformEventSink { control_runtime: std::sync::Arc, } impl FeederPlatformEventSink { pub fn new( control_runtime: std::sync::Arc, ) -> Self { Self { control_runtime } } } impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEventSink { fn on_event(&self, event: &plc_platform_core::event::PlatformEvent) { match event { plc_platform_core::event::PlatformEvent::UnitsChanged { unit_ids } => { let runtime = self.control_runtime.clone(); let ids = unit_ids.clone(); tokio::spawn(async move { for unit_id in ids { runtime.notify_unit(unit_id).await; } }); } // Other platform events: connection management handled by core. _ => {} } } } async fn handle_control_event( event: AppEvent, pool: &sqlx::PgPool, ws_manager: Option<&std::sync::Arc>, ) { persist_event_if_needed(&event, pool, ws_manager).await; match event { AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id, } => { tracing::info!( "Equipment start command sent: equipment={}, unit={:?}, point={}", equipment_id, unit_id, point_id ); } AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id, } => { tracing::info!( "Equipment stop command sent: equipment={}, unit={:?}, point={}", equipment_id, unit_id, point_id ); } AppEvent::AutoControlStarted { unit_id } => { tracing::info!("Auto control started for unit {}", unit_id); } AppEvent::AutoControlStopped { unit_id } => { tracing::info!("Auto control stopped for unit {}", unit_id); } AppEvent::FaultLocked { unit_id, equipment_id, } => { tracing::warn!("Fault locked: unit={}, equipment={}", unit_id, equipment_id); } AppEvent::FaultAcked { unit_id } => { tracing::info!("Fault acked for unit {}", unit_id); } AppEvent::CommLocked { unit_id } => { tracing::warn!("Comm locked for unit {}", unit_id); } AppEvent::CommRecovered { unit_id } => { tracing::info!("Comm recovered for unit {}", unit_id); } AppEvent::RemLocal { unit_id, equipment_id, } => { tracing::warn!("REM local: unit={}, equipment={}", unit_id, equipment_id); } AppEvent::RemRecovered { unit_id } => { tracing::info!("REM recovered for unit {}", unit_id); } AppEvent::UnitStateChanged { unit_id, from_state, to_state, } => { tracing::info!("Unit {} state: {} -> {}", unit_id, from_state, to_state); } } } async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String { sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1") .bind(id) .fetch_optional(pool) .await .ok() .flatten() .unwrap_or_else(|| id.to_string()) } async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String { sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1") .bind(id) .fetch_optional(pool) .await .ok() .flatten() .unwrap_or_else(|| id.to_string()) } async fn persist_event_if_needed( event: &AppEvent, pool: &sqlx::PgPool, ws_manager: Option<&std::sync::Arc>, ) { let record: Option<( &str, &str, Option, Option, Option, String, serde_json::Value, )> = match event { AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id, } => { let code = fetch_equipment_code(pool, *equipment_id).await; Some(( "feeder.equipment.start_command_sent", "info", *unit_id, Some(*equipment_id), None, format!("Start command sent to equipment {}", code), serde_json::json!({ "equipment_id": equipment_id, "unit_id": unit_id, "point_id": point_id }), )) } AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id, } => { let code = fetch_equipment_code(pool, *equipment_id).await; Some(( "feeder.equipment.stop_command_sent", "info", *unit_id, Some(*equipment_id), None, format!("Stop command sent to equipment {}", code), serde_json::json!({ "equipment_id": equipment_id, "unit_id": unit_id, "point_id": point_id }), )) } AppEvent::AutoControlStarted { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "feeder.unit.auto_control_started", "info", Some(*unit_id), None, None, format!("Auto control started for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::AutoControlStopped { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "feeder.unit.auto_control_stopped", "info", Some(*unit_id), None, None, format!("Auto control stopped for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::FaultLocked { unit_id, equipment_id, } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; Some(( "feeder.unit.fault_locked", "error", Some(*unit_id), Some(*equipment_id), None, format!( "Fault locked for unit {} by equipment {}", unit_code, eq_code ), serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), )) } AppEvent::FaultAcked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "feeder.unit.fault_acked", "info", Some(*unit_id), None, None, format!("Fault acknowledged for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::CommLocked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "feeder.unit.comm_locked", "warn", Some(*unit_id), None, None, format!("Communication locked for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::CommRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "feeder.unit.comm_recovered", "info", Some(*unit_id), None, None, format!("Communication recovered for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::RemLocal { unit_id, equipment_id, } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; Some(( "feeder.unit.rem_local", "warn", Some(*unit_id), Some(*equipment_id), None, format!( "Unit {} switched to local control via equipment {}", unit_code, eq_code ), serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), )) } AppEvent::RemRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( "feeder.unit.rem_recovered", "warn", Some(*unit_id), None, None, format!( "Unit {} returned to remote control; auto control requires manual restart", code ), serde_json::json!({ "unit_id": unit_id }), )) } AppEvent::UnitStateChanged { .. } => None, }; let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record else { return; }; let envelope = EventEnvelope::new(event_type, payload); let inserted = sqlx::query_as::<_, EventRecord>( r#" INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "#, ) .bind(envelope.event_type) .bind(level) .bind(unit_id as Option) .bind(equipment_id as Option) .bind(source_id) .bind(message) .bind(sqlx::types::Json(envelope.payload)) .fetch_one(pool) .await; match inserted { Ok(record) => { if let Some(ws_manager) = ws_manager { let ws_message = WsMessage::EventCreated(record); if let Err(err) = ws_manager.send_to_public(ws_message).await { tracing::warn!("Failed to broadcast event websocket message: {}", err); } } } Err(err) => { tracing::warn!("Failed to persist event: {}", err); } } }