Remove unit change platform event plumbing

This commit is contained in:
caoqianming 2026-04-21 19:12:09 +08:00
parent a49f6adf9b
commit dabcde1fca
7 changed files with 96 additions and 231 deletions

View File

@ -37,11 +37,7 @@ pub async fn run() {
.expect("Failed to bootstrap platform");
let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new());
let event_sink = Arc::new(crate::event::FeederPlatformEventSink::new(
control_runtime.clone(),
));
let platform = builder.build().with_event_sink(event_sink);
let platform = builder.build();
let event_manager = Arc::new(EventManager::new(
platform.pool.clone(),

View File

@ -88,38 +88,6 @@ impl EventManager {
}
}
/// Bridges platform events to feeder-specific side effects.
/// Connection management and telemetry are handled by core automatically.
pub struct FeederPlatformEventSink {
control_runtime: std::sync::Arc<crate::control::runtime::ControlRuntimeStore>,
}
impl FeederPlatformEventSink {
pub fn new(
control_runtime: std::sync::Arc<crate::control::runtime::ControlRuntimeStore>,
) -> Self {
Self { control_runtime }
}
}
impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEventSink {
fn on_event(&self, event: &plc_platform_core::event::PlatformEvent) {
match event {
plc_platform_core::event::PlatformEvent::UnitsChanged { unit_ids } => {
let runtime = self.control_runtime.clone();
let ids = unit_ids.clone();
tokio::spawn(async move {
for unit_id in ids {
runtime.notify_unit(unit_id).await;
}
});
}
// Other platform events: connection management handled by core.
_ => {}
}
}
}
async fn handle_control_event(
event: AppEvent,
pool: &sqlx::PgPool,

View File

@ -21,21 +21,30 @@ impl EventEnvelope {
}
/// Platform-level events emitted by core handlers.
/// Each variant carries enough context for persistence + app-specific side effects.
/// Each variant carries enough context for persistence and platform side effects.
#[derive(Debug, Clone)]
pub enum PlatformEvent {
SourceCreated { source_id: Uuid },
SourceUpdated { source_id: Uuid },
SourceDeleted { source_id: Uuid, source_name: String },
PointsCreated { source_id: Uuid, point_ids: Vec<Uuid> },
PointsDeleted { source_id: Uuid, point_ids: Vec<Uuid> },
UnitsChanged { unit_ids: Vec<Uuid> },
SourceCreated {
source_id: Uuid,
},
SourceUpdated {
source_id: Uuid,
},
SourceDeleted {
source_id: Uuid,
source_name: String,
},
PointsCreated {
source_id: Uuid,
point_ids: Vec<Uuid>,
},
PointsDeleted {
source_id: Uuid,
point_ids: Vec<Uuid>,
},
}
/// Persists platform events to the `event` table and broadcasts via WebSocket.
///
/// Apps get notified via `PlatformEventSink` for their own side effects
/// (connection management, control runtime, etc.).
pub async fn persist_and_broadcast(
event: &PlatformEvent,
pool: &sqlx::PgPool,
@ -45,8 +54,11 @@ pub async fn persist_and_broadcast(
PlatformEvent::SourceCreated { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.source.created", "info",
None, None, Some(*source_id),
"platform.source.created",
"info",
None,
None,
Some(*source_id),
format!("Source {} created", name),
serde_json::json!({ "source_id": source_id }),
))
@ -54,37 +66,57 @@ pub async fn persist_and_broadcast(
PlatformEvent::SourceUpdated { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.source.updated", "info",
None, None, Some(*source_id),
"platform.source.updated",
"info",
None,
None,
Some(*source_id),
format!("Source {} updated", name),
serde_json::json!({ "source_id": source_id }),
))
}
PlatformEvent::SourceDeleted { source_id, source_name } => Some((
"platform.source.deleted", "warn",
None, None, None,
PlatformEvent::SourceDeleted {
source_id,
source_name,
} => Some((
"platform.source.deleted",
"warn",
None,
None,
None,
format!("Source {} deleted", source_name),
serde_json::json!({ "source_id": source_id }),
)),
PlatformEvent::PointsCreated { source_id, point_ids } => {
PlatformEvent::PointsCreated {
source_id,
point_ids,
} => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.point.batch_created", "info",
None, None, Some(*source_id),
"platform.point.batch_created",
"info",
None,
None,
Some(*source_id),
format!("Created {} points for source {}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
PlatformEvent::PointsDeleted { source_id, point_ids } => {
PlatformEvent::PointsDeleted {
source_id,
point_ids,
} => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.point.batch_deleted", "warn",
None, None, Some(*source_id),
"platform.point.batch_deleted",
"warn",
None,
None,
Some(*source_id),
format!("Deleted {} points for source {}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
PlatformEvent::UnitsChanged { .. } => None,
};
let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record

View File

@ -1,4 +1,4 @@
use axum::{
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
@ -8,21 +8,11 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;
use validator::Validate;
use crate::platform_context::PlatformContext;
use crate::util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
};
use crate::platform_context::PlatformContext;
fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator<Item = Uuid>) {
let ids: Vec<Uuid> = {
let mut seen = std::collections::HashSet::new();
unit_ids.into_iter().filter(|id| seen.insert(*id)).collect()
};
if !ids.is_empty() {
state.emit_event(crate::event::PlatformEvent::UnitsChanged { unit_ids: ids });
}
}
#[derive(Deserialize, Validate)]
pub struct GetEquipmentListQuery {
@ -188,10 +178,6 @@ pub async fn create_equipment(
)
.await?;
if let Some(unit_id) = payload.unit_id {
notify_units(&state, [unit_id]);
}
Ok((
StatusCode::CREATED,
Json(serde_json::json!({
@ -217,12 +203,12 @@ pub async fn update_equipment(
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?;
let existing_equipment = if let Some(equipment) = exists {
equipment
} else {
if crate::service::get_equipment_by_id(&state.pool, equipment_id)
.await?
.is_none()
{
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
};
}
if let Some(Some(unit_id)) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
@ -255,19 +241,6 @@ pub async fn update_equipment(
)
.await?;
let mut unit_ids = Vec::new();
if let Some(unit_id) = existing_equipment.unit_id {
unit_ids.push(unit_id);
}
let next_unit_id = match payload.unit_id {
Some(next) => next,
None => existing_equipment.unit_id,
};
if let Some(unit_id) = next_unit_id {
unit_ids.push(unit_id);
}
notify_units(&state, unit_ids);
Ok(Json(serde_json::json!({
"ok_msg": "Equipment updated successfully"
})))
@ -293,9 +266,6 @@ pub async fn batch_set_equipment_unit(
}
}
let before_unit_ids =
crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?;
let updated_count = crate::service::batch_set_equipment_unit(
&state.pool,
&payload.equipment_ids,
@ -303,12 +273,6 @@ pub async fn batch_set_equipment_unit(
)
.await?;
let mut unit_ids = before_unit_ids;
if let Some(unit_id) = payload.unit_id {
unit_ids.push(unit_id);
}
notify_units(&state, unit_ids);
Ok(Json(serde_json::json!({
"ok_msg": "Equipment unit updated successfully",
"updated_count": updated_count
@ -319,15 +283,10 @@ pub async fn delete_equipment(
State(state): State<PlatformContext>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?;
let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
notify_units(&state, unit_ids);
Ok(StatusCode::NO_CONTENT)
}

View File

@ -17,16 +17,6 @@ use crate::util::{
response::ApiErr,
};
fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator<Item = Uuid>) {
let ids: Vec<Uuid> = {
let mut seen = std::collections::HashSet::new();
unit_ids.into_iter().filter(|id| seen.insert(*id)).collect()
};
if !ids.is_empty() {
state.emit_event(crate::event::PlatformEvent::UnitsChanged { unit_ids: ids });
}
}
/// List all points.
#[derive(Deserialize, Validate)]
pub struct GetPointListQuery {
@ -245,8 +235,6 @@ pub async fn update_point(
if existing_point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let mut qb: QueryBuilder<sqlx::Postgres> = QueryBuilder::new("UPDATE point SET ");
let mut wrote_field = false;
@ -301,9 +289,6 @@ pub async fn update_point(
qb.push(" WHERE id = ").push_bind(point_id);
qb.build().execute(pool).await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids));
Ok(Json(
serde_json::json!({"ok_msg": "Point updated successfully"}),
))
@ -404,8 +389,6 @@ pub async fn batch_set_point_equipment(
return Err(ApiErr::NotFound("No valid points found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
let result = sqlx::query(
r#"
UPDATE point
@ -421,9 +404,6 @@ pub async fn batch_set_point_equipment(
.execute(pool)
.await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids));
Ok(Json(serde_json::json!({
"ok_msg": "Point equipment updated successfully",
"updated_count": result.rows_affected()
@ -436,8 +416,6 @@ pub async fn delete_point(
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let source_id = {
let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?;
grouped.keys().next().copied()
@ -465,8 +443,6 @@ pub async fn delete_point(
});
}
notify_units(&state, affected_unit_ids);
Ok(Json(
serde_json::json!({"ok_msg": "Point deleted successfully"}),
))
@ -614,7 +590,6 @@ pub async fn batch_delete_points(
let point_ids = payload.point_ids;
let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?;
let existing_point_ids: Vec<Uuid> = grouped
.values()
.flat_map(|points| points.iter().map(|p| p.point_id))
@ -637,8 +612,6 @@ pub async fn batch_delete_points(
});
}
notify_units(&state, affected_unit_ids);
Ok(Json(BatchDeletePointsRes {
deleted_count: result.rows_affected(),
}))

View File

@ -1,26 +1,13 @@
use std::sync::Arc;
use crate::connection::ConnectionManager;
use crate::event::PlatformEvent;
use crate::websocket::WebSocketManager;
/// Callback interface for app-specific side effects on platform events.
///
/// Platform-level concerns (event persistence, WebSocket broadcast, connection
/// management) are handled automatically by `PlatformContext::emit_event()`.
/// Implementations only need to handle truly app-specific behavior
/// (e.g. feeder's control runtime notifications).
pub trait PlatformEventSink: Send + Sync {
/// Called for every platform event. Override to add app-specific side effects.
fn on_event(&self, event: &PlatformEvent);
}
use std::sync::Arc;
#[derive(Clone)]
pub struct PlatformContext {
pub pool: sqlx::PgPool,
pub connection_manager: Arc<ConnectionManager>,
pub ws_manager: Arc<WebSocketManager>,
pub event_sink: Option<Arc<dyn PlatformEventSink>>,
}
impl PlatformContext {
@ -33,27 +20,14 @@ impl PlatformContext {
pool,
connection_manager,
ws_manager,
event_sink: None,
}
}
pub fn with_event_sink(mut self, sink: Arc<dyn PlatformEventSink>) -> Self {
self.event_sink = Some(sink);
self
}
/// Emit a platform event.
///
/// Synchronously notifies the app-specific sink, then spawns async work for:
/// - Event persistence to DB + WebSocket broadcast
/// - Connection management side effects (connect, subscribe, etc.)
/// Spawns async work for event persistence, WebSocket broadcast, and
/// connection management side effects (connect, subscribe, etc.).
pub fn emit_event(&self, event: PlatformEvent) {
// 1. Notify app-specific sink synchronously (fire-and-forget via channels).
if let Some(sink) = &self.event_sink {
sink.on_event(&event);
}
// 2. Spawn async: persistence + WS broadcast + connection management.
let pool = self.pool.clone();
let ws_manager = self.ws_manager.clone();
let cm = self.connection_manager.clone();
@ -120,9 +94,6 @@ impl PlatformContext {
tracing::error!("Failed to unsubscribe points: {}", e);
}
}
PlatformEvent::UnitsChanged { .. } => {
// No platform-level side effect; app handles via on_event().
}
}
});
}

View File

@ -1,4 +1,4 @@
use crate::model::{ControlUnit, EventRecord};
use crate::model::{ControlUnit, EventRecord};
use sqlx::{PgPool, QueryBuilder, Row};
use uuid::Uuid;
@ -31,9 +31,11 @@ pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result<i64
.fetch_one(pool)
.await
}
None => sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#)
.fetch_one(pool)
.await,
None => {
sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#)
.fetch_one(pool)
.await
}
}
}
@ -58,9 +60,9 @@ pub async fn get_units_paginated(
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
.bind(like)
.fetch_all(pool)
.await
.bind(like)
.fetch_all(pool)
.await
} else {
let sql = format!(
r#"
@ -73,19 +75,17 @@ pub async fn get_units_paginated(
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
.bind(like)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
.bind(like)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
None => {
if page_size == -1 {
let sql = format!("SELECT * FROM unit ORDER BY {}", unit_order);
sqlx::query_as::<_, ControlUnit>(&sql)
.fetch_all(pool)
.await
sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await
} else {
let sql = format!(
r#"
@ -97,10 +97,10 @@ pub async fn get_units_paginated(
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
}
@ -138,10 +138,7 @@ pub struct CreateUnitParams<'a> {
pub require_manual_ack_after_fault: bool,
}
pub async fn create_unit(
pool: &PgPool,
params: CreateUnitParams<'_>,
) -> Result<Uuid, sqlx::Error> {
pub async fn create_unit(pool: &PgPool, params: CreateUnitParams<'_>) -> Result<Uuid, sqlx::Error> {
let unit_id = Uuid::new_v4();
sqlx::query(
r#"
@ -222,10 +219,7 @@ pub async fn update_unit(
param_count += 1;
}
if params.require_manual_ack_after_fault.is_some() {
updates.push(format!(
"require_manual_ack_after_fault = ${}",
param_count
));
updates.push(format!("require_manual_ack_after_fault = ${}", param_count));
param_count += 1;
}
@ -285,8 +279,7 @@ pub async fn get_events_count(
unit_id: Option<Uuid>,
event_type: Option<&str>,
) -> Result<i64, sqlx::Error> {
let mut qb =
QueryBuilder::new("SELECT COUNT(*)::BIGINT FROM event WHERE 1 = 1");
let mut qb = QueryBuilder::new("SELECT COUNT(*)::BIGINT FROM event WHERE 1 = 1");
if let Some(unit_id) = unit_id {
qb.push(" AND unit_id = ").push_bind(unit_id);
@ -329,9 +322,7 @@ pub async fn get_all_enabled_units(pool: &PgPool) -> Result<Vec<ControlUnit>, sq
"SELECT * FROM unit WHERE enabled = TRUE ORDER BY {}",
unit_order_clause()
);
sqlx::query_as::<_, ControlUnit>(&sql)
.fetch_all(pool)
.await
sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await
}
pub async fn get_equipment_by_unit_ids(
@ -346,9 +337,9 @@ pub async fn get_equipment_by_unit_ids(
equipment_order_clause_with_unit()
);
sqlx::query_as::<_, crate::model::Equipment>(&sql)
.bind(unit_ids)
.fetch_all(pool)
.await
.bind(unit_ids)
.fetch_all(pool)
.await
}
pub async fn get_equipment_by_unit_id(
@ -360,9 +351,9 @@ pub async fn get_equipment_by_unit_id(
unit_order_clause()
);
sqlx::query_as::<_, crate::model::Equipment>(&sql)
.bind(unit_id)
.fetch_all(pool)
.await
.bind(unit_id)
.fetch_all(pool)
.await
}
pub async fn get_points_by_equipment_ids(
@ -403,30 +394,6 @@ pub async fn get_unit_ids_by_equipment_ids(
Ok(rows)
}
pub async fn get_unit_ids_by_point_ids(
pool: &PgPool,
point_ids: &[Uuid],
) -> Result<Vec<Uuid>, sqlx::Error> {
if point_ids.is_empty() {
return Ok(vec![]);
}
let rows = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT DISTINCT e.unit_id
FROM point p
INNER JOIN equipment e ON e.id = p.equipment_id
WHERE p.id = ANY($1)
AND e.unit_id IS NOT NULL
"#,
)
.bind(point_ids)
.fetch_all(pool)
.await?;
Ok(rows)
}
pub struct EquipmentSignalRole {
pub equipment_id: Uuid,
pub point_id: Uuid,
@ -507,4 +474,3 @@ pub async fn get_equipment_role_points(
})
.collect())
}