diff --git a/crates/app_feeder_distributor/src/app.rs b/crates/app_feeder_distributor/src/app.rs index 0dc52e5..6143b30 100644 --- a/crates/app_feeder_distributor/src/app.rs +++ b/crates/app_feeder_distributor/src/app.rs @@ -37,11 +37,7 @@ pub async fn run() { .expect("Failed to bootstrap platform"); let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new()); - - let event_sink = Arc::new(crate::event::FeederPlatformEventSink::new( - control_runtime.clone(), - )); - let platform = builder.build().with_event_sink(event_sink); + let platform = builder.build(); let event_manager = Arc::new(EventManager::new( platform.pool.clone(), diff --git a/crates/app_feeder_distributor/src/event.rs b/crates/app_feeder_distributor/src/event.rs index 517e479..9e32dc7 100644 --- a/crates/app_feeder_distributor/src/event.rs +++ b/crates/app_feeder_distributor/src/event.rs @@ -88,38 +88,6 @@ impl EventManager { } } -/// Bridges platform events to feeder-specific side effects. -/// Connection management and telemetry are handled by core automatically. -pub struct FeederPlatformEventSink { - control_runtime: std::sync::Arc, -} - -impl FeederPlatformEventSink { - pub fn new( - control_runtime: std::sync::Arc, - ) -> Self { - Self { control_runtime } - } -} - -impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEventSink { - fn on_event(&self, event: &plc_platform_core::event::PlatformEvent) { - match event { - plc_platform_core::event::PlatformEvent::UnitsChanged { unit_ids } => { - let runtime = self.control_runtime.clone(); - let ids = unit_ids.clone(); - tokio::spawn(async move { - for unit_id in ids { - runtime.notify_unit(unit_id).await; - } - }); - } - // Other platform events: connection management handled by core. - _ => {} - } - } -} - async fn handle_control_event( event: AppEvent, pool: &sqlx::PgPool, diff --git a/crates/plc_platform_core/src/event.rs b/crates/plc_platform_core/src/event.rs index 562d0a4..7e8da4f 100644 --- a/crates/plc_platform_core/src/event.rs +++ b/crates/plc_platform_core/src/event.rs @@ -21,21 +21,30 @@ impl EventEnvelope { } /// Platform-level events emitted by core handlers. -/// Each variant carries enough context for persistence + app-specific side effects. +/// Each variant carries enough context for persistence and platform side effects. #[derive(Debug, Clone)] pub enum PlatformEvent { - SourceCreated { source_id: Uuid }, - SourceUpdated { source_id: Uuid }, - SourceDeleted { source_id: Uuid, source_name: String }, - PointsCreated { source_id: Uuid, point_ids: Vec }, - PointsDeleted { source_id: Uuid, point_ids: Vec }, - UnitsChanged { unit_ids: Vec }, + SourceCreated { + source_id: Uuid, + }, + SourceUpdated { + source_id: Uuid, + }, + SourceDeleted { + source_id: Uuid, + source_name: String, + }, + PointsCreated { + source_id: Uuid, + point_ids: Vec, + }, + PointsDeleted { + source_id: Uuid, + point_ids: Vec, + }, } /// Persists platform events to the `event` table and broadcasts via WebSocket. -/// -/// Apps get notified via `PlatformEventSink` for their own side effects -/// (connection management, control runtime, etc.). pub async fn persist_and_broadcast( event: &PlatformEvent, pool: &sqlx::PgPool, @@ -45,8 +54,11 @@ pub async fn persist_and_broadcast( PlatformEvent::SourceCreated { source_id } => { let name = fetch_source_name(pool, *source_id).await; Some(( - "platform.source.created", "info", - None, None, Some(*source_id), + "platform.source.created", + "info", + None, + None, + Some(*source_id), format!("Source {} created", name), serde_json::json!({ "source_id": source_id }), )) @@ -54,37 +66,57 @@ pub async fn persist_and_broadcast( PlatformEvent::SourceUpdated { source_id } => { let name = fetch_source_name(pool, *source_id).await; Some(( - "platform.source.updated", "info", - None, None, Some(*source_id), + "platform.source.updated", + "info", + None, + None, + Some(*source_id), format!("Source {} updated", name), serde_json::json!({ "source_id": source_id }), )) } - PlatformEvent::SourceDeleted { source_id, source_name } => Some(( - "platform.source.deleted", "warn", - None, None, None, + PlatformEvent::SourceDeleted { + source_id, + source_name, + } => Some(( + "platform.source.deleted", + "warn", + None, + None, + None, format!("Source {} deleted", source_name), serde_json::json!({ "source_id": source_id }), )), - PlatformEvent::PointsCreated { source_id, point_ids } => { + PlatformEvent::PointsCreated { + source_id, + point_ids, + } => { let name = fetch_source_name(pool, *source_id).await; Some(( - "platform.point.batch_created", "info", - None, None, Some(*source_id), + "platform.point.batch_created", + "info", + None, + None, + Some(*source_id), format!("Created {} points for source {}", point_ids.len(), name), serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), )) } - PlatformEvent::PointsDeleted { source_id, point_ids } => { + PlatformEvent::PointsDeleted { + source_id, + point_ids, + } => { let name = fetch_source_name(pool, *source_id).await; Some(( - "platform.point.batch_deleted", "warn", - None, None, Some(*source_id), + "platform.point.batch_deleted", + "warn", + None, + None, + Some(*source_id), format!("Deleted {} points for source {}", point_ids.len(), name), serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), )) } - PlatformEvent::UnitsChanged { .. } => None, }; let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record diff --git a/crates/plc_platform_core/src/handler/equipment.rs b/crates/plc_platform_core/src/handler/equipment.rs index 6c6d8dd..a7afc60 100644 --- a/crates/plc_platform_core/src/handler/equipment.rs +++ b/crates/plc_platform_core/src/handler/equipment.rs @@ -1,4 +1,4 @@ -use axum::{ +use axum::{ extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, @@ -8,21 +8,11 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; use validator::Validate; +use crate::platform_context::PlatformContext; use crate::util::{ pagination::{PaginatedResponse, PaginationParams}, response::ApiErr, }; -use crate::platform_context::PlatformContext; - -fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator) { - let ids: Vec = { - let mut seen = std::collections::HashSet::new(); - unit_ids.into_iter().filter(|id| seen.insert(*id)).collect() - }; - if !ids.is_empty() { - state.emit_event(crate::event::PlatformEvent::UnitsChanged { unit_ids: ids }); - } -} #[derive(Deserialize, Validate)] pub struct GetEquipmentListQuery { @@ -188,10 +178,6 @@ pub async fn create_equipment( ) .await?; - if let Some(unit_id) = payload.unit_id { - notify_units(&state, [unit_id]); - } - Ok(( StatusCode::CREATED, Json(serde_json::json!({ @@ -217,12 +203,12 @@ pub async fn update_equipment( return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } - let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; - let existing_equipment = if let Some(equipment) = exists { - equipment - } else { + if crate::service::get_equipment_by_id(&state.pool, equipment_id) + .await? + .is_none() + { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); - }; + } if let Some(Some(unit_id)) = payload.unit_id { let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; @@ -255,19 +241,6 @@ pub async fn update_equipment( ) .await?; - let mut unit_ids = Vec::new(); - if let Some(unit_id) = existing_equipment.unit_id { - unit_ids.push(unit_id); - } - let next_unit_id = match payload.unit_id { - Some(next) => next, - None => existing_equipment.unit_id, - }; - if let Some(unit_id) = next_unit_id { - unit_ids.push(unit_id); - } - notify_units(&state, unit_ids); - Ok(Json(serde_json::json!({ "ok_msg": "Equipment updated successfully" }))) @@ -293,9 +266,6 @@ pub async fn batch_set_equipment_unit( } } - let before_unit_ids = - crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?; - let updated_count = crate::service::batch_set_equipment_unit( &state.pool, &payload.equipment_ids, @@ -303,12 +273,6 @@ pub async fn batch_set_equipment_unit( ) .await?; - let mut unit_ids = before_unit_ids; - if let Some(unit_id) = payload.unit_id { - unit_ids.push(unit_id); - } - notify_units(&state, unit_ids); - Ok(Json(serde_json::json!({ "ok_msg": "Equipment unit updated successfully", "updated_count": updated_count @@ -319,15 +283,10 @@ pub async fn delete_equipment( State(state): State, Path(equipment_id): Path, ) -> Result { - let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?; let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?; if !deleted { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } - notify_units(&state, unit_ids); - Ok(StatusCode::NO_CONTENT) } - - diff --git a/crates/plc_platform_core/src/handler/point.rs b/crates/plc_platform_core/src/handler/point.rs index 01c86f1..0d3d4ff 100644 --- a/crates/plc_platform_core/src/handler/point.rs +++ b/crates/plc_platform_core/src/handler/point.rs @@ -17,16 +17,6 @@ use crate::util::{ response::ApiErr, }; -fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator) { - let ids: Vec = { - let mut seen = std::collections::HashSet::new(); - unit_ids.into_iter().filter(|id| seen.insert(*id)).collect() - }; - if !ids.is_empty() { - state.emit_event(crate::event::PlatformEvent::UnitsChanged { unit_ids: ids }); - } -} - /// List all points. #[derive(Deserialize, Validate)] pub struct GetPointListQuery { @@ -245,8 +235,6 @@ pub async fn update_point( if existing_point.is_none() { return Err(ApiErr::NotFound("Point not found".to_string(), None)); } - let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; - let mut qb: QueryBuilder = QueryBuilder::new("UPDATE point SET "); let mut wrote_field = false; @@ -301,9 +289,6 @@ pub async fn update_point( qb.push(" WHERE id = ").push_bind(point_id); qb.build().execute(pool).await?; - let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; - notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)); - Ok(Json( serde_json::json!({"ok_msg": "Point updated successfully"}), )) @@ -404,8 +389,6 @@ pub async fn batch_set_point_equipment( return Err(ApiErr::NotFound("No valid points found".to_string(), None)); } - let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; - let result = sqlx::query( r#" UPDATE point @@ -421,9 +404,6 @@ pub async fn batch_set_point_equipment( .execute(pool) .await?; - let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; - notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)); - Ok(Json(serde_json::json!({ "ok_msg": "Point equipment updated successfully", "updated_count": result.rows_affected() @@ -436,8 +416,6 @@ pub async fn delete_point( Path(point_id): Path, ) -> Result { let pool = &state.pool; - let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; - let source_id = { let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?; grouped.keys().next().copied() @@ -465,8 +443,6 @@ pub async fn delete_point( }); } - notify_units(&state, affected_unit_ids); - Ok(Json( serde_json::json!({"ok_msg": "Point deleted successfully"}), )) @@ -614,7 +590,6 @@ pub async fn batch_delete_points( let point_ids = payload.point_ids; let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?; - let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?; let existing_point_ids: Vec = grouped .values() .flat_map(|points| points.iter().map(|p| p.point_id)) @@ -637,8 +612,6 @@ pub async fn batch_delete_points( }); } - notify_units(&state, affected_unit_ids); - Ok(Json(BatchDeletePointsRes { deleted_count: result.rows_affected(), })) diff --git a/crates/plc_platform_core/src/platform_context.rs b/crates/plc_platform_core/src/platform_context.rs index 43c3fc9..13dfb76 100644 --- a/crates/plc_platform_core/src/platform_context.rs +++ b/crates/plc_platform_core/src/platform_context.rs @@ -1,26 +1,13 @@ -use std::sync::Arc; - use crate::connection::ConnectionManager; use crate::event::PlatformEvent; use crate::websocket::WebSocketManager; - -/// Callback interface for app-specific side effects on platform events. -/// -/// Platform-level concerns (event persistence, WebSocket broadcast, connection -/// management) are handled automatically by `PlatformContext::emit_event()`. -/// Implementations only need to handle truly app-specific behavior -/// (e.g. feeder's control runtime notifications). -pub trait PlatformEventSink: Send + Sync { - /// Called for every platform event. Override to add app-specific side effects. - fn on_event(&self, event: &PlatformEvent); -} +use std::sync::Arc; #[derive(Clone)] pub struct PlatformContext { pub pool: sqlx::PgPool, pub connection_manager: Arc, pub ws_manager: Arc, - pub event_sink: Option>, } impl PlatformContext { @@ -33,27 +20,14 @@ impl PlatformContext { pool, connection_manager, ws_manager, - event_sink: None, } } - pub fn with_event_sink(mut self, sink: Arc) -> Self { - self.event_sink = Some(sink); - self - } - /// Emit a platform event. /// - /// Synchronously notifies the app-specific sink, then spawns async work for: - /// - Event persistence to DB + WebSocket broadcast - /// - Connection management side effects (connect, subscribe, etc.) + /// Spawns async work for event persistence, WebSocket broadcast, and + /// connection management side effects (connect, subscribe, etc.). pub fn emit_event(&self, event: PlatformEvent) { - // 1. Notify app-specific sink synchronously (fire-and-forget via channels). - if let Some(sink) = &self.event_sink { - sink.on_event(&event); - } - - // 2. Spawn async: persistence + WS broadcast + connection management. let pool = self.pool.clone(); let ws_manager = self.ws_manager.clone(); let cm = self.connection_manager.clone(); @@ -120,9 +94,6 @@ impl PlatformContext { tracing::error!("Failed to unsubscribe points: {}", e); } } - PlatformEvent::UnitsChanged { .. } => { - // No platform-level side effect; app handles via on_event(). - } } }); } diff --git a/crates/plc_platform_core/src/service/control.rs b/crates/plc_platform_core/src/service/control.rs index 907de78..530ea14 100644 --- a/crates/plc_platform_core/src/service/control.rs +++ b/crates/plc_platform_core/src/service/control.rs @@ -1,4 +1,4 @@ -use crate::model::{ControlUnit, EventRecord}; +use crate::model::{ControlUnit, EventRecord}; use sqlx::{PgPool, QueryBuilder, Row}; use uuid::Uuid; @@ -31,9 +31,11 @@ pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#) - .fetch_one(pool) - .await, + None => { + sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#) + .fetch_one(pool) + .await + } } } @@ -58,9 +60,9 @@ pub async fn get_units_paginated( unit_order ); sqlx::query_as::<_, ControlUnit>(&sql) - .bind(like) - .fetch_all(pool) - .await + .bind(like) + .fetch_all(pool) + .await } else { let sql = format!( r#" @@ -73,19 +75,17 @@ pub async fn get_units_paginated( unit_order ); sqlx::query_as::<_, ControlUnit>(&sql) - .bind(like) - .bind(page_size as i64) - .bind(offset as i64) - .fetch_all(pool) - .await + .bind(like) + .bind(page_size as i64) + .bind(offset as i64) + .fetch_all(pool) + .await } } None => { if page_size == -1 { let sql = format!("SELECT * FROM unit ORDER BY {}", unit_order); - sqlx::query_as::<_, ControlUnit>(&sql) - .fetch_all(pool) - .await + sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await } else { let sql = format!( r#" @@ -97,10 +97,10 @@ pub async fn get_units_paginated( unit_order ); sqlx::query_as::<_, ControlUnit>(&sql) - .bind(page_size as i64) - .bind(offset as i64) - .fetch_all(pool) - .await + .bind(page_size as i64) + .bind(offset as i64) + .fetch_all(pool) + .await } } } @@ -138,10 +138,7 @@ pub struct CreateUnitParams<'a> { pub require_manual_ack_after_fault: bool, } -pub async fn create_unit( - pool: &PgPool, - params: CreateUnitParams<'_>, -) -> Result { +pub async fn create_unit(pool: &PgPool, params: CreateUnitParams<'_>) -> Result { let unit_id = Uuid::new_v4(); sqlx::query( r#" @@ -222,10 +219,7 @@ pub async fn update_unit( param_count += 1; } if params.require_manual_ack_after_fault.is_some() { - updates.push(format!( - "require_manual_ack_after_fault = ${}", - param_count - )); + updates.push(format!("require_manual_ack_after_fault = ${}", param_count)); param_count += 1; } @@ -285,8 +279,7 @@ pub async fn get_events_count( unit_id: Option, event_type: Option<&str>, ) -> Result { - let mut qb = - QueryBuilder::new("SELECT COUNT(*)::BIGINT FROM event WHERE 1 = 1"); + let mut qb = QueryBuilder::new("SELECT COUNT(*)::BIGINT FROM event WHERE 1 = 1"); if let Some(unit_id) = unit_id { qb.push(" AND unit_id = ").push_bind(unit_id); @@ -329,9 +322,7 @@ pub async fn get_all_enabled_units(pool: &PgPool) -> Result, sq "SELECT * FROM unit WHERE enabled = TRUE ORDER BY {}", unit_order_clause() ); - sqlx::query_as::<_, ControlUnit>(&sql) - .fetch_all(pool) - .await + sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await } pub async fn get_equipment_by_unit_ids( @@ -346,9 +337,9 @@ pub async fn get_equipment_by_unit_ids( equipment_order_clause_with_unit() ); sqlx::query_as::<_, crate::model::Equipment>(&sql) - .bind(unit_ids) - .fetch_all(pool) - .await + .bind(unit_ids) + .fetch_all(pool) + .await } pub async fn get_equipment_by_unit_id( @@ -360,9 +351,9 @@ pub async fn get_equipment_by_unit_id( unit_order_clause() ); sqlx::query_as::<_, crate::model::Equipment>(&sql) - .bind(unit_id) - .fetch_all(pool) - .await + .bind(unit_id) + .fetch_all(pool) + .await } pub async fn get_points_by_equipment_ids( @@ -403,30 +394,6 @@ pub async fn get_unit_ids_by_equipment_ids( Ok(rows) } -pub async fn get_unit_ids_by_point_ids( - pool: &PgPool, - point_ids: &[Uuid], -) -> Result, sqlx::Error> { - if point_ids.is_empty() { - return Ok(vec![]); - } - - let rows = sqlx::query_scalar::<_, Uuid>( - r#" - SELECT DISTINCT e.unit_id - FROM point p - INNER JOIN equipment e ON e.id = p.equipment_id - WHERE p.id = ANY($1) - AND e.unit_id IS NOT NULL - "#, - ) - .bind(point_ids) - .fetch_all(pool) - .await?; - - Ok(rows) -} - pub struct EquipmentSignalRole { pub equipment_id: Uuid, pub point_id: Uuid, @@ -507,4 +474,3 @@ pub async fn get_equipment_role_points( }) .collect()) } -