diff --git a/crates/app_feeder_distributor/src/control/engine.rs b/crates/app_feeder_distributor/src/control/engine.rs index 92dd915..9dd64ec 100644 --- a/crates/app_feeder_distributor/src/control/engine.rs +++ b/crates/app_feeder_distributor/src/control/engine.rs @@ -11,12 +11,14 @@ use crate::{ runtime::{ControlRuntimeStore, UnitRuntime, UnitRuntimeState}, }, event::AppEvent, + model::ControlUnit, + service, AppState, }; use plc_platform_core::{ service::EquipmentRolePoint, telemetry::{PointMonitorInfo, PointQuality}, - websocket::WsMessage, + websocket::{AppWsEvent, WsMessage}, }; /// Start the engine: a supervisor spawns one async task per enabled unit. @@ -35,7 +37,7 @@ async fn supervise(state: AppState, store: Arc) { loop { interval.tick().await; - match plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await { + match service::get_all_enabled_units(&state.platform.pool).await { Ok(units) => { for unit in units { let needs_spawn = tasks.get(&unit.id).is_none_or(|h| h.is_finished()); @@ -66,7 +68,7 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu loop { // Reload unit config on each iteration to detect disable/delete. let unit = - match plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id).await { + match service::get_unit_by_id(&state.platform.pool, unit_id).await { Ok(Some(u)) if u.enabled => u, Ok(_) => { tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id); @@ -164,7 +166,7 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu } else { i32::MAX }; - let unit_for_wait = plc_platform_core::model::ControlUnit { + let unit_for_wait = ControlUnit { run_time_sec: secs, ..unit.clone() }; @@ -320,7 +322,7 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu async fn wait_phase( state: &AppState, store: &ControlRuntimeStore, - unit: &plc_platform_core::model::ControlUnit, + unit: &ControlUnit, all_roles: &[(Uuid, HashMap)], notify: &Arc, fault_tick: &mut tokio::time::Interval, @@ -361,12 +363,19 @@ async fn wait_phase( } async fn push_ws(state: &AppState, runtime: &UnitRuntime) { - if let Err(e) = state - .platform - .ws_manager - .send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone())) - .await - { + let payload = match serde_json::to_value(runtime) { + Ok(value) => value, + Err(err) => { + tracing::warn!("Engine: failed to serialize runtime for WS push: {}", err); + return; + } + }; + let message = WsMessage::AppEvent(AppWsEvent { + app: "feeder".to_string(), + event_type: "unit_runtime_changed".to_string(), + data: payload, + }); + if let Err(e) = state.platform.ws_manager.send_to_public(message).await { tracing::debug!("Engine: WS push skipped (no subscribers): {}", e); } } @@ -376,7 +385,7 @@ async fn push_ws(state: &AppState, runtime: &UnitRuntime) { async fn check_fault_comm( state: &AppState, runtime: &mut UnitRuntime, - unit: &plc_platform_core::model::ControlUnit, + unit: &ControlUnit, all_roles: &[(Uuid, HashMap)], ) -> bool { let monitor = state diff --git a/crates/app_feeder_distributor/src/control/mod.rs b/crates/app_feeder_distributor/src/control/mod.rs index d1af408..f062e6a 100644 --- a/crates/app_feeder_distributor/src/control/mod.rs +++ b/crates/app_feeder_distributor/src/control/mod.rs @@ -1,6 +1,7 @@ -pub use plc_platform_core::control::{command, runtime}; +pub use plc_platform_core::control::command; pub mod engine; +pub mod runtime; pub mod simulate; pub mod validator; diff --git a/crates/plc_platform_core/src/control/runtime.rs b/crates/app_feeder_distributor/src/control/runtime.rs similarity index 100% rename from crates/plc_platform_core/src/control/runtime.rs rename to crates/app_feeder_distributor/src/control/runtime.rs diff --git a/crates/app_feeder_distributor/src/control/simulate.rs b/crates/app_feeder_distributor/src/control/simulate.rs index 66e9ba6..638c9e1 100644 --- a/crates/app_feeder_distributor/src/control/simulate.rs +++ b/crates/app_feeder_distributor/src/control/simulate.rs @@ -8,7 +8,7 @@ use plc_platform_core::{ websocket::WsMessage, }; -use crate::AppState; +use crate::{service as feeder_service, AppState}; /// Whether SIMULATE_PLC mode is enabled via environment variable. pub fn enabled() -> bool { @@ -35,7 +35,7 @@ async fn run(state: AppState) { tokio::time::sleep(Duration::from_secs(wait_secs)).await; // Pick a random enabled unit. - let units = match service::get_all_enabled_units(&state.platform.pool).await { + let units = match feeder_service::get_all_enabled_units(&state.platform.pool).await { Ok(u) if !u.is_empty() => u, _ => continue, }; diff --git a/crates/app_feeder_distributor/src/handler/control.rs b/crates/app_feeder_distributor/src/handler/control.rs index 35f68a4..a94f6ed 100644 --- a/crates/app_feeder_distributor/src/handler/control.rs +++ b/crates/app_feeder_distributor/src/handler/control.rs @@ -15,11 +15,13 @@ use crate::{ control::runtime::{UnitRuntime, UnitRuntimeState}, control::validator::{validate_manual_control, ControlAction}, event::AppEvent, + model::ControlUnit, + service as feeder_service, AppState, }; use plc_platform_core::{ handler::equipment::SignalRolePoint, - model::{ControlUnit, Equipment, Point}, + model::{Equipment, Point}, service, telemetry::PointMonitorInfo, util::{ @@ -76,9 +78,9 @@ pub async fn get_unit_list( query.validate()?; let total = - service::get_units_count(&state.platform.pool, query.keyword.as_deref()) + feeder_service::get_units_count(&state.platform.pool, query.keyword.as_deref()) .await?; - let units = service::get_units_paginated( + let units = feeder_service::get_units_paginated( &state.platform.pool, query.keyword.as_deref(), query.pagination.page_size, @@ -223,7 +225,7 @@ pub async fn get_unit( State(state): State, Path(unit_id): Path, ) -> Result { - let unit = service::get_unit_by_id(&state.platform.pool, unit_id) + let unit = feeder_service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let runtime = state.control_runtime.get(unit_id).await; @@ -298,7 +300,7 @@ pub async fn get_unit_detail( State(state): State, Path(unit_id): Path, ) -> Result { - let unit = service::get_unit_by_id(&state.platform.pool, unit_id) + let unit = feeder_service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -396,7 +398,7 @@ pub async fn create_unit( validate_unit_timing_order(run_time_sec, acc_time_sec)?; - if service::get_unit_by_code(&state.platform.pool, &payload.code) + if feeder_service::get_unit_by_code(&state.platform.pool, &payload.code) .await? .is_some() { @@ -406,9 +408,9 @@ pub async fn create_unit( )); } - let unit_id = service::create_unit( + let unit_id = feeder_service::create_unit( &state.platform.pool, - service::CreateUnitParams { + feeder_service::CreateUnitParams { code: &payload.code, name: &payload.name, description: payload.description.as_deref(), @@ -457,7 +459,7 @@ pub async fn update_unit( ) -> Result { payload.validate()?; - let existing_unit = service::get_unit_by_id(&state.platform.pool, unit_id) + let existing_unit = feeder_service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -468,7 +470,7 @@ pub async fn update_unit( if let Some(code) = payload.code.as_deref() { let duplicate = - service::get_unit_by_code(&state.platform.pool, code).await?; + feeder_service::get_unit_by_code(&state.platform.pool, code).await?; if duplicate.as_ref().is_some_and(|item| item.id != unit_id) { return Err(ApiErr::BadRequest( "Unit code already exists".to_string(), @@ -490,10 +492,10 @@ pub async fn update_unit( return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } - service::update_unit( + feeder_service::update_unit( &state.platform.pool, unit_id, - service::UpdateUnitParams { + feeder_service::UpdateUnitParams { code: payload.code.as_deref(), name: payload.name.as_deref(), description: payload.description.as_deref(), @@ -518,7 +520,7 @@ pub async fn delete_unit( State(state): State, Path(unit_id): Path, ) -> Result { - let deleted = service::delete_unit(&state.platform.pool, unit_id).await?; + let deleted = feeder_service::delete_unit(&state.platform.pool, unit_id).await?; if !deleted { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } @@ -570,7 +572,7 @@ pub async fn start_auto_unit( State(state): State, Path(unit_id): Path, ) -> Result { - let unit = service::get_unit_by_id(&state.platform.pool, unit_id) + let unit = feeder_service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -609,7 +611,7 @@ pub async fn stop_auto_unit( State(state): State, Path(unit_id): Path, ) -> Result { - service::get_unit_by_id(&state.platform.pool, unit_id) + feeder_service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -628,7 +630,7 @@ pub async fn stop_auto_unit( } pub async fn batch_start_auto(State(state): State) -> Result { - let units = service::get_all_enabled_units(&state.platform.pool).await?; + let units = feeder_service::get_all_enabled_units(&state.platform.pool).await?; let mut started = Vec::new(); let mut skipped = Vec::new(); @@ -656,7 +658,7 @@ pub async fn batch_start_auto(State(state): State) -> Result) -> Result { - let units = service::get_all_enabled_units(&state.platform.pool).await?; + let units = feeder_service::get_all_enabled_units(&state.platform.pool).await?; let mut stopped = Vec::new(); for unit in units { @@ -680,7 +682,7 @@ pub async fn ack_fault_unit( State(state): State, Path(unit_id): Path, ) -> Result { - service::get_unit_by_id(&state.platform.pool, unit_id) + feeder_service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -718,7 +720,7 @@ pub async fn get_unit_runtime( State(state): State, Path(unit_id): Path, ) -> Result { - service::get_unit_by_id(&state.platform.pool, unit_id) + feeder_service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; diff --git a/crates/app_feeder_distributor/src/lib.rs b/crates/app_feeder_distributor/src/lib.rs index f54e2f1..f5d717a 100644 --- a/crates/app_feeder_distributor/src/lib.rs +++ b/crates/app_feeder_distributor/src/lib.rs @@ -2,7 +2,9 @@ pub mod app; pub mod control; pub mod event; pub mod handler; +pub mod model; pub mod router; +pub mod service; pub use app::{run, test_state, AppState}; pub use router::build_router; diff --git a/crates/app_feeder_distributor/src/model.rs b/crates/app_feeder_distributor/src/model.rs new file mode 100644 index 0000000..af6f1fa --- /dev/null +++ b/crates/app_feeder_distributor/src/model.rs @@ -0,0 +1,23 @@ +use chrono::{DateTime, Utc}; +use plc_platform_core::util::datetime::utc_to_local_str; +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] +pub struct ControlUnit { + pub id: Uuid, + pub code: String, + pub name: String, + pub description: Option, + pub enabled: bool, + pub run_time_sec: i32, + pub stop_time_sec: i32, + pub acc_time_sec: i32, + pub bl_time_sec: i32, + pub require_manual_ack_after_fault: bool, + #[serde(serialize_with = "utc_to_local_str")] + pub created_at: DateTime, + #[serde(serialize_with = "utc_to_local_str")] + pub updated_at: DateTime, +} diff --git a/crates/app_feeder_distributor/src/service.rs b/crates/app_feeder_distributor/src/service.rs new file mode 100644 index 0000000..cc6db92 --- /dev/null +++ b/crates/app_feeder_distributor/src/service.rs @@ -0,0 +1,3 @@ +pub mod unit; + +pub use unit::*; diff --git a/crates/app_feeder_distributor/src/service/unit.rs b/crates/app_feeder_distributor/src/service/unit.rs new file mode 100644 index 0000000..8cee071 --- /dev/null +++ b/crates/app_feeder_distributor/src/service/unit.rs @@ -0,0 +1,284 @@ +use sqlx::PgPool; +use uuid::Uuid; + +use crate::model::ControlUnit; + +fn unit_order_clause() -> &'static str { + "code" +} + +pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result { + match keyword { + Some(keyword) => { + let like = format!("%{}%", keyword); + sqlx::query_scalar::<_, i64>( + r#" + SELECT COUNT(*) + FROM unit + WHERE code ILIKE $1 OR name ILIKE $1 + "#, + ) + .bind(like) + .fetch_one(pool) + .await + } + None => { + sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#) + .fetch_one(pool) + .await + } + } +} + +pub async fn get_units_paginated( + pool: &PgPool, + keyword: Option<&str>, + page_size: i32, + offset: u32, +) -> Result, sqlx::Error> { + let unit_order = unit_order_clause(); + match keyword { + Some(keyword) => { + let like = format!("%{}%", keyword); + if page_size == -1 { + let sql = format!( + r#" + SELECT * + FROM unit + WHERE code ILIKE $1 OR name ILIKE $1 + ORDER BY {} + "#, + unit_order + ); + sqlx::query_as::<_, ControlUnit>(&sql) + .bind(like) + .fetch_all(pool) + .await + } else { + let sql = format!( + r#" + SELECT * + FROM unit + WHERE code ILIKE $1 OR name ILIKE $1 + ORDER BY {} + LIMIT $2 OFFSET $3 + "#, + unit_order + ); + sqlx::query_as::<_, ControlUnit>(&sql) + .bind(like) + .bind(page_size as i64) + .bind(offset as i64) + .fetch_all(pool) + .await + } + } + None => { + if page_size == -1 { + let sql = format!("SELECT * FROM unit ORDER BY {}", unit_order); + sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await + } else { + let sql = format!( + r#" + SELECT * + FROM unit + ORDER BY {} + LIMIT $1 OFFSET $2 + "#, + unit_order + ); + sqlx::query_as::<_, ControlUnit>(&sql) + .bind(page_size as i64) + .bind(offset as i64) + .fetch_all(pool) + .await + } + } + } +} + +pub async fn get_unit_by_id( + pool: &PgPool, + unit_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE id = $1"#) + .bind(unit_id) + .fetch_optional(pool) + .await +} + +pub async fn get_unit_by_code( + pool: &PgPool, + code: &str, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE code = $1"#) + .bind(code) + .fetch_optional(pool) + .await +} + +pub struct CreateUnitParams<'a> { + pub code: &'a str, + pub name: &'a str, + pub description: Option<&'a str>, + pub enabled: bool, + pub run_time_sec: i32, + pub stop_time_sec: i32, + pub acc_time_sec: i32, + pub bl_time_sec: i32, + pub require_manual_ack_after_fault: bool, +} + +pub async fn create_unit(pool: &PgPool, params: CreateUnitParams<'_>) -> Result { + let unit_id = Uuid::new_v4(); + sqlx::query( + r#" + INSERT INTO unit ( + id, code, name, description, enabled, + run_time_sec, stop_time_sec, acc_time_sec, bl_time_sec, + require_manual_ack_after_fault + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + "#, + ) + .bind(unit_id) + .bind(params.code) + .bind(params.name) + .bind(params.description) + .bind(params.enabled) + .bind(params.run_time_sec) + .bind(params.stop_time_sec) + .bind(params.acc_time_sec) + .bind(params.bl_time_sec) + .bind(params.require_manual_ack_after_fault) + .execute(pool) + .await?; + + Ok(unit_id) +} + +pub struct UpdateUnitParams<'a> { + pub code: Option<&'a str>, + pub name: Option<&'a str>, + pub description: Option<&'a str>, + pub enabled: Option, + pub run_time_sec: Option, + pub stop_time_sec: Option, + pub acc_time_sec: Option, + pub bl_time_sec: Option, + pub require_manual_ack_after_fault: Option, +} + +pub async fn update_unit( + pool: &PgPool, + unit_id: Uuid, + params: UpdateUnitParams<'_>, +) -> Result<(), sqlx::Error> { + let mut updates = Vec::new(); + let mut param_count = 1; + + if params.code.is_some() { + updates.push(format!("code = ${}", param_count)); + param_count += 1; + } + if params.name.is_some() { + updates.push(format!("name = ${}", param_count)); + param_count += 1; + } + if params.description.is_some() { + updates.push(format!("description = ${}", param_count)); + param_count += 1; + } + if params.enabled.is_some() { + updates.push(format!("enabled = ${}", param_count)); + param_count += 1; + } + if params.run_time_sec.is_some() { + updates.push(format!("run_time_sec = ${}", param_count)); + param_count += 1; + } + if params.stop_time_sec.is_some() { + updates.push(format!("stop_time_sec = ${}", param_count)); + param_count += 1; + } + if params.acc_time_sec.is_some() { + updates.push(format!("acc_time_sec = ${}", param_count)); + param_count += 1; + } + if params.bl_time_sec.is_some() { + updates.push(format!("bl_time_sec = ${}", param_count)); + param_count += 1; + } + if params.require_manual_ack_after_fault.is_some() { + updates.push(format!("require_manual_ack_after_fault = ${}", param_count)); + param_count += 1; + } + + updates.push("updated_at = NOW()".to_string()); + + let sql = format!( + r#"UPDATE unit SET {} WHERE id = ${}"#, + updates.join(", "), + param_count + ); + + let mut query = sqlx::query(&sql); + + if let Some(code) = params.code { + query = query.bind(code); + } + if let Some(name) = params.name { + query = query.bind(name); + } + if let Some(description) = params.description { + query = query.bind(description); + } + if let Some(enabled) = params.enabled { + query = query.bind(enabled); + } + if let Some(run_time_sec) = params.run_time_sec { + query = query.bind(run_time_sec); + } + if let Some(stop_time_sec) = params.stop_time_sec { + query = query.bind(stop_time_sec); + } + if let Some(acc_time_sec) = params.acc_time_sec { + query = query.bind(acc_time_sec); + } + if let Some(bl_time_sec) = params.bl_time_sec { + query = query.bind(bl_time_sec); + } + if let Some(require_manual_ack_after_fault) = params.require_manual_ack_after_fault { + query = query.bind(require_manual_ack_after_fault); + } + + query.bind(unit_id).execute(pool).await?; + Ok(()) +} + +pub async fn delete_unit(pool: &PgPool, unit_id: Uuid) -> Result { + let result = sqlx::query(r#"DELETE FROM unit WHERE id = $1"#) + .bind(unit_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +pub async fn get_all_enabled_units(pool: &PgPool) -> Result, sqlx::Error> { + let sql = format!( + "SELECT * FROM unit WHERE enabled = TRUE ORDER BY {}", + unit_order_clause() + ); + sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await +} + +#[cfg(test)] +mod tests { + use super::unit_order_clause; + + #[test] + fn unit_ordering_defaults_to_code() { + assert_eq!(unit_order_clause(), "code"); + } +} diff --git a/crates/plc_platform_core/tests/runtime_smoke.rs b/crates/app_feeder_distributor/tests/runtime_smoke.rs similarity index 93% rename from crates/plc_platform_core/tests/runtime_smoke.rs rename to crates/app_feeder_distributor/tests/runtime_smoke.rs index e3ca949..28cd229 100644 --- a/crates/plc_platform_core/tests/runtime_smoke.rs +++ b/crates/app_feeder_distributor/tests/runtime_smoke.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use plc_platform_core::control::runtime::{ControlRuntimeStore, UnitRuntimeState}; +use app_feeder_distributor::control::runtime::{ControlRuntimeStore, UnitRuntimeState}; use uuid::Uuid; #[tokio::test] diff --git a/crates/plc_platform_core/src/control/mod.rs b/crates/plc_platform_core/src/control/mod.rs index affecb3..9fe7961 100644 --- a/crates/plc_platform_core/src/control/mod.rs +++ b/crates/plc_platform_core/src/control/mod.rs @@ -1,2 +1 @@ pub mod command; -pub mod runtime; diff --git a/crates/plc_platform_core/src/handler/equipment.rs b/crates/plc_platform_core/src/handler/equipment.rs index f4cb5fb..c3c9eca 100644 --- a/crates/plc_platform_core/src/handler/equipment.rs +++ b/crates/plc_platform_core/src/handler/equipment.rs @@ -14,6 +14,13 @@ use crate::util::{ response::ApiErr, }; +async fn unit_row_exists(pool: &sqlx::PgPool, unit_id: Uuid) -> Result { + sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM unit WHERE id = $1)") + .bind(unit_id) + .fetch_one(pool) + .await +} + #[derive(Deserialize, Validate)] pub struct GetEquipmentListQuery { #[validate(length(min = 1, max = 100))] @@ -162,8 +169,8 @@ pub async fn create_equipment( } if let Some(unit_id) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; - if unit_exists.is_none() { + let unit_exists = unit_row_exists(&state.pool, unit_id).await?; + if !unit_exists { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } } @@ -211,8 +218,8 @@ pub async fn update_equipment( } if let Some(Some(unit_id)) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; - if unit_exists.is_none() { + let unit_exists = unit_row_exists(&state.pool, unit_id).await?; + if !unit_exists { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } } @@ -262,8 +269,8 @@ pub async fn batch_set_equipment_unit( } if let Some(unit_id) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; - if unit_exists.is_none() { + let unit_exists = unit_row_exists(&state.pool, unit_id).await?; + if !unit_exists { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } } diff --git a/crates/plc_platform_core/src/model.rs b/crates/plc_platform_core/src/model.rs index cc7c97f..82efe78 100644 --- a/crates/plc_platform_core/src/model.rs +++ b/crates/plc_platform_core/src/model.rs @@ -134,24 +134,6 @@ pub struct Equipment { pub updated_at: DateTime, } -#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] -pub struct ControlUnit { - pub id: Uuid, - pub code: String, - pub name: String, - pub description: Option, - pub enabled: bool, - pub run_time_sec: i32, - pub stop_time_sec: i32, - pub acc_time_sec: i32, - pub bl_time_sec: i32, - pub require_manual_ack_after_fault: bool, - #[serde(serialize_with = "utc_to_local_str")] - pub created_at: DateTime, - #[serde(serialize_with = "utc_to_local_str")] - pub updated_at: DateTime, -} - #[derive(Debug, Serialize, Deserialize, FromRow, Clone)] pub struct EventRecord { pub id: Uuid, diff --git a/crates/plc_platform_core/src/service/control.rs b/crates/plc_platform_core/src/service/control.rs index 08d0fac..18dc713 100644 --- a/crates/plc_platform_core/src/service/control.rs +++ b/crates/plc_platform_core/src/service/control.rs @@ -1,11 +1,7 @@ -use crate::model::{ControlUnit, EventRecord}; +use crate::model::EventRecord; use sqlx::{PgPool, QueryBuilder, Row}; use uuid::Uuid; -fn unit_order_clause() -> &'static str { - "code" -} - fn equipment_order_clause_with_unit() -> &'static str { "unit_id, code" } @@ -16,264 +12,6 @@ pub struct EquipmentRolePoint { pub signal_role: String, } -pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result { - match keyword { - Some(keyword) => { - let like = format!("%{}%", keyword); - sqlx::query_scalar::<_, i64>( - r#" - SELECT COUNT(*) - FROM unit - WHERE code ILIKE $1 OR name ILIKE $1 - "#, - ) - .bind(like) - .fetch_one(pool) - .await - } - None => { - sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#) - .fetch_one(pool) - .await - } - } -} - -pub async fn get_units_paginated( - pool: &PgPool, - keyword: Option<&str>, - page_size: i32, - offset: u32, -) -> Result, sqlx::Error> { - let unit_order = unit_order_clause(); - match keyword { - Some(keyword) => { - let like = format!("%{}%", keyword); - if page_size == -1 { - let sql = format!( - r#" - SELECT * - FROM unit - WHERE code ILIKE $1 OR name ILIKE $1 - ORDER BY {} - "#, - unit_order - ); - sqlx::query_as::<_, ControlUnit>(&sql) - .bind(like) - .fetch_all(pool) - .await - } else { - let sql = format!( - r#" - SELECT * - FROM unit - WHERE code ILIKE $1 OR name ILIKE $1 - ORDER BY {} - LIMIT $2 OFFSET $3 - "#, - unit_order - ); - sqlx::query_as::<_, ControlUnit>(&sql) - .bind(like) - .bind(page_size as i64) - .bind(offset as i64) - .fetch_all(pool) - .await - } - } - None => { - if page_size == -1 { - let sql = format!("SELECT * FROM unit ORDER BY {}", unit_order); - sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await - } else { - let sql = format!( - r#" - SELECT * - FROM unit - ORDER BY {} - LIMIT $1 OFFSET $2 - "#, - unit_order - ); - sqlx::query_as::<_, ControlUnit>(&sql) - .bind(page_size as i64) - .bind(offset as i64) - .fetch_all(pool) - .await - } - } - } -} - -pub async fn get_unit_by_id( - pool: &PgPool, - unit_id: Uuid, -) -> Result, sqlx::Error> { - sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE id = $1"#) - .bind(unit_id) - .fetch_optional(pool) - .await -} - -pub async fn get_unit_by_code( - pool: &PgPool, - code: &str, -) -> Result, sqlx::Error> { - sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE code = $1"#) - .bind(code) - .fetch_optional(pool) - .await -} - -pub struct CreateUnitParams<'a> { - pub code: &'a str, - pub name: &'a str, - pub description: Option<&'a str>, - pub enabled: bool, - pub run_time_sec: i32, - pub stop_time_sec: i32, - pub acc_time_sec: i32, - pub bl_time_sec: i32, - pub require_manual_ack_after_fault: bool, -} - -pub async fn create_unit(pool: &PgPool, params: CreateUnitParams<'_>) -> Result { - let unit_id = Uuid::new_v4(); - sqlx::query( - r#" - INSERT INTO unit ( - id, code, name, description, enabled, - run_time_sec, stop_time_sec, acc_time_sec, bl_time_sec, - require_manual_ack_after_fault - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - "#, - ) - .bind(unit_id) - .bind(params.code) - .bind(params.name) - .bind(params.description) - .bind(params.enabled) - .bind(params.run_time_sec) - .bind(params.stop_time_sec) - .bind(params.acc_time_sec) - .bind(params.bl_time_sec) - .bind(params.require_manual_ack_after_fault) - .execute(pool) - .await?; - - Ok(unit_id) -} - -pub struct UpdateUnitParams<'a> { - pub code: Option<&'a str>, - pub name: Option<&'a str>, - pub description: Option<&'a str>, - pub enabled: Option, - pub run_time_sec: Option, - pub stop_time_sec: Option, - pub acc_time_sec: Option, - pub bl_time_sec: Option, - pub require_manual_ack_after_fault: Option, -} - -pub async fn update_unit( - pool: &PgPool, - unit_id: Uuid, - params: UpdateUnitParams<'_>, -) -> Result<(), sqlx::Error> { - let mut updates = Vec::new(); - let mut param_count = 1; - - if params.code.is_some() { - updates.push(format!("code = ${}", param_count)); - param_count += 1; - } - if params.name.is_some() { - updates.push(format!("name = ${}", param_count)); - param_count += 1; - } - if params.description.is_some() { - updates.push(format!("description = ${}", param_count)); - param_count += 1; - } - if params.enabled.is_some() { - updates.push(format!("enabled = ${}", param_count)); - param_count += 1; - } - if params.run_time_sec.is_some() { - updates.push(format!("run_time_sec = ${}", param_count)); - param_count += 1; - } - if params.stop_time_sec.is_some() { - updates.push(format!("stop_time_sec = ${}", param_count)); - param_count += 1; - } - if params.acc_time_sec.is_some() { - updates.push(format!("acc_time_sec = ${}", param_count)); - param_count += 1; - } - if params.bl_time_sec.is_some() { - updates.push(format!("bl_time_sec = ${}", param_count)); - param_count += 1; - } - if params.require_manual_ack_after_fault.is_some() { - updates.push(format!("require_manual_ack_after_fault = ${}", param_count)); - param_count += 1; - } - - updates.push("updated_at = NOW()".to_string()); - - let sql = format!( - r#"UPDATE unit SET {} WHERE id = ${}"#, - updates.join(", "), - param_count - ); - - let mut query = sqlx::query(&sql); - - if let Some(code) = params.code { - query = query.bind(code); - } - if let Some(name) = params.name { - query = query.bind(name); - } - if let Some(description) = params.description { - query = query.bind(description); - } - if let Some(enabled) = params.enabled { - query = query.bind(enabled); - } - if let Some(run_time_sec) = params.run_time_sec { - query = query.bind(run_time_sec); - } - if let Some(stop_time_sec) = params.stop_time_sec { - query = query.bind(stop_time_sec); - } - if let Some(acc_time_sec) = params.acc_time_sec { - query = query.bind(acc_time_sec); - } - if let Some(bl_time_sec) = params.bl_time_sec { - query = query.bind(bl_time_sec); - } - if let Some(require_manual_ack_after_fault) = params.require_manual_ack_after_fault { - query = query.bind(require_manual_ack_after_fault); - } - - query.bind(unit_id).execute(pool).await?; - Ok(()) -} - -pub async fn delete_unit(pool: &PgPool, unit_id: Uuid) -> Result { - let result = sqlx::query(r#"DELETE FROM unit WHERE id = $1"#) - .bind(unit_id) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - pub async fn get_events_count( pool: &PgPool, unit_id: Option, @@ -317,14 +55,6 @@ pub async fn get_events_paginated( qb.build_query_as::().fetch_all(pool).await } -pub async fn get_all_enabled_units(pool: &PgPool) -> Result, sqlx::Error> { - let sql = format!( - "SELECT * FROM unit WHERE enabled = TRUE ORDER BY {}", - unit_order_clause() - ); - sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await -} - pub async fn get_equipment_by_unit_ids( pool: &PgPool, unit_ids: &[Uuid], @@ -346,14 +76,12 @@ pub async fn get_equipment_by_unit_id( pool: &PgPool, unit_id: Uuid, ) -> Result, sqlx::Error> { - let sql = format!( - "SELECT * FROM equipment WHERE unit_id = $1 ORDER BY {}", - unit_order_clause() - ); - sqlx::query_as::<_, crate::model::Equipment>(&sql) - .bind(unit_id) - .fetch_all(pool) - .await + sqlx::query_as::<_, crate::model::Equipment>( + "SELECT * FROM equipment WHERE unit_id = $1 ORDER BY code", + ) + .bind(unit_id) + .fetch_all(pool) + .await } pub async fn get_points_by_equipment_ids( @@ -462,12 +190,7 @@ pub async fn get_equipment_role_points( #[cfg(test)] mod tests { - use super::{equipment_order_clause_with_unit, unit_order_clause}; - - #[test] - fn unit_ordering_defaults_to_code() { - assert_eq!(unit_order_clause(), "code"); - } + use super::equipment_order_clause_with_unit; #[test] fn unit_equipment_ordering_uses_code_within_unit() { diff --git a/crates/plc_platform_core/src/websocket.rs b/crates/plc_platform_core/src/websocket.rs index c0e73c7..1f82ede 100644 --- a/crates/plc_platform_core/src/websocket.rs +++ b/crates/plc_platform_core/src/websocket.rs @@ -14,7 +14,6 @@ use uuid::Uuid; use crate::platform_context::PlatformContext; use crate::{ connection::{BatchSetPointValueReq, BatchSetPointValueRes}, - control::runtime::UnitRuntime, model::EventRecord, telemetry::PointMonitorInfo, }; @@ -25,7 +24,19 @@ pub enum WsMessage { PointNewValue(PointMonitorInfo), PointSetValueBatchResult(BatchSetPointValueRes), EventCreated(EventRecord), - UnitRuntimeChanged(UnitRuntime), + AppEvent(AppWsEvent), +} + +/// Business-event payload carried by `WsMessage::AppEvent`. +/// +/// Apps construct this so core stays free of business types. Frontend dispatches +/// by `app` first, then `event_type`. `data` is opaque to core; each app +/// documents its schema. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AppWsEvent { + pub app: String, + pub event_type: String, + pub data: serde_json::Value, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/web/feeder/js/logs.js b/web/feeder/js/logs.js index 2afd3cd..6651c14 100644 --- a/web/feeder/js/logs.js +++ b/web/feeder/js/logs.js @@ -150,15 +150,19 @@ export function startPointSocket() { prependEvent(payload.data); } - if (payload.type === "UnitRuntimeChanged") { - const runtime = payload.data; - state.runtimes.set(runtime.unit_id, runtime); - renderUnits(); - // lazy import to avoid circular dep (ops.js -> logs.js -> ops.js) - import("./ops.js").then(({ renderOpsUnits, syncEquipmentButtonsForUnit }) => { - renderOpsUnits(); - syncEquipmentButtonsForUnit(runtime.unit_id); - }); + if (payload.type === "AppEvent" || payload.type === "app_event") { + const envelope = payload.data || {}; + if (envelope.app !== "feeder") return; + if (envelope.event_type === "unit_runtime_changed") { + const runtime = envelope.data; + state.runtimes.set(runtime.unit_id, runtime); + renderUnits(); + // lazy import to avoid circular dep (ops.js -> logs.js -> ops.js) + import("./ops.js").then(({ renderOpsUnits, syncEquipmentButtonsForUnit }) => { + renderOpsUnits(); + syncEquipmentButtonsForUnit(runtime.unit_id); + }); + } return; } } catch {