diff --git a/src/connection.rs b/src/connection.rs index dd3183c..5fdf49f 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -69,6 +69,7 @@ pub struct ConnectionStatus { pub next_client_handle: u32, pub client_handle_map: HashMap, // client_handle -> point_id pub monitored_item_map: HashMap, // point_id -> monitored_item_id + pub poll_points: HashSet, // 正在轮询的点集合 } #[derive(Clone)] @@ -78,7 +79,6 @@ pub struct ConnectionManager { point_history_data: Arc>>>, point_write_target_cache: Arc>>, poll_task_handles: Arc>>>, - poll_points_by_source: Arc>>>, pool: Option, event_manager: Option>, } @@ -137,7 +137,6 @@ impl ConnectionManager { point_history_data: Arc::new(RwLock::new(HashMap::new())), point_write_target_cache: Arc::new(RwLock::new(HashMap::new())), poll_task_handles: Arc::new(RwLock::new(HashMap::new())), - poll_points_by_source: Arc::new(RwLock::new(HashMap::new())), event_manager: None, } } @@ -300,11 +299,10 @@ impl ConnectionManager { poll_tasks.insert(point_id, handle); } { - let mut points_by_source = self.poll_points_by_source.write().await; - points_by_source - .entry(source_id) - .or_insert_with(HashSet::new) - .insert(point_id); + let mut status = self.status.write().await; + if let Some(conn_status) = status.get_mut(&source_id) { + conn_status.poll_points.insert(point_id); + } } Ok(()) @@ -315,29 +313,18 @@ impl ConnectionManager { handle.abort(); } - let mut points_by_source = self.poll_points_by_source.write().await; - let source_ids: Vec = points_by_source - .iter() - .filter(|(_, point_set)| point_set.contains(&point_id)) - .map(|(source_id, _)| *source_id) - .collect(); - - for source_id in source_ids { - if let Some(point_set) = points_by_source.get_mut(&source_id) { - point_set.remove(&point_id); - if point_set.is_empty() { - points_by_source.remove(&source_id); - } - } + let mut status = self.status.write().await; + for conn_status in status.values_mut() { + conn_status.poll_points.remove(&point_id); } } async fn stop_polling_for_source(&self, source_id: Uuid) { let point_ids = { - let mut points_by_source = self.poll_points_by_source.write().await; - points_by_source - .remove(&source_id) - .map(|set| set.into_iter().collect::>()) + let mut status = self.status.write().await; + status + .get_mut(&source_id) + .map(|conn_status| conn_status.poll_points.drain().collect::>()) .unwrap_or_default() }; @@ -496,6 +483,7 @@ impl ConnectionManager { next_client_handle: 1000, client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), + poll_points: HashSet::new(), }, ); @@ -516,6 +504,7 @@ impl ConnectionManager { client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), next_client_handle: 1000, + poll_points: HashSet::new(), }, ); }