Clean feeder core dependency boundaries

This commit is contained in:
caoqianming 2026-04-21 16:22:11 +08:00
parent 24b1d3546b
commit f8ba864a65
9 changed files with 413 additions and 233 deletions

View File

@ -1,11 +1,11 @@
use std::sync::Arc;
use crate::{
connection::ConnectionManager, control, event::EventManager, router::build_router,
};
use crate::{control, event::EventManager, router::build_router};
use axum::extract::FromRef;
use plc_platform_core::websocket::WebSocketManager;
use plc_platform_core::{config::ServerConfig, platform_context::PlatformContext};
use plc_platform_core::{bootstrap, websocket::WebSocketManager};
use plc_platform_core::{
config::ServerConfig, connection::ConnectionManager, platform_context::PlatformContext,
};
use tokio::sync::mpsc;
#[derive(Clone)]
@ -23,25 +23,16 @@ impl FromRef<AppState> for PlatformContext {
}
pub async fn run() {
dotenv::dotenv().ok();
plc_platform_core::util::log::init_logger();
let _single_instance =
match plc_platform_core::util::single_instance::try_acquire("PLCControl.FeederDistributor")
{
Ok(guard) => guard,
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
tracing::warn!("Another feeder distributor instance is already running");
let Some(_single_instance) = bootstrap::init_process(
"PLCControl.FeederDistributor",
"Another feeder distributor instance is already running",
) else {
return;
}
Err(err) => {
tracing::error!("Failed to initialize single instance guard: {}", err);
return;
}
};
let config = ServerConfig::from_env("HOST", "0.0.0.0", "PORT", 60309)
.expect("Failed to load server configuration");
let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url)
let builder = bootstrap::bootstrap_platform(&config.database_url)
.await
.expect("Failed to bootstrap platform");
@ -57,7 +48,7 @@ pub async fn run() {
Some(platform.ws_manager.clone()),
));
plc_platform_core::bootstrap::connect_all_enabled_sources(&platform)
bootstrap::connect_all_enabled_sources(&platform)
.await
.expect("Failed to connect enabled sources");
@ -70,33 +61,24 @@ pub async fn run() {
control::engine::start(state.clone(), control_runtime);
let app = build_router(state.clone());
let addr = format!("{}:{}", config.server_host, config.server_port);
tracing::info!("Starting feeder distributor server at http://{}", addr);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
let ui_url = format!("http://{}:{}/ui", "localhost", config.server_port);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
let shutdown_tx_ctrl = shutdown_tx.clone();
let ui_url = config.local_ui_url();
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let rt_handle = tokio::runtime::Handle::current();
init_tray(ui_url, shutdown_tx.clone(), rt_handle);
let connection_manager_for_shutdown = state.platform.connection_manager.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
let _ = shutdown_tx_ctrl.send(()).await;
});
bootstrap::install_ctrl_c_shutdown(shutdown_tx);
let shutdown_signal = async move {
let _ = shutdown_rx.recv().await;
tracing::info!("Received shutdown signal, closing all feeder connections...");
connection_manager_for_shutdown.disconnect_all().await;
tracing::info!("All feeder connections closed");
};
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal)
bootstrap::serve_app_with_graceful_shutdown(
&config,
"feeder distributor",
app,
bootstrap::disconnect_all_on_shutdown(
shutdown_rx,
connection_manager_for_shutdown,
"feeder",
),
)
.await
.unwrap();
}

View File

@ -11,11 +11,13 @@ use crate::{
runtime::{ControlRuntimeStore, UnitRuntime, UnitRuntimeState},
},
event::AppEvent,
service::EquipmentRolePoint,
telemetry::{PointMonitorInfo, PointQuality},
AppState,
};
use plc_platform_core::websocket::WsMessage;
use plc_platform_core::{
service::EquipmentRolePoint,
telemetry::{PointMonitorInfo, PointQuality},
websocket::WsMessage,
};
/// Start the engine: a supervisor spawns one async task per enabled unit.
pub fn start(state: AppState, runtime_store: Arc<ControlRuntimeStore>) {
@ -33,16 +35,16 @@ async fn supervise(state: AppState, store: Arc<ControlRuntimeStore>) {
loop {
interval.tick().await;
match crate::service::get_all_enabled_units(&state.platform.pool).await {
match plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await {
Ok(units) => {
for unit in units {
let needs_spawn = tasks
.get(&unit.id)
.map_or(true, |h| h.is_finished());
let needs_spawn = tasks.get(&unit.id).map_or(true, |h| h.is_finished());
if needs_spawn {
let s = state.clone();
let st = store.clone();
let handle = tokio::spawn(async move { unit_task(s, st, unit.id).await; });
let handle = tokio::spawn(async move {
unit_task(s, st, unit.id).await;
});
tasks.insert(unit.id, handle);
}
}
@ -63,7 +65,8 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
loop {
// Reload unit config on each iteration to detect disable/delete.
let unit = match crate::service::get_unit_by_id(&state.platform.pool, unit_id).await {
let unit =
match plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id).await {
Ok(Some(u)) if u.enabled => u,
Ok(_) => {
tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id);
@ -93,7 +96,11 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
}
// Wait when not active.
if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required {
if !runtime.auto_enabled
|| runtime.fault_locked
|| runtime.comm_locked
|| runtime.manual_ack_required
{
tokio::select! {
_ = fault_tick.tick() => {}
_ = notify.notified() => {
@ -114,11 +121,24 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
continue;
}
// Send feeder start command.
let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await;
let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "start_cmd", &monitor));
let monitor = state
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let cmd = kind_roles
.get("coal_feeder")
.and_then(|r| find_cmd(r, "start_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = cmd {
if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await {
if let Err(e) = send_pulse_command(
&state.platform.connection_manager,
pid,
vt.as_ref(),
300,
)
.await
{
tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e);
continue;
}
@ -132,20 +152,46 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
UnitRuntimeState::Running => {
// Wait run_time_sec. run_time_sec == 0 means run without a time limit
// (relies on acc_time_sec to eventually stop). Treat as a very long phase.
let secs = if unit.run_time_sec > 0 { unit.run_time_sec } else { i32::MAX };
let secs = if unit.run_time_sec > 0 {
unit.run_time_sec
} else {
i32::MAX
};
let unit_for_wait = plc_platform_core::model::ControlUnit {
run_time_sec: secs,
..unit.clone()
};
if !wait_phase(&state, &store, &unit_for_wait, &all_roles, &notify, &mut fault_tick).await {
if !wait_phase(
&state,
&store,
&unit_for_wait,
&all_roles,
&notify,
&mut fault_tick,
)
.await
{
continue;
}
// Stop feeder.
let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await;
let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "stop_cmd", &monitor));
let monitor = state
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let cmd = kind_roles
.get("coal_feeder")
.and_then(|r| find_cmd(r, "stop_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = cmd {
if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await {
if let Err(e) = send_pulse_command(
&state.platform.connection_manager,
pid,
vt.as_ref(),
300,
)
.await
{
tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e);
continue;
}
@ -154,14 +200,33 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
runtime.accumulated_run_sec += secs as i64 * 1000;
runtime.display_acc_sec = runtime.accumulated_run_sec;
if unit.acc_time_sec > 0 && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 {
if unit.acc_time_sec > 0
&& runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000
{
// Accumulated threshold reached; start distributor.
let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await;
let dist_cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "start_cmd", &monitor));
let monitor = state
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let dist_cmd = kind_roles
.get("distributor")
.and_then(|r| find_cmd(r, "start_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = dist_cmd {
if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e);
if let Err(e) = send_pulse_command(
&state.platform.connection_manager,
pid,
vt.as_ref(),
300,
)
.await
{
tracing::warn!(
"Engine: start distributor failed for unit {}: {}",
unit_id,
e
);
}
}
runtime.state = UnitRuntimeState::DistributorRunning;
@ -177,12 +242,29 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
if !wait_phase(&state, &store, &unit, &all_roles, &notify, &mut fault_tick).await {
continue;
}
let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await;
let cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "stop_cmd", &monitor));
let monitor = state
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let cmd = kind_roles
.get("distributor")
.and_then(|r| find_cmd(r, "stop_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = cmd {
if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e);
if let Err(e) = send_pulse_command(
&state.platform.connection_manager,
pid,
vt.as_ref(),
300,
)
.await
{
tracing::warn!(
"Engine: stop distributor failed for unit {}: {}",
unit_id,
e
);
continue;
}
}
@ -242,7 +324,11 @@ async fn wait_phase(
store.upsert(runtime.clone()).await;
push_ws(state, &runtime).await;
}
if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required {
if !runtime.auto_enabled
|| runtime.fault_locked
|| runtime.comm_locked
|| runtime.manual_ack_required
{
return false;
}
}
@ -250,7 +336,8 @@ async fn wait_phase(
async fn push_ws(state: &AppState, runtime: &UnitRuntime) {
if let Err(e) = state
.platform.ws_manager
.platform
.ws_manager
.send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone()))
.await
{
@ -267,7 +354,8 @@ async fn check_fault_comm(
all_roles: &[(Uuid, HashMap<String, EquipmentRolePoint>)],
) -> bool {
let monitor = state
.platform.connection_manager
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -340,17 +428,26 @@ async fn check_fault_comm(
runtime.rem_local = any_rem_local;
if !prev_comm && runtime.comm_locked {
let _ = state.event_manager.send(AppEvent::CommLocked { unit_id: unit.id });
let _ = state
.event_manager
.send(AppEvent::CommLocked { unit_id: unit.id });
} else if prev_comm && !runtime.comm_locked {
let _ = state.event_manager.send(AppEvent::CommRecovered { unit_id: unit.id });
let _ = state
.event_manager
.send(AppEvent::CommRecovered { unit_id: unit.id });
}
if let Some(eq_id) = flt_eq_id {
runtime.fault_locked = true;
let _ = state.event_manager.send(AppEvent::FaultLocked { unit_id: unit.id, equipment_id: eq_id });
let _ = state.event_manager.send(AppEvent::FaultLocked {
unit_id: unit.id,
equipment_id: eq_id,
});
if runtime.auto_enabled {
runtime.auto_enabled = false;
let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id });
let _ = state
.event_manager
.send(AppEvent::AutoControlStopped { unit_id: unit.id });
}
}
@ -364,16 +461,23 @@ async fn check_fault_comm(
// Fire RemLocal event when any equipment first switches to local mode.
if let Some(eq_id) = rem_local_eq_id {
let _ = state.event_manager.send(AppEvent::RemLocal { unit_id: unit.id, equipment_id: eq_id });
let _ = state.event_manager.send(AppEvent::RemLocal {
unit_id: unit.id,
equipment_id: eq_id,
});
if runtime.auto_enabled {
runtime.auto_enabled = false;
let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id });
let _ = state
.event_manager
.send(AppEvent::AutoControlStopped { unit_id: unit.id });
}
}
// Fire RemRecovered when all rem signals return to remote.
if prev_rem_local && !any_rem_local {
let _ = state.event_manager.send(AppEvent::RemRecovered { unit_id: unit.id });
let _ = state
.event_manager
.send(AppEvent::RemRecovered { unit_id: unit.id });
}
runtime.comm_locked != prev_comm
@ -390,10 +494,14 @@ type EquipMaps = (
);
async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result<EquipMaps, sqlx::Error> {
let equipment_list = crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?;
let equipment_list =
plc_platform_core::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?;
let equipment_ids: Vec<Uuid> = equipment_list.iter().map(|equip| equip.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.platform.pool, &equipment_ids).await?;
let role_point_rows = plc_platform_core::service::get_signal_role_points_batch(
&state.platform.pool,
&equipment_ids,
)
.await?;
let mut role_points_by_equipment: HashMap<Uuid, Vec<EquipmentRolePoint>> = HashMap::new();
for row in role_point_rows {
role_points_by_equipment
@ -434,7 +542,8 @@ fn build_equipment_maps(
} else {
tracing::warn!(
"Engine: unit {} has multiple {} equipment; using first",
unit_id, kind
unit_id,
kind
);
}
}
@ -449,7 +558,7 @@ fn find_cmd(
roles: &HashMap<String, EquipmentRolePoint>,
role: &str,
monitor: &HashMap<Uuid, PointMonitorInfo>,
) -> Option<(Uuid, Option<crate::telemetry::ValueType>)> {
) -> Option<(Uuid, Option<plc_platform_core::telemetry::ValueType>)> {
let cmd_rp = roles.get(role)?;
let rem_ok = roles
@ -477,9 +586,9 @@ fn find_cmd(
#[cfg(test)]
mod tests {
use super::build_equipment_maps;
use plc_platform_core::model::Equipment;
use crate::service::EquipmentRolePoint;
use chrono::Utc;
use plc_platform_core::model::Equipment;
use plc_platform_core::service::EquipmentRolePoint;
use std::collections::HashMap;
use uuid::Uuid;
@ -512,7 +621,7 @@ mod tests {
signal_role: "start_cmd".to_string(),
}],
);
let (first_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, first_roles);
let (first_kind_roles, _) = build_equipment_maps(unit_id, &equipment_list, first_roles);
let mut second_roles = HashMap::new();
second_roles.insert(
@ -522,8 +631,7 @@ mod tests {
signal_role: "start_cmd".to_string(),
}],
);
let (second_kind_roles, _, _) =
build_equipment_maps(unit_id, &equipment_list, second_roles);
let (second_kind_roles, _) = build_equipment_maps(unit_id, &equipment_list, second_roles);
assert_eq!(
first_kind_roles["coal_feeder"]["start_cmd"].point_id,

View File

@ -3,7 +3,7 @@ pub use plc_platform_core::control::{command, runtime};
pub mod engine;
pub mod validator;
use crate::telemetry::{DataValue, PointMonitorInfo};
use plc_platform_core::telemetry::{DataValue, PointMonitorInfo};
pub(crate) fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool {
match monitor.value.as_ref() {
@ -12,7 +12,10 @@ pub(crate) fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool {
Some(DataValue::UInt(value)) => *value != 0,
Some(DataValue::Float(value)) => *value != 0.0,
Some(DataValue::Text(value)) => {
matches!(value.trim().to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes")
matches!(
value.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "on" | "yes"
)
}
_ => false,
}

View File

@ -3,11 +3,11 @@ use std::collections::HashMap;
use serde_json::json;
use uuid::Uuid;
use crate::{
use crate::AppState;
use plc_platform_core::{
service::EquipmentRolePoint,
telemetry::{PointMonitorInfo, PointQuality, ValueType},
util::response::ApiErr,
AppState,
};
#[derive(Debug, Clone, Copy)]
@ -43,11 +43,14 @@ pub async fn validate_manual_control(
equipment_id: Uuid,
action: ControlAction,
) -> Result<ManualControlContext, ApiErr> {
let equipment = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id)
let equipment =
plc_platform_core::service::get_equipment_by_id(&state.platform.pool, equipment_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Equipment not found".to_string(), None))?;
let role_points = crate::service::get_equipment_role_points(&state.platform.pool, equipment_id).await?;
let role_points =
plc_platform_core::service::get_equipment_role_points(&state.platform.pool, equipment_id)
.await?;
if role_points.is_empty() {
return Err(ApiErr::BadRequest(
"Equipment has no bound role points".to_string(),
@ -75,7 +78,8 @@ pub async fn validate_manual_control(
.clone();
let monitor_guard = state
.platform.connection_manager
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -135,7 +139,9 @@ pub async fn validate_manual_control(
if runtime.fault_locked {
return Err(ApiErr::Forbidden(
"Unit is fault locked".to_string(),
Some(json!({ "unit_id": unit_id, "manual_ack_required": runtime.manual_ack_required })),
Some(
json!({ "unit_id": unit_id, "manual_ack_required": runtime.manual_ack_required }),
),
));
}
if runtime.manual_ack_required {
@ -148,7 +154,8 @@ pub async fn validate_manual_control(
}
let command_value_type = state
.platform.connection_manager
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await
.get(&command_point.point_id)
@ -198,4 +205,3 @@ fn missing_monitor_err(role: &str, equipment_id: Uuid) -> ApiErr {
})),
)
}

View File

@ -1,4 +1,4 @@
use axum::{
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
@ -11,17 +11,14 @@ use validator::Validate;
use crate::{
control::validator::{validate_manual_control, ControlAction},
util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
},
AppState,
};
use plc_platform_core::util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
};
fn validate_unit_timing_order(
run_time_sec: i32,
acc_time_sec: i32,
) -> Result<(), ApiErr> {
fn validate_unit_timing_order(run_time_sec: i32, acc_time_sec: i32) -> Result<(), ApiErr> {
if acc_time_sec <= run_time_sec {
return Err(ApiErr::BadRequest(
"acc_time_sec must be greater than run_time_sec".to_string(),
@ -68,8 +65,10 @@ pub async fn get_unit_list(
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let total = crate::service::get_units_count(&state.platform.pool, query.keyword.as_deref()).await?;
let units = crate::service::get_units_paginated(
let total =
plc_platform_core::service::get_units_count(&state.platform.pool, query.keyword.as_deref())
.await?;
let units = plc_platform_core::service::get_units_paginated(
&state.platform.pool,
query.keyword.as_deref(),
query.pagination.page_size,
@ -81,14 +80,17 @@ pub async fn get_unit_list(
let unit_ids: Vec<Uuid> = units.iter().map(|u| u.id).collect();
let all_equipments =
crate::service::get_equipment_by_unit_ids(&state.platform.pool, &unit_ids).await?;
plc_platform_core::service::get_equipment_by_unit_ids(&state.platform.pool, &unit_ids)
.await?;
let eq_ids: Vec<Uuid> = all_equipments.iter().map(|e| e.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids).await?;
plc_platform_core::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids)
.await?;
let monitor_guard = state
.platform.connection_manager
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -97,14 +99,13 @@ pub async fn get_unit_list(
Vec<crate::handler::equipment::SignalRolePoint>,
> = std::collections::HashMap::new();
for rp in role_point_rows {
role_points_map
.entry(rp.equipment_id)
.or_default()
.push(crate::handler::equipment::SignalRolePoint {
role_points_map.entry(rp.equipment_id).or_default().push(
crate::handler::equipment::SignalRolePoint {
point_id: rp.point_id,
signal_role: rp.signal_role,
point_monitor: monitor_guard.get(&rp.point_id).cloned(),
});
},
);
}
drop(monitor_guard);
@ -116,7 +117,10 @@ pub async fn get_unit_list(
equipments_by_unit
.entry(unit_id)
.or_default()
.push(UnitEquipmentItem { equipment: eq, role_points });
.push(UnitEquipmentItem {
equipment: eq,
role_points,
});
}
}
@ -125,7 +129,11 @@ pub async fn get_unit_list(
.map(|unit| {
let runtime = all_runtimes.get(&unit.id).cloned();
let equipments = equipments_by_unit.remove(&unit.id).unwrap_or_default();
UnitWithRuntime { unit, runtime, equipments }
UnitWithRuntime {
unit,
runtime,
equipments,
}
})
.collect::<Vec<_>>();
@ -151,7 +159,6 @@ pub async fn stop_equipment(
send_equipment_command(state, equipment_id, ControlAction::Stop).await
}
async fn send_equipment_command(
state: AppState,
equipment_id: Uuid,
@ -197,18 +204,20 @@ pub async fn get_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id)
let unit = plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
let runtime = state.control_runtime.get(unit_id).await;
let all_equipments =
crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?;
plc_platform_core::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?;
let eq_ids: Vec<Uuid> = all_equipments.iter().map(|e| e.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids).await?;
plc_platform_core::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids)
.await?;
let monitor_guard = state
.platform.connection_manager
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let mut role_points_map: std::collections::HashMap<
@ -216,14 +225,13 @@ pub async fn get_unit(
Vec<crate::handler::equipment::SignalRolePoint>,
> = std::collections::HashMap::new();
for rp in role_point_rows {
role_points_map
.entry(rp.equipment_id)
.or_default()
.push(crate::handler::equipment::SignalRolePoint {
role_points_map.entry(rp.equipment_id).or_default().push(
crate::handler::equipment::SignalRolePoint {
point_id: rp.point_id,
signal_role: rp.signal_role,
point_monitor: monitor_guard.get(&rp.point_id).cloned(),
});
},
);
}
drop(monitor_guard);
@ -231,18 +239,25 @@ pub async fn get_unit(
.into_iter()
.map(|eq| {
let role_points = role_points_map.remove(&eq.id).unwrap_or_default();
UnitEquipmentItem { equipment: eq, role_points }
UnitEquipmentItem {
equipment: eq,
role_points,
}
})
.collect();
Ok(Json(UnitWithRuntime { unit, runtime, equipments }))
Ok(Json(UnitWithRuntime {
unit,
runtime,
equipments,
}))
}
#[derive(serde::Serialize)]
pub struct PointDetail {
#[serde(flatten)]
pub point: plc_platform_core::model::Point,
pub point_monitor: Option<crate::telemetry::PointMonitorInfo>,
pub point_monitor: Option<plc_platform_core::telemetry::PointMonitorInfo>,
}
#[derive(serde::Serialize)]
@ -264,18 +279,24 @@ pub async fn get_unit_detail(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id)
let unit = plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
let runtime = state.control_runtime.get(unit_id).await;
let equipments = crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?;
let equipments =
plc_platform_core::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?;
let equipment_ids: Vec<Uuid> = equipments.iter().map(|e| e.id).collect();
let all_points = crate::service::get_points_by_equipment_ids(&state.platform.pool, &equipment_ids).await?;
let all_points = plc_platform_core::service::get_points_by_equipment_ids(
&state.platform.pool,
&equipment_ids,
)
.await?;
let monitor_guard = state
.platform.connection_manager
.platform
.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -290,11 +311,18 @@ pub async fn get_unit_detail(
point: p.clone(),
})
.collect();
EquipmentDetail { equipment: eq, points }
EquipmentDetail {
equipment: eq,
points,
}
})
.collect();
Ok(Json(UnitDetail { unit, runtime, equipments }))
Ok(Json(UnitDetail {
unit,
runtime,
equipments,
}))
}
#[derive(Debug, Deserialize, Validate)]
@ -349,7 +377,7 @@ pub async fn create_unit(
validate_unit_timing_order(run_time_sec, acc_time_sec)?;
if crate::service::get_unit_by_code(&state.platform.pool, &payload.code)
if plc_platform_core::service::get_unit_by_code(&state.platform.pool, &payload.code)
.await?
.is_some()
{
@ -359,9 +387,9 @@ pub async fn create_unit(
));
}
let unit_id = crate::service::create_unit(
let unit_id = plc_platform_core::service::create_unit(
&state.platform.pool,
crate::service::CreateUnitParams {
plc_platform_core::service::CreateUnitParams {
code: &payload.code,
name: &payload.name,
description: payload.description.as_deref(),
@ -370,9 +398,7 @@ pub async fn create_unit(
stop_time_sec,
acc_time_sec,
bl_time_sec,
require_manual_ack_after_fault: payload
.require_manual_ack_after_fault
.unwrap_or(true),
require_manual_ack_after_fault: payload.require_manual_ack_after_fault.unwrap_or(true),
},
)
.await?;
@ -412,7 +438,7 @@ pub async fn update_unit(
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let existing_unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id)
let existing_unit = plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -422,7 +448,8 @@ pub async fn update_unit(
)?;
if let Some(code) = payload.code.as_deref() {
let duplicate = crate::service::get_unit_by_code(&state.platform.pool, code).await?;
let duplicate =
plc_platform_core::service::get_unit_by_code(&state.platform.pool, code).await?;
if duplicate.as_ref().is_some_and(|item| item.id != unit_id) {
return Err(ApiErr::BadRequest(
"Unit code already exists".to_string(),
@ -444,10 +471,10 @@ pub async fn update_unit(
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
crate::service::update_unit(
plc_platform_core::service::update_unit(
&state.platform.pool,
unit_id,
crate::service::UpdateUnitParams {
plc_platform_core::service::UpdateUnitParams {
code: payload.code.as_deref(),
name: payload.name.as_deref(),
description: payload.description.as_deref(),
@ -470,7 +497,7 @@ pub async fn delete_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let deleted = crate::service::delete_unit(&state.platform.pool, unit_id).await?;
let deleted = plc_platform_core::service::delete_unit(&state.platform.pool, unit_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
@ -493,13 +520,13 @@ pub async fn get_event_list(
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let total = crate::service::get_events_count(
let total = plc_platform_core::service::get_events_count(
&state.platform.pool,
query.unit_id,
query.event_type.as_deref(),
)
.await?;
let data = crate::service::get_events_paginated(
let data = plc_platform_core::service::get_events_paginated(
&state.platform.pool,
query.unit_id,
query.event_type.as_deref(),
@ -520,7 +547,7 @@ pub async fn start_auto_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id)
let unit = plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -546,16 +573,20 @@ pub async fn start_auto_unit(
state.control_runtime.upsert(runtime).await;
state.control_runtime.notify_unit(unit_id).await;
let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStarted { unit_id });
let _ = state
.event_manager
.send(crate::event::AppEvent::AutoControlStarted { unit_id });
Ok(Json(json!({ "ok_msg": "Auto control started", "unit_id": unit_id })))
Ok(Json(
json!({ "ok_msg": "Auto control started", "unit_id": unit_id }),
))
}
pub async fn stop_auto_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
crate::service::get_unit_by_id(&state.platform.pool, unit_id)
plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -564,15 +595,17 @@ pub async fn stop_auto_unit(
state.control_runtime.upsert(runtime).await;
state.control_runtime.notify_unit(unit_id).await;
let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStopped { unit_id });
let _ = state
.event_manager
.send(crate::event::AppEvent::AutoControlStopped { unit_id });
Ok(Json(json!({ "ok_msg": "Auto control stopped", "unit_id": unit_id })))
Ok(Json(
json!({ "ok_msg": "Auto control stopped", "unit_id": unit_id }),
))
}
pub async fn batch_start_auto(
State(state): State<AppState>,
) -> Result<impl IntoResponse, ApiErr> {
let units = crate::service::get_all_enabled_units(&state.platform.pool).await?;
pub async fn batch_start_auto(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
let units = plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await?;
let mut started = Vec::new();
let mut skipped = Vec::new();
@ -599,10 +632,8 @@ pub async fn batch_start_auto(
Ok(Json(json!({ "started": started, "skipped": skipped })))
}
pub async fn batch_stop_auto(
State(state): State<AppState>,
) -> Result<impl IntoResponse, ApiErr> {
let units = crate::service::get_all_enabled_units(&state.platform.pool).await?;
pub async fn batch_stop_auto(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
let units = plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await?;
let mut stopped = Vec::new();
for unit in units {
@ -626,7 +657,7 @@ pub async fn ack_fault_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
crate::service::get_unit_by_id(&state.platform.pool, unit_id)
plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -651,16 +682,20 @@ pub async fn ack_fault_unit(
state.control_runtime.upsert(runtime).await;
state.control_runtime.notify_unit(unit_id).await;
let _ = state.event_manager.send(crate::event::AppEvent::FaultAcked { unit_id });
let _ = state
.event_manager
.send(crate::event::AppEvent::FaultAcked { unit_id });
Ok(Json(json!({ "ok_msg": "Fault acknowledged", "unit_id": unit_id })))
Ok(Json(
json!({ "ok_msg": "Fault acknowledged", "unit_id": unit_id }),
))
}
pub async fn get_unit_runtime(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
crate::service::get_unit_by_id(&state.platform.pool, unit_id)
plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;

View File

@ -4,25 +4,5 @@ pub mod event;
pub mod handler;
pub mod router;
pub mod connection {
pub use plc_platform_core::connection::*;
}
pub mod db {
pub use plc_platform_core::db::*;
}
pub mod service {
pub use plc_platform_core::service::*;
}
pub mod telemetry {
pub use plc_platform_core::telemetry::*;
}
pub mod util {
pub use plc_platform_core::util::*;
}
pub use app::{run, AppState, test_state};
pub use app::{run, test_state, AppState};
pub use router::build_router;

View File

@ -1,6 +1,6 @@
use crate::router::build_router;
use axum::extract::FromRef;
use plc_platform_core::platform_context::PlatformContext;
use plc_platform_core::{bootstrap, platform_context::PlatformContext};
#[derive(Clone, Debug)]
pub struct AppConfig {
@ -35,23 +35,15 @@ impl FromRef<AppState> for PlatformContext {
}
pub async fn run() {
dotenv::dotenv().ok();
plc_platform_core::util::log::init_logger();
let _single_instance =
match plc_platform_core::util::single_instance::try_acquire("PLCControl.OperationSystem") {
Ok(guard) => guard,
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
tracing::warn!("Another operation-system instance is already running");
let Some(_single_instance) = bootstrap::init_process(
"PLCControl.OperationSystem",
"Another operation-system instance is already running",
) else {
return;
}
Err(err) => {
tracing::error!("Failed to initialize single instance guard: {}", err);
return;
}
};
let config = AppConfig::from_env();
let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.server.database_url)
let builder = bootstrap::bootstrap_platform(&config.server.database_url)
.await
.expect("Failed to bootstrap platform");
let platform = builder.build();
@ -62,16 +54,8 @@ pub async fn run() {
platform,
};
let app = build_router(state.clone());
let addr = format!(
"{}:{}",
state.config.server.server_host, state.config.server.server_port
);
tracing::info!("Starting operation-system server at http://{}", addr);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.expect("operation-system listener should bind");
axum::serve(listener, app)
bootstrap::serve_app(&state.config.server, "operation-system", app)
.await
.expect("operation-system server should run");
}

View File

@ -1,10 +1,13 @@
use std::sync::Arc;
use crate::config::ServerConfig;
use crate::connection::ConnectionManager;
use crate::db::init_database;
use crate::platform_context::PlatformContext;
use crate::telemetry_processor::TelemetryProcessor;
use crate::util::single_instance::SingleInstanceGuard;
use crate::websocket::WebSocketManager;
use tokio::sync::mpsc;
pub struct PlatformBuilder {
pub pool: sqlx::PgPool,
@ -79,3 +82,74 @@ pub async fn connect_all_enabled_sources(platform: &PlatformContext) -> Result<(
Ok(())
}
pub fn init_process(
single_instance_name: &str,
duplicate_instance_message: &str,
) -> Option<SingleInstanceGuard> {
dotenv::dotenv().ok();
crate::util::log::init_logger();
match crate::util::single_instance::try_acquire(single_instance_name) {
Ok(guard) => Some(guard),
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
tracing::warn!("{}", duplicate_instance_message);
None
}
Err(err) => {
tracing::error!("Failed to initialize single instance guard: {}", err);
None
}
}
}
pub async fn serve_app(
config: &ServerConfig,
app_name: &str,
app: axum::Router,
) -> std::io::Result<()> {
let addr = config.addr();
tracing::info!("Starting {} server at http://{}", app_name, addr);
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app).await
}
pub async fn serve_app_with_graceful_shutdown<F>(
config: &ServerConfig,
app_name: &str,
app: axum::Router,
shutdown_signal: F,
) -> std::io::Result<()>
where
F: std::future::Future<Output = ()> + Send + 'static,
{
let addr = config.addr();
tracing::info!("Starting {} server at http://{}", app_name, addr);
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal)
.await
}
pub fn install_ctrl_c_shutdown(shutdown_tx: mpsc::Sender<()>) {
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
let _ = shutdown_tx.send(()).await;
});
}
pub async fn disconnect_all_on_shutdown(
mut shutdown_rx: mpsc::Receiver<()>,
connection_manager: Arc<ConnectionManager>,
app_name: &'static str,
) {
let _ = shutdown_rx.recv().await;
tracing::info!(
"Received shutdown signal, closing all {} connections...",
app_name
);
connection_manager.disconnect_all().await;
tracing::info!("All {} connections closed", app_name);
}

View File

@ -20,6 +20,14 @@ impl ServerConfig {
server_port: env_u16(port_key, port_default)?,
})
}
pub fn addr(&self) -> String {
format!("{}:{}", self.server_host, self.server_port)
}
pub fn local_ui_url(&self) -> String {
format!("http://{}:{}/ui", "localhost", self.server_port)
}
}
pub fn required_env(key: &str) -> Result<String, String> {