From 429c2d0b178c1a64be5f6a5e0ec60fac624577e5 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 21 Apr 2026 09:11:03 +0800 Subject: [PATCH] refactor(core): fill PlatformContext with pool/connection/websocket - PlatformContext now holds pool, connection_manager, ws_manager - bootstrap_platform returns PlatformBuilder for pre-Arc setup - Feeder AppState embeds PlatformContext (state.platform.pool etc.) - Ops AppState embeds PlatformContext with real DB connection - Remove WebSocket type duplication: feeder re-exports from core - Add subscribe_room/send_to_room/remove_room_if_empty to WebSocketManager Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 1 + crates/app_feeder_distributor/src/app.rs | 49 +++--- .../src/control/engine.rs | 28 ++-- .../src/control/simulate.rs | 16 +- .../src/control/validator.rs | 8 +- .../src/handler/control.rs | 56 +++---- .../src/handler/equipment.rs | 38 ++--- .../src/handler/page.rs | 12 +- .../src/handler/point.rs | 24 +-- .../src/handler/source.rs | 20 +-- .../app_feeder_distributor/src/handler/tag.rs | 12 +- .../app_feeder_distributor/src/websocket.rs | 145 ++---------------- crates/app_operation_system/Cargo.toml | 1 + crates/app_operation_system/src/app.rs | 26 +++- crates/plc_platform_core/src/bootstrap.rs | 38 ++++- .../plc_platform_core/src/platform_context.rs | 19 ++- crates/plc_platform_core/src/websocket.rs | 13 ++ .../tests/bootstrap_smoke.rs | 4 - 18 files changed, 226 insertions(+), 284 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 427a67e..041d3e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,6 +138,7 @@ dependencies = [ "axum", "dotenv", "plc_platform_core", + "sqlx", "tokio", "tower", "tower-http", diff --git a/crates/app_feeder_distributor/src/app.rs b/crates/app_feeder_distributor/src/app.rs index be1af9a..1e3364a 100644 --- a/crates/app_feeder_distributor/src/app.rs +++ b/crates/app_feeder_distributor/src/app.rs @@ -4,27 +4,25 @@ use crate::{ config::AppConfig, connection::ConnectionManager, control, - db::init_database, event::EventManager, router::build_router, - websocket, + websocket::WebSocketManager, }; +use plc_platform_core::platform_context::PlatformContext; use tokio::sync::mpsc; #[derive(Clone)] pub struct AppState { pub config: AppConfig, - pub pool: sqlx::PgPool, - pub connection_manager: Arc, + pub platform: PlatformContext, pub event_manager: Arc, - pub ws_manager: Arc, pub control_runtime: Arc, } + pub async fn run() { dotenv::dotenv().ok(); plc_platform_core::util::log::init_logger(); - let _platform = plc_platform_core::bootstrap::bootstrap_platform(); let _single_instance = match plc_platform_core::util::single_instance::try_acquire("PLCControl.FeederDistributor") { Ok(guard) => guard, @@ -39,31 +37,31 @@ pub async fn run() { }; let config = AppConfig::from_env().expect("Failed to load configuration"); - let pool = init_database(&config.database_url) + let mut builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url) .await - .expect("Failed to initialize database"); + .expect("Failed to bootstrap platform"); - let mut connection_manager = ConnectionManager::new(); - let ws_manager = Arc::new(websocket::WebSocketManager::new()); let event_manager = Arc::new(EventManager::new( - pool.clone(), - Arc::new(connection_manager.clone()), - Some(ws_manager.clone()), + builder.pool.clone(), + Arc::new(builder.connection_manager.clone()), + Some(builder.ws_manager.clone()), )); - connection_manager.set_event_manager(event_manager.clone()); - connection_manager.set_pool_and_start_reconnect_task(Arc::new(pool.clone())); - let connection_manager = Arc::new(connection_manager); + builder.connection_manager.set_event_manager(event_manager.clone()); + builder.connection_manager.set_pool_and_start_reconnect_task(Arc::new(builder.pool.clone())); + + let platform = builder.build(); + let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new()); - let sources = crate::service::get_all_enabled_sources(&pool) + let sources = crate::service::get_all_enabled_sources(&platform.pool) .await .expect("Failed to fetch sources"); let mut tasks = Vec::new(); for source in sources { - let cm = connection_manager.clone(); - let p = pool.clone(); + let cm = platform.connection_manager.clone(); + let p = platform.pool.clone(); let source_name = source.name.clone(); let source_id = source.id; @@ -84,10 +82,8 @@ pub async fn run() { let state = AppState { config: config.clone(), - pool, - connection_manager: connection_manager.clone(), + platform, event_manager, - ws_manager, control_runtime: control_runtime.clone(), }; control::engine::start(state.clone(), control_runtime); @@ -106,7 +102,7 @@ pub async fn run() { let rt_handle = tokio::runtime::Handle::current(); init_tray(ui_url, shutdown_tx.clone(), rt_handle); - let connection_manager_for_shutdown = connection_manager.clone(); + let connection_manager_for_shutdown = state.platform.connection_manager.clone(); tokio::spawn(async move { tokio::signal::ctrl_c() .await @@ -133,12 +129,13 @@ pub fn test_state() -> AppState { .connect_lazy(&database_url) .expect("lazy pool should build"); let connection_manager = Arc::new(ConnectionManager::new()); - let ws_manager = Arc::new(websocket::WebSocketManager::new()); + let ws_manager = Arc::new(WebSocketManager::new()); let event_manager = Arc::new(EventManager::new( pool.clone(), connection_manager.clone(), Some(ws_manager.clone()), )); + let platform = PlatformContext::new(pool, connection_manager, ws_manager); AppState { config: AppConfig { @@ -148,10 +145,8 @@ pub fn test_state() -> AppState { write_api_key: Some("test-write-key".to_string()), simulate_plc: false, }, - pool, - connection_manager, + platform, event_manager, - ws_manager, control_runtime: Arc::new(control::runtime::ControlRuntimeStore::new()), } } diff --git a/crates/app_feeder_distributor/src/control/engine.rs b/crates/app_feeder_distributor/src/control/engine.rs index 589ff11..cbd86e8 100644 --- a/crates/app_feeder_distributor/src/control/engine.rs +++ b/crates/app_feeder_distributor/src/control/engine.rs @@ -33,7 +33,7 @@ async fn supervise(state: AppState, store: Arc) { loop { interval.tick().await; - match crate::service::get_all_enabled_units(&state.pool).await { + match crate::service::get_all_enabled_units(&state.platform.pool).await { Ok(units) => { for unit in units { let needs_spawn = tasks @@ -63,7 +63,7 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu loop { // Reload unit config on each iteration to detect disable/delete. - let unit = match crate::service::get_unit_by_id(&state.pool, unit_id).await { + let unit = match crate::service::get_unit_by_id(&state.platform.pool, unit_id).await { Ok(Some(u)) if u.enabled => u, Ok(_) => { tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id); @@ -114,11 +114,11 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu continue; } // Send feeder start command. - let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; + let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await; let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "start_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = cmd { - if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { + if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e); continue; } @@ -146,11 +146,11 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu continue; } // Stop feeder. - let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; + let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await; let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = cmd { - if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { + if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e); continue; } @@ -166,11 +166,11 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu if unit.acc_time_sec > 0 && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 { // Accumulated threshold reached; start distributor. - let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; + let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await; let dist_cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "start_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = dist_cmd { - if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { + if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e); } else if state.config.simulate_plc { if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { @@ -191,11 +191,11 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu if !wait_phase(&state, &store, &unit, &all_roles, ¬ify, &mut fault_tick).await { continue; } - let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; + let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await; let cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = cmd { - if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { + if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e); continue; } @@ -269,7 +269,7 @@ async fn wait_phase( async fn push_ws(state: &AppState, runtime: &UnitRuntime) { if let Err(e) = state - .ws_manager + .platform.ws_manager .send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone())) .await { @@ -286,7 +286,7 @@ async fn check_fault_comm( all_roles: &[(Uuid, HashMap)], ) -> bool { let monitor = state - .connection_manager + .platform.connection_manager .get_point_monitor_data_read_guard() .await; @@ -410,10 +410,10 @@ type EquipMaps = ( ); async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result { - let equipment_list = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; + let equipment_list = crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?; let equipment_ids: Vec = equipment_list.iter().map(|equip| equip.id).collect(); let role_point_rows = - crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?; + crate::service::get_signal_role_points_batch(&state.platform.pool, &equipment_ids).await?; let mut role_points_by_equipment: HashMap> = HashMap::new(); for row in role_point_rows { role_points_by_equipment diff --git a/crates/app_feeder_distributor/src/control/simulate.rs b/crates/app_feeder_distributor/src/control/simulate.rs index 11462a3..9524d28 100644 --- a/crates/app_feeder_distributor/src/control/simulate.rs +++ b/crates/app_feeder_distributor/src/control/simulate.rs @@ -25,7 +25,7 @@ async fn run(state: AppState) { tokio::time::sleep(Duration::from_secs(wait_secs)).await; // Pick a random enabled unit. - let units = match crate::service::get_all_enabled_units(&state.pool).await { + let units = match crate::service::get_all_enabled_units(&state.platform.pool).await { Ok(u) if !u.is_empty() => u, _ => continue, }; @@ -39,7 +39,7 @@ async fn run(state: AppState) { // Pick a random equipment in that unit. let equipments = - match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await { + match crate::service::get_equipment_by_unit_id(&state.platform.pool, unit.id).await { Ok(e) if !e.is_empty() => e, _ => continue, }; @@ -47,7 +47,7 @@ async fn run(state: AppState) { // Find which of rem / flt this equipment has. let role_points = - match crate::service::get_equipment_role_points(&state.pool, eq.id).await { + match crate::service::get_equipment_role_points(&state.platform.pool, eq.id).await { Ok(rp) if !rp.is_empty() => rp, _ => continue, }; @@ -105,7 +105,7 @@ async fn run(state: AppState) { /// Called by the engine and control handler when SIMULATE_PLC=true. pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) { let role_points = - match crate::service::get_equipment_role_points(&state.pool, equipment_id).await { + match crate::service::get_equipment_role_points(&state.platform.pool, equipment_id).await { Ok(v) => v, Err(e) => { tracing::warn!("simulate_run_feedback: db error: {}", e); @@ -123,7 +123,7 @@ pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { let write_json = serde_json::json!(if value_on { 1 } else { 0 }); let write_ok = match state - .connection_manager + .platform.connection_manager .write_point_values_batch(BatchSetPointValueReq { items: vec![SetPointValueReqItem { point_id, @@ -143,7 +143,7 @@ pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { // Fallback: patch the monitor cache directly and broadcast over WS. let (value, value_type, value_text) = { let guard = state - .connection_manager + .platform.connection_manager .get_point_monitor_data_read_guard() .await; match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) { @@ -182,7 +182,7 @@ pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { }; if let Err(e) = state - .connection_manager + .platform.connection_manager .update_point_monitor_data(monitor.clone()) .await { @@ -191,7 +191,7 @@ pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { } let _ = state - .ws_manager + .platform.ws_manager .send_to_public(WsMessage::PointNewValue(monitor)) .await; } diff --git a/crates/app_feeder_distributor/src/control/validator.rs b/crates/app_feeder_distributor/src/control/validator.rs index 0767bc4..7752425 100644 --- a/crates/app_feeder_distributor/src/control/validator.rs +++ b/crates/app_feeder_distributor/src/control/validator.rs @@ -43,11 +43,11 @@ pub async fn validate_manual_control( equipment_id: Uuid, action: ControlAction, ) -> Result { - let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id) + let equipment = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id) .await? .ok_or_else(|| ApiErr::NotFound("Equipment not found".to_string(), None))?; - let role_points = crate::service::get_equipment_role_points(&state.pool, equipment_id).await?; + let role_points = crate::service::get_equipment_role_points(&state.platform.pool, equipment_id).await?; if role_points.is_empty() { return Err(ApiErr::BadRequest( "Equipment has no bound role points".to_string(), @@ -75,7 +75,7 @@ pub async fn validate_manual_control( .clone(); let monitor_guard = state - .connection_manager + .platform.connection_manager .get_point_monitor_data_read_guard() .await; @@ -148,7 +148,7 @@ pub async fn validate_manual_control( } let command_value_type = state - .connection_manager + .platform.connection_manager .get_point_monitor_data_read_guard() .await .get(&command_point.point_id) diff --git a/crates/app_feeder_distributor/src/handler/control.rs b/crates/app_feeder_distributor/src/handler/control.rs index a9da44d..98cd139 100644 --- a/crates/app_feeder_distributor/src/handler/control.rs +++ b/crates/app_feeder_distributor/src/handler/control.rs @@ -68,9 +68,9 @@ pub async fn get_unit_list( ) -> Result { query.validate()?; - let total = crate::service::get_units_count(&state.pool, query.keyword.as_deref()).await?; + let total = crate::service::get_units_count(&state.platform.pool, query.keyword.as_deref()).await?; let units = crate::service::get_units_paginated( - &state.pool, + &state.platform.pool, query.keyword.as_deref(), query.pagination.page_size, query.pagination.offset(), @@ -81,14 +81,14 @@ pub async fn get_unit_list( let unit_ids: Vec = units.iter().map(|u| u.id).collect(); let all_equipments = - crate::service::get_equipment_by_unit_ids(&state.pool, &unit_ids).await?; + crate::service::get_equipment_by_unit_ids(&state.platform.pool, &unit_ids).await?; let eq_ids: Vec = all_equipments.iter().map(|e| e.id).collect(); let role_point_rows = - crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?; + crate::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids).await?; let monitor_guard = state - .connection_manager + .platform.connection_manager .get_point_monitor_data_read_guard() .await; @@ -161,7 +161,7 @@ async fn send_equipment_command( let pulse_ms = 300u64; crate::control::command::send_pulse_command( - &state.connection_manager, + &state.platform.connection_manager, context.command_point.point_id, context.command_value_type.as_ref(), pulse_ms, @@ -206,18 +206,18 @@ pub async fn get_unit( State(state): State, Path(unit_id): Path, ) -> Result { - let unit = crate::service::get_unit_by_id(&state.pool, unit_id) + let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let runtime = state.control_runtime.get(unit_id).await; let all_equipments = - crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; + crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?; let eq_ids: Vec = all_equipments.iter().map(|e| e.id).collect(); let role_point_rows = - crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?; + crate::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids).await?; let monitor_guard = state - .connection_manager + .platform.connection_manager .get_point_monitor_data_read_guard() .await; let mut role_points_map: std::collections::HashMap< @@ -273,18 +273,18 @@ pub async fn get_unit_detail( State(state): State, Path(unit_id): Path, ) -> Result { - let unit = crate::service::get_unit_by_id(&state.pool, unit_id) + let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let runtime = state.control_runtime.get(unit_id).await; - let equipments = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; + let equipments = crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?; let equipment_ids: Vec = equipments.iter().map(|e| e.id).collect(); - let all_points = crate::service::get_points_by_equipment_ids(&state.pool, &equipment_ids).await?; + let all_points = crate::service::get_points_by_equipment_ids(&state.platform.pool, &equipment_ids).await?; let monitor_guard = state - .connection_manager + .platform.connection_manager .get_point_monitor_data_read_guard() .await; @@ -358,7 +358,7 @@ pub async fn create_unit( validate_unit_timing_order(run_time_sec, acc_time_sec)?; - if crate::service::get_unit_by_code(&state.pool, &payload.code) + if crate::service::get_unit_by_code(&state.platform.pool, &payload.code) .await? .is_some() { @@ -369,7 +369,7 @@ pub async fn create_unit( } let unit_id = crate::service::create_unit( - &state.pool, + &state.platform.pool, crate::service::CreateUnitParams { code: &payload.code, name: &payload.name, @@ -421,7 +421,7 @@ pub async fn update_unit( ) -> Result { payload.validate()?; - let existing_unit = crate::service::get_unit_by_id(&state.pool, unit_id) + let existing_unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -431,7 +431,7 @@ pub async fn update_unit( )?; if let Some(code) = payload.code.as_deref() { - let duplicate = crate::service::get_unit_by_code(&state.pool, code).await?; + let duplicate = crate::service::get_unit_by_code(&state.platform.pool, code).await?; if duplicate.as_ref().is_some_and(|item| item.id != unit_id) { return Err(ApiErr::BadRequest( "Unit code already exists".to_string(), @@ -454,7 +454,7 @@ pub async fn update_unit( } crate::service::update_unit( - &state.pool, + &state.platform.pool, unit_id, crate::service::UpdateUnitParams { code: payload.code.as_deref(), @@ -479,7 +479,7 @@ pub async fn delete_unit( State(state): State, Path(unit_id): Path, ) -> Result { - let deleted = crate::service::delete_unit(&state.pool, unit_id).await?; + let deleted = crate::service::delete_unit(&state.platform.pool, unit_id).await?; if !deleted { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } @@ -503,13 +503,13 @@ pub async fn get_event_list( query.validate()?; let total = crate::service::get_events_count( - &state.pool, + &state.platform.pool, query.unit_id, query.event_type.as_deref(), ) .await?; let data = crate::service::get_events_paginated( - &state.pool, + &state.platform.pool, query.unit_id, query.event_type.as_deref(), query.pagination.page_size, @@ -529,7 +529,7 @@ pub async fn start_auto_unit( State(state): State, Path(unit_id): Path, ) -> Result { - let unit = crate::service::get_unit_by_id(&state.pool, unit_id) + let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -564,7 +564,7 @@ pub async fn stop_auto_unit( State(state): State, Path(unit_id): Path, ) -> Result { - crate::service::get_unit_by_id(&state.pool, unit_id) + crate::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -581,7 +581,7 @@ pub async fn stop_auto_unit( pub async fn batch_start_auto( State(state): State, ) -> Result { - let units = crate::service::get_all_enabled_units(&state.pool).await?; + let units = crate::service::get_all_enabled_units(&state.platform.pool).await?; let mut started = Vec::new(); let mut skipped = Vec::new(); @@ -611,7 +611,7 @@ pub async fn batch_start_auto( pub async fn batch_stop_auto( State(state): State, ) -> Result { - let units = crate::service::get_all_enabled_units(&state.pool).await?; + let units = crate::service::get_all_enabled_units(&state.platform.pool).await?; let mut stopped = Vec::new(); for unit in units { @@ -635,7 +635,7 @@ pub async fn ack_fault_unit( State(state): State, Path(unit_id): Path, ) -> Result { - crate::service::get_unit_by_id(&state.pool, unit_id) + crate::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; @@ -669,7 +669,7 @@ pub async fn get_unit_runtime( State(state): State, Path(unit_id): Path, ) -> Result { - crate::service::get_unit_by_id(&state.pool, unit_id) + crate::service::get_unit_by_id(&state.platform.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; diff --git a/crates/app_feeder_distributor/src/handler/equipment.rs b/crates/app_feeder_distributor/src/handler/equipment.rs index 538519a..3d7f628 100644 --- a/crates/app_feeder_distributor/src/handler/equipment.rs +++ b/crates/app_feeder_distributor/src/handler/equipment.rs @@ -55,9 +55,9 @@ pub async fn get_equipment_list( ) -> Result { query.validate()?; - let total = crate::service::get_equipment_count(&state.pool, query.keyword.as_deref()).await?; + let total = crate::service::get_equipment_count(&state.platform.pool, query.keyword.as_deref()).await?; let items = crate::service::get_equipment_paginated( - &state.pool, + &state.platform.pool, query.keyword.as_deref(), query.pagination.page_size, query.pagination.offset(), @@ -66,10 +66,10 @@ pub async fn get_equipment_list( let equipment_ids: Vec = items.iter().map(|item| item.equipment.id).collect(); let role_point_rows = - crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?; + crate::service::get_signal_role_points_batch(&state.platform.pool, &equipment_ids).await?; let monitor_guard = state - .connection_manager + .platform.connection_manager .get_point_monitor_data_read_guard() .await; @@ -110,7 +110,7 @@ pub async fn get_equipment( State(state): State, Path(equipment_id): Path, ) -> Result { - let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; + let equipment = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?; match equipment { Some(item) => Ok(Json(item)), @@ -122,12 +122,12 @@ pub async fn get_equipment_points( State(state): State, Path(equipment_id): Path, ) -> Result { - let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; + let exists = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?; if exists.is_none() { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } - let points = crate::service::get_points_by_equipment_id(&state.pool, equipment_id).await?; + let points = crate::service::get_points_by_equipment_id(&state.platform.pool, equipment_id).await?; Ok(Json(points)) } @@ -165,7 +165,7 @@ pub async fn create_equipment( ) -> Result { payload.validate()?; - let exists = crate::service::get_equipment_by_code(&state.pool, &payload.code).await?; + let exists = crate::service::get_equipment_by_code(&state.platform.pool, &payload.code).await?; if exists.is_some() { return Err(ApiErr::BadRequest( "Equipment code already exists".to_string(), @@ -174,14 +174,14 @@ pub async fn create_equipment( } if let Some(unit_id) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; + let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?; if unit_exists.is_none() { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } } let equipment_id = crate::service::create_equipment( - &state.pool, + &state.platform.pool, payload.unit_id, &payload.code, &payload.name, @@ -219,7 +219,7 @@ pub async fn update_equipment( return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } - let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; + let exists = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?; let existing_equipment = if let Some(equipment) = exists { equipment } else { @@ -227,14 +227,14 @@ pub async fn update_equipment( }; if let Some(Some(unit_id)) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; + let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?; if unit_exists.is_none() { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } } if let Some(code) = payload.code.as_deref() { - let duplicate = crate::service::get_equipment_by_code(&state.pool, code).await?; + let duplicate = crate::service::get_equipment_by_code(&state.platform.pool, code).await?; if duplicate .as_ref() .is_some_and(|item| item.id != equipment_id) @@ -247,7 +247,7 @@ pub async fn update_equipment( } crate::service::update_equipment( - &state.pool, + &state.platform.pool, equipment_id, payload.unit_id, payload.code.as_deref(), @@ -289,17 +289,17 @@ pub async fn batch_set_equipment_unit( } if let Some(unit_id) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; + let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?; if unit_exists.is_none() { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } } let before_unit_ids = - crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?; + crate::service::get_unit_ids_by_equipment_ids(&state.platform.pool, &payload.equipment_ids).await?; let updated_count = crate::service::batch_set_equipment_unit( - &state.pool, + &state.platform.pool, &payload.equipment_ids, payload.unit_id, ) @@ -321,8 +321,8 @@ pub async fn delete_equipment( State(state): State, Path(equipment_id): Path, ) -> Result { - let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?; - let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?; + let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.platform.pool, &[equipment_id]).await?; + let deleted = crate::service::delete_equipment(&state.platform.pool, equipment_id).await?; if !deleted { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } diff --git a/crates/app_feeder_distributor/src/handler/page.rs b/crates/app_feeder_distributor/src/handler/page.rs index bf1675e..8c7041e 100644 --- a/crates/app_feeder_distributor/src/handler/page.rs +++ b/crates/app_feeder_distributor/src/handler/page.rs @@ -20,7 +20,7 @@ pub async fn get_page_list( Query(query): Query, ) -> Result { query.validate()?; - let pool = &state.pool; + let pool = &state.platform.pool; let pages: Vec = if let Some(name) = query.name { sqlx::query_as::<_, Page>( @@ -50,7 +50,7 @@ pub async fn get_page( ) -> Result { let page = sqlx::query_as::<_, Page>("SELECT * FROM page WHERE id = $1") .bind(page_id) - .fetch_optional(&state.pool) + .fetch_optional(&state.platform.pool) .await?; match page { @@ -88,7 +88,7 @@ pub async fn create_page( ) .bind(&payload.name) .bind(SqlxJson(payload.data)) - .fetch_one(&state.pool) + .fetch_one(&state.platform.pool) .await?; Ok((StatusCode::CREATED, Json(serde_json::json!({ @@ -106,7 +106,7 @@ pub async fn update_page( let exists = sqlx::query("SELECT 1 FROM page WHERE id = $1") .bind(page_id) - .fetch_optional(&state.pool) + .fetch_optional(&state.platform.pool) .await?; if exists.is_none() { return Err(ApiErr::NotFound("Page not found".to_string(), None)); @@ -145,7 +145,7 @@ pub async fn update_page( } query = query.bind(page_id); - query.execute(&state.pool).await?; + query.execute(&state.platform.pool).await?; Ok(Json(serde_json::json!({ "ok_msg": "Page updated successfully" @@ -158,7 +158,7 @@ pub async fn delete_page( ) -> Result { let result = sqlx::query("DELETE FROM page WHERE id = $1") .bind(page_id) - .execute(&state.pool) + .execute(&state.platform.pool) .await?; if result.rows_affected() == 0 { diff --git a/crates/app_feeder_distributor/src/handler/point.rs b/crates/app_feeder_distributor/src/handler/point.rs index 0f9b39e..9f500a9 100644 --- a/crates/app_feeder_distributor/src/handler/point.rs +++ b/crates/app_feeder_distributor/src/handler/point.rs @@ -69,7 +69,7 @@ pub async fn get_point_list( Query(query): Query, ) -> Result { query.validate()?; - let pool = &state.pool; + let pool = &state.platform.pool; // Count total rows. let total = crate::service::get_points_count(pool, query.source_id, query.equipment_id).await?; @@ -85,7 +85,7 @@ pub async fn get_point_list( .await?; let monitor_guard = state - .connection_manager + .platform.connection_manager .get_point_monitor_data_read_guard() .await; @@ -114,7 +114,7 @@ pub async fn get_point( State(state): State, Path(point_id): Path, ) -> Result { - let pool = &state.pool; + let pool = &state.platform.pool; let point = crate::service::get_point_by_id(pool, point_id).await?; Ok(Json(point)) @@ -125,7 +125,7 @@ pub async fn get_point_history( Path(point_id): Path, Query(query): Query, ) -> Result { - let pool = &state.pool; + let pool = &state.platform.pool; let point = crate::service::get_point_by_id(pool, point_id).await?; if point.is_none() { return Err(ApiErr::NotFound("Point not found".to_string(), None)); @@ -133,7 +133,7 @@ pub async fn get_point_history( let limit = query.limit.unwrap_or(120).clamp(1, 1000); let history = state - .connection_manager + .platform.connection_manager .get_point_history(point_id, limit) .await; @@ -194,7 +194,7 @@ pub async fn update_point( ) -> Result { payload.validate()?; - let pool = &state.pool; + let pool = &state.platform.pool; if payload.name.is_none() && payload.description.is_none() @@ -317,7 +317,7 @@ pub async fn batch_set_point_tags( )); } - let pool = &state.pool; + let pool = &state.platform.pool; // If tag_id is provided, ensure tag exists. if let Some(tag_id) = payload.tag_id { @@ -372,7 +372,7 @@ pub async fn batch_set_point_equipment( )); } - let pool = &state.pool; + let pool = &state.platform.pool; if let Some(equipment_id) = payload.equipment_id { let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) @@ -429,7 +429,7 @@ pub async fn delete_point( State(state): State, Path(point_id): Path, ) -> Result { - let pool = &state.pool; + let pool = &state.platform.pool; let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; let source_id = { @@ -494,7 +494,7 @@ pub async fn batch_create_points( ) -> Result { payload.validate()?; - let pool = &state.pool; + let pool = &state.platform.pool; if payload.node_ids.is_empty() { return Err(ApiErr::BadRequest( @@ -614,7 +614,7 @@ pub async fn batch_delete_points( )); } - let pool = &state.pool; + let pool = &state.platform.pool; let point_ids = payload.point_ids; let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?; @@ -673,7 +673,7 @@ pub async fn batch_set_point_value( } let result = state - .connection_manager + .platform.connection_manager .write_point_values_batch(payload) .await .map_err(|e| ApiErr::Internal(e, None))?; diff --git a/crates/app_feeder_distributor/src/handler/source.rs b/crates/app_feeder_distributor/src/handler/source.rs index 6fd1ce4..8cb5c51 100644 --- a/crates/app_feeder_distributor/src/handler/source.rs +++ b/crates/app_feeder_distributor/src/handler/source.rs @@ -98,12 +98,12 @@ impl From for SourcePublic { } pub async fn get_source_list(State(state): State) -> Result { - let pool = &state.pool; + let pool = &state.platform.pool; let sources: Vec = crate::service::get_all_enabled_sources(pool).await?; // 鑾峰彇鎵€鏈夎繛鎺ョ姸鎬? let status_map: std::collections::HashMap, Option>)> = - state.connection_manager.get_all_status().await + state.platform.connection_manager.get_all_status().await .into_iter() .map(|(source_id, s)| (source_id, (s.is_connected, s.last_error, Some(s.last_time)))) .collect(); @@ -132,7 +132,7 @@ pub async fn get_node_tree( State(state): State, Path(source_id): Path, ) -> Result { - let pool = &state.pool; + let pool = &state.platform.pool; // 鏌ヨ鎵€鏈夊睘浜庤source鐨勮妭鐐? let nodes: Vec = sqlx::query_as::<_, Node>( @@ -212,7 +212,7 @@ pub async fn create_source( ) -> Result { payload.validate()?; - let pool = &state.pool; + let pool = &state.platform.pool; let new_id = Uuid::new_v4(); sqlx::query( @@ -261,7 +261,7 @@ pub async fn update_source( return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } - let pool = &state.pool; + let pool = &state.platform.pool; let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1") .bind(source_id) @@ -311,7 +311,7 @@ pub async fn delete_source( State(state): State, Path(source_id): Path, ) -> Result { - let pool = &state.pool; + let pool = &state.platform.pool; let source_name = sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") .bind(source_id) @@ -334,7 +334,7 @@ pub async fn reconnect_source( State(state): State, Path(source_id): Path, ) -> Result { - let pool = &state.pool; + let pool = &state.platform.pool; let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1") .bind(source_id) @@ -349,7 +349,7 @@ pub async fn reconnect_source( } state - .connection_manager + .platform.connection_manager .reconnect(pool, source_id) .await .map_err(|e| ApiErr::Internal(e, None))?; @@ -362,7 +362,7 @@ pub async fn browse_and_save_nodes( Path(source_id): Path, ) -> Result { - let pool = &state.pool; + let pool = &state.platform.pool; // 纭 source 瀛樺湪 sqlx::query("SELECT 1 FROM source WHERE id = $1") @@ -370,7 +370,7 @@ pub async fn browse_and_save_nodes( .fetch_one(pool) .await?; - let session = state.connection_manager + let session = state.platform.connection_manager .get_session(source_id) .await .ok_or_else(|| anyhow::anyhow!("Source not connected"))?; diff --git a/crates/app_feeder_distributor/src/handler/tag.rs b/crates/app_feeder_distributor/src/handler/tag.rs index dcc28fc..f98965f 100644 --- a/crates/app_feeder_distributor/src/handler/tag.rs +++ b/crates/app_feeder_distributor/src/handler/tag.rs @@ -21,7 +21,7 @@ pub async fn get_tag_list( Query(query): Query, ) -> Result { query.validate()?; - let pool = &state.pool; + let pool = &state.platform.pool; // Count total rows. let total = crate::service::get_tags_count(pool).await?; @@ -43,7 +43,7 @@ pub async fn get_tag_points( State(state): State, Path(tag_id): Path, ) -> Result { - let points = crate::service::get_tag_points(&state.pool, tag_id).await?; + let points = crate::service::get_tag_points(&state.platform.pool, tag_id).await?; Ok(Json(points)) } @@ -72,7 +72,7 @@ pub async fn create_tag( let point_ids = payload.point_ids.as_deref().unwrap_or(&[]); let tag_id = crate::service::create_tag( - &state.pool, + &state.platform.pool, &payload.name, payload.description.as_deref(), point_ids, @@ -93,13 +93,13 @@ pub async fn update_tag( payload.validate()?; // Ensure the target tag exists. - let exists = crate::service::get_tag_by_id(&state.pool, tag_id).await?; + let exists = crate::service::get_tag_by_id(&state.platform.pool, tag_id).await?; if exists.is_none() { return Err(ApiErr::NotFound("Tag not found".to_string(), None)); } crate::service::update_tag( - &state.pool, + &state.platform.pool, tag_id, payload.name.as_deref(), payload.description.as_deref(), @@ -116,7 +116,7 @@ pub async fn delete_tag( State(state): State, Path(tag_id): Path, ) -> Result { - let deleted = crate::service::delete_tag(&state.pool, tag_id).await?; + let deleted = crate::service::delete_tag(&state.platform.pool, tag_id).await?; if !deleted { return Err(ApiErr::NotFound("Tag not found".to_string(), None)); diff --git a/crates/app_feeder_distributor/src/websocket.rs b/crates/app_feeder_distributor/src/websocket.rs index c14cbc5..418dbc8 100644 --- a/crates/app_feeder_distributor/src/websocket.rs +++ b/crates/app_feeder_distributor/src/websocket.rs @@ -5,145 +5,20 @@ }, response::IntoResponse, }; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::broadcast; use uuid::Uuid; -/// WebSocket message payload types. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", content = "data")] -pub enum WsMessage { - PointNewValue(crate::telemetry::PointMonitorInfo), - PointSetValueBatchResult(crate::connection::BatchSetPointValueRes), - EventCreated(plc_platform_core::model::EventRecord), - UnitRuntimeChanged(crate::control::runtime::UnitRuntime), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", content = "data", rename_all = "snake_case")] -pub enum WsClientMessage { - AuthWrite(WsAuthWriteReq), - PointSetValueBatch(crate::connection::BatchSetPointValueReq), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WsAuthWriteReq { - pub key: String, -} - -/// Room manager: room_id -> broadcast sender. -#[derive(Clone)] -pub struct RoomManager { - rooms: Arc>>>, -} - -impl RoomManager { - pub fn new() -> Self { - Self { - rooms: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Get or create room sender. - pub async fn get_or_create_room(&self, room_id: &str) -> broadcast::Sender { - let mut rooms = self.rooms.write().await; - - if let Some(sender) = rooms.get(room_id) { - return sender.clone(); - } - - let (sender, _) = broadcast::channel(100); - rooms.insert(room_id.to_string(), sender.clone()); - tracing::info!("Created new room: {}", room_id); - sender - } - - /// Get room sender if room exists. - pub async fn get_room(&self, room_id: &str) -> Option> { - let rooms = self.rooms.read().await; - rooms.get(room_id).cloned() - } - - /// Remove room if there are no receivers left. - pub async fn remove_room_if_empty(&self, room_id: &str) { - let mut rooms = self.rooms.write().await; - let should_remove = rooms - .get(room_id) - .map(|sender| sender.receiver_count() == 0) - .unwrap_or(false); - - if should_remove { - rooms.remove(room_id); - tracing::info!("Removed empty room: {}", room_id); - } - } - - /// Send message to room. - /// - /// Returns: - /// - Ok(n): n subscribers received it - /// - Ok(0): room missing or no active subscribers - pub async fn send_to_room(&self, room_id: &str, message: WsMessage) -> Result { - if let Some(sender) = self.get_room(room_id).await { - match sender.send(message) { - Ok(count) => Ok(count), - // No receiver is not exceptional in push scenarios. - Err(broadcast::error::SendError(_)) => Ok(0), - } - } else { - Ok(0) - } - } - -} - -impl Default for RoomManager { - fn default() -> Self { - Self::new() - } -} - -/// WebSocket manager. -#[derive(Clone)] -pub struct WebSocketManager { - public_room: Arc, -} - -impl WebSocketManager { - pub fn new() -> Self { - Self { - public_room: Arc::new(RoomManager::new()), - } - } - - /// Send message to public room. - pub async fn send_to_public(&self, message: WsMessage) -> Result { - self.public_room.get_or_create_room("public").await; - self.public_room.send_to_room("public", message).await - } - - /// Send message to a dedicated client room. - pub async fn send_to_client(&self, client_id: Uuid, message: WsMessage) -> Result { - self.public_room - .send_to_room(&client_id.to_string(), message) - .await - } -} - -impl Default for WebSocketManager { - fn default() -> Self { - Self::new() - } -} +pub use plc_platform_core::websocket::{ + RoomManager, WebSocketManager, WsClientMessage, WsMessage, +}; /// Public websocket handler. pub async fn public_websocket_handler( ws: WebSocketUpgrade, State(state): State, ) -> impl IntoResponse { - let ws_manager = state.ws_manager.clone(); + let ws_manager = state.platform.ws_manager.clone(); let app_state = state.clone(); ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, "public".to_string(), app_state)) } @@ -154,7 +29,7 @@ pub async fn client_websocket_handler( Path(client_id): Path, State(state): State, ) -> impl IntoResponse { - let ws_manager = state.ws_manager.clone(); + let ws_manager = state.platform.ws_manager.clone(); let room_id = client_id.to_string(); let app_state = state.clone(); ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, room_id, app_state)) @@ -167,8 +42,7 @@ async fn handle_socket( room_id: String, state: crate::AppState, ) { - let room_sender = ws_manager.public_room.get_or_create_room(&room_id).await; - let mut rx = room_sender.subscribe(); + let mut rx = ws_manager.subscribe_room(&room_id).await; let mut can_write = false; loop { @@ -198,7 +72,7 @@ async fn handle_socket( results: vec![], } } else { - match state.connection_manager.write_point_values_batch(payload).await { + match state.platform.connection_manager.write_point_values_batch(payload).await { Ok(v) => v, Err(e) => crate::connection::BatchSetPointValueRes { success: false, @@ -214,7 +88,6 @@ async fn handle_socket( } }; if let Err(e) = ws_manager - .public_room .send_to_room(&room_id, WsMessage::PointSetValueBatchResult(response)) .await { @@ -267,5 +140,5 @@ async fn handle_socket( } } - ws_manager.public_room.remove_room_if_empty(&room_id).await; + ws_manager.remove_room_if_empty(&room_id).await; } diff --git a/crates/app_operation_system/Cargo.toml b/crates/app_operation_system/Cargo.toml index d7ec056..e6c68a5 100644 --- a/crates/app_operation_system/Cargo.toml +++ b/crates/app_operation_system/Cargo.toml @@ -10,6 +10,7 @@ axum = { version = "0.8", features = ["ws"] } tower-http = { version = "0.6", features = ["cors", "fs"] } tracing = "0.1" dotenv = "0.15" +sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "json"] } [dev-dependencies] tower = { version = "0.5", features = ["util"] } diff --git a/crates/app_operation_system/src/app.rs b/crates/app_operation_system/src/app.rs index ce1efc1..6a4b44a 100644 --- a/crates/app_operation_system/src/app.rs +++ b/crates/app_operation_system/src/app.rs @@ -1,7 +1,9 @@ use crate::router::build_router; +use plc_platform_core::platform_context::PlatformContext; #[derive(Clone, Debug)] pub struct AppConfig { + pub database_url: String, pub server_host: String, pub server_port: u16, } @@ -9,6 +11,8 @@ pub struct AppConfig { impl AppConfig { pub fn from_env() -> Self { Self { + database_url: std::env::var("DATABASE_URL") + .expect("DATABASE_URL must be set"), server_host: std::env::var("OPS_SERVER_HOST") .unwrap_or_else(|_| "127.0.0.1".to_string()), server_port: std::env::var("OPS_SERVER_PORT") @@ -19,16 +23,16 @@ impl AppConfig { } } -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct AppState { pub app_name: &'static str, pub config: AppConfig, + pub platform: PlatformContext, } pub async fn run() { dotenv::dotenv().ok(); plc_platform_core::util::log::init_logger(); - let _platform = plc_platform_core::bootstrap::bootstrap_platform(); let _single_instance = match plc_platform_core::util::single_instance::try_acquire("PLCControl.OperationSystem") { Ok(guard) => guard, @@ -42,9 +46,16 @@ pub async fn run() { } }; + let config = AppConfig::from_env(); + let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url) + .await + .expect("Failed to bootstrap platform"); + let platform = builder.build(); + let state = AppState { app_name: "operation-system", - config: AppConfig::from_env(), + config, + platform, }; let app = build_router(state.clone()); let addr = format!("{}:{}", state.config.server_host, state.config.server_port); @@ -59,11 +70,20 @@ pub async fn run() { } pub fn test_state() -> AppState { + let database_url = "postgres://plc:plc@localhost/plc_control_test".to_string(); + let pool = sqlx::postgres::PgPoolOptions::new() + .connect_lazy(&database_url) + .expect("lazy pool should build"); + let connection_manager = std::sync::Arc::new(plc_platform_core::connection::ConnectionManager::new()); + let ws_manager = std::sync::Arc::new(plc_platform_core::websocket::WebSocketManager::new()); + AppState { app_name: "operation-system", config: AppConfig { + database_url, server_host: "127.0.0.1".to_string(), server_port: 0, }, + platform: PlatformContext::new(pool, connection_manager, ws_manager), } } diff --git a/crates/plc_platform_core/src/bootstrap.rs b/crates/plc_platform_core/src/bootstrap.rs index 35da13d..66506b9 100644 --- a/crates/plc_platform_core/src/bootstrap.rs +++ b/crates/plc_platform_core/src/bootstrap.rs @@ -1,5 +1,37 @@ -use crate::platform_context::PlatformContext; +use std::sync::Arc; -pub fn bootstrap_platform() -> PlatformContext { - PlatformContext::new("bootstrap") +use crate::connection::ConnectionManager; +use crate::db::init_database; +use crate::platform_context::PlatformContext; +use crate::websocket::WebSocketManager; + +pub struct PlatformBuilder { + pub pool: sqlx::PgPool, + pub connection_manager: ConnectionManager, + pub ws_manager: Arc, +} + +impl PlatformBuilder { + pub fn build(self) -> PlatformContext { + PlatformContext::new( + self.pool, + Arc::new(self.connection_manager), + self.ws_manager, + ) + } +} + +pub async fn bootstrap_platform(database_url: &str) -> Result { + let pool = init_database(database_url) + .await + .map_err(|e| format!("Failed to initialize database: {}", e))?; + + let connection_manager = ConnectionManager::new(); + let ws_manager = Arc::new(WebSocketManager::new()); + + Ok(PlatformBuilder { + pool, + connection_manager, + ws_manager, + }) } diff --git a/crates/plc_platform_core/src/platform_context.rs b/crates/plc_platform_core/src/platform_context.rs index 4dee75f..b1007cb 100644 --- a/crates/plc_platform_core/src/platform_context.rs +++ b/crates/plc_platform_core/src/platform_context.rs @@ -1,14 +1,25 @@ use std::sync::Arc; +use crate::connection::ConnectionManager; +use crate::websocket::WebSocketManager; + #[derive(Clone)] pub struct PlatformContext { - pub config_name: Arc, + pub pool: sqlx::PgPool, + pub connection_manager: Arc, + pub ws_manager: Arc, } impl PlatformContext { - pub fn new(config_name: impl Into>) -> Self { + pub fn new( + pool: sqlx::PgPool, + connection_manager: Arc, + ws_manager: Arc, + ) -> Self { Self { - config_name: config_name.into(), + pool, + connection_manager, + ws_manager, } } -} \ No newline at end of file +} diff --git a/crates/plc_platform_core/src/websocket.rs b/crates/plc_platform_core/src/websocket.rs index 919cfdf..6960f0a 100644 --- a/crates/plc_platform_core/src/websocket.rs +++ b/crates/plc_platform_core/src/websocket.rs @@ -119,6 +119,19 @@ impl WebSocketManager { .send_to_room(&client_id.to_string(), message) .await } + + pub async fn subscribe_room(&self, room_id: &str) -> broadcast::Receiver { + let sender = self.public_room.get_or_create_room(room_id).await; + sender.subscribe() + } + + pub async fn send_to_room(&self, room_id: &str, message: WsMessage) -> Result { + self.public_room.send_to_room(room_id, message).await + } + + pub async fn remove_room_if_empty(&self, room_id: &str) { + self.public_room.remove_room_if_empty(room_id).await; + } } impl Default for WebSocketManager { diff --git a/crates/plc_platform_core/tests/bootstrap_smoke.rs b/crates/plc_platform_core/tests/bootstrap_smoke.rs index 8cadc6d..2c4fe33 100644 --- a/crates/plc_platform_core/tests/bootstrap_smoke.rs +++ b/crates/plc_platform_core/tests/bootstrap_smoke.rs @@ -1,11 +1,7 @@ -use plc_platform_core::bootstrap::bootstrap_platform; use plc_platform_core::platform_context::PlatformContext; #[test] fn platform_context_type_is_public() { - let context = bootstrap_platform(); - assert_eq!(context.config_name.as_ref(), "bootstrap"); - fn assert_send_sync_clone() {} assert_send_sync_clone::(); }