重构:将 poll_task_handles 和 poll_points 合并为一个双向映射,使用 PollPointInfo 结构

This commit is contained in:
caoqianming 2026-03-04 11:24:13 +08:00
parent b22225ad72
commit 475ac02322
1 changed files with 54 additions and 30 deletions

View File

@ -59,7 +59,33 @@ struct PointWriteTarget {
external_id: String, external_id: String,
} }
#[derive(Clone)] struct PollPointInfo {
handle: JoinHandle<()>,
external_id: String,
interval_s: i32,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ConnectionStatusView {
pub is_connected: bool,
pub last_error: Option<String>,
pub last_time: DateTime<Utc>,
pub subscription_id: Option<u32>,
pub next_client_handle: u32,
}
impl From<&ConnectionStatus> for ConnectionStatusView {
fn from(status: &ConnectionStatus) -> Self {
Self {
is_connected: status.is_connected,
last_error: status.last_error.clone(),
last_time: status.last_time,
subscription_id: status.subscription_id,
next_client_handle: status.next_client_handle,
}
}
}
pub struct ConnectionStatus { pub struct ConnectionStatus {
pub session: Option<Arc<Session>>, pub session: Option<Arc<Session>>,
pub is_connected: bool, pub is_connected: bool,
@ -69,7 +95,7 @@ pub struct ConnectionStatus {
pub next_client_handle: u32, pub next_client_handle: u32,
pub client_handle_map: HashMap<u32, Uuid>, // client_handle -> point_id pub client_handle_map: HashMap<u32, Uuid>, // client_handle -> point_id
pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id
pub poll_points: HashSet<Uuid>, // 正在轮询的点集合 pub poll_points: HashMap<Uuid, PollPointInfo>, // 正在轮询的点集合
} }
#[derive(Clone)] #[derive(Clone)]
@ -78,7 +104,6 @@ pub struct ConnectionManager {
point_monitor_data: Arc<RwLock<HashMap<Uuid, PointMonitorInfo>>>, point_monitor_data: Arc<RwLock<HashMap<Uuid, PointMonitorInfo>>>,
point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>, point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>,
point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>, point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>,
poll_task_handles: Arc<RwLock<HashMap<Uuid, JoinHandle<()>>>>,
pool: Option<sqlx::PgPool>, pool: Option<sqlx::PgPool>,
event_manager: Option<std::sync::Arc<crate::event::EventManager>>, event_manager: Option<std::sync::Arc<crate::event::EventManager>>,
} }
@ -136,7 +161,6 @@ impl ConnectionManager {
point_monitor_data: Arc::new(RwLock::new(HashMap::new())), point_monitor_data: Arc::new(RwLock::new(HashMap::new())),
point_history_data: Arc::new(RwLock::new(HashMap::new())), point_history_data: Arc::new(RwLock::new(HashMap::new())),
point_write_target_cache: Arc::new(RwLock::new(HashMap::new())), point_write_target_cache: Arc::new(RwLock::new(HashMap::new())),
poll_task_handles: Arc::new(RwLock::new(HashMap::new())),
event_manager: None, event_manager: None,
} }
} }
@ -217,11 +241,13 @@ impl ConnectionManager {
.ok_or_else(|| "Event manager is not initialized".to_string())?; .ok_or_else(|| "Event manager is not initialized".to_string())?;
{ {
let poll_tasks = self.poll_task_handles.read().await; let status = self.status.read().await;
if poll_tasks.contains_key(&point.point_id) { if let Some(conn_status) = status.get(&source_id) {
if conn_status.poll_points.contains_key(&point.point_id) {
return Ok(()); return Ok(());
} }
} }
}
let point_id = point.point_id; let point_id = point.point_id;
let external_id = point.external_id.clone(); let external_id = point.external_id.clone();
@ -294,14 +320,17 @@ impl ConnectionManager {
} }
}); });
{
let mut poll_tasks = self.poll_task_handles.write().await;
poll_tasks.insert(point_id, handle);
}
{ {
let mut status = self.status.write().await; let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) { if let Some(conn_status) = status.get_mut(&source_id) {
conn_status.poll_points.insert(point_id); conn_status.poll_points.insert(
point_id,
PollPointInfo {
handle,
external_id: point.external_id.clone(),
interval_s: point.scan_interval_s,
},
);
} }
} }
@ -309,18 +338,16 @@ impl ConnectionManager {
} }
async fn stop_polling_for_point(&self, point_id: Uuid) { async fn stop_polling_for_point(&self, point_id: Uuid) {
if let Some(handle) = self.poll_task_handles.write().await.remove(&point_id) {
handle.abort();
}
let mut status = self.status.write().await; let mut status = self.status.write().await;
for conn_status in status.values_mut() { for conn_status in status.values_mut() {
conn_status.poll_points.remove(&point_id); if let Some(poll_info) = conn_status.poll_points.remove(&point_id) {
poll_info.handle.abort();
}
} }
} }
async fn stop_polling_for_source(&self, source_id: Uuid) { async fn stop_polling_for_source(&self, source_id: Uuid) {
let point_ids = { let poll_infos = {
let mut status = self.status.write().await; let mut status = self.status.write().await;
status status
.get_mut(&source_id) .get_mut(&source_id)
@ -328,15 +355,12 @@ impl ConnectionManager {
.unwrap_or_default() .unwrap_or_default()
}; };
if point_ids.is_empty() { if poll_infos.is_empty() {
return; return;
} }
let mut poll_tasks = self.poll_task_handles.write().await; for (_, poll_info) in poll_infos {
for point_id in point_ids { poll_info.handle.abort();
if let Some(handle) = poll_tasks.remove(&point_id) {
handle.abort();
}
} }
} }
@ -483,7 +507,7 @@ impl ConnectionManager {
next_client_handle: 1000, next_client_handle: 1000,
client_handle_map: HashMap::new(), client_handle_map: HashMap::new(),
monitored_item_map: HashMap::new(), monitored_item_map: HashMap::new(),
poll_points: HashSet::new(), poll_points: HashMap::new(),
}, },
); );
@ -504,7 +528,7 @@ impl ConnectionManager {
client_handle_map: HashMap::new(), client_handle_map: HashMap::new(),
monitored_item_map: HashMap::new(), monitored_item_map: HashMap::new(),
next_client_handle: 1000, next_client_handle: 1000,
poll_points: HashSet::new(), poll_points: HashMap::new(),
}, },
); );
} }
@ -566,14 +590,14 @@ impl ConnectionManager {
} }
} }
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatus> { pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> {
let status = self.status.read().await; let status = self.status.read().await;
status.get(&source_id).cloned() status.get(&source_id).map(ConnectionStatusView::from)
} }
pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatus)> { pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatusView)> {
let status = self.status.read().await; let status = self.status.read().await;
status.iter().map(|(source_id, conn_status)| (*source_id, conn_status.clone())).collect() status.iter().map(|(source_id, conn_status)| (*source_id, ConnectionStatusView::from(conn_status))).collect()
} }
pub async fn write_point_values_batch( pub async fn write_point_values_batch(