diff --git a/crates/app_feeder_distributor/src/app.rs b/crates/app_feeder_distributor/src/app.rs index 1ab7d8f..0dc52e5 100644 --- a/crates/app_feeder_distributor/src/app.rs +++ b/crates/app_feeder_distributor/src/app.rs @@ -1,11 +1,11 @@ use std::sync::Arc; -use crate::{ - connection::ConnectionManager, control, event::EventManager, router::build_router, -}; +use crate::{control, event::EventManager, router::build_router}; use axum::extract::FromRef; -use plc_platform_core::websocket::WebSocketManager; -use plc_platform_core::{config::ServerConfig, platform_context::PlatformContext}; +use plc_platform_core::{bootstrap, websocket::WebSocketManager}; +use plc_platform_core::{ + config::ServerConfig, connection::ConnectionManager, platform_context::PlatformContext, +}; use tokio::sync::mpsc; #[derive(Clone)] @@ -23,25 +23,16 @@ impl FromRef for PlatformContext { } pub async fn run() { - dotenv::dotenv().ok(); - plc_platform_core::util::log::init_logger(); - let _single_instance = - match plc_platform_core::util::single_instance::try_acquire("PLCControl.FeederDistributor") - { - Ok(guard) => guard, - Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => { - tracing::warn!("Another feeder distributor instance is already running"); - return; - } - Err(err) => { - tracing::error!("Failed to initialize single instance guard: {}", err); - return; - } - }; + let Some(_single_instance) = bootstrap::init_process( + "PLCControl.FeederDistributor", + "Another feeder distributor instance is already running", + ) else { + return; + }; let config = ServerConfig::from_env("HOST", "0.0.0.0", "PORT", 60309) .expect("Failed to load server configuration"); - let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url) + let builder = bootstrap::bootstrap_platform(&config.database_url) .await .expect("Failed to bootstrap platform"); @@ -57,7 +48,7 @@ pub async fn run() { Some(platform.ws_manager.clone()), )); - plc_platform_core::bootstrap::connect_all_enabled_sources(&platform) + bootstrap::connect_all_enabled_sources(&platform) .await .expect("Failed to connect enabled sources"); @@ -70,35 +61,26 @@ pub async fn run() { control::engine::start(state.clone(), control_runtime); let app = build_router(state.clone()); - let addr = format!("{}:{}", config.server_host, config.server_port); - tracing::info!("Starting feeder distributor server at http://{}", addr); - let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - - let ui_url = format!("http://{}:{}/ui", "localhost", config.server_port); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); - let shutdown_tx_ctrl = shutdown_tx.clone(); + let ui_url = config.local_ui_url(); + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); let rt_handle = tokio::runtime::Handle::current(); init_tray(ui_url, shutdown_tx.clone(), rt_handle); let connection_manager_for_shutdown = state.platform.connection_manager.clone(); - tokio::spawn(async move { - tokio::signal::ctrl_c() - .await - .expect("Failed to install Ctrl+C handler"); - let _ = shutdown_tx_ctrl.send(()).await; - }); + bootstrap::install_ctrl_c_shutdown(shutdown_tx); - let shutdown_signal = async move { - let _ = shutdown_rx.recv().await; - tracing::info!("Received shutdown signal, closing all feeder connections..."); - connection_manager_for_shutdown.disconnect_all().await; - tracing::info!("All feeder connections closed"); - }; - - axum::serve(listener, app) - .with_graceful_shutdown(shutdown_signal) - .await - .unwrap(); + bootstrap::serve_app_with_graceful_shutdown( + &config, + "feeder distributor", + app, + bootstrap::disconnect_all_on_shutdown( + shutdown_rx, + connection_manager_for_shutdown, + "feeder", + ), + ) + .await + .unwrap(); } pub fn test_state() -> AppState { diff --git a/crates/app_feeder_distributor/src/control/engine.rs b/crates/app_feeder_distributor/src/control/engine.rs index 3e8a4fc..9476524 100644 --- a/crates/app_feeder_distributor/src/control/engine.rs +++ b/crates/app_feeder_distributor/src/control/engine.rs @@ -11,11 +11,13 @@ use crate::{ runtime::{ControlRuntimeStore, UnitRuntime, UnitRuntimeState}, }, event::AppEvent, - service::EquipmentRolePoint, - telemetry::{PointMonitorInfo, PointQuality}, AppState, }; -use plc_platform_core::websocket::WsMessage; +use plc_platform_core::{ + service::EquipmentRolePoint, + telemetry::{PointMonitorInfo, PointQuality}, + websocket::WsMessage, +}; /// Start the engine: a supervisor spawns one async task per enabled unit. pub fn start(state: AppState, runtime_store: Arc) { @@ -33,16 +35,16 @@ async fn supervise(state: AppState, store: Arc) { loop { interval.tick().await; - match crate::service::get_all_enabled_units(&state.platform.pool).await { + match plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await { Ok(units) => { for unit in units { - let needs_spawn = tasks - .get(&unit.id) - .map_or(true, |h| h.is_finished()); + let needs_spawn = tasks.get(&unit.id).map_or(true, |h| h.is_finished()); if needs_spawn { let s = state.clone(); let st = store.clone(); - let handle = tokio::spawn(async move { unit_task(s, st, unit.id).await; }); + let handle = tokio::spawn(async move { + unit_task(s, st, unit.id).await; + }); tasks.insert(unit.id, handle); } } @@ -63,18 +65,19 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu loop { // Reload unit config on each iteration to detect disable/delete. - let unit = match crate::service::get_unit_by_id(&state.platform.pool, unit_id).await { - Ok(Some(u)) if u.enabled => u, - Ok(_) => { - tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id); - return; - } - Err(e) => { - tracing::error!("Engine: unit {} config reload failed: {}", unit_id, e); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - }; + let unit = + match plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id).await { + Ok(Some(u)) if u.enabled => u, + Ok(_) => { + tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id); + return; + } + Err(e) => { + tracing::error!("Engine: unit {} config reload failed: {}", unit_id, e); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; // Fault / comm check. let (kind_roles, all_roles) = match load_equipment_maps(&state, unit_id).await { @@ -93,7 +96,11 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu } // Wait when not active. - if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required { + if !runtime.auto_enabled + || runtime.fault_locked + || runtime.comm_locked + || runtime.manual_ack_required + { tokio::select! { _ = fault_tick.tick() => {} _ = notify.notified() => { @@ -114,11 +121,24 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu continue; } // Send feeder start command. - let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await; - let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "start_cmd", &monitor)); + let monitor = state + .platform + .connection_manager + .get_point_monitor_data_read_guard() + .await; + let cmd = kind_roles + .get("coal_feeder") + .and_then(|r| find_cmd(r, "start_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = cmd { - if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { + if let Err(e) = send_pulse_command( + &state.platform.connection_manager, + pid, + vt.as_ref(), + 300, + ) + .await + { tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e); continue; } @@ -132,20 +152,46 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu UnitRuntimeState::Running => { // Wait run_time_sec. run_time_sec == 0 means run without a time limit // (relies on acc_time_sec to eventually stop). Treat as a very long phase. - let secs = if unit.run_time_sec > 0 { unit.run_time_sec } else { i32::MAX }; + let secs = if unit.run_time_sec > 0 { + unit.run_time_sec + } else { + i32::MAX + }; let unit_for_wait = plc_platform_core::model::ControlUnit { run_time_sec: secs, ..unit.clone() }; - if !wait_phase(&state, &store, &unit_for_wait, &all_roles, ¬ify, &mut fault_tick).await { + if !wait_phase( + &state, + &store, + &unit_for_wait, + &all_roles, + ¬ify, + &mut fault_tick, + ) + .await + { continue; } // Stop feeder. - let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await; - let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); + let monitor = state + .platform + .connection_manager + .get_point_monitor_data_read_guard() + .await; + let cmd = kind_roles + .get("coal_feeder") + .and_then(|r| find_cmd(r, "stop_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = cmd { - if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { + if let Err(e) = send_pulse_command( + &state.platform.connection_manager, + pid, + vt.as_ref(), + 300, + ) + .await + { tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e); continue; } @@ -154,14 +200,33 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu runtime.accumulated_run_sec += secs as i64 * 1000; runtime.display_acc_sec = runtime.accumulated_run_sec; - if unit.acc_time_sec > 0 && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 { + if unit.acc_time_sec > 0 + && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 + { // Accumulated threshold reached; start distributor. - let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await; - let dist_cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "start_cmd", &monitor)); + let monitor = state + .platform + .connection_manager + .get_point_monitor_data_read_guard() + .await; + let dist_cmd = kind_roles + .get("distributor") + .and_then(|r| find_cmd(r, "start_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = dist_cmd { - if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { - tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e); + if let Err(e) = send_pulse_command( + &state.platform.connection_manager, + pid, + vt.as_ref(), + 300, + ) + .await + { + tracing::warn!( + "Engine: start distributor failed for unit {}: {}", + unit_id, + e + ); } } runtime.state = UnitRuntimeState::DistributorRunning; @@ -177,12 +242,29 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu if !wait_phase(&state, &store, &unit, &all_roles, ¬ify, &mut fault_tick).await { continue; } - let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await; - let cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); + let monitor = state + .platform + .connection_manager + .get_point_monitor_data_read_guard() + .await; + let cmd = kind_roles + .get("distributor") + .and_then(|r| find_cmd(r, "stop_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = cmd { - if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { - tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e); + if let Err(e) = send_pulse_command( + &state.platform.connection_manager, + pid, + vt.as_ref(), + 300, + ) + .await + { + tracing::warn!( + "Engine: stop distributor failed for unit {}: {}", + unit_id, + e + ); continue; } } @@ -242,7 +324,11 @@ async fn wait_phase( store.upsert(runtime.clone()).await; push_ws(state, &runtime).await; } - if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required { + if !runtime.auto_enabled + || runtime.fault_locked + || runtime.comm_locked + || runtime.manual_ack_required + { return false; } } @@ -250,7 +336,8 @@ async fn wait_phase( async fn push_ws(state: &AppState, runtime: &UnitRuntime) { if let Err(e) = state - .platform.ws_manager + .platform + .ws_manager .send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone())) .await { @@ -267,7 +354,8 @@ async fn check_fault_comm( all_roles: &[(Uuid, HashMap)], ) -> bool { let monitor = state - .platform.connection_manager + .platform + .connection_manager .get_point_monitor_data_read_guard() .await; @@ -340,17 +428,26 @@ async fn check_fault_comm( runtime.rem_local = any_rem_local; if !prev_comm && runtime.comm_locked { - let _ = state.event_manager.send(AppEvent::CommLocked { unit_id: unit.id }); + let _ = state + .event_manager + .send(AppEvent::CommLocked { unit_id: unit.id }); } else if prev_comm && !runtime.comm_locked { - let _ = state.event_manager.send(AppEvent::CommRecovered { unit_id: unit.id }); + let _ = state + .event_manager + .send(AppEvent::CommRecovered { unit_id: unit.id }); } if let Some(eq_id) = flt_eq_id { runtime.fault_locked = true; - let _ = state.event_manager.send(AppEvent::FaultLocked { unit_id: unit.id, equipment_id: eq_id }); + let _ = state.event_manager.send(AppEvent::FaultLocked { + unit_id: unit.id, + equipment_id: eq_id, + }); if runtime.auto_enabled { runtime.auto_enabled = false; - let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id }); + let _ = state + .event_manager + .send(AppEvent::AutoControlStopped { unit_id: unit.id }); } } @@ -364,16 +461,23 @@ async fn check_fault_comm( // Fire RemLocal event when any equipment first switches to local mode. if let Some(eq_id) = rem_local_eq_id { - let _ = state.event_manager.send(AppEvent::RemLocal { unit_id: unit.id, equipment_id: eq_id }); + let _ = state.event_manager.send(AppEvent::RemLocal { + unit_id: unit.id, + equipment_id: eq_id, + }); if runtime.auto_enabled { runtime.auto_enabled = false; - let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id }); + let _ = state + .event_manager + .send(AppEvent::AutoControlStopped { unit_id: unit.id }); } } // Fire RemRecovered when all rem signals return to remote. if prev_rem_local && !any_rem_local { - let _ = state.event_manager.send(AppEvent::RemRecovered { unit_id: unit.id }); + let _ = state + .event_manager + .send(AppEvent::RemRecovered { unit_id: unit.id }); } runtime.comm_locked != prev_comm @@ -390,10 +494,14 @@ type EquipMaps = ( ); async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result { - let equipment_list = crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?; + let equipment_list = + plc_platform_core::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?; let equipment_ids: Vec = equipment_list.iter().map(|equip| equip.id).collect(); - let role_point_rows = - crate::service::get_signal_role_points_batch(&state.platform.pool, &equipment_ids).await?; + let role_point_rows = plc_platform_core::service::get_signal_role_points_batch( + &state.platform.pool, + &equipment_ids, + ) + .await?; let mut role_points_by_equipment: HashMap> = HashMap::new(); for row in role_point_rows { role_points_by_equipment @@ -434,7 +542,8 @@ fn build_equipment_maps( } else { tracing::warn!( "Engine: unit {} has multiple {} equipment; using first", - unit_id, kind + unit_id, + kind ); } } @@ -449,7 +558,7 @@ fn find_cmd( roles: &HashMap, role: &str, monitor: &HashMap, -) -> Option<(Uuid, Option)> { +) -> Option<(Uuid, Option)> { let cmd_rp = roles.get(role)?; let rem_ok = roles @@ -477,9 +586,9 @@ fn find_cmd( #[cfg(test)] mod tests { use super::build_equipment_maps; - use plc_platform_core::model::Equipment; - use crate::service::EquipmentRolePoint; use chrono::Utc; + use plc_platform_core::model::Equipment; + use plc_platform_core::service::EquipmentRolePoint; use std::collections::HashMap; use uuid::Uuid; @@ -512,7 +621,7 @@ mod tests { signal_role: "start_cmd".to_string(), }], ); - let (first_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, first_roles); + let (first_kind_roles, _) = build_equipment_maps(unit_id, &equipment_list, first_roles); let mut second_roles = HashMap::new(); second_roles.insert( @@ -522,8 +631,7 @@ mod tests { signal_role: "start_cmd".to_string(), }], ); - let (second_kind_roles, _, _) = - build_equipment_maps(unit_id, &equipment_list, second_roles); + let (second_kind_roles, _) = build_equipment_maps(unit_id, &equipment_list, second_roles); assert_eq!( first_kind_roles["coal_feeder"]["start_cmd"].point_id, diff --git a/crates/app_feeder_distributor/src/control/mod.rs b/crates/app_feeder_distributor/src/control/mod.rs index 775412f..b833f51 100644 --- a/crates/app_feeder_distributor/src/control/mod.rs +++ b/crates/app_feeder_distributor/src/control/mod.rs @@ -3,7 +3,7 @@ pub use plc_platform_core::control::{command, runtime}; pub mod engine; pub mod validator; -use crate::telemetry::{DataValue, PointMonitorInfo}; +use plc_platform_core::telemetry::{DataValue, PointMonitorInfo}; pub(crate) fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool { match monitor.value.as_ref() { @@ -12,7 +12,10 @@ pub(crate) fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool { Some(DataValue::UInt(value)) => *value != 0, Some(DataValue::Float(value)) => *value != 0.0, Some(DataValue::Text(value)) => { - matches!(value.trim().to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes") + matches!( + value.trim().to_ascii_lowercase().as_str(), + "1" | "true" | "on" | "yes" + ) } _ => false, } diff --git a/crates/app_feeder_distributor/src/control/validator.rs b/crates/app_feeder_distributor/src/control/validator.rs index 7752425..4d7ef92 100644 --- a/crates/app_feeder_distributor/src/control/validator.rs +++ b/crates/app_feeder_distributor/src/control/validator.rs @@ -3,11 +3,11 @@ use std::collections::HashMap; use serde_json::json; use uuid::Uuid; -use crate::{ +use crate::AppState; +use plc_platform_core::{ service::EquipmentRolePoint, telemetry::{PointMonitorInfo, PointQuality, ValueType}, util::response::ApiErr, - AppState, }; #[derive(Debug, Clone, Copy)] @@ -43,11 +43,14 @@ pub async fn validate_manual_control( equipment_id: Uuid, action: ControlAction, ) -> Result { - let equipment = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id) - .await? - .ok_or_else(|| ApiErr::NotFound("Equipment not found".to_string(), None))?; + let equipment = + plc_platform_core::service::get_equipment_by_id(&state.platform.pool, equipment_id) + .await? + .ok_or_else(|| ApiErr::NotFound("Equipment not found".to_string(), None))?; - let role_points = crate::service::get_equipment_role_points(&state.platform.pool, equipment_id).await?; + let role_points = + plc_platform_core::service::get_equipment_role_points(&state.platform.pool, equipment_id) + .await?; if role_points.is_empty() { return Err(ApiErr::BadRequest( "Equipment has no bound role points".to_string(), @@ -75,7 +78,8 @@ pub async fn validate_manual_control( .clone(); let monitor_guard = state - .platform.connection_manager + .platform + .connection_manager .get_point_monitor_data_read_guard() .await; @@ -135,7 +139,9 @@ pub async fn validate_manual_control( if runtime.fault_locked { return Err(ApiErr::Forbidden( "Unit is fault locked".to_string(), - Some(json!({ "unit_id": unit_id, "manual_ack_required": runtime.manual_ack_required })), + Some( + json!({ "unit_id": unit_id, "manual_ack_required": runtime.manual_ack_required }), + ), )); } if runtime.manual_ack_required { @@ -148,7 +154,8 @@ pub async fn validate_manual_control( } let command_value_type = state - .platform.connection_manager + .platform + .connection_manager .get_point_monitor_data_read_guard() .await .get(&command_point.point_id) @@ -198,4 +205,3 @@ fn missing_monitor_err(role: &str, equipment_id: Uuid) -> ApiErr { })), ) } - diff --git a/crates/app_feeder_distributor/src/handler/control.rs b/crates/app_feeder_distributor/src/handler/control.rs index b54a0c9..93fc2ff 100644 --- a/crates/app_feeder_distributor/src/handler/control.rs +++ b/crates/app_feeder_distributor/src/handler/control.rs @@ -1,4 +1,4 @@ -use axum::{ +use axum::{ extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, @@ -11,17 +11,14 @@ use validator::Validate; use crate::{ control::validator::{validate_manual_control, ControlAction}, - util::{ - pagination::{PaginatedResponse, PaginationParams}, - response::ApiErr, - }, AppState, }; +use plc_platform_core::util::{ + pagination::{PaginatedResponse, PaginationParams}, + response::ApiErr, +}; -fn validate_unit_timing_order( - run_time_sec: i32, - acc_time_sec: i32, -) -> Result<(), ApiErr> { +fn validate_unit_timing_order(run_time_sec: i32, acc_time_sec: i32) -> Result<(), ApiErr> { if acc_time_sec <= run_time_sec { return Err(ApiErr::BadRequest( "acc_time_sec must be greater than run_time_sec".to_string(), @@ -68,8 +65,10 @@ pub async fn get_unit_list( ) -> Result { query.validate()?; - let total = crate::service::get_units_count(&state.platform.pool, query.keyword.as_deref()).await?; - let units = crate::service::get_units_paginated( + let total = + plc_platform_core::service::get_units_count(&state.platform.pool, query.keyword.as_deref()) + .await?; + let units = plc_platform_core::service::get_units_paginated( &state.platform.pool, query.keyword.as_deref(), query.pagination.page_size, @@ -81,14 +80,17 @@ pub async fn get_unit_list( let unit_ids: Vec = units.iter().map(|u| u.id).collect(); let all_equipments = - crate::service::get_equipment_by_unit_ids(&state.platform.pool, &unit_ids).await?; + plc_platform_core::service::get_equipment_by_unit_ids(&state.platform.pool, &unit_ids) + .await?; let eq_ids: Vec = all_equipments.iter().map(|e| e.id).collect(); let role_point_rows = - crate::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids).await?; + plc_platform_core::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids) + .await?; let monitor_guard = state - .platform.connection_manager + .platform + .connection_manager .get_point_monitor_data_read_guard() .await; @@ -97,14 +99,13 @@ pub async fn get_unit_list( Vec, > = std::collections::HashMap::new(); for rp in role_point_rows { - role_points_map - .entry(rp.equipment_id) - .or_default() - .push(crate::handler::equipment::SignalRolePoint { + role_points_map.entry(rp.equipment_id).or_default().push( + crate::handler::equipment::SignalRolePoint { point_id: rp.point_id, signal_role: rp.signal_role, point_monitor: monitor_guard.get(&rp.point_id).cloned(), - }); + }, + ); } drop(monitor_guard); @@ -116,7 +117,10 @@ pub async fn get_unit_list( equipments_by_unit .entry(unit_id) .or_default() - .push(UnitEquipmentItem { equipment: eq, role_points }); + .push(UnitEquipmentItem { + equipment: eq, + role_points, + }); } } @@ -125,7 +129,11 @@ pub async fn get_unit_list( .map(|unit| { let runtime = all_runtimes.get(&unit.id).cloned(); let equipments = equipments_by_unit.remove(&unit.id).unwrap_or_default(); - UnitWithRuntime { unit, runtime, equipments } + UnitWithRuntime { + unit, + runtime, + equipments, + } }) .collect::>(); @@ -151,7 +159,6 @@ pub async fn stop_equipment( send_equipment_command(state, equipment_id, ControlAction::Stop).await } - async fn send_equipment_command( state: AppState, equipment_id: Uuid, @@ -197,18 +204,20 @@ pub async fn get_unit( State(state): State, Path(unit_id): Path, ) -> Result { - let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id) + let unit = plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let runtime = state.control_runtime.get(unit_id).await; let all_equipments = - crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?; + plc_platform_core::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?; let eq_ids: Vec = all_equipments.iter().map(|e| e.id).collect(); let role_point_rows = - crate::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids).await?; + plc_platform_core::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids) + .await?; let monitor_guard = state - .platform.connection_manager + .platform + .connection_manager .get_point_monitor_data_read_guard() .await; let mut role_points_map: std::collections::HashMap< @@ -216,14 +225,13 @@ pub async fn get_unit( Vec, > = std::collections::HashMap::new(); for rp in role_point_rows { - role_points_map - .entry(rp.equipment_id) - .or_default() - .push(crate::handler::equipment::SignalRolePoint { + role_points_map.entry(rp.equipment_id).or_default().push( + crate::handler::equipment::SignalRolePoint { point_id: rp.point_id, signal_role: rp.signal_role, point_monitor: monitor_guard.get(&rp.point_id).cloned(), - }); + }, + ); } drop(monitor_guard); @@ -231,18 +239,25 @@ pub async fn get_unit( .into_iter() .map(|eq| { let role_points = role_points_map.remove(&eq.id).unwrap_or_default(); - UnitEquipmentItem { equipment: eq, role_points } + UnitEquipmentItem { + equipment: eq, + role_points, + } }) .collect(); - Ok(Json(UnitWithRuntime { unit, runtime, equipments })) + Ok(Json(UnitWithRuntime { + unit, + runtime, + equipments, + })) } #[derive(serde::Serialize)] pub struct PointDetail { #[serde(flatten)] pub point: plc_platform_core::model::Point, - pub point_monitor: Option, + pub point_monitor: Option, } #[derive(serde::Serialize)] @@ -264,18 +279,24 @@ pub async fn get_unit_detail( State(state): State, Path(unit_id): Path, ) -> Result { - let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id) + let unit = plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let runtime = state.control_runtime.get(unit_id).await; - let equipments = crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?; + let equipments = + plc_platform_core::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?; let equipment_ids: Vec = equipments.iter().map(|e| e.id).collect(); - let all_points = crate::service::get_points_by_equipment_ids(&state.platform.pool, &equipment_ids).await?; + let all_points = plc_platform_core::service::get_points_by_equipment_ids( + &state.platform.pool, + &equipment_ids, + ) + .await?; let monitor_guard = state - .platform.connection_manager + .platform + .connection_manager .get_point_monitor_data_read_guard() .await; @@ -290,11 +311,18 @@ pub async fn get_unit_detail( point: p.clone(), }) .collect(); - EquipmentDetail { equipment: eq, points } + EquipmentDetail { + equipment: eq, + points, + } }) .collect(); - Ok(Json(UnitDetail { unit, runtime, equipments })) + Ok(Json(UnitDetail { + unit, + runtime, + equipments, + })) } #[derive(Debug, Deserialize, Validate)] @@ -349,7 +377,7 @@ pub async fn create_unit( validate_unit_timing_order(run_time_sec, acc_time_sec)?; - if crate::service::get_unit_by_code(&state.platform.pool, &payload.code) + if plc_platform_core::service::get_unit_by_code(&state.platform.pool, &payload.code) .await? .is_some() { @@ -359,9 +387,9 @@ pub async fn create_unit( )); } - let unit_id = crate::service::create_unit( + let unit_id = plc_platform_core::service::create_unit( &state.platform.pool, - crate::service::CreateUnitParams { + plc_platform_core::service::CreateUnitParams { code: &payload.code, name: &payload.name, description: payload.description.as_deref(), @@ -370,9 +398,7 @@ pub async fn create_unit( stop_time_sec, acc_time_sec, bl_time_sec, - require_manual_ack_after_fault: payload - .require_manual_ack_after_fault - .unwrap_or(true), + require_manual_ack_after_fault: payload.require_manual_ack_after_fault.unwrap_or(true), }, ) .await?; @@ -412,7 +438,7 @@ pub async fn update_unit( ) -> Result { payload.validate()?; - let existing_unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id) + let existing_unit = plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -422,7 +448,8 @@ pub async fn update_unit( )?; if let Some(code) = payload.code.as_deref() { - let duplicate = crate::service::get_unit_by_code(&state.platform.pool, code).await?; + let duplicate = + plc_platform_core::service::get_unit_by_code(&state.platform.pool, code).await?; if duplicate.as_ref().is_some_and(|item| item.id != unit_id) { return Err(ApiErr::BadRequest( "Unit code already exists".to_string(), @@ -444,10 +471,10 @@ pub async fn update_unit( return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } - crate::service::update_unit( + plc_platform_core::service::update_unit( &state.platform.pool, unit_id, - crate::service::UpdateUnitParams { + plc_platform_core::service::UpdateUnitParams { code: payload.code.as_deref(), name: payload.name.as_deref(), description: payload.description.as_deref(), @@ -470,7 +497,7 @@ pub async fn delete_unit( State(state): State, Path(unit_id): Path, ) -> Result { - let deleted = crate::service::delete_unit(&state.platform.pool, unit_id).await?; + let deleted = plc_platform_core::service::delete_unit(&state.platform.pool, unit_id).await?; if !deleted { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } @@ -493,13 +520,13 @@ pub async fn get_event_list( ) -> Result { query.validate()?; - let total = crate::service::get_events_count( + let total = plc_platform_core::service::get_events_count( &state.platform.pool, query.unit_id, query.event_type.as_deref(), ) .await?; - let data = crate::service::get_events_paginated( + let data = plc_platform_core::service::get_events_paginated( &state.platform.pool, query.unit_id, query.event_type.as_deref(), @@ -520,7 +547,7 @@ pub async fn start_auto_unit( State(state): State, Path(unit_id): Path, ) -> Result { - let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id) + let unit = plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -546,16 +573,20 @@ pub async fn start_auto_unit( state.control_runtime.upsert(runtime).await; state.control_runtime.notify_unit(unit_id).await; - let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStarted { unit_id }); + let _ = state + .event_manager + .send(crate::event::AppEvent::AutoControlStarted { unit_id }); - Ok(Json(json!({ "ok_msg": "Auto control started", "unit_id": unit_id }))) + Ok(Json( + json!({ "ok_msg": "Auto control started", "unit_id": unit_id }), + )) } pub async fn stop_auto_unit( State(state): State, Path(unit_id): Path, ) -> Result { - crate::service::get_unit_by_id(&state.platform.pool, unit_id) + plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -564,15 +595,17 @@ pub async fn stop_auto_unit( state.control_runtime.upsert(runtime).await; state.control_runtime.notify_unit(unit_id).await; - let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStopped { unit_id }); + let _ = state + .event_manager + .send(crate::event::AppEvent::AutoControlStopped { unit_id }); - Ok(Json(json!({ "ok_msg": "Auto control stopped", "unit_id": unit_id }))) + Ok(Json( + json!({ "ok_msg": "Auto control stopped", "unit_id": unit_id }), + )) } -pub async fn batch_start_auto( - State(state): State, -) -> Result { - let units = crate::service::get_all_enabled_units(&state.platform.pool).await?; +pub async fn batch_start_auto(State(state): State) -> Result { + let units = plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await?; let mut started = Vec::new(); let mut skipped = Vec::new(); @@ -599,10 +632,8 @@ pub async fn batch_start_auto( Ok(Json(json!({ "started": started, "skipped": skipped }))) } -pub async fn batch_stop_auto( - State(state): State, -) -> Result { - let units = crate::service::get_all_enabled_units(&state.platform.pool).await?; +pub async fn batch_stop_auto(State(state): State) -> Result { + let units = plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await?; let mut stopped = Vec::new(); for unit in units { @@ -626,7 +657,7 @@ pub async fn ack_fault_unit( State(state): State, Path(unit_id): Path, ) -> Result { - crate::service::get_unit_by_id(&state.platform.pool, unit_id) + plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -651,16 +682,20 @@ pub async fn ack_fault_unit( state.control_runtime.upsert(runtime).await; state.control_runtime.notify_unit(unit_id).await; - let _ = state.event_manager.send(crate::event::AppEvent::FaultAcked { unit_id }); + let _ = state + .event_manager + .send(crate::event::AppEvent::FaultAcked { unit_id }); - Ok(Json(json!({ "ok_msg": "Fault acknowledged", "unit_id": unit_id }))) + Ok(Json( + json!({ "ok_msg": "Fault acknowledged", "unit_id": unit_id }), + )) } pub async fn get_unit_runtime( State(state): State, Path(unit_id): Path, ) -> Result { - crate::service::get_unit_by_id(&state.platform.pool, unit_id) + plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; diff --git a/crates/app_feeder_distributor/src/lib.rs b/crates/app_feeder_distributor/src/lib.rs index ca64b08..f54e2f1 100644 --- a/crates/app_feeder_distributor/src/lib.rs +++ b/crates/app_feeder_distributor/src/lib.rs @@ -4,25 +4,5 @@ pub mod event; pub mod handler; pub mod router; -pub mod connection { - pub use plc_platform_core::connection::*; -} - -pub mod db { - pub use plc_platform_core::db::*; -} - -pub mod service { - pub use plc_platform_core::service::*; -} - -pub mod telemetry { - pub use plc_platform_core::telemetry::*; -} - -pub mod util { - pub use plc_platform_core::util::*; -} - -pub use app::{run, AppState, test_state}; +pub use app::{run, test_state, AppState}; pub use router::build_router; diff --git a/crates/app_operation_system/src/app.rs b/crates/app_operation_system/src/app.rs index 21ea93b..79d6f91 100644 --- a/crates/app_operation_system/src/app.rs +++ b/crates/app_operation_system/src/app.rs @@ -1,6 +1,6 @@ use crate::router::build_router; use axum::extract::FromRef; -use plc_platform_core::platform_context::PlatformContext; +use plc_platform_core::{bootstrap, platform_context::PlatformContext}; #[derive(Clone, Debug)] pub struct AppConfig { @@ -35,23 +35,15 @@ impl FromRef for PlatformContext { } pub async fn run() { - dotenv::dotenv().ok(); - plc_platform_core::util::log::init_logger(); - let _single_instance = - match plc_platform_core::util::single_instance::try_acquire("PLCControl.OperationSystem") { - Ok(guard) => guard, - Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => { - tracing::warn!("Another operation-system instance is already running"); - return; - } - Err(err) => { - tracing::error!("Failed to initialize single instance guard: {}", err); - return; - } - }; + let Some(_single_instance) = bootstrap::init_process( + "PLCControl.OperationSystem", + "Another operation-system instance is already running", + ) else { + return; + }; let config = AppConfig::from_env(); - let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.server.database_url) + let builder = bootstrap::bootstrap_platform(&config.server.database_url) .await .expect("Failed to bootstrap platform"); let platform = builder.build(); @@ -62,16 +54,8 @@ pub async fn run() { platform, }; let app = build_router(state.clone()); - let addr = format!( - "{}:{}", - state.config.server.server_host, state.config.server.server_port - ); - tracing::info!("Starting operation-system server at http://{}", addr); - let listener = tokio::net::TcpListener::bind(&addr) - .await - .expect("operation-system listener should bind"); - axum::serve(listener, app) + bootstrap::serve_app(&state.config.server, "operation-system", app) .await .expect("operation-system server should run"); } diff --git a/crates/plc_platform_core/src/bootstrap.rs b/crates/plc_platform_core/src/bootstrap.rs index 0e4e272..f0c9c63 100644 --- a/crates/plc_platform_core/src/bootstrap.rs +++ b/crates/plc_platform_core/src/bootstrap.rs @@ -1,10 +1,13 @@ use std::sync::Arc; +use crate::config::ServerConfig; use crate::connection::ConnectionManager; use crate::db::init_database; use crate::platform_context::PlatformContext; use crate::telemetry_processor::TelemetryProcessor; +use crate::util::single_instance::SingleInstanceGuard; use crate::websocket::WebSocketManager; +use tokio::sync::mpsc; pub struct PlatformBuilder { pub pool: sqlx::PgPool, @@ -79,3 +82,74 @@ pub async fn connect_all_enabled_sources(platform: &PlatformContext) -> Result<( Ok(()) } + +pub fn init_process( + single_instance_name: &str, + duplicate_instance_message: &str, +) -> Option { + dotenv::dotenv().ok(); + crate::util::log::init_logger(); + + match crate::util::single_instance::try_acquire(single_instance_name) { + Ok(guard) => Some(guard), + Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => { + tracing::warn!("{}", duplicate_instance_message); + None + } + Err(err) => { + tracing::error!("Failed to initialize single instance guard: {}", err); + None + } + } +} + +pub async fn serve_app( + config: &ServerConfig, + app_name: &str, + app: axum::Router, +) -> std::io::Result<()> { + let addr = config.addr(); + tracing::info!("Starting {} server at http://{}", app_name, addr); + let listener = tokio::net::TcpListener::bind(&addr).await?; + axum::serve(listener, app).await +} + +pub async fn serve_app_with_graceful_shutdown( + config: &ServerConfig, + app_name: &str, + app: axum::Router, + shutdown_signal: F, +) -> std::io::Result<()> +where + F: std::future::Future + Send + 'static, +{ + let addr = config.addr(); + tracing::info!("Starting {} server at http://{}", app_name, addr); + let listener = tokio::net::TcpListener::bind(&addr).await?; + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal) + .await +} + +pub fn install_ctrl_c_shutdown(shutdown_tx: mpsc::Sender<()>) { + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + let _ = shutdown_tx.send(()).await; + }); +} + +pub async fn disconnect_all_on_shutdown( + mut shutdown_rx: mpsc::Receiver<()>, + connection_manager: Arc, + app_name: &'static str, +) { + let _ = shutdown_rx.recv().await; + tracing::info!( + "Received shutdown signal, closing all {} connections...", + app_name + ); + connection_manager.disconnect_all().await; + tracing::info!("All {} connections closed", app_name); +} diff --git a/crates/plc_platform_core/src/config.rs b/crates/plc_platform_core/src/config.rs index 7950171..ea15808 100644 --- a/crates/plc_platform_core/src/config.rs +++ b/crates/plc_platform_core/src/config.rs @@ -20,6 +20,14 @@ impl ServerConfig { server_port: env_u16(port_key, port_default)?, }) } + + pub fn addr(&self) -> String { + format!("{}:{}", self.server_host, self.server_port) + } + + pub fn local_ui_url(&self) -> String { + format!("http://{}:{}/ui", "localhost", self.server_port) + } } pub fn required_env(key: &str) -> Result {