diff --git a/crates/app_feeder_distributor/src/app.rs b/crates/app_feeder_distributor/src/app.rs index 6143b30..49646f7 100644 --- a/crates/app_feeder_distributor/src/app.rs +++ b/crates/app_feeder_distributor/src/app.rs @@ -42,6 +42,7 @@ pub async fn run() { let event_manager = Arc::new(EventManager::new( platform.pool.clone(), Some(platform.ws_manager.clone()), + platform.metadata.clone(), )); bootstrap::connect_all_enabled_sources(&platform) @@ -86,8 +87,12 @@ pub fn test_state() -> AppState { .expect("lazy pool should build"); let connection_manager = Arc::new(ConnectionManager::new()); let ws_manager = Arc::new(WebSocketManager::new()); - let event_manager = Arc::new(EventManager::new(pool.clone(), Some(ws_manager.clone()))); - let platform = PlatformContext::new(pool, connection_manager, ws_manager); + let platform = PlatformContext::new(pool.clone(), connection_manager, ws_manager.clone()); + let event_manager = Arc::new(EventManager::new( + pool, + Some(ws_manager), + platform.metadata.clone(), + )); AppState { config: ServerConfig { diff --git a/crates/app_feeder_distributor/src/event.rs b/crates/app_feeder_distributor/src/event.rs index d957f37..a712509 100644 --- a/crates/app_feeder_distributor/src/event.rs +++ b/crates/app_feeder_distributor/src/event.rs @@ -1,5 +1,7 @@ +use std::sync::Arc; + use plc_platform_core::{ - event::{record_event, EventInsert}, + event::{record_event, EventInsert, MetadataCache}, websocket::WebSocketManager, }; use tokio::sync::mpsc; @@ -59,7 +61,11 @@ pub struct EventManager { } impl EventManager { - pub fn new(pool: sqlx::PgPool, ws_manager: Option>) -> Self { + pub fn new( + pool: sqlx::PgPool, + ws_manager: Option>, + metadata: Arc, + ) -> Self { let (control_sender, mut control_receiver) = mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); @@ -67,7 +73,13 @@ impl EventManager { let control_ws_manager = ws_manager.clone(); tokio::spawn(async move { while let Some(event) = control_receiver.recv().await { - handle_control_event(event, &control_pool, control_ws_manager.as_ref()).await; + handle_control_event( + event, + &control_pool, + control_ws_manager.as_ref(), + &metadata, + ) + .await; } }); @@ -90,7 +102,8 @@ impl EventManager { async fn handle_control_event( event: AppEvent, pool: &sqlx::PgPool, - ws_manager: Option<&std::sync::Arc>, + ws_manager: Option<&Arc>, + metadata: &MetadataCache, ) { // UnitStateChanged is high-frequency and intentionally not persisted; // it still needs tracing for local observability. All other events are @@ -104,33 +117,14 @@ async fn handle_control_event( tracing::info!("Unit {} state: {} -> {}", unit_id, from_state, to_state); } - persist_event_if_needed(&event, pool, ws_manager).await; -} - -async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String { - sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1") - .bind(id) - .fetch_optional(pool) - .await - .ok() - .flatten() - .unwrap_or_else(|| id.to_string()) -} - -async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String { - sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1") - .bind(id) - .fetch_optional(pool) - .await - .ok() - .flatten() - .unwrap_or_else(|| id.to_string()) + persist_event_if_needed(&event, pool, ws_manager, metadata).await; } async fn persist_event_if_needed( event: &AppEvent, pool: &sqlx::PgPool, - ws_manager: Option<&std::sync::Arc>, + ws_manager: Option<&Arc>, + metadata: &MetadataCache, ) { let record: Option = match event { AppEvent::EquipmentStartCommandSent { @@ -138,7 +132,7 @@ async fn persist_event_if_needed( unit_id, point_id, } => { - let code = fetch_equipment_code(pool, *equipment_id).await; + let code = metadata.equipment_code(pool, *equipment_id).await; Some(EventInsert { event_type: "feeder.equipment.start_command_sent", level: "info", @@ -158,7 +152,7 @@ async fn persist_event_if_needed( unit_id, point_id, } => { - let code = fetch_equipment_code(pool, *equipment_id).await; + let code = metadata.equipment_code(pool, *equipment_id).await; Some(EventInsert { event_type: "feeder.equipment.stop_command_sent", level: "info", @@ -174,7 +168,7 @@ async fn persist_event_if_needed( }) } AppEvent::AutoControlStarted { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; + let code = metadata.unit_code(pool, *unit_id).await; Some(EventInsert { event_type: "feeder.unit.auto_control_started", level: "info", @@ -186,7 +180,7 @@ async fn persist_event_if_needed( }) } AppEvent::AutoControlStopped { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; + let code = metadata.unit_code(pool, *unit_id).await; Some(EventInsert { event_type: "feeder.unit.auto_control_stopped", level: "info", @@ -201,8 +195,8 @@ async fn persist_event_if_needed( unit_id, equipment_id, } => { - let unit_code = fetch_unit_code(pool, *unit_id).await; - let eq_code = fetch_equipment_code(pool, *equipment_id).await; + let unit_code = metadata.unit_code(pool, *unit_id).await; + let eq_code = metadata.equipment_code(pool, *equipment_id).await; Some(EventInsert { event_type: "feeder.unit.fault_locked", level: "error", @@ -217,7 +211,7 @@ async fn persist_event_if_needed( }) } AppEvent::FaultAcked { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; + let code = metadata.unit_code(pool, *unit_id).await; Some(EventInsert { event_type: "feeder.unit.fault_acked", level: "info", @@ -229,7 +223,7 @@ async fn persist_event_if_needed( }) } AppEvent::CommLocked { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; + let code = metadata.unit_code(pool, *unit_id).await; Some(EventInsert { event_type: "feeder.unit.comm_locked", level: "warn", @@ -241,7 +235,7 @@ async fn persist_event_if_needed( }) } AppEvent::CommRecovered { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; + let code = metadata.unit_code(pool, *unit_id).await; Some(EventInsert { event_type: "feeder.unit.comm_recovered", level: "info", @@ -256,8 +250,8 @@ async fn persist_event_if_needed( unit_id, equipment_id, } => { - let unit_code = fetch_unit_code(pool, *unit_id).await; - let eq_code = fetch_equipment_code(pool, *equipment_id).await; + let unit_code = metadata.unit_code(pool, *unit_id).await; + let eq_code = metadata.equipment_code(pool, *equipment_id).await; Some(EventInsert { event_type: "feeder.unit.rem_local", level: "warn", @@ -272,7 +266,7 @@ async fn persist_event_if_needed( }) } AppEvent::RemRecovered { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; + let code = metadata.unit_code(pool, *unit_id).await; Some(EventInsert { event_type: "feeder.unit.rem_recovered", level: "warn", @@ -292,5 +286,5 @@ async fn persist_event_if_needed( let Some(record) = record else { return; }; - record_event(pool, ws_manager.map(std::sync::Arc::as_ref), record).await; + record_event(pool, ws_manager.map(Arc::as_ref), record).await; } diff --git a/crates/app_feeder_distributor/src/handler/control.rs b/crates/app_feeder_distributor/src/handler/control.rs index dee91f5..b328ee5 100644 --- a/crates/app_feeder_distributor/src/handler/control.rs +++ b/crates/app_feeder_distributor/src/handler/control.rs @@ -498,6 +498,8 @@ pub async fn update_unit( ) .await?; + state.platform.metadata.invalidate_unit(unit_id).await; + Ok(Json(serde_json::json!({ "ok_msg": "Unit updated successfully" }))) @@ -512,6 +514,8 @@ pub async fn delete_unit( return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } + state.platform.metadata.invalidate_unit(unit_id).await; + Ok(StatusCode::NO_CONTENT) } diff --git a/crates/plc_platform_core/src/event.rs b/crates/plc_platform_core/src/event.rs index 344b1f2..59ae745 100644 --- a/crates/plc_platform_core/src/event.rs +++ b/crates/plc_platform_core/src/event.rs @@ -1,10 +1,66 @@ +use std::collections::HashMap; + use serde::{Deserialize, Serialize}; use serde_json::Value; +use tokio::sync::RwLock; use uuid::Uuid; use crate::model::EventRecord; use crate::websocket::{WebSocketManager, WsMessage}; +/// In-memory cache for unit/equipment `code` fields used in event messages. +/// Lazily populated on first access; entries are invalidated when the +/// corresponding row is updated or deleted (see invalidate_* methods). +#[derive(Default)] +pub struct MetadataCache { + unit_codes: RwLock>, + equipment_codes: RwLock>, +} + +impl MetadataCache { + pub fn new() -> Self { + Self::default() + } + + pub async fn unit_code(&self, pool: &sqlx::PgPool, id: Uuid) -> String { + if let Some(code) = self.unit_codes.read().await.get(&id) { + return code.clone(); + } + let code = sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await + .ok() + .flatten() + .unwrap_or_else(|| id.to_string()); + self.unit_codes.write().await.insert(id, code.clone()); + code + } + + pub async fn equipment_code(&self, pool: &sqlx::PgPool, id: Uuid) -> String { + if let Some(code) = self.equipment_codes.read().await.get(&id) { + return code.clone(); + } + let code = sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await + .ok() + .flatten() + .unwrap_or_else(|| id.to_string()); + self.equipment_codes.write().await.insert(id, code.clone()); + code + } + + pub async fn invalidate_unit(&self, id: Uuid) { + self.unit_codes.write().await.remove(&id); + } + + pub async fn invalidate_equipment(&self, id: Uuid) { + self.equipment_codes.write().await.remove(&id); + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct EventEnvelope { pub event_type: String, diff --git a/crates/plc_platform_core/src/handler/equipment.rs b/crates/plc_platform_core/src/handler/equipment.rs index a7afc60..f4cb5fb 100644 --- a/crates/plc_platform_core/src/handler/equipment.rs +++ b/crates/plc_platform_core/src/handler/equipment.rs @@ -241,6 +241,8 @@ pub async fn update_equipment( ) .await?; + state.metadata.invalidate_equipment(equipment_id).await; + Ok(Json(serde_json::json!({ "ok_msg": "Equipment updated successfully" }))) @@ -288,5 +290,7 @@ pub async fn delete_equipment( return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } + state.metadata.invalidate_equipment(equipment_id).await; + Ok(StatusCode::NO_CONTENT) } diff --git a/crates/plc_platform_core/src/platform_context.rs b/crates/plc_platform_core/src/platform_context.rs index 13dfb76..2276ee3 100644 --- a/crates/plc_platform_core/src/platform_context.rs +++ b/crates/plc_platform_core/src/platform_context.rs @@ -1,5 +1,5 @@ use crate::connection::ConnectionManager; -use crate::event::PlatformEvent; +use crate::event::{MetadataCache, PlatformEvent}; use crate::websocket::WebSocketManager; use std::sync::Arc; @@ -8,6 +8,7 @@ pub struct PlatformContext { pub pool: sqlx::PgPool, pub connection_manager: Arc, pub ws_manager: Arc, + pub metadata: Arc, } impl PlatformContext { @@ -20,6 +21,7 @@ impl PlatformContext { pool, connection_manager, ws_manager, + metadata: Arc::new(MetadataCache::new()), } }