From a6be0827d02fc092280951b69c72fde3cabba844 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 5 Mar 2026 09:30:30 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=BB=9F=E4=B8=80=E8=BD=AE?= =?UTF-8?q?=E8=AF=A2=E4=BB=BB=E5=8A=A1=E5=AE=9E=E7=8E=B0=EF=BC=8C=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=20scan=5Finterval=5Fs=20=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../20260224070000_remove_scan_interval_s.sql | 2 + src/connection.rs | 288 +++++++++--------- src/model.rs | 2 - src/service.rs | 11 +- 4 files changed, 153 insertions(+), 150 deletions(-) create mode 100644 migrations/20260224070000_remove_scan_interval_s.sql diff --git a/migrations/20260224070000_remove_scan_interval_s.sql b/migrations/20260224070000_remove_scan_interval_s.sql new file mode 100644 index 0000000..83b045b --- /dev/null +++ b/migrations/20260224070000_remove_scan_interval_s.sql @@ -0,0 +1,2 @@ +-- 移除 scan_interval_s 字段,因为现在使用统一的轮询任务 +ALTER TABLE point DROP COLUMN scan_interval_s; diff --git a/src/connection.rs b/src/connection.rs index e2e3e0f..b9a0bac 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -59,8 +59,10 @@ struct PointWriteTarget { external_id: String, } +#[derive(Debug, Clone)] pub struct PollPointInfo { - handle: JoinHandle<()>, + pub point_id: Uuid, + pub external_id: String, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -93,7 +95,8 @@ pub struct ConnectionStatus { pub next_client_handle: u32, pub client_handle_map: HashMap, // client_handle -> point_id pub monitored_item_map: HashMap, // point_id -> monitored_item_id - pub poll_points: HashMap, // 正在轮询的点集合 + pub poll_points: Vec, // 正在轮询的点集合 + poll_handle: Option>, // 统一的轮询任务句柄 } #[derive(Clone)] @@ -216,101 +219,109 @@ impl ConnectionManager { self.point_monitor_data.read().await } - async fn start_polling_for_point( - &self, - source_id: Uuid, - point: PointSubscriptionInfo, - session: Arc, - ) -> Result<(), String> { - let interval_s = point.scan_interval_s; - if interval_s <= 0 { - return Err(format!( - "Point {} has invalid scan_interval_s {}", - point.point_id, point.scan_interval_s - )); - } - - let node_id = NodeId::from_str(&point.external_id) - .map_err(|e| format!("Invalid node id {}: {}", point.external_id, e))?; - - let event_manager = self - .event_manager - .clone() - .ok_or_else(|| "Event manager is not initialized".to_string())?; + async fn start_unified_poll_task(&self, source_id: Uuid, session: Arc) { + let event_manager = match self.event_manager.clone() { + Some(em) => em, + None => { + tracing::warn!("Event manager is not initialized, cannot start unified poll task"); + return; + } + }; + // 停止旧的轮询任务 { - let status = self.status.read().await; - if let Some(conn_status) = status.get(&source_id) { - if conn_status.poll_points.contains_key(&point.point_id) { - return Ok(()); + let mut status = self.status.write().await; + if let Some(conn_status) = status.get_mut(&source_id) { + if let Some(handle) = conn_status.poll_handle.take() { + handle.abort(); } } } - let point_id = point.point_id; - let external_id = point.external_id.clone(); - let interval_sec_u64 = u64::try_from(interval_s) - .map_err(|_| format!("Invalid scan_interval_s {} for point {}", interval_s, point_id))?; + tracing::info!( + "Starting unified poll task for source {}", + source_id + ); + // 克隆 status 引用,以便在异步任务中使用 + let status_ref = self.status.clone(); + + // 启动新的轮询任务 let handle = tokio::spawn(async move { - let mut ticker = tokio::time::interval(Duration::from_secs(interval_sec_u64)); + let mut ticker = tokio::time::interval(Duration::from_secs(1)); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { ticker.tick().await; - let read_request = ReadValueId { - node_id: node_id.clone(), - attribute_id: AttributeId::Value as u32, - index_range: NumericRange::None, - data_encoding: Default::default(), + // 在任务内部获取轮询点列表 + let poll_points = { + let status = status_ref.read().await; + status.get(&source_id) + .map(|conn_status| conn_status.poll_points.clone()) + .unwrap_or_default() }; - match session - .read(&[read_request], TimestampsToReturn::Both, 0f64) - .await - { - Ok(result) if !result.is_empty() => { - let dv = &result[0]; - let val = dv.value.clone(); - let unified_value = - val.as_ref().map(crate::telemetry::opcua_variant_to_data); - let unified_value_type = - val.as_ref().map(crate::telemetry::opcua_variant_type); - let unified_value_text = val.as_ref().map(|v| v.to_string()); - let quality = dv - .status - .as_ref() - .map(crate::telemetry::PointQuality::from_status_code) - .unwrap_or(crate::telemetry::PointQuality::Unknown); + if poll_points.is_empty() { + continue; + } - let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( - crate::telemetry::PointNewValue { - source_id, - point_id: Some(point_id), - client_handle: 0, - value: unified_value, - value_type: unified_value_type, - value_text: unified_value_text, - quality, - protocol: "opcua".to_string(), - timestamp: Some(Utc::now()), - scan_mode: ScanMode::Poll, - }, - )); - } - Ok(_) => { - tracing::warn!( - "Poll read returned empty result for point {} node {}", - point_id, - external_id - ); + // 构建批量读取请求 + let read_requests: Vec = poll_points + .iter() + .filter_map(|p| { + NodeId::from_str(&p.external_id).ok().map(|node_id| ReadValueId { + node_id, + attribute_id: AttributeId::Value as u32, + index_range: NumericRange::None, + data_encoding: Default::default(), + }) + }) + .collect(); + + if read_requests.is_empty() { + continue; + } + + // 执行批量读取 + match session.read(&read_requests, TimestampsToReturn::Both, 0f64).await { + Ok(results) => { + for (i, result) in results.iter().enumerate() { + if i >= poll_points.len() { + break; + } + + let poll_point = &poll_points[i]; + let dv = result; + let val = dv.value.clone(); + let unified_value = val.as_ref().map(crate::telemetry::opcua_variant_to_data); + let unified_value_type = val.as_ref().map(crate::telemetry::opcua_variant_type); + let unified_value_text = val.as_ref().map(|v| v.to_string()); + let quality = dv.status + .as_ref() + .map(crate::telemetry::PointQuality::from_status_code) + .unwrap_or(crate::telemetry::PointQuality::Unknown); + + let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( + crate::telemetry::PointNewValue { + source_id, + point_id: Some(poll_point.point_id), + client_handle: 0, + value: unified_value, + value_type: unified_value_type, + value_text: unified_value_text, + quality, + protocol: "opcua".to_string(), + timestamp: Some(Utc::now()), + scan_mode: ScanMode::Poll, + }, + )); + } } Err(e) => { tracing::warn!( - "Poll read failed for point {} node {}: {:?}", - point_id, - external_id, + "Unified poll read failed for source {}: {:?}", + source_id, e ); } @@ -318,43 +329,10 @@ impl ConnectionManager { } }); - { - let mut status = self.status.write().await; - if let Some(conn_status) = status.get_mut(&source_id) { - conn_status.poll_points.insert( - point_id, - PollPointInfo { handle }, - ); - } - } - - Ok(()) - } - - async fn stop_polling_for_point(&self, point_id: Uuid) { + // 保存轮询任务句柄 let mut status = self.status.write().await; - for conn_status in status.values_mut() { - if let Some(poll_info) = conn_status.poll_points.remove(&point_id) { - poll_info.handle.abort(); - } - } - } - - async fn stop_polling_for_source(&self, source_id: Uuid) { - let poll_infos = { - let mut status = self.status.write().await; - status - .get_mut(&source_id) - .map(|conn_status| conn_status.poll_points.drain().collect::>()) - .unwrap_or_default() - }; - - if poll_infos.is_empty() { - return; - } - - for (_, poll_info) in poll_infos { - poll_info.handle.abort(); + if let Some(conn_status) = status.get_mut(&source_id) { + conn_status.poll_handle = Some(handle); } } @@ -365,28 +343,33 @@ impl ConnectionManager { session: Arc, ) -> usize { let mut started = 0usize; - for point in points.iter().cloned() { - match self - .start_polling_for_point(source_id, point.clone(), session.clone()) - .await - { - Ok(()) => { - started += 1; - tracing::info!( - "Point {} switched to poll mode with scan_interval_s {}", - point.point_id, - point.scan_interval_s - ); - } - Err(e) => { - tracing::warn!( - "Point {} cannot switch to poll mode: {}", - point.point_id, - e - ); + + // 添加新的轮询点 + { + let mut status = self.status.write().await; + if let Some(conn_status) = status.get_mut(&source_id) { + for point in points { + // 检查点是否已经在轮询列表中 + if !conn_status.poll_points.iter().any(|p| p.point_id == point.point_id) { + conn_status.poll_points.push(PollPointInfo { + point_id: point.point_id, + external_id: point.external_id.clone(), + }); + started += 1; + tracing::info!( + "Point {} switched to poll mode", + point.point_id + ); + } } } } + + // 如果有新的轮询点,启动或重启统一的轮询任务 + if started > 0 { + self.start_unified_poll_task(source_id, session).await; + } + started } @@ -501,7 +484,8 @@ impl ConnectionManager { next_client_handle: 1000, client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), - poll_points: HashMap::new(), + poll_points: Vec::new(), + poll_handle: None, }, ); @@ -522,12 +506,22 @@ impl ConnectionManager { client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), next_client_handle: 1000, - poll_points: HashMap::new(), + poll_points: Vec::new(), + poll_handle: None, }, ); } pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> { - self.stop_polling_for_source(source_id).await; + // 停止轮询任务并清空轮询点列表 + { + let mut status = self.status.write().await; + if let Some(conn_status) = status.get_mut(&source_id) { + conn_status.poll_points.clear(); + if let Some(handle) = conn_status.poll_handle.take() { + handle.abort(); + } + } + } let conn_status = self.status.write().await.remove(&source_id); if let Some(conn_status) = conn_status { @@ -546,7 +540,16 @@ impl ConnectionManager { let source_ids: Vec = self.status.read().await.keys().copied().collect(); for source_id in source_ids { - self.stop_polling_for_source(source_id).await; + // 停止轮询任务并清空轮询点列表 + { + let mut status = self.status.write().await; + if let Some(conn_status) = status.get_mut(&source_id) { + conn_status.poll_points.clear(); + if let Some(handle) = conn_status.poll_handle.take() { + handle.abort(); + } + } + } let conn_status = self.status.write().await.remove(&source_id); if let Some(conn_status) = conn_status { @@ -1020,8 +1023,9 @@ impl ConnectionManager { conn_status .monitored_item_map .insert(point.point_id, monitored_item_result.result.monitored_item_id); + // 从轮询列表中移除该点 + conn_status.poll_points.retain(|p| p.point_id != point.point_id); } - self.stop_polling_for_point(point.point_id).await; } else { tracing::error!( "Failed to create monitored item for point {}: {:?}", @@ -1144,8 +1148,12 @@ impl ConnectionManager { .retain(|_, point_id| !removed_set.contains(point_id)); } - for point_id in &removed_point_ids { - self.stop_polling_for_point(*point_id).await; + // 从轮询列表中移除已取消订阅的点 + { + let mut status = self.status.write().await; + if let Some(conn_status) = status.get_mut(&source_id) { + conn_status.poll_points.retain(|p| !removed_set.contains(&p.point_id)); + } } let _ = self .remove_point_write_target_cache_by_point_ids(&removed_point_ids) diff --git a/src/model.rs b/src/model.rs index a13e1e9..d7b95f8 100644 --- a/src/model.rs +++ b/src/model.rs @@ -92,7 +92,6 @@ pub struct Point { pub name: String, pub description: Option, pub unit: Option, - pub scan_interval_s: i32, // s pub tag_id: Option, #[serde(serialize_with = "utc_to_local_str")] pub created_at: DateTime, @@ -104,7 +103,6 @@ pub struct Point { pub struct PointSubscriptionInfo { pub point_id: Uuid, pub external_id: String, - pub scan_interval_s: i32, } #[derive(Debug, Serialize, Deserialize, FromRow, Clone)] diff --git a/src/service.rs b/src/service.rs index d076cfe..f5bcdc9 100644 --- a/src/service.rs +++ b/src/service.rs @@ -30,8 +30,7 @@ pub async fn get_points_grouped_by_source( SELECT p.id as point_id, n.source_id, - n.external_id, - p.scan_interval_s + n.external_id FROM point p INNER JOIN node n ON p.node_id = n.id WHERE p.id = ANY($1) @@ -54,7 +53,6 @@ pub async fn get_points_grouped_by_source( let info = PointSubscriptionInfo { point_id, external_id: row.get("external_id"), - scan_interval_s: row.get("scan_interval_s"), }; result.entry(source_id).or_default().push(info); @@ -73,8 +71,7 @@ pub async fn get_points_with_ids( r#" SELECT p.id as point_id, - n.external_id, - p.scan_interval_s + n.external_id FROM point p INNER JOIN node n ON p.node_id = n.id WHERE n.source_id = $1 @@ -89,8 +86,7 @@ pub async fn get_points_with_ids( r#" SELECT p.id as point_id, - n.external_id, - p.scan_interval_s + n.external_id FROM point p INNER JOIN node n ON p.node_id = n.id WHERE n.source_id = $1 @@ -110,7 +106,6 @@ pub async fn get_points_with_ids( .map(|row| PointSubscriptionInfo { point_id: row.get("point_id"), external_id: row.get("external_id"), - scan_interval_s: row.get("scan_interval_s"), }) .collect()) }