From 8230536c73c452db12f3558b38ad38f0fdf999e8 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 5 Mar 2026 10:52:27 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E5=9C=A8=20connect=20=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E4=B8=AD=E5=90=AF=E5=8A=A8=E7=BB=9F=E4=B8=80=E8=BD=AE?= =?UTF-8?q?=E8=AF=A2=E4=BB=BB=E5=8A=A1=EF=BC=8C=E7=A7=BB=E9=99=A4=20start?= =?UTF-8?q?=5Fpolling=5Ffor=5Fpoints=20=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/connection.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index b9a0bac..96f564f 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -336,11 +336,11 @@ impl ConnectionManager { } } - async fn start_polling_for_points( + // 将点添加到轮询列表 + async fn add_points_to_poll_list( &self, source_id: Uuid, points: &[PointSubscriptionInfo], - session: Arc, ) -> usize { let mut started = 0usize; @@ -365,11 +365,6 @@ impl ConnectionManager { } } - // 如果有新的轮询点,启动或重启统一的轮询任务 - if started > 0 { - self.start_unified_poll_task(source_id, session).await; - } - started } @@ -476,7 +471,7 @@ impl ConnectionManager { status.insert( source_id, ConnectionStatus { - session: Some(session), + session: Some(session.clone()), is_connected: true, last_error: None, last_time: Utc::now(), @@ -488,6 +483,10 @@ impl ConnectionManager { poll_handle: None, }, ); + drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前 + + // 启动统一的轮询任务 + self.start_unified_poll_task(source_id, session).await; tracing::info!("Successfully connected to source {}", source_id); Ok(()) @@ -933,7 +932,7 @@ impl ConnectionManager { if subscription_id.is_none() { let polled_count = self - .start_polling_for_points(source_id, &points, session.clone()) + .add_points_to_poll_list(source_id, &points) .await; return Ok(Self::subscription_result(0, polled_count)); } @@ -1001,7 +1000,7 @@ impl ConnectionManager { } let polled_count = self - .start_polling_for_points(source_id, &item_points, session.clone()) + .add_points_to_poll_list(source_id, &item_points) .await; return Ok(Self::subscription_result(0, polled_count)); } @@ -1057,7 +1056,7 @@ impl ConnectionManager { } let polled_count = self - .start_polling_for_points(source_id, &failed_points, session.clone()) + .add_points_to_poll_list(source_id, &failed_points) .await; Ok(Self::subscription_result(