优化点取消订阅逻辑:从轮询列表中移除所有传入点,并记录订阅点和轮询点的移除数量

This commit is contained in:
caoqianming 2026-03-05 11:01:20 +08:00
parent 8230536c73
commit a63433e757
1 changed files with 19 additions and 8 deletions

View File

@ -1147,13 +1147,6 @@ impl ConnectionManager {
.retain(|_, point_id| !removed_set.contains(point_id));
}
// 从轮询列表中移除已取消订阅的点
{
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
conn_status.poll_points.retain(|p| !removed_set.contains(&p.point_id));
}
}
let _ = self
.remove_point_write_target_cache_by_point_ids(&removed_point_ids)
.await;
@ -1170,9 +1163,27 @@ impl ConnectionManager {
history_data.remove(point_id);
}
}
// 从轮询列表中移除传入的点,并记录移除的轮询点数量
let polling_removed_count = {
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
let before_count = conn_status.poll_points.len();
conn_status.poll_points.retain(|p| !target_ids.contains(&p.point_id));
let after_count = conn_status.poll_points.len();
before_count - after_count
} else {
0
}
};
// 计算从订阅点和轮询点移除的总数
let total_removed = removed_point_ids.len() + polling_removed_count;
tracing::info!(
"Unsubscribed {} points from source {}",
"Unsubscribed {} points (subscription: {}, polling: {}) from source {}",
total_removed,
removed_point_ids.len(),
polling_removed_count,
source_id
);
Ok(removed_point_ids.len())