refactor: 添加 allocate_client_handle 函数封装句柄分配逻辑
This commit is contained in:
parent
a2208e8958
commit
b197607d5f
|
|
@ -717,6 +717,17 @@ impl ConnectionManager {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn allocate_client_handle(&self, source_id: Uuid) -> Option<u32> {
|
||||||
|
let mut status = self.status.write().await;
|
||||||
|
if let Some(conn) = status.get_mut(&source_id) {
|
||||||
|
let handle = conn.next_client_handle;
|
||||||
|
conn.next_client_handle += 1;
|
||||||
|
Some(handle)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> {
|
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> {
|
||||||
let status = self.status.read().await;
|
let status = self.status.read().await;
|
||||||
status.get(&source_id).map(ConnectionStatusView::from)
|
status.get(&source_id).map(ConnectionStatusView::from)
|
||||||
|
|
@ -1069,12 +1080,6 @@ impl ConnectionManager {
|
||||||
}
|
}
|
||||||
let subscription_id = subscription_id.unwrap_or_default();
|
let subscription_id = subscription_id.unwrap_or_default();
|
||||||
|
|
||||||
let mut client_handle_seed: u32 = self
|
|
||||||
.status
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.get(&source_id)
|
|
||||||
.map_or(1000, |s| s.next_client_handle);
|
|
||||||
let mut items_to_create: Vec<MonitoredItemCreateRequest> = Vec::new();
|
let mut items_to_create: Vec<MonitoredItemCreateRequest> = Vec::new();
|
||||||
let mut item_points: Vec<PointSubscriptionInfo> = Vec::new();
|
let mut item_points: Vec<PointSubscriptionInfo> = Vec::new();
|
||||||
|
|
||||||
|
|
@ -1082,8 +1087,8 @@ impl ConnectionManager {
|
||||||
let node_id = NodeId::from_str(&p.external_id)
|
let node_id = NodeId::from_str(&p.external_id)
|
||||||
.map_err(|e| format!("Invalid node id {}: {}", p.external_id, e))?;
|
.map_err(|e| format!("Invalid node id {}: {}", p.external_id, e))?;
|
||||||
|
|
||||||
let client_handle = client_handle_seed;
|
let client_handle = self.allocate_client_handle(source_id).await
|
||||||
client_handle_seed += 1;
|
.ok_or_else(|| format!("Failed to allocate client handle for source {}", source_id))?;
|
||||||
|
|
||||||
if let Some(s) = self.status.write().await.get_mut(&source_id) {
|
if let Some(s) = self.status.write().await.get_mut(&source_id) {
|
||||||
s.client_handle_map.insert(client_handle, p.point_id);
|
s.client_handle_map.insert(client_handle, p.point_id);
|
||||||
|
|
@ -1127,7 +1132,6 @@ impl ConnectionManager {
|
||||||
conn_status
|
conn_status
|
||||||
.client_handle_map
|
.client_handle_map
|
||||||
.retain(|_, point_id| !item_point_ids.contains(point_id));
|
.retain(|_, point_id| !item_point_ids.contains(point_id));
|
||||||
conn_status.next_client_handle = client_handle_seed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let polled_count = self
|
let polled_count = self
|
||||||
|
|
@ -1183,7 +1187,6 @@ impl ConnectionManager {
|
||||||
conn_status
|
conn_status
|
||||||
.client_handle_map
|
.client_handle_map
|
||||||
.retain(|_, point_id| !failed_point_ids.contains(point_id));
|
.retain(|_, point_id| !failed_point_ids.contains(point_id));
|
||||||
conn_status.next_client_handle = client_handle_seed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let polled_count = self
|
let polled_count = self
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue