Cache unit/equipment codes for event messages

Add MetadataCache to PlatformContext — a lazy-loaded, cross-app cache
of code fields used when formatting event messages. Each persisted
AppEvent previously did 1-2 extra SELECTs to look up the code for its
human-readable message; after this change the same id hits the cache
on all subsequent events.

Invalidation: the platform-owned equipment handler invalidates its
entry on update/delete; feeder's unit handler does the same for
units. Deletes are invalidated for hygiene only — no further events
should target a deleted id.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-04-22 08:21:46 +08:00
parent 3e0d4c242b
commit 6c8e5561dc
6 changed files with 107 additions and 42 deletions

View File

@ -42,6 +42,7 @@ pub async fn run() {
let event_manager = Arc::new(EventManager::new( let event_manager = Arc::new(EventManager::new(
platform.pool.clone(), platform.pool.clone(),
Some(platform.ws_manager.clone()), Some(platform.ws_manager.clone()),
platform.metadata.clone(),
)); ));
bootstrap::connect_all_enabled_sources(&platform) bootstrap::connect_all_enabled_sources(&platform)
@ -86,8 +87,12 @@ pub fn test_state() -> AppState {
.expect("lazy pool should build"); .expect("lazy pool should build");
let connection_manager = Arc::new(ConnectionManager::new()); let connection_manager = Arc::new(ConnectionManager::new());
let ws_manager = Arc::new(WebSocketManager::new()); let ws_manager = Arc::new(WebSocketManager::new());
let event_manager = Arc::new(EventManager::new(pool.clone(), Some(ws_manager.clone()))); let platform = PlatformContext::new(pool.clone(), connection_manager, ws_manager.clone());
let platform = PlatformContext::new(pool, connection_manager, ws_manager); let event_manager = Arc::new(EventManager::new(
pool,
Some(ws_manager),
platform.metadata.clone(),
));
AppState { AppState {
config: ServerConfig { config: ServerConfig {

View File

@ -1,5 +1,7 @@
use std::sync::Arc;
use plc_platform_core::{ use plc_platform_core::{
event::{record_event, EventInsert}, event::{record_event, EventInsert, MetadataCache},
websocket::WebSocketManager, websocket::WebSocketManager,
}; };
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -59,7 +61,11 @@ pub struct EventManager {
} }
impl EventManager { impl EventManager {
pub fn new(pool: sqlx::PgPool, ws_manager: Option<std::sync::Arc<WebSocketManager>>) -> Self { pub fn new(
pool: sqlx::PgPool,
ws_manager: Option<Arc<WebSocketManager>>,
metadata: Arc<MetadataCache>,
) -> Self {
let (control_sender, mut control_receiver) = let (control_sender, mut control_receiver) =
mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY); mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
@ -67,7 +73,13 @@ impl EventManager {
let control_ws_manager = ws_manager.clone(); let control_ws_manager = ws_manager.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(event) = control_receiver.recv().await { while let Some(event) = control_receiver.recv().await {
handle_control_event(event, &control_pool, control_ws_manager.as_ref()).await; handle_control_event(
event,
&control_pool,
control_ws_manager.as_ref(),
&metadata,
)
.await;
} }
}); });
@ -90,7 +102,8 @@ impl EventManager {
async fn handle_control_event( async fn handle_control_event(
event: AppEvent, event: AppEvent,
pool: &sqlx::PgPool, pool: &sqlx::PgPool,
ws_manager: Option<&std::sync::Arc<WebSocketManager>>, ws_manager: Option<&Arc<WebSocketManager>>,
metadata: &MetadataCache,
) { ) {
// UnitStateChanged is high-frequency and intentionally not persisted; // UnitStateChanged is high-frequency and intentionally not persisted;
// it still needs tracing for local observability. All other events are // it still needs tracing for local observability. All other events are
@ -104,33 +117,14 @@ async fn handle_control_event(
tracing::info!("Unit {} state: {} -> {}", unit_id, from_state, to_state); tracing::info!("Unit {} state: {} -> {}", unit_id, from_state, to_state);
} }
persist_event_if_needed(&event, pool, ws_manager).await; persist_event_if_needed(&event, pool, ws_manager, metadata).await;
}
async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
} }
async fn persist_event_if_needed( async fn persist_event_if_needed(
event: &AppEvent, event: &AppEvent,
pool: &sqlx::PgPool, pool: &sqlx::PgPool,
ws_manager: Option<&std::sync::Arc<WebSocketManager>>, ws_manager: Option<&Arc<WebSocketManager>>,
metadata: &MetadataCache,
) { ) {
let record: Option<EventInsert> = match event { let record: Option<EventInsert> = match event {
AppEvent::EquipmentStartCommandSent { AppEvent::EquipmentStartCommandSent {
@ -138,7 +132,7 @@ async fn persist_event_if_needed(
unit_id, unit_id,
point_id, point_id,
} => { } => {
let code = fetch_equipment_code(pool, *equipment_id).await; let code = metadata.equipment_code(pool, *equipment_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.equipment.start_command_sent", event_type: "feeder.equipment.start_command_sent",
level: "info", level: "info",
@ -158,7 +152,7 @@ async fn persist_event_if_needed(
unit_id, unit_id,
point_id, point_id,
} => { } => {
let code = fetch_equipment_code(pool, *equipment_id).await; let code = metadata.equipment_code(pool, *equipment_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.equipment.stop_command_sent", event_type: "feeder.equipment.stop_command_sent",
level: "info", level: "info",
@ -174,7 +168,7 @@ async fn persist_event_if_needed(
}) })
} }
AppEvent::AutoControlStarted { unit_id } => { AppEvent::AutoControlStarted { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = metadata.unit_code(pool, *unit_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.unit.auto_control_started", event_type: "feeder.unit.auto_control_started",
level: "info", level: "info",
@ -186,7 +180,7 @@ async fn persist_event_if_needed(
}) })
} }
AppEvent::AutoControlStopped { unit_id } => { AppEvent::AutoControlStopped { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = metadata.unit_code(pool, *unit_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.unit.auto_control_stopped", event_type: "feeder.unit.auto_control_stopped",
level: "info", level: "info",
@ -201,8 +195,8 @@ async fn persist_event_if_needed(
unit_id, unit_id,
equipment_id, equipment_id,
} => { } => {
let unit_code = fetch_unit_code(pool, *unit_id).await; let unit_code = metadata.unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await; let eq_code = metadata.equipment_code(pool, *equipment_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.unit.fault_locked", event_type: "feeder.unit.fault_locked",
level: "error", level: "error",
@ -217,7 +211,7 @@ async fn persist_event_if_needed(
}) })
} }
AppEvent::FaultAcked { unit_id } => { AppEvent::FaultAcked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = metadata.unit_code(pool, *unit_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.unit.fault_acked", event_type: "feeder.unit.fault_acked",
level: "info", level: "info",
@ -229,7 +223,7 @@ async fn persist_event_if_needed(
}) })
} }
AppEvent::CommLocked { unit_id } => { AppEvent::CommLocked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = metadata.unit_code(pool, *unit_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.unit.comm_locked", event_type: "feeder.unit.comm_locked",
level: "warn", level: "warn",
@ -241,7 +235,7 @@ async fn persist_event_if_needed(
}) })
} }
AppEvent::CommRecovered { unit_id } => { AppEvent::CommRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = metadata.unit_code(pool, *unit_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.unit.comm_recovered", event_type: "feeder.unit.comm_recovered",
level: "info", level: "info",
@ -256,8 +250,8 @@ async fn persist_event_if_needed(
unit_id, unit_id,
equipment_id, equipment_id,
} => { } => {
let unit_code = fetch_unit_code(pool, *unit_id).await; let unit_code = metadata.unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await; let eq_code = metadata.equipment_code(pool, *equipment_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.unit.rem_local", event_type: "feeder.unit.rem_local",
level: "warn", level: "warn",
@ -272,7 +266,7 @@ async fn persist_event_if_needed(
}) })
} }
AppEvent::RemRecovered { unit_id } => { AppEvent::RemRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = metadata.unit_code(pool, *unit_id).await;
Some(EventInsert { Some(EventInsert {
event_type: "feeder.unit.rem_recovered", event_type: "feeder.unit.rem_recovered",
level: "warn", level: "warn",
@ -292,5 +286,5 @@ async fn persist_event_if_needed(
let Some(record) = record else { let Some(record) = record else {
return; return;
}; };
record_event(pool, ws_manager.map(std::sync::Arc::as_ref), record).await; record_event(pool, ws_manager.map(Arc::as_ref), record).await;
} }

View File

@ -498,6 +498,8 @@ pub async fn update_unit(
) )
.await?; .await?;
state.platform.metadata.invalidate_unit(unit_id).await;
Ok(Json(serde_json::json!({ Ok(Json(serde_json::json!({
"ok_msg": "Unit updated successfully" "ok_msg": "Unit updated successfully"
}))) })))
@ -512,6 +514,8 @@ pub async fn delete_unit(
return Err(ApiErr::NotFound("Unit not found".to_string(), None)); return Err(ApiErr::NotFound("Unit not found".to_string(), None));
} }
state.platform.metadata.invalidate_unit(unit_id).await;
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }

View File

@ -1,10 +1,66 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use tokio::sync::RwLock;
use uuid::Uuid; use uuid::Uuid;
use crate::model::EventRecord; use crate::model::EventRecord;
use crate::websocket::{WebSocketManager, WsMessage}; use crate::websocket::{WebSocketManager, WsMessage};
/// In-memory cache for unit/equipment `code` fields used in event messages.
/// Lazily populated on first access; entries are invalidated when the
/// corresponding row is updated or deleted (see invalidate_* methods).
#[derive(Default)]
pub struct MetadataCache {
unit_codes: RwLock<HashMap<Uuid, String>>,
equipment_codes: RwLock<HashMap<Uuid, String>>,
}
impl MetadataCache {
pub fn new() -> Self {
Self::default()
}
pub async fn unit_code(&self, pool: &sqlx::PgPool, id: Uuid) -> String {
if let Some(code) = self.unit_codes.read().await.get(&id) {
return code.clone();
}
let code = sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string());
self.unit_codes.write().await.insert(id, code.clone());
code
}
pub async fn equipment_code(&self, pool: &sqlx::PgPool, id: Uuid) -> String {
if let Some(code) = self.equipment_codes.read().await.get(&id) {
return code.clone();
}
let code = sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string());
self.equipment_codes.write().await.insert(id, code.clone());
code
}
pub async fn invalidate_unit(&self, id: Uuid) {
self.unit_codes.write().await.remove(&id);
}
pub async fn invalidate_equipment(&self, id: Uuid) {
self.equipment_codes.write().await.remove(&id);
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EventEnvelope { pub struct EventEnvelope {
pub event_type: String, pub event_type: String,

View File

@ -241,6 +241,8 @@ pub async fn update_equipment(
) )
.await?; .await?;
state.metadata.invalidate_equipment(equipment_id).await;
Ok(Json(serde_json::json!({ Ok(Json(serde_json::json!({
"ok_msg": "Equipment updated successfully" "ok_msg": "Equipment updated successfully"
}))) })))
@ -288,5 +290,7 @@ pub async fn delete_equipment(
return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
} }
state.metadata.invalidate_equipment(equipment_id).await;
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }

View File

@ -1,5 +1,5 @@
use crate::connection::ConnectionManager; use crate::connection::ConnectionManager;
use crate::event::PlatformEvent; use crate::event::{MetadataCache, PlatformEvent};
use crate::websocket::WebSocketManager; use crate::websocket::WebSocketManager;
use std::sync::Arc; use std::sync::Arc;
@ -8,6 +8,7 @@ pub struct PlatformContext {
pub pool: sqlx::PgPool, pub pool: sqlx::PgPool,
pub connection_manager: Arc<ConnectionManager>, pub connection_manager: Arc<ConnectionManager>,
pub ws_manager: Arc<WebSocketManager>, pub ws_manager: Arc<WebSocketManager>,
pub metadata: Arc<MetadataCache>,
} }
impl PlatformContext { impl PlatformContext {
@ -20,6 +21,7 @@ impl PlatformContext {
pool, pool,
connection_manager, connection_manager,
ws_manager, ws_manager,
metadata: Arc::new(MetadataCache::new()),
} }
} }