diff --git a/src/connection.rs b/src/connection.rs index 5fdf49f..2fa8eee 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -59,7 +59,33 @@ struct PointWriteTarget { external_id: String, } -#[derive(Clone)] +struct PollPointInfo { + handle: JoinHandle<()>, + external_id: String, + interval_s: i32, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ConnectionStatusView { + pub is_connected: bool, + pub last_error: Option, + pub last_time: DateTime, + pub subscription_id: Option, + pub next_client_handle: u32, +} + +impl From<&ConnectionStatus> for ConnectionStatusView { + fn from(status: &ConnectionStatus) -> Self { + Self { + is_connected: status.is_connected, + last_error: status.last_error.clone(), + last_time: status.last_time, + subscription_id: status.subscription_id, + next_client_handle: status.next_client_handle, + } + } +} + pub struct ConnectionStatus { pub session: Option>, pub is_connected: bool, @@ -69,7 +95,7 @@ pub struct ConnectionStatus { pub next_client_handle: u32, pub client_handle_map: HashMap, // client_handle -> point_id pub monitored_item_map: HashMap, // point_id -> monitored_item_id - pub poll_points: HashSet, // 正在轮询的点集合 + pub poll_points: HashMap, // 正在轮询的点集合 } #[derive(Clone)] @@ -78,7 +104,6 @@ pub struct ConnectionManager { point_monitor_data: Arc>>, point_history_data: Arc>>>, point_write_target_cache: Arc>>, - poll_task_handles: Arc>>>, pool: Option, event_manager: Option>, } @@ -136,7 +161,6 @@ impl ConnectionManager { point_monitor_data: Arc::new(RwLock::new(HashMap::new())), point_history_data: Arc::new(RwLock::new(HashMap::new())), point_write_target_cache: Arc::new(RwLock::new(HashMap::new())), - poll_task_handles: Arc::new(RwLock::new(HashMap::new())), event_manager: None, } } @@ -217,9 +241,11 @@ impl ConnectionManager { .ok_or_else(|| "Event manager is not initialized".to_string())?; { - let poll_tasks = self.poll_task_handles.read().await; - if poll_tasks.contains_key(&point.point_id) { - return Ok(()); + let status = self.status.read().await; + if let Some(conn_status) = status.get(&source_id) { + if conn_status.poll_points.contains_key(&point.point_id) { + return Ok(()); + } } } @@ -294,14 +320,17 @@ impl ConnectionManager { } }); - { - let mut poll_tasks = self.poll_task_handles.write().await; - poll_tasks.insert(point_id, handle); - } { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { - conn_status.poll_points.insert(point_id); + conn_status.poll_points.insert( + point_id, + PollPointInfo { + handle, + external_id: point.external_id.clone(), + interval_s: point.scan_interval_s, + }, + ); } } @@ -309,18 +338,16 @@ impl ConnectionManager { } async fn stop_polling_for_point(&self, point_id: Uuid) { - if let Some(handle) = self.poll_task_handles.write().await.remove(&point_id) { - handle.abort(); - } - let mut status = self.status.write().await; for conn_status in status.values_mut() { - conn_status.poll_points.remove(&point_id); + if let Some(poll_info) = conn_status.poll_points.remove(&point_id) { + poll_info.handle.abort(); + } } } async fn stop_polling_for_source(&self, source_id: Uuid) { - let point_ids = { + let poll_infos = { let mut status = self.status.write().await; status .get_mut(&source_id) @@ -328,15 +355,12 @@ impl ConnectionManager { .unwrap_or_default() }; - if point_ids.is_empty() { + if poll_infos.is_empty() { return; } - let mut poll_tasks = self.poll_task_handles.write().await; - for point_id in point_ids { - if let Some(handle) = poll_tasks.remove(&point_id) { - handle.abort(); - } + for (_, poll_info) in poll_infos { + poll_info.handle.abort(); } } @@ -483,7 +507,7 @@ impl ConnectionManager { next_client_handle: 1000, client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), - poll_points: HashSet::new(), + poll_points: HashMap::new(), }, ); @@ -504,7 +528,7 @@ impl ConnectionManager { client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), next_client_handle: 1000, - poll_points: HashSet::new(), + poll_points: HashMap::new(), }, ); } @@ -566,14 +590,14 @@ impl ConnectionManager { } } - pub async fn get_status(&self, source_id: Uuid) -> Option { + pub async fn get_status(&self, source_id: Uuid) -> Option { let status = self.status.read().await; - status.get(&source_id).cloned() + status.get(&source_id).map(ConnectionStatusView::from) } - pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatus)> { + pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatusView)> { let status = self.status.read().await; - status.iter().map(|(source_id, conn_status)| (*source_id, conn_status.clone())).collect() + status.iter().map(|(source_id, conn_status)| (*source_id, ConnectionStatusView::from(conn_status))).collect() } pub async fn write_point_values_batch(