From 5685c33687a7e343fe49b4959c17270568de99fc Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 5 Mar 2026 11:08:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96main.rs=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=89=93=E5=8D=B0=EF=BC=9A=E7=A7=BB=E9=99=A4=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E6=97=A5=E5=BF=97=EF=BC=8C=E4=BD=BF=E7=94=A8=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E5=A4=84=E7=90=86source=E8=BF=9E=E6=8E=A5=E5=92=8C=E8=AE=A2?= =?UTF-8?q?=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.rs | 53 ++++++++++++++++++++++++++--------------------------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/src/main.rs b/src/main.rs index 312a0cd..79c9f63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -48,41 +48,40 @@ async fn main() { connection_manager.set_event_manager(event_manager.clone()); let connection_manager = Arc::new(connection_manager); - // Connect to all enabled sources + // Connect to all enabled sources concurrently let sources = service::get_all_enabled_sources(&pool) .await .expect("Failed to fetch sources"); + // Spawn a task for each source to connect and subscribe concurrently + let mut tasks = Vec::new(); for source in sources { - tracing::info!("Connecting to source: {} ({})", source.name, source.endpoint); - match connection_manager.connect_from_source(&pool, source.id).await { - Ok(_) => { - tracing::info!("Successfully connected to source: {}", source.name); - // Subscribe to points for this source - match connection_manager - .subscribe_points_from_source(source.id, None, &pool) - .await - { - Ok(stats) => { - let subscribed = *stats.get("subscribed").unwrap_or(&0); - let polled = *stats.get("polled").unwrap_or(&0); - let total = *stats.get("total").unwrap_or(&0); - tracing::info!( - "Point subscribe setup for source {}: subscribed={}, polled={}, total={}", - source.name, - subscribed, - polled, - total - ); - } - Err(e) => { - tracing::error!("Failed to subscribe to points for source {}: {}", source.name, e); + let cm = connection_manager.clone(); + let p = pool.clone(); + let source_name = source.name.clone(); + let source_id = source.id; + + let task = tokio::spawn(async move { + match cm.connect_from_source(&p, source_id).await { + Ok(_) => { + // Subscribe to points for this source + if let Err(e) = cm.subscribe_points_from_source(source_id, None, &p).await { + tracing::error!("Failed to subscribe to points for source {}: {}", source_name, e); } } + Err(e) => { + tracing::error!("Failed to connect to source {}: {}", source_name, e); + } } - Err(e) => { - tracing::error!("Failed to connect to source {}: {}", source.name, e); - } + }); + + tasks.push(task); + } + + // Wait for all connection tasks to complete + for task in tasks { + if let Err(e) = task.await { + tracing::error!("Source connection task failed: {:?}", e); } }