diff --git a/src/connection.rs b/src/connection.rs index b9a0bac..96f564f 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -336,11 +336,11 @@ impl ConnectionManager { } } - async fn start_polling_for_points( + // 将点添加到轮询列表 + async fn add_points_to_poll_list( &self, source_id: Uuid, points: &[PointSubscriptionInfo], - session: Arc, ) -> usize { let mut started = 0usize; @@ -365,11 +365,6 @@ impl ConnectionManager { } } - // 如果有新的轮询点,启动或重启统一的轮询任务 - if started > 0 { - self.start_unified_poll_task(source_id, session).await; - } - started } @@ -476,7 +471,7 @@ impl ConnectionManager { status.insert( source_id, ConnectionStatus { - session: Some(session), + session: Some(session.clone()), is_connected: true, last_error: None, last_time: Utc::now(), @@ -488,6 +483,10 @@ impl ConnectionManager { poll_handle: None, }, ); + drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前 + + // 启动统一的轮询任务 + self.start_unified_poll_task(source_id, session).await; tracing::info!("Successfully connected to source {}", source_id); Ok(()) @@ -933,7 +932,7 @@ impl ConnectionManager { if subscription_id.is_none() { let polled_count = self - .start_polling_for_points(source_id, &points, session.clone()) + .add_points_to_poll_list(source_id, &points) .await; return Ok(Self::subscription_result(0, polled_count)); } @@ -1001,7 +1000,7 @@ impl ConnectionManager { } let polled_count = self - .start_polling_for_points(source_id, &item_points, session.clone()) + .add_points_to_poll_list(source_id, &item_points) .await; return Ok(Self::subscription_result(0, polled_count)); } @@ -1057,7 +1056,7 @@ impl ConnectionManager { } let polled_count = self - .start_polling_for_points(source_id, &failed_points, session.clone()) + .add_points_to_poll_list(source_id, &failed_points) .await; Ok(Self::subscription_result(