diff --git a/crates/app_feeder_distributor/src/app.rs b/crates/app_feeder_distributor/src/app.rs index 1e3364a..04e8c81 100644 --- a/crates/app_feeder_distributor/src/app.rs +++ b/crates/app_feeder_distributor/src/app.rs @@ -8,6 +8,7 @@ use crate::{ router::build_router, websocket::WebSocketManager, }; +use axum::extract::FromRef; use plc_platform_core::platform_context::PlatformContext; use tokio::sync::mpsc; @@ -19,6 +20,12 @@ pub struct AppState { pub control_runtime: Arc, } +impl FromRef for PlatformContext { + fn from_ref(state: &AppState) -> Self { + state.platform.clone() + } +} + pub async fn run() { dotenv::dotenv().ok(); @@ -50,10 +57,14 @@ pub async fn run() { builder.connection_manager.set_event_manager(event_manager.clone()); builder.connection_manager.set_pool_and_start_reconnect_task(Arc::new(builder.pool.clone())); - let platform = builder.build(); - let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new()); + let event_sink = Arc::new(crate::event::FeederPlatformEventSink::new( + event_manager.clone(), + control_runtime.clone(), + )); + let platform = builder.build().with_event_sink(event_sink); + let sources = crate::service::get_all_enabled_sources(&platform.pool) .await .expect("Failed to fetch sources"); diff --git a/crates/app_feeder_distributor/src/event.rs b/crates/app_feeder_distributor/src/event.rs index 7f68d22..af5d00c 100644 --- a/crates/app_feeder_distributor/src/event.rs +++ b/crates/app_feeder_distributor/src/event.rs @@ -143,6 +143,52 @@ impl EventManager { } } +/// Adapter that bridges platform handler events to feeder's EventManager + ControlRuntime. +pub struct FeederPlatformEventSink { + event_manager: std::sync::Arc, + control_runtime: std::sync::Arc, +} + +impl FeederPlatformEventSink { + pub fn new( + event_manager: std::sync::Arc, + control_runtime: std::sync::Arc, + ) -> Self { + Self { event_manager, control_runtime } + } +} + +impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEventSink { + fn on_source_created(&self, source_id: Uuid) { + let _ = self.event_manager.send(AppEvent::SourceCreate { source_id }); + } + + fn on_source_updated(&self, source_id: Uuid) { + let _ = self.event_manager.send(AppEvent::SourceUpdate { source_id }); + } + + fn on_source_deleted(&self, source_id: Uuid, source_name: String) { + let _ = self.event_manager.send(AppEvent::SourceDelete { source_id, source_name }); + } + + fn on_points_created(&self, source_id: Uuid, point_ids: Vec) { + let _ = self.event_manager.send(AppEvent::PointCreateBatch { source_id, point_ids }); + } + + fn on_points_deleted(&self, source_id: Uuid, point_ids: Vec) { + let _ = self.event_manager.send(AppEvent::PointDeleteBatch { source_id, point_ids }); + } + + fn on_units_changed(&self, unit_ids: Vec) { + let runtime = self.control_runtime.clone(); + tokio::spawn(async move { + for unit_id in unit_ids { + runtime.notify_unit(unit_id).await; + } + }); + } +} + impl plc_platform_core::connection::PointEventSink for EventManager { fn send_point_new_value( &self, @@ -273,16 +319,6 @@ async fn handle_control_event( } } -async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String { - sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") - .bind(id) - .fetch_optional(pool) - .await - .ok() - .flatten() - .unwrap_or_else(|| id.to_string()) -} - async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String { sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1") .bind(id) @@ -308,53 +344,18 @@ async fn persist_event_if_needed( pool: &sqlx::PgPool, ws_manager: Option<&std::sync::Arc>, ) { - let record = match event { - AppEvent::SourceCreate { source_id } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "source.created", "info", - None, None, Some(*source_id), - format!("Source {} created", name), - serde_json::json!({ "source_id": source_id }), - )) - } - AppEvent::SourceUpdate { source_id } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "source.updated", "info", - None, None, Some(*source_id), - format!("Source {} updated", name), - serde_json::json!({ "source_id": source_id }), - )) - } - AppEvent::SourceDelete { source_id, source_name } => Some(( - "source.deleted", "warn", - None, None, None, - format!("Source {} deleted", source_name), - serde_json::json!({ "source_id": source_id }), - )), - AppEvent::PointCreateBatch { source_id, point_ids } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "point.batch_created", "info", - None, None, Some(*source_id), - format!("Created {} points for source {}", point_ids.len(), name), - serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), - )) - } - AppEvent::PointDeleteBatch { source_id, point_ids } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "point.batch_deleted", "warn", - None, None, Some(*source_id), - format!("Deleted {} points for source {}", point_ids.len(), name), - serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), - )) - } + let record: Option<(&str, &str, Option, Option, Option, String, serde_json::Value)> = match event { + // Platform events — persistence is handled by core's emit_event(). + AppEvent::SourceCreate { .. } => None, + AppEvent::SourceUpdate { .. } => None, + AppEvent::SourceDelete { .. } => None, + AppEvent::PointCreateBatch { .. } => None, + AppEvent::PointDeleteBatch { .. } => None, + // Feeder-specific events — persisted here. AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => { let code = fetch_equipment_code(pool, *equipment_id).await; Some(( - "equipment.start_command_sent", "info", + "feeder.equipment.start_command_sent", "info", *unit_id, Some(*equipment_id), None, format!("Start command sent to equipment {}", code), serde_json::json!({ @@ -367,7 +368,7 @@ async fn persist_event_if_needed( AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id } => { let code = fetch_equipment_code(pool, *equipment_id).await; Some(( - "equipment.stop_command_sent", "info", + "feeder.equipment.stop_command_sent", "info", *unit_id, Some(*equipment_id), None, format!("Stop command sent to equipment {}", code), serde_json::json!({ @@ -380,7 +381,7 @@ async fn persist_event_if_needed( AppEvent::AutoControlStarted { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "unit.auto_control_started", "info", + "feeder.unit.auto_control_started", "info", Some(*unit_id), None, None, format!("Auto control started for unit {}", code), serde_json::json!({ "unit_id": unit_id }), @@ -389,7 +390,7 @@ async fn persist_event_if_needed( AppEvent::AutoControlStopped { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "unit.auto_control_stopped", "info", + "feeder.unit.auto_control_stopped", "info", Some(*unit_id), None, None, format!("Auto control stopped for unit {}", code), serde_json::json!({ "unit_id": unit_id }), @@ -399,7 +400,7 @@ async fn persist_event_if_needed( let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; Some(( - "unit.fault_locked", "error", + "feeder.unit.fault_locked", "error", Some(*unit_id), Some(*equipment_id), None, format!("Fault locked for unit {} by equipment {}", unit_code, eq_code), serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), @@ -408,7 +409,7 @@ async fn persist_event_if_needed( AppEvent::FaultAcked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "unit.fault_acked", "info", + "feeder.unit.fault_acked", "info", Some(*unit_id), None, None, format!("Fault acknowledged for unit {}", code), serde_json::json!({ "unit_id": unit_id }), @@ -417,7 +418,7 @@ async fn persist_event_if_needed( AppEvent::CommLocked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "unit.comm_locked", "warn", + "feeder.unit.comm_locked", "warn", Some(*unit_id), None, None, format!("Communication locked for unit {}", code), serde_json::json!({ "unit_id": unit_id }), @@ -426,7 +427,7 @@ async fn persist_event_if_needed( AppEvent::CommRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "unit.comm_recovered", "info", + "feeder.unit.comm_recovered", "info", Some(*unit_id), None, None, format!("Communication recovered for unit {}", code), serde_json::json!({ "unit_id": unit_id }), @@ -436,7 +437,7 @@ async fn persist_event_if_needed( let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; Some(( - "unit.rem_local", "warn", + "feeder.unit.rem_local", "warn", Some(*unit_id), Some(*equipment_id), None, format!("Unit {} switched to local control via equipment {}", unit_code, eq_code), serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), @@ -445,7 +446,7 @@ async fn persist_event_if_needed( AppEvent::RemRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "unit.rem_recovered", "warn", + "feeder.unit.rem_recovered", "warn", Some(*unit_id), None, None, format!("Unit {} returned to remote control; auto control requires manual restart", code), serde_json::json!({ "unit_id": unit_id }), diff --git a/crates/app_feeder_distributor/src/handler.rs b/crates/app_feeder_distributor/src/handler.rs index 4155c59..dc08b75 100644 --- a/crates/app_feeder_distributor/src/handler.rs +++ b/crates/app_feeder_distributor/src/handler.rs @@ -1,10 +1,18 @@ -pub mod control; +pub mod control; pub mod doc; -pub mod equipment; +pub mod equipment { + pub use plc_platform_core::handler::equipment::*; +} pub mod log { pub use plc_platform_core::handler::log::*; } -pub mod page; +pub mod page { + pub use plc_platform_core::handler::page::*; +} pub mod point; -pub mod source; -pub mod tag; +pub mod source { + pub use plc_platform_core::handler::source::*; +} +pub mod tag { + pub use plc_platform_core::handler::tag::*; +} diff --git a/crates/app_feeder_distributor/src/handler/point.rs b/crates/app_feeder_distributor/src/handler/point.rs index 9f500a9..1b72b43 100644 --- a/crates/app_feeder_distributor/src/handler/point.rs +++ b/crates/app_feeder_distributor/src/handler/point.rs @@ -1,662 +1,21 @@ +// Re-export all platform point handlers from core. +pub use plc_platform_core::handler::point::*; + use axum::{ - extract::{Path, Query, State}, + extract::State, http::HeaderMap, response::IntoResponse, Json, }; -use serde::{Deserialize, Serialize}; -use serde_with::rust::double_option; -use sqlx::{QueryBuilder, Row}; -use std::collections::{HashMap, HashSet}; -use uuid::Uuid; -use validator::Validate; +use plc_platform_core::util::response::ApiErr; -use plc_platform_core::util::{ - pagination::{PaginatedResponse, PaginationParams}, - response::ApiErr, -}; - -use crate::{ - AppState, -}; -use plc_platform_core::model::{Node, Point}; - -async fn notify_units( - state: &AppState, - unit_ids: impl IntoIterator, -) { - let mut seen = std::collections::HashSet::new(); - for unit_id in unit_ids { - if seen.insert(unit_id) { - state.control_runtime.notify_unit(unit_id).await; - } - } -} - -/// List all points. -#[derive(Deserialize, Validate)] -pub struct GetPointListQuery { - pub source_id: Option, - pub equipment_id: Option, - #[serde(flatten)] - pub pagination: PaginationParams, -} - -#[derive(Serialize)] -pub struct PointWithMonitor { - #[serde(flatten)] - pub point: Point, - pub point_monitor: Option, -} - -#[derive(Deserialize, Validate)] -pub struct GetPointHistoryQuery { - pub limit: Option, -} - -#[derive(Serialize)] -pub struct PointHistoryItem { - #[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")] - pub timestamp: Option>, - pub quality: crate::telemetry::PointQuality, - pub value: Option, - pub value_text: Option, - pub value_number: Option, -} - -pub async fn get_point_list( - State(state): State, - Query(query): Query, -) -> Result { - query.validate()?; - let pool = &state.platform.pool; - - // Count total rows. - let total = crate::service::get_points_count(pool, query.source_id, query.equipment_id).await?; - - // Load current page rows. - let points = crate::service::get_points_paginated( - pool, - query.source_id, - query.equipment_id, - query.pagination.page_size, - query.pagination.offset(), - ) - .await?; - - let monitor_guard = state - .platform.connection_manager - .get_point_monitor_data_read_guard() - .await; - - let data: Vec = points - .into_iter() - .map(|point| { - let point_monitor = monitor_guard.get(&point.id).cloned(); - PointWithMonitor { - point, - point_monitor, - } - }) - .collect(); - - let response = PaginatedResponse::new( - data, - total, - query.pagination.page, - query.pagination.page_size, - ); - - Ok(Json(response)) -} -/// Get a point by id. -pub async fn get_point( - State(state): State, - Path(point_id): Path, -) -> Result { - let pool = &state.platform.pool; - let point = crate::service::get_point_by_id(pool, point_id).await?; - - Ok(Json(point)) -} - -pub async fn get_point_history( - State(state): State, - Path(point_id): Path, - Query(query): Query, -) -> Result { - let pool = &state.platform.pool; - let point = crate::service::get_point_by_id(pool, point_id).await?; - if point.is_none() { - return Err(ApiErr::NotFound("Point not found".to_string(), None)); - } - - let limit = query.limit.unwrap_or(120).clamp(1, 1000); - let history = state - .platform.connection_manager - .get_point_history(point_id, limit) - .await; - - let items: Vec = history - .into_iter() - .map(|item| { - let value_number = monitor_value_to_number(&item); - PointHistoryItem { - timestamp: item.timestamp, - quality: item.quality, - value_number, - value: item.value, - value_text: item.value_text, - } - }) - .collect(); - - Ok(Json(items)) -} - -/// Request payload for updating editable point fields. -#[derive(Deserialize, Validate)] -pub struct UpdatePointReq { - pub name: Option, - #[serde(default, with = "double_option")] - pub description: Option>, - #[serde(default, with = "double_option")] - pub unit: Option>, - #[serde(default, with = "double_option")] - pub tag_id: Option>, - #[serde(default, with = "double_option")] - pub equipment_id: Option>, - #[serde(default, with = "double_option")] - pub signal_role: Option>, -} - -/// Request payload for batch setting point tags. -#[derive(Deserialize, Validate)] -pub struct BatchSetPointTagsReq { - #[validate(length(min = 1, max = 500))] - pub point_ids: Vec, - pub tag_id: Option, -} - -#[derive(Deserialize, Validate)] -pub struct BatchSetPointEquipmentReq { - #[validate(length(min = 1, max = 500))] - pub point_ids: Vec, - pub equipment_id: Option, - pub signal_role: Option, -} - -/// Update point metadata (name/description/unit only). -pub async fn update_point( - State(state): State, - Path(point_id): Path, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let pool = &state.platform.pool; - - if payload.name.is_none() - && payload.description.is_none() - && payload.unit.is_none() - && payload.tag_id.is_none() - && payload.equipment_id.is_none() - && payload.signal_role.is_none() - { - return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); - } - - // If tag_id is provided, ensure tag exists. - if let Some(Some(tag_id)) = payload.tag_id { - let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) - .bind(tag_id) - .fetch_optional(pool) - .await? - .is_some(); - - if !tag_exists { - return Err(ApiErr::NotFound("Tag not found".to_string(), None)); - } - } - - if let Some(Some(equipment_id)) = payload.equipment_id { - let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) - .bind(equipment_id) - .fetch_optional(pool) - .await? - .is_some(); - - if !equipment_exists { - return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); - } - } - - // Ensure target point exists. - let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) - .bind(point_id) - .fetch_optional(pool) - .await?; - if existing_point.is_none() { - return Err(ApiErr::NotFound("Point not found".to_string(), None)); - } - let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; - - let mut qb: QueryBuilder = QueryBuilder::new("UPDATE point SET "); - let mut wrote_field = false; - - if let Some(name) = &payload.name { - if wrote_field { - qb.push(", "); - } - qb.push("name = ").push_bind(name); - wrote_field = true; - } - if let Some(description) = &payload.description { - if wrote_field { - qb.push(", "); - } - qb.push("description = ").push_bind(description.as_deref()); - wrote_field = true; - } - if let Some(unit) = &payload.unit { - if wrote_field { - qb.push(", "); - } - qb.push("unit = ").push_bind(unit.as_deref()); - wrote_field = true; - } - if let Some(tag_id) = &payload.tag_id { - if wrote_field { - qb.push(", "); - } - qb.push("tag_id = ").push_bind(tag_id.as_ref()); - wrote_field = true; - } - if let Some(equipment_id) = &payload.equipment_id { - if wrote_field { - qb.push(", "); - } - qb.push("equipment_id = ").push_bind(equipment_id.as_ref()); - wrote_field = true; - } - if let Some(signal_role) = &payload.signal_role { - if wrote_field { - qb.push(", "); - } - qb.push("signal_role = ").push_bind(signal_role.as_deref()); - wrote_field = true; - } - - if wrote_field { - qb.push(", "); - } - qb.push("updated_at = NOW()"); - - qb.push(" WHERE id = ").push_bind(point_id); - qb.build().execute(pool).await?; - - let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; - notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await; - - Ok(Json( - serde_json::json!({"ok_msg": "Point updated successfully"}), - )) -} - -/// Batch set point tags. -pub async fn batch_set_point_tags( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - if payload.point_ids.is_empty() { - return Err(ApiErr::BadRequest( - "point_ids cannot be empty".to_string(), - None, - )); - } - - let pool = &state.platform.pool; - - // If tag_id is provided, ensure tag exists. - if let Some(tag_id) = payload.tag_id { - let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) - .bind(tag_id) - .fetch_optional(pool) - .await? - .is_some(); - - if !tag_exists { - return Err(ApiErr::NotFound("Tag not found".to_string(), None)); - } - } - - // Check which points exist - let existing_points: Vec = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#) - .bind(&payload.point_ids) - .fetch_all(pool) - .await? - .into_iter() - .map(|row: sqlx::postgres::PgRow| row.get::("id")) - .collect(); - - if existing_points.is_empty() { - return Err(ApiErr::NotFound("No valid points found".to_string(), None)); - } - - // Update tag_id for all existing points - let result = - sqlx::query(r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#) - .bind(payload.tag_id) - .bind(&existing_points) - .execute(pool) - .await?; - - Ok(Json(serde_json::json!({ - "ok_msg": "Point tags updated successfully", - "updated_count": result.rows_affected() - }))) -} - -pub async fn batch_set_point_equipment( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - if payload.point_ids.is_empty() { - return Err(ApiErr::BadRequest( - "point_ids cannot be empty".to_string(), - None, - )); - } - - let pool = &state.platform.pool; - - if let Some(equipment_id) = payload.equipment_id { - let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) - .bind(equipment_id) - .fetch_optional(pool) - .await? - .is_some(); - - if !equipment_exists { - return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); - } - } - - let existing_points: Vec = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#) - .bind(&payload.point_ids) - .fetch_all(pool) - .await? - .into_iter() - .map(|row: sqlx::postgres::PgRow| row.get::("id")) - .collect(); - - if existing_points.is_empty() { - return Err(ApiErr::NotFound("No valid points found".to_string(), None)); - } - - let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; - - let result = sqlx::query( - r#" - UPDATE point - SET equipment_id = $1, - signal_role = $2, - updated_at = NOW() - WHERE id = ANY($3) - "#, - ) - .bind(payload.equipment_id) - .bind(payload.signal_role.as_deref()) - .bind(&existing_points) - .execute(pool) - .await?; - - let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; - notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await; - - Ok(Json(serde_json::json!({ - "ok_msg": "Point equipment updated successfully", - "updated_count": result.rows_affected() - }))) -} - -/// Delete one point by id. -pub async fn delete_point( - State(state): State, - Path(point_id): Path, -) -> Result { - let pool = &state.platform.pool; - let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; - - let source_id = { - let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?; - grouped.keys().next().copied() - }; - - // Ensure target point exists. - let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) - .bind(point_id) - .fetch_optional(pool) - .await?; - if existing_point.is_none() { - return Err(ApiErr::NotFound("Point not found".to_string(), None)); - } - - // Delete point. - sqlx::query(r#"delete from point WHERE id = $1"#) - .bind(point_id) - .execute(pool) - .await?; - - if let Some(source_id) = source_id { - if let Err(e) = state - .event_manager - .send(crate::event::AppEvent::PointDeleteBatch { - source_id, - point_ids: vec![point_id], - }) - { - tracing::error!("Failed to send PointDeleteBatch event: {}", e); - } - } - - notify_units(&state, affected_unit_ids).await; - - Ok(Json( - serde_json::json!({"ok_msg": "Point deleted successfully"}), - )) -} - -#[derive(Deserialize, Validate)] -/// Request payload for batch point creation from node ids. -pub struct BatchCreatePointsReq { - #[validate(length(min = 1, max = 500))] - pub node_ids: Vec, -} - -#[derive(Serialize)] -/// Response payload for batch point creation. -pub struct BatchCreatePointsRes { - pub success_count: usize, - pub failed_count: usize, - pub failed_node_ids: Vec, - pub created_point_ids: Vec, -} - -/// Batch create points by node ids. -pub async fn batch_create_points( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let pool = &state.platform.pool; - - if payload.node_ids.is_empty() { - return Err(ApiErr::BadRequest( - "node_ids cannot be empty".to_string(), - None, - )); - } - - // Use one transaction for the full batch. - let mut tx = pool.begin().await?; - let node_ids = payload.node_ids; - - let nodes: Vec = sqlx::query_as::<_, Node>(r#"SELECT * FROM node WHERE id = ANY($1)"#) - .bind(&node_ids) - .fetch_all(&mut *tx) - .await?; - - let node_map: HashMap = nodes.into_iter().map(|node| (node.id, node)).collect(); - - let existing_node_ids: HashSet = node_map.keys().copied().collect(); - let mut failed_node_ids = Vec::new(); - for node_id in &node_ids { - if !existing_node_ids.contains(node_id) { - failed_node_ids.push(*node_id); - } - } - - let existing_point_node_ids: HashSet = - sqlx::query_scalar::<_, Uuid>(r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#) - .bind(&node_ids) - .fetch_all(&mut *tx) - .await? - .into_iter() - .collect(); - - let mut to_create = Vec::new(); - let mut seen_creatable = HashSet::new(); - for node_id in node_ids { - if !existing_node_ids.contains(&node_id) || existing_point_node_ids.contains(&node_id) { - continue; - } - - if !seen_creatable.insert(node_id) { - continue; - } - - let name = node_map - .get(&node_id) - .map(|node| node.browse_name.clone()) - .unwrap_or_else(|| format!("Point_{}", node_id)); - to_create.push((Uuid::new_v4(), node_id, name)); - } - - let mut created_point_ids = Vec::with_capacity(to_create.len()); - if !to_create.is_empty() { - let mut qb = QueryBuilder::new("INSERT INTO point (id, node_id, name) "); - qb.push_values(to_create.iter(), |mut b, (id, node_id, name)| { - b.push_bind(*id).push_bind(*node_id).push_bind(name); - }); - qb.build().execute(&mut *tx).await?; - created_point_ids.extend(to_create.into_iter().map(|(id, _, _)| id)); - } - - // Commit the transaction. - tx.commit().await?; - - // Emit grouped create events by source. - if !created_point_ids.is_empty() { - let grouped = - crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?; - for (source_id, points) in grouped { - let point_ids: Vec = points.into_iter().map(|p| p.point_id).collect(); - if let Err(e) = state - .event_manager - .send(crate::event::AppEvent::PointCreateBatch { - source_id, - point_ids, - }) - { - tracing::error!("Failed to send PointCreateBatch event: {}", e); - } - } - } - - Ok(Json(BatchCreatePointsRes { - success_count: created_point_ids.len(), - failed_count: failed_node_ids.len(), - failed_node_ids, - created_point_ids, - })) -} - -#[derive(Deserialize, Validate)] -/// Request payload for batch point deletion. -pub struct BatchDeletePointsReq { - #[validate(length(min = 1, max = 500))] - pub point_ids: Vec, -} - -#[derive(Serialize)] -/// Response payload for batch point deletion. -pub struct BatchDeletePointsRes { - pub deleted_count: u64, -} - -/// Batch delete points and emit grouped delete events by source. -pub async fn batch_delete_points( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - if payload.point_ids.is_empty() { - return Err(ApiErr::BadRequest( - "point_ids cannot be empty".to_string(), - None, - )); - } - - let pool = &state.platform.pool; - let point_ids = payload.point_ids; - - let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?; - let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?; - let existing_point_ids: Vec = grouped - .values() - .flat_map(|points| points.iter().map(|p| p.point_id)) - .collect(); - - if existing_point_ids.is_empty() { - return Ok(Json(BatchDeletePointsRes { deleted_count: 0 })); - } - - let result = sqlx::query(r#"DELETE FROM point WHERE id = ANY($1)"#) - .bind(&existing_point_ids) - .execute(pool) - .await?; - - for (source_id, points) in grouped { - let ids: Vec = points.into_iter().map(|p| p.point_id).collect(); - if let Err(e) = state - .event_manager - .send(crate::event::AppEvent::PointDeleteBatch { - source_id, - point_ids: ids, - }) - { - tracing::error!("Failed to send PointDeleteBatch event: {}", e); - } - } - - notify_units(&state, affected_unit_ids).await; - - Ok(Json(BatchDeletePointsRes { - deleted_count: result.rows_affected(), - })) -} +use crate::AppState; +/// Feeder-specific: batch set point values (requires write key auth from app config). pub async fn batch_set_point_value( State(state): State, headers: HeaderMap, - Json(payload): Json, + Json(payload): Json, ) -> Result { let write_key = headers .get("X-Write-Key") @@ -673,21 +32,10 @@ pub async fn batch_set_point_value( } let result = state - .platform.connection_manager + .platform + .connection_manager .write_point_values_batch(payload) .await .map_err(|e| ApiErr::Internal(e, None))?; Ok(Json(result)) } - -fn monitor_value_to_number(item: &crate::telemetry::PointMonitorInfo) -> Option { - match item.value.as_ref()? { - crate::telemetry::DataValue::Int(v) => Some(*v as f64), - crate::telemetry::DataValue::UInt(v) => Some(*v as f64), - crate::telemetry::DataValue::Float(v) => Some(*v), - crate::telemetry::DataValue::Bool(v) => Some(if *v { 1.0 } else { 0.0 }), - crate::telemetry::DataValue::Text(v) => v.parse::().ok(), - _ => None, - } -} - diff --git a/crates/app_feeder_distributor/src/router.rs b/crates/app_feeder_distributor/src/router.rs index 4649c07..3017ef3 100644 --- a/crates/app_feeder_distributor/src/router.rs +++ b/crates/app_feeder_distributor/src/router.rs @@ -2,7 +2,7 @@ use axum::{ extract::Request, middleware::Next, response::Response, - routing::{get, post, put}, + routing::{get, post}, Router, }; use tower_http::cors::{Any, CorsLayer}; @@ -20,74 +20,17 @@ async fn no_cache(req: Request, next: Next) -> Response { } pub fn build_router(state: AppState) -> Router { - let all_route = Router::new() - .route( - "/api/source", - get(handler::source::get_source_list).post(handler::source::create_source), - ) - .route( - "/api/source/{source_id}", - axum::routing::delete(handler::source::delete_source) - .put(handler::source::update_source), - ) - .route( - "/api/source/{source_id}/reconnect", - axum::routing::post(handler::source::reconnect_source), - ) - .route( - "/api/source/{source_id}/browse", - axum::routing::post(handler::source::browse_and_save_nodes), - ) - .route( - "/api/source/{source_id}/node-tree", - get(handler::source::get_node_tree), - ) - .route("/api/point", get(handler::point::get_point_list)) + // Platform routes (source, point, equipment, tag, page, logs) from core. + let platform = plc_platform_core::handler::platform_routes::(); + + // Feeder-specific routes. + let feeder_routes = Router::new() + // Feeder-only: batch set point values (requires write key auth). .route( "/api/point/value/batch", - axum::routing::post(handler::point::batch_set_point_value), - ) - .route( - "/api/point/batch", - axum::routing::post(handler::point::batch_create_points) - .delete(handler::point::batch_delete_points), - ) - .route( - "/api/point/{point_id}/history", - get(handler::point::get_point_history), - ) - .route( - "/api/point/{point_id}", - get(handler::point::get_point) - .put(handler::point::update_point) - .delete(handler::point::delete_point), - ) - .route( - "/api/point/batch/set-tags", - put(handler::point::batch_set_point_tags), - ) - .route( - "/api/point/batch/set-equipment", - put(handler::point::batch_set_point_equipment), - ) - .route( - "/api/equipment", - get(handler::equipment::get_equipment_list).post(handler::equipment::create_equipment), - ) - .route( - "/api/equipment/{equipment_id}", - get(handler::equipment::get_equipment) - .put(handler::equipment::update_equipment) - .delete(handler::equipment::delete_equipment), - ) - .route( - "/api/equipment/batch/set-unit", - put(handler::equipment::batch_set_equipment_unit), - ) - .route( - "/api/equipment/{equipment_id}/points", - get(handler::equipment::get_equipment_points), + post(handler::point::batch_set_point_value), ) + // Unit / control routes (feeder-specific). .route( "/api/unit", get(handler::control::get_unit_list).post(handler::control::create_unit), @@ -135,33 +78,13 @@ pub fn build_router(state: AppState) -> Router { "/api/unit/{unit_id}/detail", get(handler::control::get_unit_detail), ) - .route( - "/api/tag", - get(handler::tag::get_tag_list).post(handler::tag::create_tag), - ) - .route( - "/api/tag/{tag_id}", - get(handler::tag::get_tag_points) - .put(handler::tag::update_tag) - .delete(handler::tag::delete_tag), - ) - .route( - "/api/page", - get(handler::page::get_page_list).post(handler::page::create_page), - ) - .route( - "/api/page/{page_id}", - get(handler::page::get_page) - .put(handler::page::update_page) - .delete(handler::page::delete_page), - ) - .route("/api/logs", get(handler::log::get_logs)) - .route("/api/logs/stream", get(handler::log::stream_logs)) + // Doc routes (feeder-specific doc paths). .route("/api/docs/api-md", get(handler::doc::get_api_md)) .route("/api/docs/readme-md", get(handler::doc::get_readme_md)); Router::new() - .merge(all_route) + .merge(platform) + .merge(feeder_routes) .nest( "/ui", Router::new() diff --git a/crates/app_operation_system/src/app.rs b/crates/app_operation_system/src/app.rs index 6a4b44a..efd1591 100644 --- a/crates/app_operation_system/src/app.rs +++ b/crates/app_operation_system/src/app.rs @@ -1,3 +1,4 @@ +use axum::extract::FromRef; use crate::router::build_router; use plc_platform_core::platform_context::PlatformContext; @@ -30,6 +31,12 @@ pub struct AppState { pub platform: PlatformContext, } +impl FromRef for PlatformContext { + fn from_ref(state: &AppState) -> Self { + state.platform.clone() + } +} + pub async fn run() { dotenv::dotenv().ok(); plc_platform_core::util::log::init_logger(); diff --git a/crates/app_operation_system/src/router.rs b/crates/app_operation_system/src/router.rs index 0a6cc6a..368e49c 100644 --- a/crates/app_operation_system/src/router.rs +++ b/crates/app_operation_system/src/router.rs @@ -16,12 +16,18 @@ async fn no_cache( } pub fn build_router(state: AppState) -> Router { - Router::new() + // Platform routes (source, point, equipment, tag, page, logs) from core. + let platform = plc_platform_core::handler::platform_routes::(); + + // Ops-specific routes. + let ops_routes = Router::new() .route("/api/health", get(health_check)) - .route("/api/logs", get(plc_platform_core::handler::log::get_logs)) - .route("/api/logs/stream", get(plc_platform_core::handler::log::stream_logs)) .route("/api/docs/api-md", get(crate::handler::doc::get_api_md)) - .route("/api/docs/readme-md", get(crate::handler::doc::get_readme_md)) + .route("/api/docs/readme-md", get(crate::handler::doc::get_readme_md)); + + Router::new() + .merge(platform) + .merge(ops_routes) .nest( "/ui", Router::new() diff --git a/crates/plc_platform_core/src/event.rs b/crates/plc_platform_core/src/event.rs index d218971..562d0a4 100644 --- a/crates/plc_platform_core/src/event.rs +++ b/crates/plc_platform_core/src/event.rs @@ -1,5 +1,9 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; +use uuid::Uuid; + +use crate::model::EventRecord; +use crate::websocket::{WebSocketManager, WsMessage}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct EventEnvelope { @@ -15,3 +19,116 @@ impl EventEnvelope { } } } + +/// Platform-level events emitted by core handlers. +/// Each variant carries enough context for persistence + app-specific side effects. +#[derive(Debug, Clone)] +pub enum PlatformEvent { + SourceCreated { source_id: Uuid }, + SourceUpdated { source_id: Uuid }, + SourceDeleted { source_id: Uuid, source_name: String }, + PointsCreated { source_id: Uuid, point_ids: Vec }, + PointsDeleted { source_id: Uuid, point_ids: Vec }, + UnitsChanged { unit_ids: Vec }, +} + +/// Persists platform events to the `event` table and broadcasts via WebSocket. +/// +/// Apps get notified via `PlatformEventSink` for their own side effects +/// (connection management, control runtime, etc.). +pub async fn persist_and_broadcast( + event: &PlatformEvent, + pool: &sqlx::PgPool, + ws_manager: &WebSocketManager, +) { + let record = match event { + PlatformEvent::SourceCreated { source_id } => { + let name = fetch_source_name(pool, *source_id).await; + Some(( + "platform.source.created", "info", + None, None, Some(*source_id), + format!("Source {} created", name), + serde_json::json!({ "source_id": source_id }), + )) + } + PlatformEvent::SourceUpdated { source_id } => { + let name = fetch_source_name(pool, *source_id).await; + Some(( + "platform.source.updated", "info", + None, None, Some(*source_id), + format!("Source {} updated", name), + serde_json::json!({ "source_id": source_id }), + )) + } + PlatformEvent::SourceDeleted { source_id, source_name } => Some(( + "platform.source.deleted", "warn", + None, None, None, + format!("Source {} deleted", source_name), + serde_json::json!({ "source_id": source_id }), + )), + PlatformEvent::PointsCreated { source_id, point_ids } => { + let name = fetch_source_name(pool, *source_id).await; + Some(( + "platform.point.batch_created", "info", + None, None, Some(*source_id), + format!("Created {} points for source {}", point_ids.len(), name), + serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), + )) + } + PlatformEvent::PointsDeleted { source_id, point_ids } => { + let name = fetch_source_name(pool, *source_id).await; + Some(( + "platform.point.batch_deleted", "warn", + None, None, Some(*source_id), + format!("Deleted {} points for source {}", point_ids.len(), name), + serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), + )) + } + PlatformEvent::UnitsChanged { .. } => None, + }; + + let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record + else { + return; + }; + let envelope = EventEnvelope::new(event_type, payload); + + let inserted = sqlx::query_as::<_, EventRecord>( + r#" + INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING * + "#, + ) + .bind(envelope.event_type) + .bind(level) + .bind(unit_id as Option) + .bind(equipment_id as Option) + .bind(source_id) + .bind(message) + .bind(sqlx::types::Json(envelope.payload)) + .fetch_one(pool) + .await; + + match inserted { + Ok(record) => { + let ws_message = WsMessage::EventCreated(record); + if let Err(err) = ws_manager.send_to_public(ws_message).await { + tracing::warn!("Failed to broadcast platform event: {}", err); + } + } + Err(err) => { + tracing::warn!("Failed to persist platform event: {}", err); + } + } +} + +async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String { + sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await + .ok() + .flatten() + .unwrap_or_else(|| id.to_string()) +} diff --git a/crates/plc_platform_core/src/handler.rs b/crates/plc_platform_core/src/handler.rs index 7031130..d602022 100644 --- a/crates/plc_platform_core/src/handler.rs +++ b/crates/plc_platform_core/src/handler.rs @@ -1,2 +1,10 @@ pub mod doc; +pub mod equipment; pub mod log; +pub mod page; +pub mod point; +pub mod router; +pub mod source; +pub mod tag; + +pub use router::platform_routes; diff --git a/crates/app_feeder_distributor/src/handler/equipment.rs b/crates/plc_platform_core/src/handler/equipment.rs similarity index 80% rename from crates/app_feeder_distributor/src/handler/equipment.rs rename to crates/plc_platform_core/src/handler/equipment.rs index 3d7f628..6c6d8dd 100644 --- a/crates/app_feeder_distributor/src/handler/equipment.rs +++ b/crates/plc_platform_core/src/handler/equipment.rs @@ -8,21 +8,19 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; use validator::Validate; -use plc_platform_core::util::{ +use crate::util::{ pagination::{PaginatedResponse, PaginationParams}, response::ApiErr, }; -use crate::AppState; +use crate::platform_context::PlatformContext; -async fn notify_units( - state: &AppState, - unit_ids: impl IntoIterator, -) { - let mut seen = std::collections::HashSet::new(); - for unit_id in unit_ids { - if seen.insert(unit_id) { - state.control_runtime.notify_unit(unit_id).await; - } +fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator) { + let ids: Vec = { + let mut seen = std::collections::HashSet::new(); + unit_ids.into_iter().filter(|id| seen.insert(*id)).collect() + }; + if !ids.is_empty() { + state.emit_event(crate::event::PlatformEvent::UnitsChanged { unit_ids: ids }); } } @@ -44,20 +42,20 @@ pub struct SignalRolePoint { #[derive(Serialize)] pub struct EquipmentListItem { #[serde(flatten)] - pub equipment: plc_platform_core::model::Equipment, + pub equipment: crate::model::Equipment, pub point_count: i64, pub role_points: Vec, } pub async fn get_equipment_list( - State(state): State, + State(state): State, Query(query): Query, ) -> Result { query.validate()?; - let total = crate::service::get_equipment_count(&state.platform.pool, query.keyword.as_deref()).await?; + let total = crate::service::get_equipment_count(&state.pool, query.keyword.as_deref()).await?; let items = crate::service::get_equipment_paginated( - &state.platform.pool, + &state.pool, query.keyword.as_deref(), query.pagination.page_size, query.pagination.offset(), @@ -66,10 +64,10 @@ pub async fn get_equipment_list( let equipment_ids: Vec = items.iter().map(|item| item.equipment.id).collect(); let role_point_rows = - crate::service::get_signal_role_points_batch(&state.platform.pool, &equipment_ids).await?; + crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?; let monitor_guard = state - .platform.connection_manager + .connection_manager .get_point_monitor_data_read_guard() .await; @@ -107,10 +105,10 @@ pub async fn get_equipment_list( } pub async fn get_equipment( - State(state): State, + State(state): State, Path(equipment_id): Path, ) -> Result { - let equipment = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?; + let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; match equipment { Some(item) => Ok(Json(item)), @@ -119,15 +117,15 @@ pub async fn get_equipment( } pub async fn get_equipment_points( - State(state): State, + State(state): State, Path(equipment_id): Path, ) -> Result { - let exists = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?; + let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; if exists.is_none() { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } - let points = crate::service::get_points_by_equipment_id(&state.platform.pool, equipment_id).await?; + let points = crate::service::get_points_by_equipment_id(&state.pool, equipment_id).await?; Ok(Json(points)) } @@ -160,12 +158,12 @@ pub struct BatchSetEquipmentUnitReq { } pub async fn create_equipment( - State(state): State, + State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; - let exists = crate::service::get_equipment_by_code(&state.platform.pool, &payload.code).await?; + let exists = crate::service::get_equipment_by_code(&state.pool, &payload.code).await?; if exists.is_some() { return Err(ApiErr::BadRequest( "Equipment code already exists".to_string(), @@ -174,14 +172,14 @@ pub async fn create_equipment( } if let Some(unit_id) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?; + let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; if unit_exists.is_none() { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } } let equipment_id = crate::service::create_equipment( - &state.platform.pool, + &state.pool, payload.unit_id, &payload.code, &payload.name, @@ -191,7 +189,7 @@ pub async fn create_equipment( .await?; if let Some(unit_id) = payload.unit_id { - notify_units(&state, [unit_id]).await; + notify_units(&state, [unit_id]); } Ok(( @@ -204,7 +202,7 @@ pub async fn create_equipment( } pub async fn update_equipment( - State(state): State, + State(state): State, Path(equipment_id): Path, Json(payload): Json, ) -> Result { @@ -219,7 +217,7 @@ pub async fn update_equipment( return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } - let exists = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?; + let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; let existing_equipment = if let Some(equipment) = exists { equipment } else { @@ -227,14 +225,14 @@ pub async fn update_equipment( }; if let Some(Some(unit_id)) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?; + let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; if unit_exists.is_none() { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } } if let Some(code) = payload.code.as_deref() { - let duplicate = crate::service::get_equipment_by_code(&state.platform.pool, code).await?; + let duplicate = crate::service::get_equipment_by_code(&state.pool, code).await?; if duplicate .as_ref() .is_some_and(|item| item.id != equipment_id) @@ -247,7 +245,7 @@ pub async fn update_equipment( } crate::service::update_equipment( - &state.platform.pool, + &state.pool, equipment_id, payload.unit_id, payload.code.as_deref(), @@ -268,7 +266,7 @@ pub async fn update_equipment( if let Some(unit_id) = next_unit_id { unit_ids.push(unit_id); } - notify_units(&state, unit_ids).await; + notify_units(&state, unit_ids); Ok(Json(serde_json::json!({ "ok_msg": "Equipment updated successfully" @@ -276,7 +274,7 @@ pub async fn update_equipment( } pub async fn batch_set_equipment_unit( - State(state): State, + State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; @@ -289,17 +287,17 @@ pub async fn batch_set_equipment_unit( } if let Some(unit_id) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?; + let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; if unit_exists.is_none() { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } } let before_unit_ids = - crate::service::get_unit_ids_by_equipment_ids(&state.platform.pool, &payload.equipment_ids).await?; + crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?; let updated_count = crate::service::batch_set_equipment_unit( - &state.platform.pool, + &state.pool, &payload.equipment_ids, payload.unit_id, ) @@ -309,7 +307,7 @@ pub async fn batch_set_equipment_unit( if let Some(unit_id) = payload.unit_id { unit_ids.push(unit_id); } - notify_units(&state, unit_ids).await; + notify_units(&state, unit_ids); Ok(Json(serde_json::json!({ "ok_msg": "Equipment unit updated successfully", @@ -318,16 +316,16 @@ pub async fn batch_set_equipment_unit( } pub async fn delete_equipment( - State(state): State, + State(state): State, Path(equipment_id): Path, ) -> Result { - let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.platform.pool, &[equipment_id]).await?; - let deleted = crate::service::delete_equipment(&state.platform.pool, equipment_id).await?; + let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?; + let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?; if !deleted { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } - notify_units(&state, unit_ids).await; + notify_units(&state, unit_ids); Ok(StatusCode::NO_CONTENT) } diff --git a/crates/app_feeder_distributor/src/handler/page.rs b/crates/plc_platform_core/src/handler/page.rs similarity index 85% rename from crates/app_feeder_distributor/src/handler/page.rs rename to crates/plc_platform_core/src/handler/page.rs index 8c7041e..b75ab0e 100644 --- a/crates/app_feeder_distributor/src/handler/page.rs +++ b/crates/plc_platform_core/src/handler/page.rs @@ -1,13 +1,13 @@ -use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse}; +use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse}; use serde::Deserialize; use std::collections::HashMap; use sqlx::types::Json as SqlxJson; use uuid::Uuid; use validator::Validate; -use plc_platform_core::model::Page; -use plc_platform_core::util::response::ApiErr; -use crate::AppState; +use crate::model::Page; +use crate::platform_context::PlatformContext; +use crate::util::response::ApiErr; #[derive(Deserialize, Validate)] pub struct GetPageListQuery { @@ -16,11 +16,11 @@ pub struct GetPageListQuery { } pub async fn get_page_list( - State(state): State, + State(state): State, Query(query): Query, ) -> Result { query.validate()?; - let pool = &state.platform.pool; + let pool = &state.pool; let pages: Vec = if let Some(name) = query.name { sqlx::query_as::<_, Page>( @@ -45,12 +45,12 @@ pub async fn get_page_list( } pub async fn get_page( - State(state): State, + State(state): State, Path(page_id): Path, ) -> Result { let page = sqlx::query_as::<_, Page>("SELECT * FROM page WHERE id = $1") .bind(page_id) - .fetch_optional(&state.platform.pool) + .fetch_optional(&state.pool) .await?; match page { @@ -74,7 +74,7 @@ pub struct UpdatePageReq { } pub async fn create_page( - State(state): State, + State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; @@ -88,7 +88,7 @@ pub async fn create_page( ) .bind(&payload.name) .bind(SqlxJson(payload.data)) - .fetch_one(&state.platform.pool) + .fetch_one(&state.pool) .await?; Ok((StatusCode::CREATED, Json(serde_json::json!({ @@ -98,7 +98,7 @@ pub async fn create_page( } pub async fn update_page( - State(state): State, + State(state): State, Path(page_id): Path, Json(payload): Json, ) -> Result { @@ -106,7 +106,7 @@ pub async fn update_page( let exists = sqlx::query("SELECT 1 FROM page WHERE id = $1") .bind(page_id) - .fetch_optional(&state.platform.pool) + .fetch_optional(&state.pool) .await?; if exists.is_none() { return Err(ApiErr::NotFound("Page not found".to_string(), None)); @@ -145,7 +145,7 @@ pub async fn update_page( } query = query.bind(page_id); - query.execute(&state.platform.pool).await?; + query.execute(&state.pool).await?; Ok(Json(serde_json::json!({ "ok_msg": "Page updated successfully" @@ -153,12 +153,12 @@ pub async fn update_page( } pub async fn delete_page( - State(state): State, + State(state): State, Path(page_id): Path, ) -> Result { let result = sqlx::query("DELETE FROM page WHERE id = $1") .bind(page_id) - .execute(&state.platform.pool) + .execute(&state.pool) .await?; if result.rows_affected() == 0 { diff --git a/crates/plc_platform_core/src/handler/point.rs b/crates/plc_platform_core/src/handler/point.rs new file mode 100644 index 0000000..0d784b7 --- /dev/null +++ b/crates/plc_platform_core/src/handler/point.rs @@ -0,0 +1,639 @@ +use axum::{ + extract::{Path, Query, State}, + response::IntoResponse, + Json, +}; +use serde::{Deserialize, Serialize}; +use serde_with::rust::double_option; +use sqlx::{QueryBuilder, Row}; +use std::collections::{HashMap, HashSet}; +use uuid::Uuid; +use validator::Validate; + +use crate::util::{ + pagination::{PaginatedResponse, PaginationParams}, + response::ApiErr, +}; +use crate::platform_context::PlatformContext; +use crate::model::{Node, Point}; + +fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator) { + let ids: Vec = { + let mut seen = std::collections::HashSet::new(); + unit_ids.into_iter().filter(|id| seen.insert(*id)).collect() + }; + if !ids.is_empty() { + state.emit_event(crate::event::PlatformEvent::UnitsChanged { unit_ids: ids }); + } +} + +/// List all points. +#[derive(Deserialize, Validate)] +pub struct GetPointListQuery { + pub source_id: Option, + pub equipment_id: Option, + #[serde(flatten)] + pub pagination: PaginationParams, +} + +#[derive(Serialize)] +pub struct PointWithMonitor { + #[serde(flatten)] + pub point: Point, + pub point_monitor: Option, +} + +#[derive(Deserialize, Validate)] +pub struct GetPointHistoryQuery { + pub limit: Option, +} + +#[derive(Serialize)] +pub struct PointHistoryItem { + #[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")] + pub timestamp: Option>, + pub quality: crate::telemetry::PointQuality, + pub value: Option, + pub value_text: Option, + pub value_number: Option, +} + +pub async fn get_point_list( + State(state): State, + Query(query): Query, +) -> Result { + query.validate()?; + let pool = &state.pool; + + // Count total rows. + let total = crate::service::get_points_count(pool, query.source_id, query.equipment_id).await?; + + // Load current page rows. + let points = crate::service::get_points_paginated( + pool, + query.source_id, + query.equipment_id, + query.pagination.page_size, + query.pagination.offset(), + ) + .await?; + + let monitor_guard = state + .connection_manager + .get_point_monitor_data_read_guard() + .await; + + let data: Vec = points + .into_iter() + .map(|point| { + let point_monitor = monitor_guard.get(&point.id).cloned(); + PointWithMonitor { + point, + point_monitor, + } + }) + .collect(); + + let response = PaginatedResponse::new( + data, + total, + query.pagination.page, + query.pagination.page_size, + ); + + Ok(Json(response)) +} +/// Get a point by id. +pub async fn get_point( + State(state): State, + Path(point_id): Path, +) -> Result { + let pool = &state.pool; + let point = crate::service::get_point_by_id(pool, point_id).await?; + + Ok(Json(point)) +} + +pub async fn get_point_history( + State(state): State, + Path(point_id): Path, + Query(query): Query, +) -> Result { + let pool = &state.pool; + let point = crate::service::get_point_by_id(pool, point_id).await?; + if point.is_none() { + return Err(ApiErr::NotFound("Point not found".to_string(), None)); + } + + let limit = query.limit.unwrap_or(120).clamp(1, 1000); + let history = state + .connection_manager + .get_point_history(point_id, limit) + .await; + + let items: Vec = history + .into_iter() + .map(|item| { + let value_number = monitor_value_to_number(&item); + PointHistoryItem { + timestamp: item.timestamp, + quality: item.quality, + value_number, + value: item.value, + value_text: item.value_text, + } + }) + .collect(); + + Ok(Json(items)) +} + +/// Request payload for updating editable point fields. +#[derive(Deserialize, Validate)] +pub struct UpdatePointReq { + pub name: Option, + #[serde(default, with = "double_option")] + pub description: Option>, + #[serde(default, with = "double_option")] + pub unit: Option>, + #[serde(default, with = "double_option")] + pub tag_id: Option>, + #[serde(default, with = "double_option")] + pub equipment_id: Option>, + #[serde(default, with = "double_option")] + pub signal_role: Option>, +} + +/// Request payload for batch setting point tags. +#[derive(Deserialize, Validate)] +pub struct BatchSetPointTagsReq { + #[validate(length(min = 1, max = 500))] + pub point_ids: Vec, + pub tag_id: Option, +} + +#[derive(Deserialize, Validate)] +pub struct BatchSetPointEquipmentReq { + #[validate(length(min = 1, max = 500))] + pub point_ids: Vec, + pub equipment_id: Option, + pub signal_role: Option, +} + +/// Update point metadata (name/description/unit only). +pub async fn update_point( + State(state): State, + Path(point_id): Path, + Json(payload): Json, +) -> Result { + payload.validate()?; + + let pool = &state.pool; + + if payload.name.is_none() + && payload.description.is_none() + && payload.unit.is_none() + && payload.tag_id.is_none() + && payload.equipment_id.is_none() + && payload.signal_role.is_none() + { + return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); + } + + // If tag_id is provided, ensure tag exists. + if let Some(Some(tag_id)) = payload.tag_id { + let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) + .bind(tag_id) + .fetch_optional(pool) + .await? + .is_some(); + + if !tag_exists { + return Err(ApiErr::NotFound("Tag not found".to_string(), None)); + } + } + + if let Some(Some(equipment_id)) = payload.equipment_id { + let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) + .bind(equipment_id) + .fetch_optional(pool) + .await? + .is_some(); + + if !equipment_exists { + return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); + } + } + + // Ensure target point exists. + let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) + .bind(point_id) + .fetch_optional(pool) + .await?; + if existing_point.is_none() { + return Err(ApiErr::NotFound("Point not found".to_string(), None)); + } + let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; + + let mut qb: QueryBuilder = QueryBuilder::new("UPDATE point SET "); + let mut wrote_field = false; + + if let Some(name) = &payload.name { + if wrote_field { + qb.push(", "); + } + qb.push("name = ").push_bind(name); + wrote_field = true; + } + if let Some(description) = &payload.description { + if wrote_field { + qb.push(", "); + } + qb.push("description = ").push_bind(description.as_deref()); + wrote_field = true; + } + if let Some(unit) = &payload.unit { + if wrote_field { + qb.push(", "); + } + qb.push("unit = ").push_bind(unit.as_deref()); + wrote_field = true; + } + if let Some(tag_id) = &payload.tag_id { + if wrote_field { + qb.push(", "); + } + qb.push("tag_id = ").push_bind(tag_id.as_ref()); + wrote_field = true; + } + if let Some(equipment_id) = &payload.equipment_id { + if wrote_field { + qb.push(", "); + } + qb.push("equipment_id = ").push_bind(equipment_id.as_ref()); + wrote_field = true; + } + if let Some(signal_role) = &payload.signal_role { + if wrote_field { + qb.push(", "); + } + qb.push("signal_role = ").push_bind(signal_role.as_deref()); + wrote_field = true; + } + + if wrote_field { + qb.push(", "); + } + qb.push("updated_at = NOW()"); + + qb.push(" WHERE id = ").push_bind(point_id); + qb.build().execute(pool).await?; + + let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; + notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)); + + Ok(Json( + serde_json::json!({"ok_msg": "Point updated successfully"}), + )) +} + +/// Batch set point tags. +pub async fn batch_set_point_tags( + State(state): State, + Json(payload): Json, +) -> Result { + payload.validate()?; + + if payload.point_ids.is_empty() { + return Err(ApiErr::BadRequest( + "point_ids cannot be empty".to_string(), + None, + )); + } + + let pool = &state.pool; + + // If tag_id is provided, ensure tag exists. + if let Some(tag_id) = payload.tag_id { + let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) + .bind(tag_id) + .fetch_optional(pool) + .await? + .is_some(); + + if !tag_exists { + return Err(ApiErr::NotFound("Tag not found".to_string(), None)); + } + } + + // Check which points exist + let existing_points: Vec = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#) + .bind(&payload.point_ids) + .fetch_all(pool) + .await? + .into_iter() + .map(|row: sqlx::postgres::PgRow| row.get::("id")) + .collect(); + + if existing_points.is_empty() { + return Err(ApiErr::NotFound("No valid points found".to_string(), None)); + } + + // Update tag_id for all existing points + let result = + sqlx::query(r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#) + .bind(payload.tag_id) + .bind(&existing_points) + .execute(pool) + .await?; + + Ok(Json(serde_json::json!({ + "ok_msg": "Point tags updated successfully", + "updated_count": result.rows_affected() + }))) +} + +pub async fn batch_set_point_equipment( + State(state): State, + Json(payload): Json, +) -> Result { + payload.validate()?; + + if payload.point_ids.is_empty() { + return Err(ApiErr::BadRequest( + "point_ids cannot be empty".to_string(), + None, + )); + } + + let pool = &state.pool; + + if let Some(equipment_id) = payload.equipment_id { + let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) + .bind(equipment_id) + .fetch_optional(pool) + .await? + .is_some(); + + if !equipment_exists { + return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); + } + } + + let existing_points: Vec = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#) + .bind(&payload.point_ids) + .fetch_all(pool) + .await? + .into_iter() + .map(|row: sqlx::postgres::PgRow| row.get::("id")) + .collect(); + + if existing_points.is_empty() { + return Err(ApiErr::NotFound("No valid points found".to_string(), None)); + } + + let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; + + let result = sqlx::query( + r#" + UPDATE point + SET equipment_id = $1, + signal_role = $2, + updated_at = NOW() + WHERE id = ANY($3) + "#, + ) + .bind(payload.equipment_id) + .bind(payload.signal_role.as_deref()) + .bind(&existing_points) + .execute(pool) + .await?; + + let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; + notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)); + + Ok(Json(serde_json::json!({ + "ok_msg": "Point equipment updated successfully", + "updated_count": result.rows_affected() + }))) +} + +/// Delete one point by id. +pub async fn delete_point( + State(state): State, + Path(point_id): Path, +) -> Result { + let pool = &state.pool; + let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; + + let source_id = { + let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?; + grouped.keys().next().copied() + }; + + // Ensure target point exists. + let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) + .bind(point_id) + .fetch_optional(pool) + .await?; + if existing_point.is_none() { + return Err(ApiErr::NotFound("Point not found".to_string(), None)); + } + + // Delete point. + sqlx::query(r#"delete from point WHERE id = $1"#) + .bind(point_id) + .execute(pool) + .await?; + + if let Some(source_id) = source_id { + state.emit_event(crate::event::PlatformEvent::PointsDeleted { + source_id, + point_ids: vec![point_id], + }); + } + + notify_units(&state, affected_unit_ids); + + Ok(Json( + serde_json::json!({"ok_msg": "Point deleted successfully"}), + )) +} + +#[derive(Deserialize, Validate)] +/// Request payload for batch point creation from node ids. +pub struct BatchCreatePointsReq { + #[validate(length(min = 1, max = 500))] + pub node_ids: Vec, +} + +#[derive(Serialize)] +/// Response payload for batch point creation. +pub struct BatchCreatePointsRes { + pub success_count: usize, + pub failed_count: usize, + pub failed_node_ids: Vec, + pub created_point_ids: Vec, +} + +/// Batch create points by node ids. +pub async fn batch_create_points( + State(state): State, + Json(payload): Json, +) -> Result { + payload.validate()?; + + let pool = &state.pool; + + if payload.node_ids.is_empty() { + return Err(ApiErr::BadRequest( + "node_ids cannot be empty".to_string(), + None, + )); + } + + // Use one transaction for the full batch. + let mut tx = pool.begin().await?; + let node_ids = payload.node_ids; + + let nodes: Vec = sqlx::query_as::<_, Node>(r#"SELECT * FROM node WHERE id = ANY($1)"#) + .bind(&node_ids) + .fetch_all(&mut *tx) + .await?; + + let node_map: HashMap = nodes.into_iter().map(|node| (node.id, node)).collect(); + + let existing_node_ids: HashSet = node_map.keys().copied().collect(); + let mut failed_node_ids = Vec::new(); + for node_id in &node_ids { + if !existing_node_ids.contains(node_id) { + failed_node_ids.push(*node_id); + } + } + + let existing_point_node_ids: HashSet = + sqlx::query_scalar::<_, Uuid>(r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#) + .bind(&node_ids) + .fetch_all(&mut *tx) + .await? + .into_iter() + .collect(); + + let mut to_create = Vec::new(); + let mut seen_creatable = HashSet::new(); + for node_id in node_ids { + if !existing_node_ids.contains(&node_id) || existing_point_node_ids.contains(&node_id) { + continue; + } + + if !seen_creatable.insert(node_id) { + continue; + } + + let name = node_map + .get(&node_id) + .map(|node| node.browse_name.clone()) + .unwrap_or_else(|| format!("Point_{}", node_id)); + to_create.push((Uuid::new_v4(), node_id, name)); + } + + let mut created_point_ids = Vec::with_capacity(to_create.len()); + if !to_create.is_empty() { + let mut qb = QueryBuilder::new("INSERT INTO point (id, node_id, name) "); + qb.push_values(to_create.iter(), |mut b, (id, node_id, name)| { + b.push_bind(*id).push_bind(*node_id).push_bind(name); + }); + qb.build().execute(&mut *tx).await?; + created_point_ids.extend(to_create.into_iter().map(|(id, _, _)| id)); + } + + // Commit the transaction. + tx.commit().await?; + + // Emit grouped create events by source. + if !created_point_ids.is_empty() { + let grouped = + crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?; + for (source_id, points) in grouped { + let point_ids: Vec = points.into_iter().map(|p| p.point_id).collect(); + state.emit_event(crate::event::PlatformEvent::PointsCreated { source_id, point_ids }); + } + } + + Ok(Json(BatchCreatePointsRes { + success_count: created_point_ids.len(), + failed_count: failed_node_ids.len(), + failed_node_ids, + created_point_ids, + })) +} + +#[derive(Deserialize, Validate)] +/// Request payload for batch point deletion. +pub struct BatchDeletePointsReq { + #[validate(length(min = 1, max = 500))] + pub point_ids: Vec, +} + +#[derive(Serialize)] +/// Response payload for batch point deletion. +pub struct BatchDeletePointsRes { + pub deleted_count: u64, +} + +/// Batch delete points and emit grouped delete events by source. +pub async fn batch_delete_points( + State(state): State, + Json(payload): Json, +) -> Result { + payload.validate()?; + + if payload.point_ids.is_empty() { + return Err(ApiErr::BadRequest( + "point_ids cannot be empty".to_string(), + None, + )); + } + + let pool = &state.pool; + let point_ids = payload.point_ids; + + let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?; + let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?; + let existing_point_ids: Vec = grouped + .values() + .flat_map(|points| points.iter().map(|p| p.point_id)) + .collect(); + + if existing_point_ids.is_empty() { + return Ok(Json(BatchDeletePointsRes { deleted_count: 0 })); + } + + let result = sqlx::query(r#"DELETE FROM point WHERE id = ANY($1)"#) + .bind(&existing_point_ids) + .execute(pool) + .await?; + + for (source_id, points) in grouped { + let ids: Vec = points.into_iter().map(|p| p.point_id).collect(); + state.emit_event(crate::event::PlatformEvent::PointsDeleted { source_id, point_ids: ids }); + } + + notify_units(&state, affected_unit_ids); + + Ok(Json(BatchDeletePointsRes { + deleted_count: result.rows_affected(), + })) +} + +fn monitor_value_to_number(item: &crate::telemetry::PointMonitorInfo) -> Option { + match item.value.as_ref()? { + crate::telemetry::DataValue::Int(v) => Some(*v as f64), + crate::telemetry::DataValue::UInt(v) => Some(*v as f64), + crate::telemetry::DataValue::Float(v) => Some(*v), + crate::telemetry::DataValue::Bool(v) => Some(if *v { 1.0 } else { 0.0 }), + crate::telemetry::DataValue::Text(v) => v.parse::().ok(), + _ => None, + } +} + diff --git a/crates/plc_platform_core/src/handler/router.rs b/crates/plc_platform_core/src/handler/router.rs new file mode 100644 index 0000000..68fcdee --- /dev/null +++ b/crates/plc_platform_core/src/handler/router.rs @@ -0,0 +1,112 @@ +use axum::{ + extract::FromRef, + routing::{get, post, put}, + Router, +}; + +use crate::platform_context::PlatformContext; + +/// Returns all platform CRUD routes. +/// +/// The generic `S` is the app's top-level state type. It must implement +/// `FromRef for PlatformContext` so that axum can extract `State` +/// from handlers registered with the app's state. +pub fn platform_routes() -> Router +where + S: Clone + Send + Sync + 'static, + PlatformContext: FromRef, +{ + Router::new() + // Source + .route( + "/api/source", + get(super::source::get_source_list).post(super::source::create_source), + ) + .route( + "/api/source/{source_id}", + axum::routing::delete(super::source::delete_source) + .put(super::source::update_source), + ) + .route( + "/api/source/{source_id}/reconnect", + post(super::source::reconnect_source), + ) + .route( + "/api/source/{source_id}/browse", + post(super::source::browse_and_save_nodes), + ) + .route( + "/api/source/{source_id}/node-tree", + get(super::source::get_node_tree), + ) + // Point + .route("/api/point", get(super::point::get_point_list)) + .route( + "/api/point/batch", + post(super::point::batch_create_points) + .delete(super::point::batch_delete_points), + ) + .route( + "/api/point/{point_id}/history", + get(super::point::get_point_history), + ) + .route( + "/api/point/{point_id}", + get(super::point::get_point) + .put(super::point::update_point) + .delete(super::point::delete_point), + ) + .route( + "/api/point/batch/set-tags", + put(super::point::batch_set_point_tags), + ) + .route( + "/api/point/batch/set-equipment", + put(super::point::batch_set_point_equipment), + ) + // Equipment + .route( + "/api/equipment", + get(super::equipment::get_equipment_list) + .post(super::equipment::create_equipment), + ) + .route( + "/api/equipment/{equipment_id}", + get(super::equipment::get_equipment) + .put(super::equipment::update_equipment) + .delete(super::equipment::delete_equipment), + ) + .route( + "/api/equipment/batch/set-unit", + put(super::equipment::batch_set_equipment_unit), + ) + .route( + "/api/equipment/{equipment_id}/points", + get(super::equipment::get_equipment_points), + ) + // Tag + .route( + "/api/tag", + get(super::tag::get_tag_list).post(super::tag::create_tag), + ) + .route( + "/api/tag/{tag_id}", + get(super::tag::get_tag_points) + .put(super::tag::update_tag) + .delete(super::tag::delete_tag), + ) + // Page + .route( + "/api/page", + get(super::page::get_page_list).post(super::page::create_page), + ) + .route( + "/api/page/{page_id}", + get(super::page::get_page) + .put(super::page::update_page) + .delete(super::page::delete_page), + ) + // Logs + .route("/api/logs", get(super::log::get_logs)) + .route("/api/logs/stream", get(super::log::stream_logs)) +} diff --git a/crates/app_feeder_distributor/src/handler/source.rs b/crates/plc_platform_core/src/handler/source.rs similarity index 92% rename from crates/app_feeder_distributor/src/handler/source.rs rename to crates/plc_platform_core/src/handler/source.rs index 8cb5c51..a37da3b 100644 --- a/crates/app_feeder_distributor/src/handler/source.rs +++ b/crates/plc_platform_core/src/handler/source.rs @@ -11,11 +11,11 @@ use opcua::types::ReferenceTypeId; use opcua::client::Session; use std::collections::{HashMap, VecDeque}; -use plc_platform_core::util::response::ApiErr; +use crate::util::response::ApiErr; use anyhow::{Context}; -use plc_platform_core::model::{Node, Source}; -use crate::AppState; +use crate::model::{Node, Source}; +use crate::platform_context::PlatformContext; use sqlx::QueryBuilder; // 鏍戣妭鐐圭粨鏋勪綋 @@ -62,7 +62,7 @@ pub struct SourceWithStatus { pub source: SourcePublic, pub is_connected: bool, pub last_error: Option, - #[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")] + #[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")] pub last_time: Option>, } @@ -75,9 +75,9 @@ pub struct SourcePublic { pub security_policy: Option, pub security_mode: Option, pub enabled: bool, - #[serde(serialize_with = "plc_platform_core::util::datetime::utc_to_local_str")] + #[serde(serialize_with = "crate::util::datetime::utc_to_local_str")] pub created_at: DateTime, - #[serde(serialize_with = "plc_platform_core::util::datetime::utc_to_local_str")] + #[serde(serialize_with = "crate::util::datetime::utc_to_local_str")] pub updated_at: DateTime, } @@ -97,13 +97,13 @@ impl From for SourcePublic { } } -pub async fn get_source_list(State(state): State) -> Result { - let pool = &state.platform.pool; +pub async fn get_source_list(State(state): State) -> Result { + let pool = &state.pool; let sources: Vec = crate::service::get_all_enabled_sources(pool).await?; // 鑾峰彇鎵€鏈夎繛鎺ョ姸鎬? let status_map: std::collections::HashMap, Option>)> = - state.platform.connection_manager.get_all_status().await + state.connection_manager.get_all_status().await .into_iter() .map(|(source_id, s)| (source_id, (s.is_connected, s.last_error, Some(s.last_time)))) .collect(); @@ -129,10 +129,10 @@ pub async fn get_source_list(State(state): State) -> Result, + State(state): State, Path(source_id): Path, ) -> Result { - let pool = &state.platform.pool; + let pool = &state.pool; // 鏌ヨ鎵€鏈夊睘浜庤source鐨勮妭鐐? let nodes: Vec = sqlx::query_as::<_, Node>( @@ -207,12 +207,12 @@ pub struct CreateSourceRes { } pub async fn create_source( - State(state): State, + State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; - let pool = &state.platform.pool; + let pool = &state.pool; let new_id = Uuid::new_v4(); sqlx::query( @@ -226,8 +226,7 @@ pub async fn create_source( .execute(pool) .await?; - // 瑙﹀彂 SourceCreate 浜嬩欢 - let _ = state.event_manager.send(crate::event::AppEvent::SourceCreate { source_id: new_id }); + state.emit_event(crate::event::PlatformEvent::SourceCreated { source_id: new_id }); Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id }))) } @@ -244,7 +243,7 @@ pub struct UpdateSourceReq { } pub async fn update_source( - State(state): State, + State(state): State, Path(source_id): Path, Json(payload): Json, ) -> Result { @@ -261,7 +260,7 @@ pub async fn update_source( return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } - let pool = &state.platform.pool; + let pool = &state.pool; let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1") .bind(source_id) @@ -302,16 +301,16 @@ pub async fn update_source( qb.push(" WHERE id = ").push_bind(source_id); qb.build().execute(pool).await?; - let _ = state.event_manager.send(crate::event::AppEvent::SourceUpdate { source_id }); + state.emit_event(crate::event::PlatformEvent::SourceUpdated { source_id }); Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"}))) } pub async fn delete_source( - State(state): State, + State(state): State, Path(source_id): Path, ) -> Result { - let pool = &state.platform.pool; + let pool = &state.pool; let source_name = sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") .bind(source_id) @@ -324,17 +323,16 @@ pub async fn delete_source( .execute(pool) .await?; - // 瑙﹀彂 SourceDelete 浜嬩欢 - let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id, source_name }); + state.emit_event(crate::event::PlatformEvent::SourceDeleted { source_id, source_name }); Ok(StatusCode::NO_CONTENT) } pub async fn reconnect_source( - State(state): State, + State(state): State, Path(source_id): Path, ) -> Result { - let pool = &state.platform.pool; + let pool = &state.pool; let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1") .bind(source_id) @@ -349,7 +347,7 @@ pub async fn reconnect_source( } state - .platform.connection_manager + .connection_manager .reconnect(pool, source_id) .await .map_err(|e| ApiErr::Internal(e, None))?; @@ -358,11 +356,11 @@ pub async fn reconnect_source( } pub async fn browse_and_save_nodes( - State(state): State, + State(state): State, Path(source_id): Path, ) -> Result { - let pool = &state.platform.pool; + let pool = &state.pool; // 纭 source 瀛樺湪 sqlx::query("SELECT 1 FROM source WHERE id = $1") @@ -370,7 +368,7 @@ pub async fn browse_and_save_nodes( .fetch_one(pool) .await?; - let session = state.platform.connection_manager + let session = state.connection_manager .get_session(source_id) .await .ok_or_else(|| anyhow::anyhow!("Source not connected"))?; diff --git a/crates/app_feeder_distributor/src/handler/tag.rs b/crates/plc_platform_core/src/handler/tag.rs similarity index 77% rename from crates/app_feeder_distributor/src/handler/tag.rs rename to crates/plc_platform_core/src/handler/tag.rs index f98965f..26d6a0f 100644 --- a/crates/app_feeder_distributor/src/handler/tag.rs +++ b/crates/plc_platform_core/src/handler/tag.rs @@ -3,13 +3,12 @@ use serde::Deserialize; use uuid::Uuid; use validator::Validate; -use plc_platform_core::util::{ +use crate::platform_context::PlatformContext; +use crate::util::{ pagination::{PaginatedResponse, PaginationParams}, response::ApiErr, }; -use crate::{AppState}; -/// List all tags. #[derive(Deserialize, Validate)] pub struct GetTagListQuery { #[serde(flatten)] @@ -17,16 +16,13 @@ pub struct GetTagListQuery { } pub async fn get_tag_list( - State(state): State, + State(state): State, Query(query): Query, ) -> Result { query.validate()?; - let pool = &state.platform.pool; + let pool = &state.pool; - // Count total rows. let total = crate::service::get_tags_count(pool).await?; - - // Load current page rows. let tags = crate::service::get_tags_paginated( pool, query.pagination.page_size, @@ -34,16 +30,14 @@ pub async fn get_tag_list( ).await?; let response = PaginatedResponse::new(tags, total, query.pagination.page, query.pagination.page_size); - Ok(Json(response)) } -/// List points under a tag. pub async fn get_tag_points( - State(state): State, + State(state): State, Path(tag_id): Path, ) -> Result { - let points = crate::service::get_tag_points(&state.platform.pool, tag_id).await?; + let points = crate::service::get_tag_points(&state.pool, tag_id).await?; Ok(Json(points)) } @@ -63,16 +57,15 @@ pub struct UpdateTagReq { pub point_ids: Option>, } -/// Create a tag. pub async fn create_tag( - State(state): State, + State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; let point_ids = payload.point_ids.as_deref().unwrap_or(&[]); let tag_id = crate::service::create_tag( - &state.platform.pool, + &state.pool, &payload.name, payload.description.as_deref(), point_ids, @@ -84,22 +77,20 @@ pub async fn create_tag( })))) } -/// Update a tag. pub async fn update_tag( - State(state): State, + State(state): State, Path(tag_id): Path, Json(payload): Json, ) -> Result { payload.validate()?; - // Ensure the target tag exists. - let exists = crate::service::get_tag_by_id(&state.platform.pool, tag_id).await?; + let exists = crate::service::get_tag_by_id(&state.pool, tag_id).await?; if exists.is_none() { return Err(ApiErr::NotFound("Tag not found".to_string(), None)); } crate::service::update_tag( - &state.platform.pool, + &state.pool, tag_id, payload.name.as_deref(), payload.description.as_deref(), @@ -111,16 +102,13 @@ pub async fn update_tag( }))) } -/// Delete a tag. pub async fn delete_tag( - State(state): State, + State(state): State, Path(tag_id): Path, ) -> Result { - let deleted = crate::service::delete_tag(&state.platform.pool, tag_id).await?; - + let deleted = crate::service::delete_tag(&state.pool, tag_id).await?; if !deleted { return Err(ApiErr::NotFound("Tag not found".to_string(), None)); } - Ok(StatusCode::NO_CONTENT) } diff --git a/crates/plc_platform_core/src/platform_context.rs b/crates/plc_platform_core/src/platform_context.rs index b1007cb..31f10ad 100644 --- a/crates/plc_platform_core/src/platform_context.rs +++ b/crates/plc_platform_core/src/platform_context.rs @@ -1,13 +1,32 @@ use std::sync::Arc; +use uuid::Uuid; + use crate::connection::ConnectionManager; +use crate::event::PlatformEvent; use crate::websocket::WebSocketManager; +/// Callback interface for app-specific side effects on platform events. +/// +/// Platform-level concerns (event persistence, WebSocket broadcast) are handled +/// automatically by `PlatformContext::emit_event()`. Implementations of this trait +/// only need to handle app-specific behavior (e.g. connection management, +/// control runtime notifications). +pub trait PlatformEventSink: Send + Sync { + fn on_source_created(&self, source_id: Uuid); + fn on_source_updated(&self, source_id: Uuid); + fn on_source_deleted(&self, source_id: Uuid, source_name: String); + fn on_points_created(&self, source_id: Uuid, point_ids: Vec); + fn on_points_deleted(&self, source_id: Uuid, point_ids: Vec); + fn on_units_changed(&self, unit_ids: Vec); +} + #[derive(Clone)] pub struct PlatformContext { pub pool: sqlx::PgPool, pub connection_manager: Arc, pub ws_manager: Arc, + pub event_sink: Option>, } impl PlatformContext { @@ -20,6 +39,56 @@ impl PlatformContext { pool, connection_manager, ws_manager, + event_sink: None, } } + + pub fn with_event_sink(mut self, sink: Arc) -> Self { + self.event_sink = Some(sink); + self + } + + /// Emit a platform event: persists to DB, broadcasts via WebSocket, + /// then notifies the app-specific event sink for side effects. + pub fn emit_event(&self, event: PlatformEvent) { + // Notify app-specific sink synchronously (fire-and-forget via channels). + if let Some(sink) = &self.event_sink { + match &event { + PlatformEvent::SourceCreated { source_id } => { + sink.on_source_created(*source_id); + } + PlatformEvent::SourceUpdated { source_id } => { + sink.on_source_updated(*source_id); + } + PlatformEvent::SourceDeleted { + source_id, + source_name, + } => { + sink.on_source_deleted(*source_id, source_name.clone()); + } + PlatformEvent::PointsCreated { + source_id, + point_ids, + } => { + sink.on_points_created(*source_id, point_ids.clone()); + } + PlatformEvent::PointsDeleted { + source_id, + point_ids, + } => { + sink.on_points_deleted(*source_id, point_ids.clone()); + } + PlatformEvent::UnitsChanged { unit_ids } => { + sink.on_units_changed(unit_ids.clone()); + } + } + } + + // Spawn async persistence + WebSocket broadcast. + let pool = self.pool.clone(); + let ws_manager = self.ws_manager.clone(); + tokio::spawn(async move { + crate::event::persist_and_broadcast(&event, &pool, &ws_manager).await; + }); + } }