use std::sync::Arc; use plc_platform_core::{ event::{record_event, EventInsert, MetadataCache}, websocket::WebSocketManager, }; use tokio::sync::mpsc; use uuid::Uuid; const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; /// Operation-system business events. /// /// Each variant maps to a row in the `event` table (via `record_event`) and /// follows the `ops.*` namespace from design doc ยง8.1. Every record carries /// `subject_type` + `subject_id` so the front-end can filter the timeline /// for one segment / station without joining on event_type strings. #[derive(Debug, Clone)] pub enum AppEvent { SegmentAutoStarted { segment_id: Uuid, }, SegmentAutoStopped { segment_id: Uuid, }, SegmentStepAdvanced { segment_id: Uuid, step_no: i32, }, SegmentCompleted { segment_id: Uuid, }, SegmentBlocked { segment_id: Uuid, reason: String, }, SegmentFaultLocked { segment_id: Uuid, message: String, }, SegmentFaultAcked { segment_id: Uuid, }, SegmentCommLocked { segment_id: Uuid, }, SegmentCommRecovered { segment_id: Uuid, }, StationStateChanged { station_id: Uuid, presence: bool, vacancy: bool, }, AlarmActionTimeout { segment_id: Uuid, step_no: i32, }, AlarmSignalConflict { segment_id: Uuid, message: String, }, AlarmResourceBusy { segment_id: Uuid, resource_key: String, }, } pub struct EventManager { sender: mpsc::Sender, } impl EventManager { pub fn new( pool: sqlx::PgPool, ws_manager: Option>, metadata: Arc, ) -> Self { let (sender, mut receiver) = mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); let pool_for_task = pool.clone(); let ws_for_task = ws_manager.clone(); tokio::spawn(async move { while let Some(event) = receiver.recv().await { handle_event(event, &pool_for_task, ws_for_task.as_ref(), &metadata).await; } }); Self { sender } } pub fn send(&self, event: AppEvent) -> Result<(), String> { match self.sender.try_send(event) { Ok(()) => Ok(()), Err(mpsc::error::TrySendError::Closed(e)) => { Err(format!("ops event channel closed ({e:?})")) } Err(mpsc::error::TrySendError::Full(e)) => Err(format!("ops event queue full ({e:?})")), } } } fn segment_event( event_type: &'static str, level: &'static str, segment_id: Uuid, message: String, payload: serde_json::Value, ) -> EventInsert { EventInsert { event_type, level, unit_id: None, equipment_id: None, source_id: None, subject_type: Some("segment"), subject_id: Some(segment_id), message, payload, } } async fn handle_event( event: AppEvent, pool: &sqlx::PgPool, ws_manager: Option<&Arc>, _metadata: &MetadataCache, ) { let record: Option = match &event { AppEvent::SegmentAutoStarted { segment_id } => Some(segment_event( "ops.segment.auto_started", "info", *segment_id, format!("Segment {} auto control started", segment_id), serde_json::json!({ "segment_id": segment_id }), )), AppEvent::SegmentAutoStopped { segment_id } => Some(segment_event( "ops.segment.auto_stopped", "info", *segment_id, format!("Segment {} auto control stopped", segment_id), serde_json::json!({ "segment_id": segment_id }), )), AppEvent::SegmentStepAdvanced { segment_id, step_no, } => Some(segment_event( "ops.segment.step_advanced", "info", *segment_id, format!("Segment {} advanced to step {}", segment_id, step_no), serde_json::json!({ "segment_id": segment_id, "step_no": step_no }), )), AppEvent::SegmentCompleted { segment_id } => Some(segment_event( "ops.segment.completed", "info", *segment_id, format!("Segment {} completed", segment_id), serde_json::json!({ "segment_id": segment_id }), )), AppEvent::SegmentBlocked { segment_id, reason } => Some(segment_event( "ops.segment.blocked", "warn", *segment_id, format!("Segment {} blocked: {}", segment_id, reason), serde_json::json!({ "segment_id": segment_id, "reason": reason }), )), AppEvent::SegmentFaultLocked { segment_id, message, } => Some(segment_event( "ops.segment.fault_locked", "error", *segment_id, format!("Segment {} fault locked: {}", segment_id, message), serde_json::json!({ "segment_id": segment_id, "message": message }), )), AppEvent::SegmentFaultAcked { segment_id } => Some(segment_event( "ops.segment.fault_acked", "info", *segment_id, format!("Segment {} fault acknowledged", segment_id), serde_json::json!({ "segment_id": segment_id }), )), AppEvent::SegmentCommLocked { segment_id } => Some(segment_event( "ops.segment.comm_locked", "warn", *segment_id, format!("Segment {} communication locked", segment_id), serde_json::json!({ "segment_id": segment_id }), )), AppEvent::SegmentCommRecovered { segment_id } => Some(segment_event( "ops.segment.comm_recovered", "info", *segment_id, format!("Segment {} communication recovered", segment_id), serde_json::json!({ "segment_id": segment_id }), )), AppEvent::StationStateChanged { station_id, presence, vacancy, } => Some(EventInsert { event_type: "ops.station.state_changed", level: "info", unit_id: None, equipment_id: None, source_id: None, subject_type: Some("station"), subject_id: Some(*station_id), message: format!( "Station {} state changed (presence={}, vacancy={})", station_id, presence, vacancy ), payload: serde_json::json!({ "station_id": station_id, "presence": presence, "vacancy": vacancy }), }), AppEvent::AlarmActionTimeout { segment_id, step_no, } => Some(segment_event( "ops.alarm.action_timeout", "error", *segment_id, format!("Action timeout on segment {} step {}", segment_id, step_no), serde_json::json!({ "segment_id": segment_id, "step_no": step_no }), )), AppEvent::AlarmSignalConflict { segment_id, message, } => Some(segment_event( "ops.alarm.signal_conflict", "error", *segment_id, format!("Signal conflict on segment {}: {}", segment_id, message), serde_json::json!({ "segment_id": segment_id, "message": message }), )), AppEvent::AlarmResourceBusy { segment_id, resource_key, } => Some(segment_event( "ops.alarm.resource_busy", "warn", *segment_id, format!("Resource {} busy for segment {}", resource_key, segment_id), serde_json::json!({ "segment_id": segment_id, "resource_key": resource_key }), )), }; if let Some(record) = record { record_event(pool, ws_manager.map(Arc::as_ref), record).await; } }