diff --git a/Cargo.lock b/Cargo.lock index 09ca8df..9ab3040 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,10 +112,13 @@ dependencies = [ "axum", "dotenv", "plc_platform_core", + "serde_json", + "sqlx", "tokio", "tower-http", "tracing", "tray-icon", + "uuid", "webbrowser", "winit", ] diff --git a/crates/app_feeder_distributor/Cargo.toml b/crates/app_feeder_distributor/Cargo.toml index 9694adf..a2fc808 100644 --- a/crates/app_feeder_distributor/Cargo.toml +++ b/crates/app_feeder_distributor/Cargo.toml @@ -10,6 +10,9 @@ axum = { version = "0.8", features = ["ws"] } tower-http = { version = "0.6", features = ["cors", "fs"] } tracing = "0.1" dotenv = "0.15" +serde_json = "1.0" +sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "json"] } +uuid = { version = "1.21", features = ["serde", "v4"] } [target.'cfg(windows)'.dependencies] tray-icon = "0.15" diff --git a/crates/app_feeder_distributor/src/main.rs b/crates/app_feeder_distributor/src/main.rs index e71fdf5..e63b9d6 100644 --- a/crates/app_feeder_distributor/src/main.rs +++ b/crates/app_feeder_distributor/src/main.rs @@ -1 +1,94 @@ -fn main() {} \ No newline at end of file +use std::sync::Arc; + +#[allow(dead_code)] +#[path = "../../../src/config.rs"] +mod config; +#[allow(dead_code)] +#[path = "../../../src/event.rs"] +mod event; + +mod connection { + pub use plc_platform_core::connection::*; +} + +mod db { + pub use plc_platform_core::db::*; +} + +mod telemetry { + pub use plc_platform_core::telemetry::*; +} + +#[allow(dead_code)] +mod websocket { + #[derive(Debug, Clone)] + pub enum WsMessage { + PointNewValue(crate::telemetry::PointMonitorInfo), + EventCreated(plc_platform_core::model::EventRecord), + } + + #[derive(Clone, Default)] + pub struct WebSocketManager; + + impl WebSocketManager { + pub async fn send_to_public(&self, _message: WsMessage) -> Result { + Ok(0) + } + } +} + +use config::AppConfig; +use connection::ConnectionManager; +use event::EventManager; + +#[tokio::main] +async fn main() { + dotenv::dotenv().ok(); + plc_platform_core::util::log::init_logger(); + let _platform = plc_platform_core::bootstrap::bootstrap_platform(); + let _single_instance = match plc_platform_core::util::single_instance::try_acquire( + "PLCControl.FeederDistributor", + ) { + Ok(guard) => guard, + Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => { + tracing::warn!("Another feeder distributor instance is already running"); + return; + } + Err(err) => { + tracing::error!("Failed to initialize single instance guard: {}", err); + return; + } + }; + + let config = AppConfig::from_env().expect("Failed to load configuration"); + let pool = db::init_database(&config.database_url) + .await + .expect("Failed to initialize database"); + + let mut connection_manager = ConnectionManager::new(); + let event_manager = Arc::new(EventManager::new( + pool.clone(), + Arc::new(connection_manager.clone()), + None, + )); + connection_manager.set_event_manager(event_manager.clone()); + connection_manager.set_pool_and_start_reconnect_task(Arc::new(pool.clone())); + + let connection_manager = Arc::new(connection_manager); + let sources = plc_platform_core::service::get_all_enabled_sources(&pool) + .await + .expect("Failed to fetch sources"); + + for source in &sources { + if let Err(err) = connection_manager.connect_from_source(&pool, source.id).await { + tracing::error!("Failed to connect to source {}: {}", source.name, err); + } + } + + tracing::info!( + "Feeder distributor bootstrap initialized for {} enabled sources on {}:{}", + sources.len(), + config.server_host, + config.server_port + ); +} diff --git a/src/event.rs b/src/event.rs index ccd7cb0..45358c3 100644 --- a/src/event.rs +++ b/src/event.rs @@ -142,6 +142,15 @@ impl EventManager { } } +impl plc_platform_core::connection::PointEventSink for EventManager { + fn send_point_new_value( + &self, + payload: plc_platform_core::telemetry::PointNewValue, + ) -> Result<(), String> { + self.send(AppEvent::PointNewValue(payload)) + } +} + async fn handle_control_event( event: AppEvent, pool: &sqlx::PgPool,