重构:将 poll_points_by_source 的内层 HashSet 移动到 ConnectionStatus 中,简化数据结构

This commit is contained in:
caoqianming 2026-03-04 11:16:08 +08:00
parent 4bb9bdd27d
commit b22225ad72
1 changed files with 14 additions and 25 deletions

View File

@ -69,6 +69,7 @@ pub struct ConnectionStatus {
pub next_client_handle: u32,
pub client_handle_map: HashMap<u32, Uuid>, // client_handle -> point_id
pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id
pub poll_points: HashSet<Uuid>, // 正在轮询的点集合
}
#[derive(Clone)]
@ -78,7 +79,6 @@ pub struct ConnectionManager {
point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>,
point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>,
poll_task_handles: Arc<RwLock<HashMap<Uuid, JoinHandle<()>>>>,
poll_points_by_source: Arc<RwLock<HashMap<Uuid, HashSet<Uuid>>>>,
pool: Option<sqlx::PgPool>,
event_manager: Option<std::sync::Arc<crate::event::EventManager>>,
}
@ -137,7 +137,6 @@ impl ConnectionManager {
point_history_data: Arc::new(RwLock::new(HashMap::new())),
point_write_target_cache: Arc::new(RwLock::new(HashMap::new())),
poll_task_handles: Arc::new(RwLock::new(HashMap::new())),
poll_points_by_source: Arc::new(RwLock::new(HashMap::new())),
event_manager: None,
}
}
@ -300,11 +299,10 @@ impl ConnectionManager {
poll_tasks.insert(point_id, handle);
}
{
let mut points_by_source = self.poll_points_by_source.write().await;
points_by_source
.entry(source_id)
.or_insert_with(HashSet::new)
.insert(point_id);
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
conn_status.poll_points.insert(point_id);
}
}
Ok(())
@ -315,29 +313,18 @@ impl ConnectionManager {
handle.abort();
}
let mut points_by_source = self.poll_points_by_source.write().await;
let source_ids: Vec<Uuid> = points_by_source
.iter()
.filter(|(_, point_set)| point_set.contains(&point_id))
.map(|(source_id, _)| *source_id)
.collect();
for source_id in source_ids {
if let Some(point_set) = points_by_source.get_mut(&source_id) {
point_set.remove(&point_id);
if point_set.is_empty() {
points_by_source.remove(&source_id);
}
}
let mut status = self.status.write().await;
for conn_status in status.values_mut() {
conn_status.poll_points.remove(&point_id);
}
}
async fn stop_polling_for_source(&self, source_id: Uuid) {
let point_ids = {
let mut points_by_source = self.poll_points_by_source.write().await;
points_by_source
.remove(&source_id)
.map(|set| set.into_iter().collect::<Vec<_>>())
let mut status = self.status.write().await;
status
.get_mut(&source_id)
.map(|conn_status| conn_status.poll_points.drain().collect::<Vec<_>>())
.unwrap_or_default()
};
@ -496,6 +483,7 @@ impl ConnectionManager {
next_client_handle: 1000,
client_handle_map: HashMap::new(),
monitored_item_map: HashMap::new(),
poll_points: HashSet::new(),
},
);
@ -516,6 +504,7 @@ impl ConnectionManager {
client_handle_map: HashMap::new(),
monitored_item_map: HashMap::new(),
next_client_handle: 1000,
poll_points: HashSet::new(),
},
);
}