From 494cf1d656a45f528f0f3eb5d1decc6b4f138e5d Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 5 Mar 2026 11:18:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=BF=9E=E6=8E=A5=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=EF=BC=9Aconnect=5Ffrom=5Fsource=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E5=A4=84=E7=90=86=E8=AE=A2=E9=98=85=EF=BC=8C=E7=AE=80=E5=8C=96?= =?UTF-8?q?main.rs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/connection.rs | 8 +++++++- src/main.rs | 12 ++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 486e6ef..f1635b7 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -395,7 +395,13 @@ impl ConnectionManager { source.username.as_deref(), source.password.as_deref(), ) - .await + .await?; + + // Subscribe to points for this source + self.subscribe_points_from_source(source_id, None, pool) + .await?; + + Ok(()) } pub async fn connect( diff --git a/src/main.rs b/src/main.rs index 2271e63..57c789e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -62,16 +62,8 @@ async fn main() { let source_id = source.id; let task = tokio::spawn(async move { - match cm.connect_from_source(&p, source_id).await { - Ok(_) => { - // Subscribe to points for this source - if let Err(e) = cm.subscribe_points_from_source(source_id, None, &p).await { - tracing::error!("Failed to subscribe to points for source {}: {}", source_name, e); - } - } - Err(e) => { - tracing::error!("Failed to connect to source {}: {}", source_name, e); - } + if let Err(e) = cm.connect_from_source(&p, source_id).await { + tracing::error!("Failed to connect to source {}: {}", source_name, e); } });