perf(connection): reduce subscription lock contention

This commit is contained in:
caoqianming 2026-03-17 08:28:55 +08:00
parent f33d989905
commit 8eb1d6671a
1 changed files with 22 additions and 11 deletions

View File

@ -1221,6 +1221,7 @@ impl ConnectionManager {
let mut items_to_create: Vec<MonitoredItemCreateRequest> = Vec::new(); let mut items_to_create: Vec<MonitoredItemCreateRequest> = Vec::new();
let mut item_points: Vec<PointSubscriptionInfo> = Vec::new(); let mut item_points: Vec<PointSubscriptionInfo> = Vec::new();
let mut pending_client_handles: Vec<(u32, Uuid)> = Vec::new();
for p in points.iter().cloned() { for p in points.iter().cloned() {
let node_id = NodeId::from_str(&p.external_id) let node_id = NodeId::from_str(&p.external_id)
@ -1228,10 +1229,7 @@ impl ConnectionManager {
let client_handle = self.allocate_client_handle(source_id).await let client_handle = self.allocate_client_handle(source_id).await
.ok_or_else(|| format!("Failed to allocate client handle for source {}", source_id))?; .ok_or_else(|| format!("Failed to allocate client handle for source {}", source_id))?;
pending_client_handles.push((client_handle, p.point_id));
if let Some(s) = self.status.write().await.get_mut(&source_id) {
s.client_handle_map.insert(client_handle, p.point_id);
}
let request = MonitoredItemCreateRequest { let request = MonitoredItemCreateRequest {
item_to_monitor: ReadValueId { item_to_monitor: ReadValueId {
@ -1254,6 +1252,12 @@ impl ConnectionManager {
item_points.push(p); item_points.push(p);
} }
if let Some(conn_status) = self.status.write().await.get_mut(&source_id) {
for (client_handle, point_id) in &pending_client_handles {
conn_status.client_handle_map.insert(*client_handle, *point_id);
}
}
let results = match session let results = match session
.create_monitored_items(subscription_id, TimestampsToReturn::Both, items_to_create) .create_monitored_items(subscription_id, TimestampsToReturn::Both, items_to_create)
.await .await
@ -1283,6 +1287,7 @@ impl ConnectionManager {
let mut successfully_subscribed_points = Vec::new(); let mut successfully_subscribed_points = Vec::new();
let mut successfully_subscribed_set: HashSet<Uuid> = HashSet::new(); let mut successfully_subscribed_set: HashSet<Uuid> = HashSet::new();
let mut failed_points: Vec<PointSubscriptionInfo> = Vec::new(); let mut failed_points: Vec<PointSubscriptionInfo> = Vec::new();
let mut successful_monitored_items: Vec<(Uuid, u32)> = Vec::new();
for (i, monitored_item_result) in results.iter().enumerate() { for (i, monitored_item_result) in results.iter().enumerate() {
if i >= item_points.len() { if i >= item_points.len() {
break; break;
@ -1292,13 +1297,10 @@ impl ConnectionManager {
if monitored_item_result.result.status_code.is_good() { if monitored_item_result.result.status_code.is_good() {
successfully_subscribed_points.push(point.point_id); successfully_subscribed_points.push(point.point_id);
successfully_subscribed_set.insert(point.point_id); successfully_subscribed_set.insert(point.point_id);
if let Some(conn_status) = self.status.write().await.get_mut(&source_id) { successful_monitored_items.push((
conn_status point.point_id,
.monitored_item_map monitored_item_result.result.monitored_item_id,
.insert(point.point_id, monitored_item_result.result.monitored_item_id); ));
// 从轮询列表中移除该点
Arc::make_mut(&mut conn_status.poll_points).retain(|p| p.point_id != point.point_id);
}
} else { } else {
tracing::error!( tracing::error!(
"Failed to create monitored item for point {}: {:?}", "Failed to create monitored item for point {}: {:?}",
@ -1322,6 +1324,15 @@ impl ConnectionManager {
failed_points.retain(|p| !successfully_subscribed_set.contains(&p.point_id)); failed_points.retain(|p| !successfully_subscribed_set.contains(&p.point_id));
if let Some(conn_status) = self.status.write().await.get_mut(&source_id) { if let Some(conn_status) = self.status.write().await.get_mut(&source_id) {
for (point_id, monitored_item_id) in &successful_monitored_items {
conn_status
.monitored_item_map
.insert(*point_id, *monitored_item_id);
}
if !successfully_subscribed_set.is_empty() {
Arc::make_mut(&mut conn_status.poll_points)
.retain(|p| !successfully_subscribed_set.contains(&p.point_id));
}
let failed_point_ids: HashSet<Uuid> = failed_points.iter().map(|p| p.point_id).collect(); let failed_point_ids: HashSet<Uuid> = failed_points.iter().map(|p| p.point_id).collect();
conn_status conn_status
.client_handle_map .client_handle_map