refactor: 在 connect 方法中启动统一轮询任务,移除 start_polling_for_points 方法

This commit is contained in:
caoqianming 2026-03-05 10:52:27 +08:00
parent 114d350e5b
commit 8230536c73
1 changed files with 10 additions and 11 deletions

View File

@ -336,11 +336,11 @@ impl ConnectionManager {
} }
} }
async fn start_polling_for_points( // 将点添加到轮询列表
async fn add_points_to_poll_list(
&self, &self,
source_id: Uuid, source_id: Uuid,
points: &[PointSubscriptionInfo], points: &[PointSubscriptionInfo],
session: Arc<Session>,
) -> usize { ) -> usize {
let mut started = 0usize; let mut started = 0usize;
@ -365,11 +365,6 @@ impl ConnectionManager {
} }
} }
// 如果有新的轮询点,启动或重启统一的轮询任务
if started > 0 {
self.start_unified_poll_task(source_id, session).await;
}
started started
} }
@ -476,7 +471,7 @@ impl ConnectionManager {
status.insert( status.insert(
source_id, source_id,
ConnectionStatus { ConnectionStatus {
session: Some(session), session: Some(session.clone()),
is_connected: true, is_connected: true,
last_error: None, last_error: None,
last_time: Utc::now(), last_time: Utc::now(),
@ -488,6 +483,10 @@ impl ConnectionManager {
poll_handle: None, poll_handle: None,
}, },
); );
drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前
// 启动统一的轮询任务
self.start_unified_poll_task(source_id, session).await;
tracing::info!("Successfully connected to source {}", source_id); tracing::info!("Successfully connected to source {}", source_id);
Ok(()) Ok(())
@ -933,7 +932,7 @@ impl ConnectionManager {
if subscription_id.is_none() { if subscription_id.is_none() {
let polled_count = self let polled_count = self
.start_polling_for_points(source_id, &points, session.clone()) .add_points_to_poll_list(source_id, &points)
.await; .await;
return Ok(Self::subscription_result(0, polled_count)); return Ok(Self::subscription_result(0, polled_count));
} }
@ -1001,7 +1000,7 @@ impl ConnectionManager {
} }
let polled_count = self let polled_count = self
.start_polling_for_points(source_id, &item_points, session.clone()) .add_points_to_poll_list(source_id, &item_points)
.await; .await;
return Ok(Self::subscription_result(0, polled_count)); return Ok(Self::subscription_result(0, polled_count));
} }
@ -1057,7 +1056,7 @@ impl ConnectionManager {
} }
let polled_count = self let polled_count = self
.start_polling_for_points(source_id, &failed_points, session.clone()) .add_points_to_poll_list(source_id, &failed_points)
.await; .await;
Ok(Self::subscription_result( Ok(Self::subscription_result(