diff --git a/crates/app_feeder_distributor/src/event.rs b/crates/app_feeder_distributor/src/event.rs index 19d5498..60c36b2 100644 --- a/crates/app_feeder_distributor/src/event.rs +++ b/crates/app_feeder_distributor/src/event.rs @@ -1,7 +1,6 @@ -use plc_platform_core::model::EventRecord; use plc_platform_core::{ - event::EventEnvelope, - websocket::{WebSocketManager, WsMessage}, + event::{record_event, EventInsert}, + websocket::WebSocketManager, }; use tokio::sync::mpsc; use uuid::Uuid; @@ -180,29 +179,19 @@ async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String { .unwrap_or_else(|| id.to_string()) } -struct PersistableEvent { - event_type: &'static str, - level: &'static str, - unit_id: Option, - equipment_id: Option, - source_id: Option, - message: String, - payload: serde_json::Value, -} - async fn persist_event_if_needed( event: &AppEvent, pool: &sqlx::PgPool, ws_manager: Option<&std::sync::Arc>, ) { - let record: Option = match event { + let record: Option = match event { AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id, } => { let code = fetch_equipment_code(pool, *equipment_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.equipment.start_command_sent", level: "info", unit_id: *unit_id, @@ -222,7 +211,7 @@ async fn persist_event_if_needed( point_id, } => { let code = fetch_equipment_code(pool, *equipment_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.equipment.stop_command_sent", level: "info", unit_id: *unit_id, @@ -238,7 +227,7 @@ async fn persist_event_if_needed( } AppEvent::AutoControlStarted { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.unit.auto_control_started", level: "info", unit_id: Some(*unit_id), @@ -250,7 +239,7 @@ async fn persist_event_if_needed( } AppEvent::AutoControlStopped { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.unit.auto_control_stopped", level: "info", unit_id: Some(*unit_id), @@ -266,7 +255,7 @@ async fn persist_event_if_needed( } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.unit.fault_locked", level: "error", unit_id: Some(*unit_id), @@ -281,7 +270,7 @@ async fn persist_event_if_needed( } AppEvent::FaultAcked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.unit.fault_acked", level: "info", unit_id: Some(*unit_id), @@ -293,7 +282,7 @@ async fn persist_event_if_needed( } AppEvent::CommLocked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.unit.comm_locked", level: "warn", unit_id: Some(*unit_id), @@ -305,7 +294,7 @@ async fn persist_event_if_needed( } AppEvent::CommRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.unit.comm_recovered", level: "info", unit_id: Some(*unit_id), @@ -321,7 +310,7 @@ async fn persist_event_if_needed( } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.unit.rem_local", level: "warn", unit_id: Some(*unit_id), @@ -336,7 +325,7 @@ async fn persist_event_if_needed( } AppEvent::RemRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(PersistableEvent { + Some(EventInsert { event_type: "feeder.unit.rem_recovered", level: "warn", unit_id: Some(*unit_id), @@ -352,40 +341,8 @@ async fn persist_event_if_needed( AppEvent::UnitStateChanged { .. } => None, }; - let Some(record) = record - else { + let Some(record) = record else { return; }; - let envelope = EventEnvelope::new(record.event_type, record.payload); - - let inserted = sqlx::query_as::<_, EventRecord>( - r#" - INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload) - VALUES ($1, $2, $3, $4, $5, $6, $7) - RETURNING * - "#, - ) - .bind(envelope.event_type) - .bind(record.level) - .bind(record.unit_id) - .bind(record.equipment_id) - .bind(record.source_id) - .bind(record.message) - .bind(sqlx::types::Json(envelope.payload)) - .fetch_one(pool) - .await; - - match inserted { - Ok(record) => { - if let Some(ws_manager) = ws_manager { - let ws_message = WsMessage::EventCreated(record); - if let Err(err) = ws_manager.send_to_public(ws_message).await { - tracing::warn!("Failed to broadcast event websocket message: {}", err); - } - } - } - Err(err) => { - tracing::warn!("Failed to persist event: {}", err); - } - } + record_event(pool, ws_manager.map(std::sync::Arc::as_ref), record).await; } diff --git a/crates/plc_platform_core/src/event.rs b/crates/plc_platform_core/src/event.rs index 7e8da4f..a26ceca 100644 --- a/crates/plc_platform_core/src/event.rs +++ b/crates/plc_platform_core/src/event.rs @@ -44,86 +44,27 @@ pub enum PlatformEvent { }, } -/// Persists platform events to the `event` table and broadcasts via WebSocket. -pub async fn persist_and_broadcast( - event: &PlatformEvent, - pool: &sqlx::PgPool, - ws_manager: &WebSocketManager, -) { - let record = match event { - PlatformEvent::SourceCreated { source_id } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "platform.source.created", - "info", - None, - None, - Some(*source_id), - format!("Source {} created", name), - serde_json::json!({ "source_id": source_id }), - )) - } - PlatformEvent::SourceUpdated { source_id } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "platform.source.updated", - "info", - None, - None, - Some(*source_id), - format!("Source {} updated", name), - serde_json::json!({ "source_id": source_id }), - )) - } - PlatformEvent::SourceDeleted { - source_id, - source_name, - } => Some(( - "platform.source.deleted", - "warn", - None, - None, - None, - format!("Source {} deleted", source_name), - serde_json::json!({ "source_id": source_id }), - )), - PlatformEvent::PointsCreated { - source_id, - point_ids, - } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "platform.point.batch_created", - "info", - None, - None, - Some(*source_id), - format!("Created {} points for source {}", point_ids.len(), name), - serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), - )) - } - PlatformEvent::PointsDeleted { - source_id, - point_ids, - } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "platform.point.batch_deleted", - "warn", - None, - None, - Some(*source_id), - format!("Deleted {} points for source {}", point_ids.len(), name), - serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), - )) - } - }; +/// Platform-owned row for the `event` table. +/// Apps construct this to write business events through [`record_event`]. +pub struct EventInsert { + pub event_type: &'static str, + pub level: &'static str, + pub unit_id: Option, + pub equipment_id: Option, + pub source_id: Option, + pub message: String, + pub payload: Value, +} - let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record - else { - return; - }; - let envelope = EventEnvelope::new(event_type, payload); +/// Inserts an event into the `event` table and optionally broadcasts via WebSocket. +/// This is the platform primitive used by both core platform events and app business events. +pub async fn record_event( + pool: &sqlx::PgPool, + ws_manager: Option<&WebSocketManager>, + event: EventInsert, +) { + let event_type = event.event_type; + let envelope = EventEnvelope::new(event_type, event.payload); let inserted = sqlx::query_as::<_, EventRecord>( r#" @@ -133,28 +74,111 @@ pub async fn persist_and_broadcast( "#, ) .bind(envelope.event_type) - .bind(level) - .bind(unit_id as Option) - .bind(equipment_id as Option) - .bind(source_id) - .bind(message) + .bind(event.level) + .bind(event.unit_id) + .bind(event.equipment_id) + .bind(event.source_id) + .bind(event.message) .bind(sqlx::types::Json(envelope.payload)) .fetch_one(pool) .await; match inserted { Ok(record) => { - let ws_message = WsMessage::EventCreated(record); - if let Err(err) = ws_manager.send_to_public(ws_message).await { - tracing::warn!("Failed to broadcast platform event: {}", err); + if let Some(ws_manager) = ws_manager { + let ws_message = WsMessage::EventCreated(record); + if let Err(err) = ws_manager.send_to_public(ws_message).await { + tracing::warn!("Failed to broadcast event {}: {}", event_type, err); + } } } Err(err) => { - tracing::warn!("Failed to persist platform event: {}", err); + tracing::warn!("Failed to persist event {}: {}", event_type, err); } } } +/// Persists platform events to the `event` table and broadcasts via WebSocket. +pub async fn persist_and_broadcast( + event: &PlatformEvent, + pool: &sqlx::PgPool, + ws_manager: &WebSocketManager, +) { + let record = match event { + PlatformEvent::SourceCreated { source_id } => { + let name = fetch_source_name(pool, *source_id).await; + Some(EventInsert { + event_type: "platform.source.created", + level: "info", + unit_id: None, + equipment_id: None, + source_id: Some(*source_id), + message: format!("Source {} created", name), + payload: serde_json::json!({ "source_id": source_id }), + }) + } + PlatformEvent::SourceUpdated { source_id } => { + let name = fetch_source_name(pool, *source_id).await; + Some(EventInsert { + event_type: "platform.source.updated", + level: "info", + unit_id: None, + equipment_id: None, + source_id: Some(*source_id), + message: format!("Source {} updated", name), + payload: serde_json::json!({ "source_id": source_id }), + }) + } + PlatformEvent::SourceDeleted { + source_id, + source_name, + } => Some(EventInsert { + event_type: "platform.source.deleted", + level: "warn", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Source {} deleted", source_name), + payload: serde_json::json!({ "source_id": source_id }), + }), + PlatformEvent::PointsCreated { + source_id, + point_ids, + } => { + let name = fetch_source_name(pool, *source_id).await; + Some(EventInsert { + event_type: "platform.point.batch_created", + level: "info", + unit_id: None, + equipment_id: None, + source_id: Some(*source_id), + message: format!("Created {} points for source {}", point_ids.len(), name), + payload: serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), + }) + } + PlatformEvent::PointsDeleted { + source_id, + point_ids, + } => { + let name = fetch_source_name(pool, *source_id).await; + Some(EventInsert { + event_type: "platform.point.batch_deleted", + level: "warn", + unit_id: None, + equipment_id: None, + source_id: Some(*source_id), + message: format!("Deleted {} points for source {}", point_ids.len(), name), + payload: serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), + }) + } + }; + + let Some(record) = record else { + return; + }; + record_event(pool, Some(ws_manager), record).await; +} + async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String { sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") .bind(id)