diff --git a/src/connection.rs b/src/connection.rs index 14d0f80..d09ac48 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1221,6 +1221,7 @@ impl ConnectionManager { let mut items_to_create: Vec = Vec::new(); let mut item_points: Vec = Vec::new(); + let mut pending_client_handles: Vec<(u32, Uuid)> = Vec::new(); for p in points.iter().cloned() { let node_id = NodeId::from_str(&p.external_id) @@ -1228,10 +1229,7 @@ impl ConnectionManager { let client_handle = self.allocate_client_handle(source_id).await .ok_or_else(|| format!("Failed to allocate client handle for source {}", source_id))?; - - if let Some(s) = self.status.write().await.get_mut(&source_id) { - s.client_handle_map.insert(client_handle, p.point_id); - } + pending_client_handles.push((client_handle, p.point_id)); let request = MonitoredItemCreateRequest { item_to_monitor: ReadValueId { @@ -1254,6 +1252,12 @@ impl ConnectionManager { item_points.push(p); } + if let Some(conn_status) = self.status.write().await.get_mut(&source_id) { + for (client_handle, point_id) in &pending_client_handles { + conn_status.client_handle_map.insert(*client_handle, *point_id); + } + } + let results = match session .create_monitored_items(subscription_id, TimestampsToReturn::Both, items_to_create) .await @@ -1283,6 +1287,7 @@ impl ConnectionManager { let mut successfully_subscribed_points = Vec::new(); let mut successfully_subscribed_set: HashSet = HashSet::new(); let mut failed_points: Vec = Vec::new(); + let mut successful_monitored_items: Vec<(Uuid, u32)> = Vec::new(); for (i, monitored_item_result) in results.iter().enumerate() { if i >= item_points.len() { break; @@ -1292,13 +1297,10 @@ impl ConnectionManager { if monitored_item_result.result.status_code.is_good() { successfully_subscribed_points.push(point.point_id); successfully_subscribed_set.insert(point.point_id); - if let Some(conn_status) = self.status.write().await.get_mut(&source_id) { - conn_status - .monitored_item_map - .insert(point.point_id, monitored_item_result.result.monitored_item_id); - // 从轮询列表中移除该点 - Arc::make_mut(&mut conn_status.poll_points).retain(|p| p.point_id != point.point_id); - } + successful_monitored_items.push(( + point.point_id, + monitored_item_result.result.monitored_item_id, + )); } else { tracing::error!( "Failed to create monitored item for point {}: {:?}", @@ -1322,6 +1324,15 @@ impl ConnectionManager { failed_points.retain(|p| !successfully_subscribed_set.contains(&p.point_id)); if let Some(conn_status) = self.status.write().await.get_mut(&source_id) { + for (point_id, monitored_item_id) in &successful_monitored_items { + conn_status + .monitored_item_map + .insert(*point_id, *monitored_item_id); + } + if !successfully_subscribed_set.is_empty() { + Arc::make_mut(&mut conn_status.poll_points) + .retain(|p| !successfully_subscribed_set.contains(&p.point_id)); + } let failed_point_ids: HashSet = failed_points.iter().map(|p| p.point_id).collect(); conn_status .client_handle_map