Extract event persistence primitive to platform core

Move the INSERT + WebSocket broadcast mechanism out of the feeder app
and into plc_platform_core as pub record_event(pool, ws, EventInsert).
The event table schema is owned by core, so writing to it is a platform
capability — apps (feeder, future ops) should only decide what to emit,
not how to persist it. Also replaces the 7-tuple in core's
persist_and_broadcast with the named EventInsert struct for readability.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-04-21 20:04:25 +08:00
parent 58fdb9f58e
commit 1c646dfaa7
2 changed files with 127 additions and 146 deletions

View File

@ -1,7 +1,6 @@
use plc_platform_core::model::EventRecord;
use plc_platform_core::{ use plc_platform_core::{
event::EventEnvelope, event::{record_event, EventInsert},
websocket::{WebSocketManager, WsMessage}, websocket::WebSocketManager,
}; };
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
@ -180,29 +179,19 @@ async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String {
.unwrap_or_else(|| id.to_string()) .unwrap_or_else(|| id.to_string())
} }
struct PersistableEvent {
event_type: &'static str,
level: &'static str,
unit_id: Option<Uuid>,
equipment_id: Option<Uuid>,
source_id: Option<Uuid>,
message: String,
payload: serde_json::Value,
}
async fn persist_event_if_needed( async fn persist_event_if_needed(
event: &AppEvent, event: &AppEvent,
pool: &sqlx::PgPool, pool: &sqlx::PgPool,
ws_manager: Option<&std::sync::Arc<WebSocketManager>>, ws_manager: Option<&std::sync::Arc<WebSocketManager>>,
) { ) {
let record: Option<PersistableEvent> = match event { let record: Option<EventInsert> = match event {
AppEvent::EquipmentStartCommandSent { AppEvent::EquipmentStartCommandSent {
equipment_id, equipment_id,
unit_id, unit_id,
point_id, point_id,
} => { } => {
let code = fetch_equipment_code(pool, *equipment_id).await; let code = fetch_equipment_code(pool, *equipment_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.equipment.start_command_sent", event_type: "feeder.equipment.start_command_sent",
level: "info", level: "info",
unit_id: *unit_id, unit_id: *unit_id,
@ -222,7 +211,7 @@ async fn persist_event_if_needed(
point_id, point_id,
} => { } => {
let code = fetch_equipment_code(pool, *equipment_id).await; let code = fetch_equipment_code(pool, *equipment_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.equipment.stop_command_sent", event_type: "feeder.equipment.stop_command_sent",
level: "info", level: "info",
unit_id: *unit_id, unit_id: *unit_id,
@ -238,7 +227,7 @@ async fn persist_event_if_needed(
} }
AppEvent::AutoControlStarted { unit_id } => { AppEvent::AutoControlStarted { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = fetch_unit_code(pool, *unit_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.unit.auto_control_started", event_type: "feeder.unit.auto_control_started",
level: "info", level: "info",
unit_id: Some(*unit_id), unit_id: Some(*unit_id),
@ -250,7 +239,7 @@ async fn persist_event_if_needed(
} }
AppEvent::AutoControlStopped { unit_id } => { AppEvent::AutoControlStopped { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = fetch_unit_code(pool, *unit_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.unit.auto_control_stopped", event_type: "feeder.unit.auto_control_stopped",
level: "info", level: "info",
unit_id: Some(*unit_id), unit_id: Some(*unit_id),
@ -266,7 +255,7 @@ async fn persist_event_if_needed(
} => { } => {
let unit_code = fetch_unit_code(pool, *unit_id).await; let unit_code = fetch_unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.unit.fault_locked", event_type: "feeder.unit.fault_locked",
level: "error", level: "error",
unit_id: Some(*unit_id), unit_id: Some(*unit_id),
@ -281,7 +270,7 @@ async fn persist_event_if_needed(
} }
AppEvent::FaultAcked { unit_id } => { AppEvent::FaultAcked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = fetch_unit_code(pool, *unit_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.unit.fault_acked", event_type: "feeder.unit.fault_acked",
level: "info", level: "info",
unit_id: Some(*unit_id), unit_id: Some(*unit_id),
@ -293,7 +282,7 @@ async fn persist_event_if_needed(
} }
AppEvent::CommLocked { unit_id } => { AppEvent::CommLocked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = fetch_unit_code(pool, *unit_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.unit.comm_locked", event_type: "feeder.unit.comm_locked",
level: "warn", level: "warn",
unit_id: Some(*unit_id), unit_id: Some(*unit_id),
@ -305,7 +294,7 @@ async fn persist_event_if_needed(
} }
AppEvent::CommRecovered { unit_id } => { AppEvent::CommRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = fetch_unit_code(pool, *unit_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.unit.comm_recovered", event_type: "feeder.unit.comm_recovered",
level: "info", level: "info",
unit_id: Some(*unit_id), unit_id: Some(*unit_id),
@ -321,7 +310,7 @@ async fn persist_event_if_needed(
} => { } => {
let unit_code = fetch_unit_code(pool, *unit_id).await; let unit_code = fetch_unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.unit.rem_local", event_type: "feeder.unit.rem_local",
level: "warn", level: "warn",
unit_id: Some(*unit_id), unit_id: Some(*unit_id),
@ -336,7 +325,7 @@ async fn persist_event_if_needed(
} }
AppEvent::RemRecovered { unit_id } => { AppEvent::RemRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await; let code = fetch_unit_code(pool, *unit_id).await;
Some(PersistableEvent { Some(EventInsert {
event_type: "feeder.unit.rem_recovered", event_type: "feeder.unit.rem_recovered",
level: "warn", level: "warn",
unit_id: Some(*unit_id), unit_id: Some(*unit_id),
@ -352,40 +341,8 @@ async fn persist_event_if_needed(
AppEvent::UnitStateChanged { .. } => None, AppEvent::UnitStateChanged { .. } => None,
}; };
let Some(record) = record let Some(record) = record else {
else {
return; return;
}; };
let envelope = EventEnvelope::new(record.event_type, record.payload); record_event(pool, ws_manager.map(std::sync::Arc::as_ref), record).await;
let inserted = sqlx::query_as::<_, EventRecord>(
r#"
INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
)
.bind(envelope.event_type)
.bind(record.level)
.bind(record.unit_id)
.bind(record.equipment_id)
.bind(record.source_id)
.bind(record.message)
.bind(sqlx::types::Json(envelope.payload))
.fetch_one(pool)
.await;
match inserted {
Ok(record) => {
if let Some(ws_manager) = ws_manager {
let ws_message = WsMessage::EventCreated(record);
if let Err(err) = ws_manager.send_to_public(ws_message).await {
tracing::warn!("Failed to broadcast event websocket message: {}", err);
}
}
}
Err(err) => {
tracing::warn!("Failed to persist event: {}", err);
}
}
} }

View File

@ -44,86 +44,27 @@ pub enum PlatformEvent {
}, },
} }
/// Persists platform events to the `event` table and broadcasts via WebSocket. /// Platform-owned row for the `event` table.
pub async fn persist_and_broadcast( /// Apps construct this to write business events through [`record_event`].
event: &PlatformEvent, pub struct EventInsert {
pool: &sqlx::PgPool, pub event_type: &'static str,
ws_manager: &WebSocketManager, pub level: &'static str,
) { pub unit_id: Option<Uuid>,
let record = match event { pub equipment_id: Option<Uuid>,
PlatformEvent::SourceCreated { source_id } => { pub source_id: Option<Uuid>,
let name = fetch_source_name(pool, *source_id).await; pub message: String,
Some(( pub payload: Value,
"platform.source.created", }
"info",
None,
None,
Some(*source_id),
format!("Source {} created", name),
serde_json::json!({ "source_id": source_id }),
))
}
PlatformEvent::SourceUpdated { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.source.updated",
"info",
None,
None,
Some(*source_id),
format!("Source {} updated", name),
serde_json::json!({ "source_id": source_id }),
))
}
PlatformEvent::SourceDeleted {
source_id,
source_name,
} => Some((
"platform.source.deleted",
"warn",
None,
None,
None,
format!("Source {} deleted", source_name),
serde_json::json!({ "source_id": source_id }),
)),
PlatformEvent::PointsCreated {
source_id,
point_ids,
} => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.point.batch_created",
"info",
None,
None,
Some(*source_id),
format!("Created {} points for source {}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
PlatformEvent::PointsDeleted {
source_id,
point_ids,
} => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.point.batch_deleted",
"warn",
None,
None,
Some(*source_id),
format!("Deleted {} points for source {}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
};
let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record /// Inserts an event into the `event` table and optionally broadcasts via WebSocket.
else { /// This is the platform primitive used by both core platform events and app business events.
return; pub async fn record_event(
}; pool: &sqlx::PgPool,
let envelope = EventEnvelope::new(event_type, payload); ws_manager: Option<&WebSocketManager>,
event: EventInsert,
) {
let event_type = event.event_type;
let envelope = EventEnvelope::new(event_type, event.payload);
let inserted = sqlx::query_as::<_, EventRecord>( let inserted = sqlx::query_as::<_, EventRecord>(
r#" r#"
@ -133,28 +74,111 @@ pub async fn persist_and_broadcast(
"#, "#,
) )
.bind(envelope.event_type) .bind(envelope.event_type)
.bind(level) .bind(event.level)
.bind(unit_id as Option<Uuid>) .bind(event.unit_id)
.bind(equipment_id as Option<Uuid>) .bind(event.equipment_id)
.bind(source_id) .bind(event.source_id)
.bind(message) .bind(event.message)
.bind(sqlx::types::Json(envelope.payload)) .bind(sqlx::types::Json(envelope.payload))
.fetch_one(pool) .fetch_one(pool)
.await; .await;
match inserted { match inserted {
Ok(record) => { Ok(record) => {
let ws_message = WsMessage::EventCreated(record); if let Some(ws_manager) = ws_manager {
if let Err(err) = ws_manager.send_to_public(ws_message).await { let ws_message = WsMessage::EventCreated(record);
tracing::warn!("Failed to broadcast platform event: {}", err); if let Err(err) = ws_manager.send_to_public(ws_message).await {
tracing::warn!("Failed to broadcast event {}: {}", event_type, err);
}
} }
} }
Err(err) => { Err(err) => {
tracing::warn!("Failed to persist platform event: {}", err); tracing::warn!("Failed to persist event {}: {}", event_type, err);
} }
} }
} }
/// Persists platform events to the `event` table and broadcasts via WebSocket.
pub async fn persist_and_broadcast(
event: &PlatformEvent,
pool: &sqlx::PgPool,
ws_manager: &WebSocketManager,
) {
let record = match event {
PlatformEvent::SourceCreated { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some(EventInsert {
event_type: "platform.source.created",
level: "info",
unit_id: None,
equipment_id: None,
source_id: Some(*source_id),
message: format!("Source {} created", name),
payload: serde_json::json!({ "source_id": source_id }),
})
}
PlatformEvent::SourceUpdated { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some(EventInsert {
event_type: "platform.source.updated",
level: "info",
unit_id: None,
equipment_id: None,
source_id: Some(*source_id),
message: format!("Source {} updated", name),
payload: serde_json::json!({ "source_id": source_id }),
})
}
PlatformEvent::SourceDeleted {
source_id,
source_name,
} => Some(EventInsert {
event_type: "platform.source.deleted",
level: "warn",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Source {} deleted", source_name),
payload: serde_json::json!({ "source_id": source_id }),
}),
PlatformEvent::PointsCreated {
source_id,
point_ids,
} => {
let name = fetch_source_name(pool, *source_id).await;
Some(EventInsert {
event_type: "platform.point.batch_created",
level: "info",
unit_id: None,
equipment_id: None,
source_id: Some(*source_id),
message: format!("Created {} points for source {}", point_ids.len(), name),
payload: serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
})
}
PlatformEvent::PointsDeleted {
source_id,
point_ids,
} => {
let name = fetch_source_name(pool, *source_id).await;
Some(EventInsert {
event_type: "platform.point.batch_deleted",
level: "warn",
unit_id: None,
equipment_id: None,
source_id: Some(*source_id),
message: format!("Deleted {} points for source {}", point_ids.len(), name),
payload: serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
})
}
};
let Some(record) = record else {
return;
};
record_event(pool, Some(ws_manager), record).await;
}
async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String { async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
.bind(id) .bind(id)