refactor(core): fill PlatformContext with pool/connection/websocket

- PlatformContext now holds pool, connection_manager, ws_manager
- bootstrap_platform returns PlatformBuilder for pre-Arc setup
- Feeder AppState embeds PlatformContext (state.platform.pool etc.)
- Ops AppState embeds PlatformContext with real DB connection
- Remove WebSocket type duplication: feeder re-exports from core
- Add subscribe_room/send_to_room/remove_room_if_empty to WebSocketManager

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-04-21 09:11:03 +08:00
parent 368faf290a
commit 429c2d0b17
18 changed files with 226 additions and 284 deletions

1
Cargo.lock generated
View File

@ -138,6 +138,7 @@ dependencies = [
"axum",
"dotenv",
"plc_platform_core",
"sqlx",
"tokio",
"tower",
"tower-http",

View File

@ -4,27 +4,25 @@ use crate::{
config::AppConfig,
connection::ConnectionManager,
control,
db::init_database,
event::EventManager,
router::build_router,
websocket,
websocket::WebSocketManager,
};
use plc_platform_core::platform_context::PlatformContext;
use tokio::sync::mpsc;
#[derive(Clone)]
pub struct AppState {
pub config: AppConfig,
pub pool: sqlx::PgPool,
pub connection_manager: Arc<ConnectionManager>,
pub platform: PlatformContext,
pub event_manager: Arc<EventManager>,
pub ws_manager: Arc<websocket::WebSocketManager>,
pub control_runtime: Arc<control::runtime::ControlRuntimeStore>,
}
pub async fn run() {
dotenv::dotenv().ok();
plc_platform_core::util::log::init_logger();
let _platform = plc_platform_core::bootstrap::bootstrap_platform();
let _single_instance =
match plc_platform_core::util::single_instance::try_acquire("PLCControl.FeederDistributor") {
Ok(guard) => guard,
@ -39,31 +37,31 @@ pub async fn run() {
};
let config = AppConfig::from_env().expect("Failed to load configuration");
let pool = init_database(&config.database_url)
let mut builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url)
.await
.expect("Failed to initialize database");
.expect("Failed to bootstrap platform");
let mut connection_manager = ConnectionManager::new();
let ws_manager = Arc::new(websocket::WebSocketManager::new());
let event_manager = Arc::new(EventManager::new(
pool.clone(),
Arc::new(connection_manager.clone()),
Some(ws_manager.clone()),
builder.pool.clone(),
Arc::new(builder.connection_manager.clone()),
Some(builder.ws_manager.clone()),
));
connection_manager.set_event_manager(event_manager.clone());
connection_manager.set_pool_and_start_reconnect_task(Arc::new(pool.clone()));
let connection_manager = Arc::new(connection_manager);
builder.connection_manager.set_event_manager(event_manager.clone());
builder.connection_manager.set_pool_and_start_reconnect_task(Arc::new(builder.pool.clone()));
let platform = builder.build();
let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new());
let sources = crate::service::get_all_enabled_sources(&pool)
let sources = crate::service::get_all_enabled_sources(&platform.pool)
.await
.expect("Failed to fetch sources");
let mut tasks = Vec::new();
for source in sources {
let cm = connection_manager.clone();
let p = pool.clone();
let cm = platform.connection_manager.clone();
let p = platform.pool.clone();
let source_name = source.name.clone();
let source_id = source.id;
@ -84,10 +82,8 @@ pub async fn run() {
let state = AppState {
config: config.clone(),
pool,
connection_manager: connection_manager.clone(),
platform,
event_manager,
ws_manager,
control_runtime: control_runtime.clone(),
};
control::engine::start(state.clone(), control_runtime);
@ -106,7 +102,7 @@ pub async fn run() {
let rt_handle = tokio::runtime::Handle::current();
init_tray(ui_url, shutdown_tx.clone(), rt_handle);
let connection_manager_for_shutdown = connection_manager.clone();
let connection_manager_for_shutdown = state.platform.connection_manager.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
@ -133,12 +129,13 @@ pub fn test_state() -> AppState {
.connect_lazy(&database_url)
.expect("lazy pool should build");
let connection_manager = Arc::new(ConnectionManager::new());
let ws_manager = Arc::new(websocket::WebSocketManager::new());
let ws_manager = Arc::new(WebSocketManager::new());
let event_manager = Arc::new(EventManager::new(
pool.clone(),
connection_manager.clone(),
Some(ws_manager.clone()),
));
let platform = PlatformContext::new(pool, connection_manager, ws_manager);
AppState {
config: AppConfig {
@ -148,10 +145,8 @@ pub fn test_state() -> AppState {
write_api_key: Some("test-write-key".to_string()),
simulate_plc: false,
},
pool,
connection_manager,
platform,
event_manager,
ws_manager,
control_runtime: Arc::new(control::runtime::ControlRuntimeStore::new()),
}
}

View File

@ -33,7 +33,7 @@ async fn supervise(state: AppState, store: Arc<ControlRuntimeStore>) {
loop {
interval.tick().await;
match crate::service::get_all_enabled_units(&state.pool).await {
match crate::service::get_all_enabled_units(&state.platform.pool).await {
Ok(units) => {
for unit in units {
let needs_spawn = tasks
@ -63,7 +63,7 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
loop {
// Reload unit config on each iteration to detect disable/delete.
let unit = match crate::service::get_unit_by_id(&state.pool, unit_id).await {
let unit = match crate::service::get_unit_by_id(&state.platform.pool, unit_id).await {
Ok(Some(u)) if u.enabled => u,
Ok(_) => {
tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id);
@ -114,11 +114,11 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
continue;
}
// Send feeder start command.
let monitor = state.connection_manager.get_point_monitor_data_read_guard().await;
let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await;
let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "start_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = cmd {
if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await {
if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e);
continue;
}
@ -146,11 +146,11 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
continue;
}
// Stop feeder.
let monitor = state.connection_manager.get_point_monitor_data_read_guard().await;
let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await;
let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "stop_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = cmd {
if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await {
if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e);
continue;
}
@ -166,11 +166,11 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
if unit.acc_time_sec > 0 && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 {
// Accumulated threshold reached; start distributor.
let monitor = state.connection_manager.get_point_monitor_data_read_guard().await;
let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await;
let dist_cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "start_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = dist_cmd {
if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await {
if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e);
} else if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("distributor").copied() {
@ -191,11 +191,11 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
if !wait_phase(&state, &store, &unit, &all_roles, &notify, &mut fault_tick).await {
continue;
}
let monitor = state.connection_manager.get_point_monitor_data_read_guard().await;
let monitor = state.platform.connection_manager.get_point_monitor_data_read_guard().await;
let cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "stop_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = cmd {
if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await {
if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e);
continue;
}
@ -269,7 +269,7 @@ async fn wait_phase(
async fn push_ws(state: &AppState, runtime: &UnitRuntime) {
if let Err(e) = state
.ws_manager
.platform.ws_manager
.send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone()))
.await
{
@ -286,7 +286,7 @@ async fn check_fault_comm(
all_roles: &[(Uuid, HashMap<String, EquipmentRolePoint>)],
) -> bool {
let monitor = state
.connection_manager
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -410,10 +410,10 @@ type EquipMaps = (
);
async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result<EquipMaps, sqlx::Error> {
let equipment_list = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?;
let equipment_list = crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?;
let equipment_ids: Vec<Uuid> = equipment_list.iter().map(|equip| equip.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?;
crate::service::get_signal_role_points_batch(&state.platform.pool, &equipment_ids).await?;
let mut role_points_by_equipment: HashMap<Uuid, Vec<EquipmentRolePoint>> = HashMap::new();
for row in role_point_rows {
role_points_by_equipment

View File

@ -25,7 +25,7 @@ async fn run(state: AppState) {
tokio::time::sleep(Duration::from_secs(wait_secs)).await;
// Pick a random enabled unit.
let units = match crate::service::get_all_enabled_units(&state.pool).await {
let units = match crate::service::get_all_enabled_units(&state.platform.pool).await {
Ok(u) if !u.is_empty() => u,
_ => continue,
};
@ -39,7 +39,7 @@ async fn run(state: AppState) {
// Pick a random equipment in that unit.
let equipments =
match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await {
match crate::service::get_equipment_by_unit_id(&state.platform.pool, unit.id).await {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
@ -47,7 +47,7 @@ async fn run(state: AppState) {
// Find which of rem / flt this equipment has.
let role_points =
match crate::service::get_equipment_role_points(&state.pool, eq.id).await {
match crate::service::get_equipment_role_points(&state.platform.pool, eq.id).await {
Ok(rp) if !rp.is_empty() => rp,
_ => continue,
};
@ -105,7 +105,7 @@ async fn run(state: AppState) {
/// Called by the engine and control handler when SIMULATE_PLC=true.
pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) {
let role_points =
match crate::service::get_equipment_role_points(&state.pool, equipment_id).await {
match crate::service::get_equipment_role_points(&state.platform.pool, equipment_id).await {
Ok(v) => v,
Err(e) => {
tracing::warn!("simulate_run_feedback: db error: {}", e);
@ -123,7 +123,7 @@ pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on:
pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) {
let write_json = serde_json::json!(if value_on { 1 } else { 0 });
let write_ok = match state
.connection_manager
.platform.connection_manager
.write_point_values_batch(BatchSetPointValueReq {
items: vec![SetPointValueReqItem {
point_id,
@ -143,7 +143,7 @@ pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) {
// Fallback: patch the monitor cache directly and broadcast over WS.
let (value, value_type, value_text) = {
let guard = state
.connection_manager
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) {
@ -182,7 +182,7 @@ pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) {
};
if let Err(e) = state
.connection_manager
.platform.connection_manager
.update_point_monitor_data(monitor.clone())
.await
{
@ -191,7 +191,7 @@ pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) {
}
let _ = state
.ws_manager
.platform.ws_manager
.send_to_public(WsMessage::PointNewValue(monitor))
.await;
}

View File

@ -43,11 +43,11 @@ pub async fn validate_manual_control(
equipment_id: Uuid,
action: ControlAction,
) -> Result<ManualControlContext, ApiErr> {
let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id)
let equipment = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Equipment not found".to_string(), None))?;
let role_points = crate::service::get_equipment_role_points(&state.pool, equipment_id).await?;
let role_points = crate::service::get_equipment_role_points(&state.platform.pool, equipment_id).await?;
if role_points.is_empty() {
return Err(ApiErr::BadRequest(
"Equipment has no bound role points".to_string(),
@ -75,7 +75,7 @@ pub async fn validate_manual_control(
.clone();
let monitor_guard = state
.connection_manager
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -148,7 +148,7 @@ pub async fn validate_manual_control(
}
let command_value_type = state
.connection_manager
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await
.get(&command_point.point_id)

View File

@ -68,9 +68,9 @@ pub async fn get_unit_list(
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let total = crate::service::get_units_count(&state.pool, query.keyword.as_deref()).await?;
let total = crate::service::get_units_count(&state.platform.pool, query.keyword.as_deref()).await?;
let units = crate::service::get_units_paginated(
&state.pool,
&state.platform.pool,
query.keyword.as_deref(),
query.pagination.page_size,
query.pagination.offset(),
@ -81,14 +81,14 @@ pub async fn get_unit_list(
let unit_ids: Vec<Uuid> = units.iter().map(|u| u.id).collect();
let all_equipments =
crate::service::get_equipment_by_unit_ids(&state.pool, &unit_ids).await?;
crate::service::get_equipment_by_unit_ids(&state.platform.pool, &unit_ids).await?;
let eq_ids: Vec<Uuid> = all_equipments.iter().map(|e| e.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?;
crate::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids).await?;
let monitor_guard = state
.connection_manager
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -161,7 +161,7 @@ async fn send_equipment_command(
let pulse_ms = 300u64;
crate::control::command::send_pulse_command(
&state.connection_manager,
&state.platform.connection_manager,
context.command_point.point_id,
context.command_value_type.as_ref(),
pulse_ms,
@ -206,18 +206,18 @@ pub async fn get_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit = crate::service::get_unit_by_id(&state.pool, unit_id)
let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
let runtime = state.control_runtime.get(unit_id).await;
let all_equipments =
crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?;
crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?;
let eq_ids: Vec<Uuid> = all_equipments.iter().map(|e| e.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?;
crate::service::get_signal_role_points_batch(&state.platform.pool, &eq_ids).await?;
let monitor_guard = state
.connection_manager
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
let mut role_points_map: std::collections::HashMap<
@ -273,18 +273,18 @@ pub async fn get_unit_detail(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit = crate::service::get_unit_by_id(&state.pool, unit_id)
let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
let runtime = state.control_runtime.get(unit_id).await;
let equipments = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?;
let equipments = crate::service::get_equipment_by_unit_id(&state.platform.pool, unit_id).await?;
let equipment_ids: Vec<Uuid> = equipments.iter().map(|e| e.id).collect();
let all_points = crate::service::get_points_by_equipment_ids(&state.pool, &equipment_ids).await?;
let all_points = crate::service::get_points_by_equipment_ids(&state.platform.pool, &equipment_ids).await?;
let monitor_guard = state
.connection_manager
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -358,7 +358,7 @@ pub async fn create_unit(
validate_unit_timing_order(run_time_sec, acc_time_sec)?;
if crate::service::get_unit_by_code(&state.pool, &payload.code)
if crate::service::get_unit_by_code(&state.platform.pool, &payload.code)
.await?
.is_some()
{
@ -369,7 +369,7 @@ pub async fn create_unit(
}
let unit_id = crate::service::create_unit(
&state.pool,
&state.platform.pool,
crate::service::CreateUnitParams {
code: &payload.code,
name: &payload.name,
@ -421,7 +421,7 @@ pub async fn update_unit(
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let existing_unit = crate::service::get_unit_by_id(&state.pool, unit_id)
let existing_unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -431,7 +431,7 @@ pub async fn update_unit(
)?;
if let Some(code) = payload.code.as_deref() {
let duplicate = crate::service::get_unit_by_code(&state.pool, code).await?;
let duplicate = crate::service::get_unit_by_code(&state.platform.pool, code).await?;
if duplicate.as_ref().is_some_and(|item| item.id != unit_id) {
return Err(ApiErr::BadRequest(
"Unit code already exists".to_string(),
@ -454,7 +454,7 @@ pub async fn update_unit(
}
crate::service::update_unit(
&state.pool,
&state.platform.pool,
unit_id,
crate::service::UpdateUnitParams {
code: payload.code.as_deref(),
@ -479,7 +479,7 @@ pub async fn delete_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let deleted = crate::service::delete_unit(&state.pool, unit_id).await?;
let deleted = crate::service::delete_unit(&state.platform.pool, unit_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
@ -503,13 +503,13 @@ pub async fn get_event_list(
query.validate()?;
let total = crate::service::get_events_count(
&state.pool,
&state.platform.pool,
query.unit_id,
query.event_type.as_deref(),
)
.await?;
let data = crate::service::get_events_paginated(
&state.pool,
&state.platform.pool,
query.unit_id,
query.event_type.as_deref(),
query.pagination.page_size,
@ -529,7 +529,7 @@ pub async fn start_auto_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit = crate::service::get_unit_by_id(&state.pool, unit_id)
let unit = crate::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -564,7 +564,7 @@ pub async fn stop_auto_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
crate::service::get_unit_by_id(&state.pool, unit_id)
crate::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -581,7 +581,7 @@ pub async fn stop_auto_unit(
pub async fn batch_start_auto(
State(state): State<AppState>,
) -> Result<impl IntoResponse, ApiErr> {
let units = crate::service::get_all_enabled_units(&state.pool).await?;
let units = crate::service::get_all_enabled_units(&state.platform.pool).await?;
let mut started = Vec::new();
let mut skipped = Vec::new();
@ -611,7 +611,7 @@ pub async fn batch_start_auto(
pub async fn batch_stop_auto(
State(state): State<AppState>,
) -> Result<impl IntoResponse, ApiErr> {
let units = crate::service::get_all_enabled_units(&state.pool).await?;
let units = crate::service::get_all_enabled_units(&state.platform.pool).await?;
let mut stopped = Vec::new();
for unit in units {
@ -635,7 +635,7 @@ pub async fn ack_fault_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
crate::service::get_unit_by_id(&state.pool, unit_id)
crate::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -669,7 +669,7 @@ pub async fn get_unit_runtime(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
crate::service::get_unit_by_id(&state.pool, unit_id)
crate::service::get_unit_by_id(&state.platform.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;

View File

@ -55,9 +55,9 @@ pub async fn get_equipment_list(
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let total = crate::service::get_equipment_count(&state.pool, query.keyword.as_deref()).await?;
let total = crate::service::get_equipment_count(&state.platform.pool, query.keyword.as_deref()).await?;
let items = crate::service::get_equipment_paginated(
&state.pool,
&state.platform.pool,
query.keyword.as_deref(),
query.pagination.page_size,
query.pagination.offset(),
@ -66,10 +66,10 @@ pub async fn get_equipment_list(
let equipment_ids: Vec<uuid::Uuid> = items.iter().map(|item| item.equipment.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?;
crate::service::get_signal_role_points_batch(&state.platform.pool, &equipment_ids).await?;
let monitor_guard = state
.connection_manager
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -110,7 +110,7 @@ pub async fn get_equipment(
State(state): State<AppState>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?;
let equipment = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?;
match equipment {
Some(item) => Ok(Json(item)),
@ -122,12 +122,12 @@ pub async fn get_equipment_points(
State(state): State<AppState>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?;
let exists = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?;
if exists.is_none() {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
let points = crate::service::get_points_by_equipment_id(&state.pool, equipment_id).await?;
let points = crate::service::get_points_by_equipment_id(&state.platform.pool, equipment_id).await?;
Ok(Json(points))
}
@ -165,7 +165,7 @@ pub async fn create_equipment(
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let exists = crate::service::get_equipment_by_code(&state.pool, &payload.code).await?;
let exists = crate::service::get_equipment_by_code(&state.platform.pool, &payload.code).await?;
if exists.is_some() {
return Err(ApiErr::BadRequest(
"Equipment code already exists".to_string(),
@ -174,14 +174,14 @@ pub async fn create_equipment(
}
if let Some(unit_id) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?;
if unit_exists.is_none() {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
}
let equipment_id = crate::service::create_equipment(
&state.pool,
&state.platform.pool,
payload.unit_id,
&payload.code,
&payload.name,
@ -219,7 +219,7 @@ pub async fn update_equipment(
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?;
let exists = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?;
let existing_equipment = if let Some(equipment) = exists {
equipment
} else {
@ -227,14 +227,14 @@ pub async fn update_equipment(
};
if let Some(Some(unit_id)) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?;
if unit_exists.is_none() {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
}
if let Some(code) = payload.code.as_deref() {
let duplicate = crate::service::get_equipment_by_code(&state.pool, code).await?;
let duplicate = crate::service::get_equipment_by_code(&state.platform.pool, code).await?;
if duplicate
.as_ref()
.is_some_and(|item| item.id != equipment_id)
@ -247,7 +247,7 @@ pub async fn update_equipment(
}
crate::service::update_equipment(
&state.pool,
&state.platform.pool,
equipment_id,
payload.unit_id,
payload.code.as_deref(),
@ -289,17 +289,17 @@ pub async fn batch_set_equipment_unit(
}
if let Some(unit_id) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?;
if unit_exists.is_none() {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
}
let before_unit_ids =
crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?;
crate::service::get_unit_ids_by_equipment_ids(&state.platform.pool, &payload.equipment_ids).await?;
let updated_count = crate::service::batch_set_equipment_unit(
&state.pool,
&state.platform.pool,
&payload.equipment_ids,
payload.unit_id,
)
@ -321,8 +321,8 @@ pub async fn delete_equipment(
State(state): State<AppState>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?;
let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?;
let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.platform.pool, &[equipment_id]).await?;
let deleted = crate::service::delete_equipment(&state.platform.pool, equipment_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}

View File

@ -20,7 +20,7 @@ pub async fn get_page_list(
Query(query): Query<GetPageListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.pool;
let pool = &state.platform.pool;
let pages: Vec<Page> = if let Some(name) = query.name {
sqlx::query_as::<_, Page>(
@ -50,7 +50,7 @@ pub async fn get_page(
) -> Result<impl IntoResponse, ApiErr> {
let page = sqlx::query_as::<_, Page>("SELECT * FROM page WHERE id = $1")
.bind(page_id)
.fetch_optional(&state.pool)
.fetch_optional(&state.platform.pool)
.await?;
match page {
@ -88,7 +88,7 @@ pub async fn create_page(
)
.bind(&payload.name)
.bind(SqlxJson(payload.data))
.fetch_one(&state.pool)
.fetch_one(&state.platform.pool)
.await?;
Ok((StatusCode::CREATED, Json(serde_json::json!({
@ -106,7 +106,7 @@ pub async fn update_page(
let exists = sqlx::query("SELECT 1 FROM page WHERE id = $1")
.bind(page_id)
.fetch_optional(&state.pool)
.fetch_optional(&state.platform.pool)
.await?;
if exists.is_none() {
return Err(ApiErr::NotFound("Page not found".to_string(), None));
@ -145,7 +145,7 @@ pub async fn update_page(
}
query = query.bind(page_id);
query.execute(&state.pool).await?;
query.execute(&state.platform.pool).await?;
Ok(Json(serde_json::json!({
"ok_msg": "Page updated successfully"
@ -158,7 +158,7 @@ pub async fn delete_page(
) -> Result<impl IntoResponse, ApiErr> {
let result = sqlx::query("DELETE FROM page WHERE id = $1")
.bind(page_id)
.execute(&state.pool)
.execute(&state.platform.pool)
.await?;
if result.rows_affected() == 0 {

View File

@ -69,7 +69,7 @@ pub async fn get_point_list(
Query(query): Query<GetPointListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.pool;
let pool = &state.platform.pool;
// Count total rows.
let total = crate::service::get_points_count(pool, query.source_id, query.equipment_id).await?;
@ -85,7 +85,7 @@ pub async fn get_point_list(
.await?;
let monitor_guard = state
.connection_manager
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -114,7 +114,7 @@ pub async fn get_point(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let pool = &state.platform.pool;
let point = crate::service::get_point_by_id(pool, point_id).await?;
Ok(Json(point))
@ -125,7 +125,7 @@ pub async fn get_point_history(
Path(point_id): Path<Uuid>,
Query(query): Query<GetPointHistoryQuery>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let pool = &state.platform.pool;
let point = crate::service::get_point_by_id(pool, point_id).await?;
if point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
@ -133,7 +133,7 @@ pub async fn get_point_history(
let limit = query.limit.unwrap_or(120).clamp(1, 1000);
let history = state
.connection_manager
.platform.connection_manager
.get_point_history(point_id, limit)
.await;
@ -194,7 +194,7 @@ pub async fn update_point(
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.pool;
let pool = &state.platform.pool;
if payload.name.is_none()
&& payload.description.is_none()
@ -317,7 +317,7 @@ pub async fn batch_set_point_tags(
));
}
let pool = &state.pool;
let pool = &state.platform.pool;
// If tag_id is provided, ensure tag exists.
if let Some(tag_id) = payload.tag_id {
@ -372,7 +372,7 @@ pub async fn batch_set_point_equipment(
));
}
let pool = &state.pool;
let pool = &state.platform.pool;
if let Some(equipment_id) = payload.equipment_id {
let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#)
@ -429,7 +429,7 @@ pub async fn delete_point(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let pool = &state.platform.pool;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let source_id = {
@ -494,7 +494,7 @@ pub async fn batch_create_points(
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.pool;
let pool = &state.platform.pool;
if payload.node_ids.is_empty() {
return Err(ApiErr::BadRequest(
@ -614,7 +614,7 @@ pub async fn batch_delete_points(
));
}
let pool = &state.pool;
let pool = &state.platform.pool;
let point_ids = payload.point_ids;
let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?;
@ -673,7 +673,7 @@ pub async fn batch_set_point_value(
}
let result = state
.connection_manager
.platform.connection_manager
.write_point_values_batch(payload)
.await
.map_err(|e| ApiErr::Internal(e, None))?;

View File

@ -98,12 +98,12 @@ impl From<Source> for SourcePublic {
}
pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let pool = &state.platform.pool;
let sources: Vec<Source> = crate::service::get_all_enabled_sources(pool).await?;
// 鑾峰彇鎵€鏈夎繛鎺ョ姸鎬?
let status_map: std::collections::HashMap<Uuid, (bool, Option<String>, Option<DateTime<Utc>>)> =
state.connection_manager.get_all_status().await
state.platform.connection_manager.get_all_status().await
.into_iter()
.map(|(source_id, s)| (source_id, (s.is_connected, s.last_error, Some(s.last_time))))
.collect();
@ -132,7 +132,7 @@ pub async fn get_node_tree(
State(state): State<AppState>,
Path(source_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let pool = &state.platform.pool;
// 鏌ヨ鎵€鏈夊睘浜庤source鐨勮妭鐐?
let nodes: Vec<Node> = sqlx::query_as::<_, Node>(
@ -212,7 +212,7 @@ pub async fn create_source(
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.pool;
let pool = &state.platform.pool;
let new_id = Uuid::new_v4();
sqlx::query(
@ -261,7 +261,7 @@ pub async fn update_source(
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
let pool = &state.pool;
let pool = &state.platform.pool;
let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1")
.bind(source_id)
@ -311,7 +311,7 @@ pub async fn delete_source(
State(state): State<AppState>,
Path(source_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let pool = &state.platform.pool;
let source_name = sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
.bind(source_id)
@ -334,7 +334,7 @@ pub async fn reconnect_source(
State(state): State<AppState>,
Path(source_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let pool = &state.platform.pool;
let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1")
.bind(source_id)
@ -349,7 +349,7 @@ pub async fn reconnect_source(
}
state
.connection_manager
.platform.connection_manager
.reconnect(pool, source_id)
.await
.map_err(|e| ApiErr::Internal(e, None))?;
@ -362,7 +362,7 @@ pub async fn browse_and_save_nodes(
Path(source_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let pool = &state.platform.pool;
// 纭 source 瀛樺湪
sqlx::query("SELECT 1 FROM source WHERE id = $1")
@ -370,7 +370,7 @@ pub async fn browse_and_save_nodes(
.fetch_one(pool)
.await?;
let session = state.connection_manager
let session = state.platform.connection_manager
.get_session(source_id)
.await
.ok_or_else(|| anyhow::anyhow!("Source not connected"))?;

View File

@ -21,7 +21,7 @@ pub async fn get_tag_list(
Query(query): Query<GetTagListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.pool;
let pool = &state.platform.pool;
// Count total rows.
let total = crate::service::get_tags_count(pool).await?;
@ -43,7 +43,7 @@ pub async fn get_tag_points(
State(state): State<AppState>,
Path(tag_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let points = crate::service::get_tag_points(&state.pool, tag_id).await?;
let points = crate::service::get_tag_points(&state.platform.pool, tag_id).await?;
Ok(Json(points))
}
@ -72,7 +72,7 @@ pub async fn create_tag(
let point_ids = payload.point_ids.as_deref().unwrap_or(&[]);
let tag_id = crate::service::create_tag(
&state.pool,
&state.platform.pool,
&payload.name,
payload.description.as_deref(),
point_ids,
@ -93,13 +93,13 @@ pub async fn update_tag(
payload.validate()?;
// Ensure the target tag exists.
let exists = crate::service::get_tag_by_id(&state.pool, tag_id).await?;
let exists = crate::service::get_tag_by_id(&state.platform.pool, tag_id).await?;
if exists.is_none() {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));
}
crate::service::update_tag(
&state.pool,
&state.platform.pool,
tag_id,
payload.name.as_deref(),
payload.description.as_deref(),
@ -116,7 +116,7 @@ pub async fn delete_tag(
State(state): State<AppState>,
Path(tag_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let deleted = crate::service::delete_tag(&state.pool, tag_id).await?;
let deleted = crate::service::delete_tag(&state.platform.pool, tag_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));

View File

@ -5,145 +5,20 @@
},
response::IntoResponse,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tokio::sync::broadcast;
use uuid::Uuid;
/// WebSocket message payload types.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum WsMessage {
PointNewValue(crate::telemetry::PointMonitorInfo),
PointSetValueBatchResult(crate::connection::BatchSetPointValueRes),
EventCreated(plc_platform_core::model::EventRecord),
UnitRuntimeChanged(crate::control::runtime::UnitRuntime),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "snake_case")]
pub enum WsClientMessage {
AuthWrite(WsAuthWriteReq),
PointSetValueBatch(crate::connection::BatchSetPointValueReq),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WsAuthWriteReq {
pub key: String,
}
/// Room manager: room_id -> broadcast sender.
#[derive(Clone)]
pub struct RoomManager {
rooms: Arc<RwLock<HashMap<String, broadcast::Sender<WsMessage>>>>,
}
impl RoomManager {
pub fn new() -> Self {
Self {
rooms: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Get or create room sender.
pub async fn get_or_create_room(&self, room_id: &str) -> broadcast::Sender<WsMessage> {
let mut rooms = self.rooms.write().await;
if let Some(sender) = rooms.get(room_id) {
return sender.clone();
}
let (sender, _) = broadcast::channel(100);
rooms.insert(room_id.to_string(), sender.clone());
tracing::info!("Created new room: {}", room_id);
sender
}
/// Get room sender if room exists.
pub async fn get_room(&self, room_id: &str) -> Option<broadcast::Sender<WsMessage>> {
let rooms = self.rooms.read().await;
rooms.get(room_id).cloned()
}
/// Remove room if there are no receivers left.
pub async fn remove_room_if_empty(&self, room_id: &str) {
let mut rooms = self.rooms.write().await;
let should_remove = rooms
.get(room_id)
.map(|sender| sender.receiver_count() == 0)
.unwrap_or(false);
if should_remove {
rooms.remove(room_id);
tracing::info!("Removed empty room: {}", room_id);
}
}
/// Send message to room.
///
/// Returns:
/// - Ok(n): n subscribers received it
/// - Ok(0): room missing or no active subscribers
pub async fn send_to_room(&self, room_id: &str, message: WsMessage) -> Result<usize, String> {
if let Some(sender) = self.get_room(room_id).await {
match sender.send(message) {
Ok(count) => Ok(count),
// No receiver is not exceptional in push scenarios.
Err(broadcast::error::SendError(_)) => Ok(0),
}
} else {
Ok(0)
}
}
}
impl Default for RoomManager {
fn default() -> Self {
Self::new()
}
}
/// WebSocket manager.
#[derive(Clone)]
pub struct WebSocketManager {
public_room: Arc<RoomManager>,
}
impl WebSocketManager {
pub fn new() -> Self {
Self {
public_room: Arc::new(RoomManager::new()),
}
}
/// Send message to public room.
pub async fn send_to_public(&self, message: WsMessage) -> Result<usize, String> {
self.public_room.get_or_create_room("public").await;
self.public_room.send_to_room("public", message).await
}
/// Send message to a dedicated client room.
pub async fn send_to_client(&self, client_id: Uuid, message: WsMessage) -> Result<usize, String> {
self.public_room
.send_to_room(&client_id.to_string(), message)
.await
}
}
impl Default for WebSocketManager {
fn default() -> Self {
Self::new()
}
}
pub use plc_platform_core::websocket::{
RoomManager, WebSocketManager, WsClientMessage, WsMessage,
};
/// Public websocket handler.
pub async fn public_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<crate::AppState>,
) -> impl IntoResponse {
let ws_manager = state.ws_manager.clone();
let ws_manager = state.platform.ws_manager.clone();
let app_state = state.clone();
ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, "public".to_string(), app_state))
}
@ -154,7 +29,7 @@ pub async fn client_websocket_handler(
Path(client_id): Path<Uuid>,
State(state): State<crate::AppState>,
) -> impl IntoResponse {
let ws_manager = state.ws_manager.clone();
let ws_manager = state.platform.ws_manager.clone();
let room_id = client_id.to_string();
let app_state = state.clone();
ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, room_id, app_state))
@ -167,8 +42,7 @@ async fn handle_socket(
room_id: String,
state: crate::AppState,
) {
let room_sender = ws_manager.public_room.get_or_create_room(&room_id).await;
let mut rx = room_sender.subscribe();
let mut rx = ws_manager.subscribe_room(&room_id).await;
let mut can_write = false;
loop {
@ -198,7 +72,7 @@ async fn handle_socket(
results: vec![],
}
} else {
match state.connection_manager.write_point_values_batch(payload).await {
match state.platform.connection_manager.write_point_values_batch(payload).await {
Ok(v) => v,
Err(e) => crate::connection::BatchSetPointValueRes {
success: false,
@ -214,7 +88,6 @@ async fn handle_socket(
}
};
if let Err(e) = ws_manager
.public_room
.send_to_room(&room_id, WsMessage::PointSetValueBatchResult(response))
.await
{
@ -267,5 +140,5 @@ async fn handle_socket(
}
}
ws_manager.public_room.remove_room_if_empty(&room_id).await;
ws_manager.remove_room_if_empty(&room_id).await;
}

View File

@ -10,6 +10,7 @@ axum = { version = "0.8", features = ["ws"] }
tower-http = { version = "0.6", features = ["cors", "fs"] }
tracing = "0.1"
dotenv = "0.15"
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "json"] }
[dev-dependencies]
tower = { version = "0.5", features = ["util"] }

View File

@ -1,7 +1,9 @@
use crate::router::build_router;
use plc_platform_core::platform_context::PlatformContext;
#[derive(Clone, Debug)]
pub struct AppConfig {
pub database_url: String,
pub server_host: String,
pub server_port: u16,
}
@ -9,6 +11,8 @@ pub struct AppConfig {
impl AppConfig {
pub fn from_env() -> Self {
Self {
database_url: std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set"),
server_host: std::env::var("OPS_SERVER_HOST")
.unwrap_or_else(|_| "127.0.0.1".to_string()),
server_port: std::env::var("OPS_SERVER_PORT")
@ -19,16 +23,16 @@ impl AppConfig {
}
}
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct AppState {
pub app_name: &'static str,
pub config: AppConfig,
pub platform: PlatformContext,
}
pub async fn run() {
dotenv::dotenv().ok();
plc_platform_core::util::log::init_logger();
let _platform = plc_platform_core::bootstrap::bootstrap_platform();
let _single_instance =
match plc_platform_core::util::single_instance::try_acquire("PLCControl.OperationSystem") {
Ok(guard) => guard,
@ -42,9 +46,16 @@ pub async fn run() {
}
};
let config = AppConfig::from_env();
let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url)
.await
.expect("Failed to bootstrap platform");
let platform = builder.build();
let state = AppState {
app_name: "operation-system",
config: AppConfig::from_env(),
config,
platform,
};
let app = build_router(state.clone());
let addr = format!("{}:{}", state.config.server_host, state.config.server_port);
@ -59,11 +70,20 @@ pub async fn run() {
}
pub fn test_state() -> AppState {
let database_url = "postgres://plc:plc@localhost/plc_control_test".to_string();
let pool = sqlx::postgres::PgPoolOptions::new()
.connect_lazy(&database_url)
.expect("lazy pool should build");
let connection_manager = std::sync::Arc::new(plc_platform_core::connection::ConnectionManager::new());
let ws_manager = std::sync::Arc::new(plc_platform_core::websocket::WebSocketManager::new());
AppState {
app_name: "operation-system",
config: AppConfig {
database_url,
server_host: "127.0.0.1".to_string(),
server_port: 0,
},
platform: PlatformContext::new(pool, connection_manager, ws_manager),
}
}

View File

@ -1,5 +1,37 @@
use crate::platform_context::PlatformContext;
use std::sync::Arc;
pub fn bootstrap_platform() -> PlatformContext {
PlatformContext::new("bootstrap")
use crate::connection::ConnectionManager;
use crate::db::init_database;
use crate::platform_context::PlatformContext;
use crate::websocket::WebSocketManager;
pub struct PlatformBuilder {
pub pool: sqlx::PgPool,
pub connection_manager: ConnectionManager,
pub ws_manager: Arc<WebSocketManager>,
}
impl PlatformBuilder {
pub fn build(self) -> PlatformContext {
PlatformContext::new(
self.pool,
Arc::new(self.connection_manager),
self.ws_manager,
)
}
}
pub async fn bootstrap_platform(database_url: &str) -> Result<PlatformBuilder, String> {
let pool = init_database(database_url)
.await
.map_err(|e| format!("Failed to initialize database: {}", e))?;
let connection_manager = ConnectionManager::new();
let ws_manager = Arc::new(WebSocketManager::new());
Ok(PlatformBuilder {
pool,
connection_manager,
ws_manager,
})
}

View File

@ -1,14 +1,25 @@
use std::sync::Arc;
use crate::connection::ConnectionManager;
use crate::websocket::WebSocketManager;
#[derive(Clone)]
pub struct PlatformContext {
pub config_name: Arc<str>,
pub pool: sqlx::PgPool,
pub connection_manager: Arc<ConnectionManager>,
pub ws_manager: Arc<WebSocketManager>,
}
impl PlatformContext {
pub fn new(config_name: impl Into<Arc<str>>) -> Self {
pub fn new(
pool: sqlx::PgPool,
connection_manager: Arc<ConnectionManager>,
ws_manager: Arc<WebSocketManager>,
) -> Self {
Self {
config_name: config_name.into(),
pool,
connection_manager,
ws_manager,
}
}
}
}

View File

@ -119,6 +119,19 @@ impl WebSocketManager {
.send_to_room(&client_id.to_string(), message)
.await
}
pub async fn subscribe_room(&self, room_id: &str) -> broadcast::Receiver<WsMessage> {
let sender = self.public_room.get_or_create_room(room_id).await;
sender.subscribe()
}
pub async fn send_to_room(&self, room_id: &str, message: WsMessage) -> Result<usize, String> {
self.public_room.send_to_room(room_id, message).await
}
pub async fn remove_room_if_empty(&self, room_id: &str) {
self.public_room.remove_room_if_empty(room_id).await;
}
}
impl Default for WebSocketManager {

View File

@ -1,11 +1,7 @@
use plc_platform_core::bootstrap::bootstrap_platform;
use plc_platform_core::platform_context::PlatformContext;
#[test]
fn platform_context_type_is_public() {
let context = bootstrap_platform();
assert_eq!(context.config_name.as_ref(), "bootstrap");
fn assert_send_sync_clone<T: Send + Sync + Clone>() {}
assert_send_sync_clone::<PlatformContext>();
}