From b22225ad72cc9dc697e65fd7c8fd5175f2272633 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Wed, 4 Mar 2026 11:16:08 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=EF=BC=9A=E5=B0=86=20poll=5Fp?= =?UTF-8?q?oints=5Fby=5Fsource=20=E7=9A=84=E5=86=85=E5=B1=82=20HashSet=20?= =?UTF-8?q?=E7=A7=BB=E5=8A=A8=E5=88=B0=20ConnectionStatus=20=E4=B8=AD?= =?UTF-8?q?=EF=BC=8C=E7=AE=80=E5=8C=96=E6=95=B0=E6=8D=AE=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/connection.rs | 39 ++++++++++++++------------------------- 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index dd3183c..5fdf49f 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -69,6 +69,7 @@ pub struct ConnectionStatus { pub next_client_handle: u32, pub client_handle_map: HashMap, // client_handle -> point_id pub monitored_item_map: HashMap, // point_id -> monitored_item_id + pub poll_points: HashSet, // 正在轮询的点集合 } #[derive(Clone)] @@ -78,7 +79,6 @@ pub struct ConnectionManager { point_history_data: Arc>>>, point_write_target_cache: Arc>>, poll_task_handles: Arc>>>, - poll_points_by_source: Arc>>>, pool: Option, event_manager: Option>, } @@ -137,7 +137,6 @@ impl ConnectionManager { point_history_data: Arc::new(RwLock::new(HashMap::new())), point_write_target_cache: Arc::new(RwLock::new(HashMap::new())), poll_task_handles: Arc::new(RwLock::new(HashMap::new())), - poll_points_by_source: Arc::new(RwLock::new(HashMap::new())), event_manager: None, } } @@ -300,11 +299,10 @@ impl ConnectionManager { poll_tasks.insert(point_id, handle); } { - let mut points_by_source = self.poll_points_by_source.write().await; - points_by_source - .entry(source_id) - .or_insert_with(HashSet::new) - .insert(point_id); + let mut status = self.status.write().await; + if let Some(conn_status) = status.get_mut(&source_id) { + conn_status.poll_points.insert(point_id); + } } Ok(()) @@ -315,29 +313,18 @@ impl ConnectionManager { handle.abort(); } - let mut points_by_source = self.poll_points_by_source.write().await; - let source_ids: Vec = points_by_source - .iter() - .filter(|(_, point_set)| point_set.contains(&point_id)) - .map(|(source_id, _)| *source_id) - .collect(); - - for source_id in source_ids { - if let Some(point_set) = points_by_source.get_mut(&source_id) { - point_set.remove(&point_id); - if point_set.is_empty() { - points_by_source.remove(&source_id); - } - } + let mut status = self.status.write().await; + for conn_status in status.values_mut() { + conn_status.poll_points.remove(&point_id); } } async fn stop_polling_for_source(&self, source_id: Uuid) { let point_ids = { - let mut points_by_source = self.poll_points_by_source.write().await; - points_by_source - .remove(&source_id) - .map(|set| set.into_iter().collect::>()) + let mut status = self.status.write().await; + status + .get_mut(&source_id) + .map(|conn_status| conn_status.poll_points.drain().collect::>()) .unwrap_or_default() }; @@ -496,6 +483,7 @@ impl ConnectionManager { next_client_handle: 1000, client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), + poll_points: HashSet::new(), }, ); @@ -516,6 +504,7 @@ impl ConnectionManager { client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), next_client_handle: 1000, + poll_points: HashSet::new(), }, ); }