diff --git a/src/main.rs b/src/main.rs index 312a0cd..79c9f63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -48,41 +48,40 @@ async fn main() { connection_manager.set_event_manager(event_manager.clone()); let connection_manager = Arc::new(connection_manager); - // Connect to all enabled sources + // Connect to all enabled sources concurrently let sources = service::get_all_enabled_sources(&pool) .await .expect("Failed to fetch sources"); + // Spawn a task for each source to connect and subscribe concurrently + let mut tasks = Vec::new(); for source in sources { - tracing::info!("Connecting to source: {} ({})", source.name, source.endpoint); - match connection_manager.connect_from_source(&pool, source.id).await { - Ok(_) => { - tracing::info!("Successfully connected to source: {}", source.name); - // Subscribe to points for this source - match connection_manager - .subscribe_points_from_source(source.id, None, &pool) - .await - { - Ok(stats) => { - let subscribed = *stats.get("subscribed").unwrap_or(&0); - let polled = *stats.get("polled").unwrap_or(&0); - let total = *stats.get("total").unwrap_or(&0); - tracing::info!( - "Point subscribe setup for source {}: subscribed={}, polled={}, total={}", - source.name, - subscribed, - polled, - total - ); - } - Err(e) => { - tracing::error!("Failed to subscribe to points for source {}: {}", source.name, e); + let cm = connection_manager.clone(); + let p = pool.clone(); + let source_name = source.name.clone(); + let source_id = source.id; + + let task = tokio::spawn(async move { + match cm.connect_from_source(&p, source_id).await { + Ok(_) => { + // Subscribe to points for this source + if let Err(e) = cm.subscribe_points_from_source(source_id, None, &p).await { + tracing::error!("Failed to subscribe to points for source {}: {}", source_name, e); } } + Err(e) => { + tracing::error!("Failed to connect to source {}: {}", source_name, e); + } } - Err(e) => { - tracing::error!("Failed to connect to source {}: {}", source.name, e); - } + }); + + tasks.push(task); + } + + // Wait for all connection tasks to complete + for task in tasks { + if let Err(e) = task.await { + tracing::error!("Source connection task failed: {:?}", e); } }