use chrono::{DateTime, Utc}; use opcua::{ client::{ClientBuilder, IdentityToken, Session}, crypto::SecurityPolicy, types::{ AttributeId, MessageSecurityMode, MonitoredItemCreateRequest, MonitoringMode, MonitoringParameters, NodeId, NumericRange, ReadValueId, TimestampsToReturn, Variant, WriteValue, DataValue as OpcuaDataValue, UserTokenPolicy, }, }; use std::{ collections::{HashMap, HashSet, VecDeque}, str::FromStr, sync::Arc, time::Duration, }; use tokio::task::JoinHandle; use tokio::sync::RwLock; use uuid::Uuid; use crate::{ model::{PointSubscriptionInfo, ScanMode}, telemetry::PointMonitorInfo, }; const DEFAULT_POINT_RING_BUFFER_LEN: usize = 1000; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct SetPointValueReqItem { pub point_id: Uuid, pub value: serde_json::Value, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct BatchSetPointValueReq { pub items: Vec, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct SetPointValueResItem { pub point_id: Uuid, pub success: bool, pub err_msg: Option, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct BatchSetPointValueRes { pub success: bool, pub err_msg: Option, pub success_count: usize, pub failed_count: usize, pub results: Vec, } #[derive(Debug, Clone)] struct PointWriteTarget { source_id: Uuid, external_id: String, } #[derive(Debug, Clone)] pub struct PollPointInfo { pub point_id: Uuid, pub external_id: String, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ConnectionStatusView { pub is_connected: bool, pub last_error: Option, pub last_time: DateTime, pub subscription_id: Option, pub next_client_handle: u32, } impl From<&ConnectionStatus> for ConnectionStatusView { fn from(status: &ConnectionStatus) -> Self { Self { is_connected: status.is_connected, last_error: status.last_error.clone(), last_time: status.last_time, subscription_id: status.subscription_id, next_client_handle: status.next_client_handle, } } } pub struct ConnectionStatus { pub session: Option>, pub is_connected: bool, pub last_error: Option, pub last_time: DateTime, pub subscription_id: Option, pub next_client_handle: u32, pub client_handle_map: HashMap, // client_handle -> point_id pub monitored_item_map: HashMap, // point_id -> monitored_item_id pub poll_points: Vec, // 正在轮询的点集合 poll_handle: Option>, // 统一的轮询任务句柄 heartbeat_handle: Option>, // 心跳任务句柄 } #[derive(Clone)] pub struct ConnectionManager { status: Arc>>, point_monitor_data: Arc>>, point_history_data: Arc>>>, point_write_target_cache: Arc>>, event_manager: Option>, pool: Option>, reconnect_tx: Option>, reconnect_rx: Arc>>>, reconnecting: Arc>>, } impl ConnectionManager { fn subscription_result(subscribed: usize, polled: usize) -> HashMap { let mut result = HashMap::new(); result.insert("subscribed".to_string(), subscribed); result.insert("polled".to_string(), polled); result.insert("total".to_string(), subscribed + polled); result } fn json_value_to_opcua_variant(value: &serde_json::Value) -> Result { match value { serde_json::Value::Null => Ok(Variant::Empty), serde_json::Value::Bool(v) => Ok(Variant::from(*v)), serde_json::Value::Number(n) => { if let Some(v) = n.as_i64() { Ok(Variant::from(v)) } else if let Some(v) = n.as_u64() { Ok(Variant::from(v)) } else if let Some(v) = n.as_f64() { Ok(Variant::from(v)) } else { Err("Unsupported numeric value".to_string()) } } serde_json::Value::String(s) => Ok(Variant::from(s.clone())), _ => Err("Only null/bool/number/string are supported for point write".to_string()), } } fn write_value_batch_result( success: bool, err_msg: Option, results: Vec, ) -> BatchSetPointValueRes { let success_count = results.iter().filter(|r| r.success).count(); let failed_count = results.len().saturating_sub(success_count); BatchSetPointValueRes { success, err_msg, success_count, failed_count, results, } } pub fn new() -> Self { let (reconnect_tx, reconnect_rx) = tokio::sync::mpsc::unbounded_channel::(); Self { status: Arc::new(RwLock::new(HashMap::new())), point_monitor_data: Arc::new(RwLock::new(HashMap::new())), point_history_data: Arc::new(RwLock::new(HashMap::new())), point_write_target_cache: Arc::new(RwLock::new(HashMap::new())), event_manager: None, pool: None, reconnect_tx: Some(reconnect_tx), reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))), reconnecting: Arc::new(RwLock::new(HashSet::new())), } } pub fn set_event_manager(&mut self, event_manager: std::sync::Arc) { self.event_manager = Some(event_manager); } pub fn set_pool(&mut self, pool: Arc) { self.pool = Some(pool); } pub fn set_pool_and_start_reconnect_task(&mut self, pool: Arc) { self.pool = Some(pool.clone()); // 将 self 转换为不可变引用以调用 start_reconnect_task let manager = self.clone(); manager.start_reconnect_task(); } pub fn set_reconnect_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender) { self.reconnect_tx = Some(tx); } pub fn get_reconnect_rx(&self) -> Option> { self.reconnect_rx.lock().unwrap().take() } pub fn start_reconnect_task(&self) { let manager = self.clone(); let pool = manager.pool.clone(); tokio::spawn(async move { // 获取重连通道的接收端 let mut reconnect_rx = manager.get_reconnect_rx().expect("Failed to get reconnect receiver"); while let Some(source_id) = reconnect_rx.recv().await { tracing::info!("Received reconnect request for source {}", source_id); if let Some(ref pool) = pool { if let Err(e) = manager.reconnect(pool, source_id).await { tracing::error!("Failed to reconnect source {}: {}", source_id, e); } } else { tracing::warn!("Pool not available for reconnection of source {}", source_id); } } }); } pub async fn remove_point_write_target_cache_by_point_ids( &self, point_ids: &[Uuid], ) -> usize { if point_ids.is_empty() { return 0; } let mut removed = 0usize; let mut cache = self.point_write_target_cache.write().await; for point_id in point_ids { if cache.remove(point_id).is_some() { removed += 1; } } removed } pub async fn update_point_monitor_data( &self, sample: PointMonitorInfo, ) -> Result<(), String> { let point_id = sample.point_id; let mut data = self.point_monitor_data.write().await; data.insert(point_id, sample.clone()); let mut history_data = self.point_history_data.write().await; let ring = history_data .entry(point_id) .or_insert_with(|| VecDeque::with_capacity(DEFAULT_POINT_RING_BUFFER_LEN)); ring.push_back(sample); if ring.len() > DEFAULT_POINT_RING_BUFFER_LEN { let _ = ring.pop_front(); } Ok(()) } pub async fn get_status_read_guard(&self) -> tokio::sync::RwLockReadGuard<'_, HashMap> { self.status.read().await } pub async fn get_point_monitor_data_read_guard( &self, ) -> tokio::sync::RwLockReadGuard<'_, HashMap> { self.point_monitor_data.read().await } async fn start_heartbeat_task(&self, source_id: Uuid) { let manager = self.clone(); let handle = tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_secs(30)); // 每30秒检测一次心跳 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { ticker.tick().await; // 检查session是否有效 let session = manager.get_session(source_id).await; let session_valid = if let Some(session) = session { // 尝试读取当前时间来验证连接 let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点 let read_request = ReadValueId { node_id, attribute_id: AttributeId::Value as u32, index_range: NumericRange::None, data_encoding: Default::default(), }; match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await { Ok(_) => true, Err(_) => false, } } else { false }; if !session_valid { // 检查是否已经在重连中 let mut reconnecting = manager.reconnecting.write().await; if !reconnecting.contains(&source_id) { reconnecting.insert(source_id); drop(reconnecting); tracing::warn!( "Heartbeat detected invalid session for source {}, triggering reconnection", source_id ); // 通过通道发送重连请求 if let Some(tx) = manager.reconnect_tx.as_ref() { if let Err(e) = tx.send(source_id) { tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e); } } else { tracing::warn!("Reconnect channel not available for source {}", source_id); } } else { drop(reconnecting); } } } }); // 保存心跳任务句柄 let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { conn_status.heartbeat_handle = Some(handle); } } async fn start_unified_poll_task(&self, source_id: Uuid, session: Arc) { let event_manager = match self.event_manager.clone() { Some(em) => em, None => { tracing::warn!("Event manager is not initialized, cannot start unified poll task"); return; } }; // 停止旧的轮询任务 { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { if let Some(handle) = conn_status.poll_handle.take() { handle.abort(); } } } tracing::info!( "Starting unified poll task for source {}", source_id ); // 克隆 status 引用,以便在异步任务中使用 let status_ref = self.status.clone(); // 启动新的轮询任务 let handle = tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_secs(1)); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { ticker.tick().await; // 在任务内部获取轮询点列表 let poll_points = { let status = status_ref.read().await; status.get(&source_id) .map(|conn_status| conn_status.poll_points.clone()) .unwrap_or_default() }; if poll_points.is_empty() { continue; } // 构建批量读取请求 let read_requests: Vec = poll_points .iter() .filter_map(|p| { NodeId::from_str(&p.external_id).ok().map(|node_id| ReadValueId { node_id, attribute_id: AttributeId::Value as u32, index_range: NumericRange::None, data_encoding: Default::default(), }) }) .collect(); if read_requests.is_empty() { continue; } // 执行批量读取 match session.read(&read_requests, TimestampsToReturn::Both, 0f64).await { Ok(results) => { for (poll_point, result) in poll_points.iter().zip(results.iter()) { let dv = result; let val = dv.value.clone(); let unified_value = val.as_ref().map(crate::telemetry::opcua_variant_to_data); let unified_value_type = val.as_ref().map(crate::telemetry::opcua_variant_type); let unified_value_text = val.as_ref().map(|v| v.to_string()); let quality = dv.status .as_ref() .map(crate::telemetry::PointQuality::from_status_code) .unwrap_or(crate::telemetry::PointQuality::Unknown); let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( crate::telemetry::PointNewValue { source_id, point_id: Some(poll_point.point_id), client_handle: 0, value: unified_value, value_type: unified_value_type, value_text: unified_value_text, quality, protocol: "opcua".to_string(), timestamp: Some(Utc::now()), scan_mode: ScanMode::Poll, }, )); } } Err(e) => { tracing::warn!( "Unified poll read failed for source {}: {:?}", source_id, e ); } } } }); // 保存轮询任务句柄 let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { conn_status.poll_handle = Some(handle); } } // 将点添加到轮询列表 async fn add_points_to_poll_list( &self, source_id: Uuid, points: &[PointSubscriptionInfo], ) -> usize { let mut started = 0usize; // 添加新的轮询点 { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { for point in points { // 检查点是否已经在轮询列表中 if !conn_status.poll_points.iter().any(|p| p.point_id == point.point_id) { conn_status.poll_points.push(PollPointInfo { point_id: point.point_id, external_id: point.external_id.clone(), }); started += 1; tracing::info!( "Point {} switched to poll mode", point.point_id ); } } } } started } pub async fn connect_from_source( &self, pool: &sqlx::PgPool, source_id: Uuid, ) -> Result<(), String> { let source = match crate::service::get_enabled_source(pool, source_id).await { Ok(Some(s)) => s, Ok(None) => { tracing::info!( "Source {} is not enabled or does not exist, skipping connection", source_id ); return Ok(()); } Err(e) => { let error = format!("Failed to fetch source: {}", e); tracing::error!("{}", error); return Err(error); } }; self.connect( source.id, &source.endpoint, source.security_policy.as_deref(), source.security_mode.as_deref(), source.username.as_deref(), source.password.as_deref(), ) .await?; // Subscribe to points for this source self.subscribe_points_from_source(source_id, None, pool) .await?; Ok(()) } pub async fn connect( &self, source_id: Uuid, endpoint: &str, security_policy: Option<&str>, security_mode: Option<&str>, username: Option<&str>, password: Option<&str>, ) -> Result<(), String> { let mut client = match ClientBuilder::new() .application_name("plc_gateway") .application_uri("urn:plc_gateway:opcua-client") .trust_server_certs(true) .create_sample_keypair(true) .session_retry_limit(3) .client() { Ok(client) => client, Err(e) => { let error = format!("Failed to create client: {:?}", e); self.fail_connect(source_id, &error).await; return Err(error); } }; let security_policy = if security_policy.is_none() { SecurityPolicy::None } else { security_policy .and_then(|s| SecurityPolicy::from_str(s).ok()) .unwrap_or(SecurityPolicy::None) }; let security_mode = match security_mode { Some("Sign") => MessageSecurityMode::Sign, Some("SignAndEncrypt") => MessageSecurityMode::SignAndEncrypt, _ => MessageSecurityMode::None, }; let identity_token = match (username, password) { (Some(user), Some(pass)) => IdentityToken::new_user_name(user, pass), _ => IdentityToken::Anonymous, }; let (session, event_loop) = match client .connect_to_matching_endpoint( ( endpoint, security_policy.to_str(), security_mode, UserTokenPolicy::anonymous(), ), identity_token, ) .await { Ok(res) => res, Err(e) => { let error = format!("Failed to connect: {:?}", e); self.fail_connect(source_id, &error).await; return Err(error); } }; let _ = event_loop.spawn(); session.wait_for_connection().await; let mut status = self.status.write().await; status.insert( source_id, ConnectionStatus { session: Some(session.clone()), is_connected: true, last_error: None, last_time: Utc::now(), subscription_id: None, next_client_handle: 1000, client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), poll_points: Vec::new(), poll_handle: None, heartbeat_handle: None, }, ); drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前 // 启动统一的轮询任务 self.start_unified_poll_task(source_id, session).await; // 启动心跳任务 self.start_heartbeat_task(source_id).await; tracing::info!("Successfully connected to source {}", source_id); Ok(()) } pub async fn fail_connect(&self, source_id: Uuid, error: &str) { let mut status = self.status.write().await; status.insert( source_id, ConnectionStatus { session: None, is_connected: false, last_error: Some(error.to_string()), last_time: Utc::now(), subscription_id: None, client_handle_map: HashMap::new(), monitored_item_map: HashMap::new(), next_client_handle: 1000, poll_points: Vec::new(), poll_handle: None, heartbeat_handle: None, }, ); } pub async fn reconnect(&self, pool: &sqlx::PgPool, source_id: Uuid) -> Result<(), String> { tracing::info!("Reconnecting to source {}", source_id); // 先断开连接 if let Err(e) = self.disconnect(source_id).await { tracing::error!("Failed to disconnect source {}: {}", source_id, e); } // 再重新连接 let result = self.connect_from_source(pool, source_id).await; // 清除重连标记 if result.is_ok() { let mut reconnecting = self.reconnecting.write().await; reconnecting.remove(&source_id); } result } pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> { // 停止轮询任务并清空轮询点列表 { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { conn_status.poll_points.clear(); if let Some(handle) = conn_status.poll_handle.take() { handle.abort(); } // 停止心跳任务 if let Some(handle) = conn_status.heartbeat_handle.take() { handle.abort(); } } } let conn_status = self.status.write().await.remove(&source_id); if let Some(conn_status) = conn_status { if let Some(session) = conn_status.session { session.disconnect().await.map_err(|e| { let error = format!("Failed to disconnect: {}", e); tracing::error!("{}", error); error })?; } } Ok(()) } pub async fn disconnect_all(&self) { let source_ids: Vec = self.status.read().await.keys().copied().collect(); for source_id in source_ids { // 停止轮询任务并清空轮询点列表 { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { conn_status.poll_points.clear(); if let Some(handle) = conn_status.poll_handle.take() { handle.abort(); } // 停止心跳任务 if let Some(handle) = conn_status.heartbeat_handle.take() { handle.abort(); } } } let conn_status = self.status.write().await.remove(&source_id); if let Some(conn_status) = conn_status { if let Some(session) = conn_status.session { if let Err(e) = session.disconnect().await { tracing::error!("Failed to disconnect from source {}: {}", source_id, e); } } } } } pub async fn get_session(&self, source_id: Uuid) -> Option> { let status = self.status.read().await; if let Some(conn_status) = status.get(&source_id) { if conn_status.is_connected { return conn_status.session.clone(); } } None } pub async fn get_status(&self, source_id: Uuid) -> Option { let status = self.status.read().await; status.get(&source_id).map(ConnectionStatusView::from) } pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatusView)> { let status = self.status.read().await; status.iter().map(|(source_id, conn_status)| (*source_id, ConnectionStatusView::from(conn_status))).collect() } pub async fn write_point_values_batch( &self, payload: BatchSetPointValueReq, ) -> Result { if payload.items.is_empty() { return Ok(Self::write_value_batch_result( false, Some("items cannot be empty".to_string()), vec![], )); } let target_map = self.point_write_target_cache.read().await; #[derive(Clone)] struct PendingWrite { point_id: Uuid, source_id: Uuid, external_id: String, variant: Variant, } let mut results = Vec::new(); let mut pending_by_source: HashMap> = HashMap::new(); for item in payload.items { let Some(target) = target_map.get(&item.point_id) else { results.push(SetPointValueResItem { point_id: item.point_id, success: false, err_msg: Some("Point write target not found in cache".to_string()), }); continue; }; let variant = match Self::json_value_to_opcua_variant(&item.value) { Ok(v) => v, Err(e) => { results.push(SetPointValueResItem { point_id: item.point_id, success: false, err_msg: Some(e), }); continue; } }; pending_by_source .entry(target.source_id) .or_default() .push(PendingWrite { point_id: item.point_id, source_id: target.source_id, external_id: target.external_id.clone(), variant, }); } if !results.is_empty() { return Ok(Self::write_value_batch_result( false, Some("Batch write precheck failed, no write executed".to_string()), results, )); } // Validate all target sources are already connected before executing writes. for source_id in pending_by_source.keys() { let source_connected = self .get_status(*source_id) .await .map(|s| s.is_connected) .unwrap_or(false); if !source_connected { return Ok(Self::write_value_batch_result( false, Some(format!("Source {} is not connected", source_id)), vec![], )); } } let mut success_events: Vec<(Uuid, Uuid, Variant)> = Vec::new(); for (source_id, writes) in pending_by_source { let Some(session) = self.get_session(source_id).await else { return Ok(Self::write_value_batch_result( false, Some(format!("Source {} is not connected", source_id)), vec![], )); }; let mut write_values = Vec::new(); let mut write_items = Vec::new(); for write in writes { let node_id = match NodeId::from_str(&write.external_id) { Ok(v) => v, Err(e) => { results.push(SetPointValueResItem { point_id: write.point_id, success: false, err_msg: Some(format!("Invalid node id {}: {}", write.external_id, e)), }); continue; } }; write_values.push(WriteValue { node_id, attribute_id: AttributeId::Value as u32, index_range: NumericRange::None, value: OpcuaDataValue::value_only(write.variant.clone()), }); write_items.push(write); } if write_values.is_empty() { continue; } let status_codes = match session.write(&write_values).await { Ok(v) => v, Err(e) => { let write_results: Vec = write_items .into_iter() .map(|write| SetPointValueResItem { point_id: write.point_id, success: false, err_msg: Some(format!("Write failed: {:?}", e)), }) .collect(); return Ok(Self::write_value_batch_result( false, Some("Batch write failed, no local updates emitted".to_string()), write_results, )); } }; for (idx, write) in write_items.iter().enumerate() { let status_opt = status_codes.get(idx); let status_ok = status_opt.map(|s| s.is_good()).unwrap_or(false); results.push(SetPointValueResItem { point_id: write.point_id, success: status_ok, err_msg: if status_ok { None } else { Some(match status_opt { Some(s) => format!("Write rejected: {:?}", s), None => "Write result missing from server response".to_string(), }) }, }); if !status_ok { return Ok(Self::write_value_batch_result( false, Some("Batch write failed, no local updates emitted".to_string()), results, )); } success_events.push((write.source_id, write.point_id, write.variant.clone())); } } // Emit local updates only when the full batch succeeds. if let Some(event_manager) = &self.event_manager { for (source_id, point_id, variant) in success_events { if let Err(e) = event_manager.send(crate::event::ReloadEvent::PointNewValue( crate::telemetry::PointNewValue { source_id, point_id: Some(point_id), client_handle: 0, value: Some(crate::telemetry::opcua_variant_to_data(&variant)), value_type: Some(crate::telemetry::opcua_variant_type(&variant)), value_text: Some(variant.to_string()), quality: crate::telemetry::PointQuality::Good, protocol: "opcua".to_string(), timestamp: Some(Utc::now()), scan_mode: crate::model::ScanMode::Poll, }, )) { tracing::warn!( "Batch write succeeded but failed to dispatch point update for point {}: {}", point_id, e ); } } } Ok(Self::write_value_batch_result(true, None, results)) } pub async fn subscribe_points_from_source( &self, source_id: Uuid, point_ids: Option>, pool: &sqlx::PgPool, ) -> Result, String> { let point_ids = point_ids.unwrap_or_default(); let mut points = crate::service::get_points_with_ids(pool, source_id, &point_ids) .await .map_err(|e| format!("Failed to get points for source {}: {}", source_id, e))?; { let mut cache = self.point_write_target_cache.write().await; for p in &points { cache.insert( p.point_id, PointWriteTarget { source_id, external_id: p.external_id.clone(), }, ); } } if points.is_empty() { tracing::info!("No valid points found to subscribe for source {}", source_id); return Ok(Self::subscription_result(0, 0)); } tracing::info!( "Processing subscription for source {} with {} points", source_id, points.len() ); let session = self .get_session(source_id) .await .ok_or_else(|| format!("Failed to get session for source {}", source_id))?; let subscription_id = { let status = self.status.read().await; if let Some(conn_status) = status.get(&source_id) { let subscribed_point_ids: std::collections::HashSet = conn_status.client_handle_map.values().copied().collect(); let original_count = points.len(); points.retain(|p| !subscribed_point_ids.contains(&p.point_id)); let filtered_count = points.len(); if filtered_count < original_count { tracing::info!( "Filtered out {} already subscribed points for source {}", original_count - filtered_count, source_id ); } if points.is_empty() { tracing::info!("All points are already subscribed for source {}", source_id); return Ok(Self::subscription_result(0, 0)); } conn_status.subscription_id } else { None } }; let subscription_id = match subscription_id { Some(id) => Some(id), None => { let manager = self.clone(); let current_source_id = source_id; match session .create_subscription( Duration::from_secs(1), 10, 30, 0, 0, true, opcua::client::DataChangeCallback::new(move |dv, item| { let client_handle = item.client_handle(); let val = dv.value; let timex = Some(Utc::now()); let unified_value = val.as_ref().map(crate::telemetry::opcua_variant_to_data); let unified_value_type = val.as_ref().map(crate::telemetry::opcua_variant_type); let unified_value_text = val.as_ref().map(|v| v.to_string()); let quality = dv.status .as_ref() .map(crate::telemetry::PointQuality::from_status_code) .unwrap_or(crate::telemetry::PointQuality::Unknown); if let Some(event_manager) = &manager.event_manager { let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( crate::telemetry::PointNewValue { source_id: current_source_id, point_id: None, client_handle, value: unified_value, value_type: unified_value_type, value_text: unified_value_text, quality, protocol: "opcua".to_string(), timestamp: timex, scan_mode: ScanMode::Subscribe, })); } }), ) .await { Ok(id) => { if let Some(conn_status) = manager.status.write().await.get_mut(&source_id) { conn_status.subscription_id = Some(id); } tracing::info!("Created subscription {} for source {}", id, source_id); Some(id) } Err(e) => { tracing::warn!( "Failed to create subscription for source {}, falling back to poll mode: {:?}", source_id, e ); None } } } }; if subscription_id.is_none() { let polled_count = self .add_points_to_poll_list(source_id, &points) .await; return Ok(Self::subscription_result(0, polled_count)); } let subscription_id = subscription_id.unwrap_or_default(); let mut client_handle_seed: u32 = self .status .read() .await .get(&source_id) .map_or(1000, |s| s.next_client_handle); let mut items_to_create: Vec = Vec::new(); let mut item_points: Vec = Vec::new(); for p in points.iter().cloned() { let node_id = NodeId::from_str(&p.external_id) .map_err(|e| format!("Invalid node id {}: {}", p.external_id, e))?; let client_handle = client_handle_seed; client_handle_seed += 1; if let Some(s) = self.status.write().await.get_mut(&source_id) { s.client_handle_map.insert(client_handle, p.point_id); } let request = MonitoredItemCreateRequest { item_to_monitor: ReadValueId { node_id, attribute_id: AttributeId::Value as u32, index_range: Default::default(), data_encoding: Default::default(), }, monitoring_mode: MonitoringMode::Reporting, requested_parameters: MonitoringParameters { client_handle, sampling_interval: 0.0, filter: Default::default(), queue_size: 10, discard_oldest: true, }, }; items_to_create.push(request); item_points.push(p); } let results = match session .create_monitored_items(subscription_id, TimestampsToReturn::Both, items_to_create) .await { Ok(results) => results, Err(e) => { tracing::warn!( "Failed to create monitored items for source {}, falling back to poll mode: {:?}", source_id, e ); let item_point_ids: HashSet = item_points.iter().map(|p| p.point_id).collect(); if let Some(conn_status) = self.status.write().await.get_mut(&source_id) { conn_status .client_handle_map .retain(|_, point_id| !item_point_ids.contains(point_id)); conn_status.next_client_handle = client_handle_seed; } let polled_count = self .add_points_to_poll_list(source_id, &item_points) .await; return Ok(Self::subscription_result(0, polled_count)); } }; let mut successfully_subscribed_points = Vec::new(); let mut successfully_subscribed_set: HashSet = HashSet::new(); let mut failed_points: Vec = Vec::new(); for (i, monitored_item_result) in results.iter().enumerate() { if i >= item_points.len() { break; } let point = &item_points[i]; if monitored_item_result.result.status_code.is_good() { successfully_subscribed_points.push(point.point_id); successfully_subscribed_set.insert(point.point_id); if let Some(conn_status) = self.status.write().await.get_mut(&source_id) { conn_status .monitored_item_map .insert(point.point_id, monitored_item_result.result.monitored_item_id); // 从轮询列表中移除该点 conn_status.poll_points.retain(|p| p.point_id != point.point_id); } } else { tracing::error!( "Failed to create monitored item for point {}: {:?}", point.point_id, monitored_item_result.result.status_code ); failed_points.push(point.clone()); } } if results.len() < item_points.len() { for point in item_points.iter().skip(results.len()) { tracing::warn!( "No monitored item result returned for point {}, fallback to poll", point.point_id ); failed_points.push(point.clone()); } } failed_points.retain(|p| !successfully_subscribed_set.contains(&p.point_id)); if let Some(conn_status) = self.status.write().await.get_mut(&source_id) { let failed_point_ids: HashSet = failed_points.iter().map(|p| p.point_id).collect(); conn_status .client_handle_map .retain(|_, point_id| !failed_point_ids.contains(point_id)); conn_status.next_client_handle = client_handle_seed; } let polled_count = self .add_points_to_poll_list(source_id, &failed_points) .await; Ok(Self::subscription_result( successfully_subscribed_points.len(), polled_count, )) } /// pub async fn unsubscribe_points_from_source( &self, source_id: Uuid, point_ids: Vec, ) -> Result { if point_ids.is_empty() { return Ok(0); } let target_ids: std::collections::HashSet = point_ids.into_iter().collect(); let session = self.get_session(source_id).await; let Some(session) = session else { return Ok(0); }; let (subscription_id, point_item_pairs) = { let status = self.status.read().await; let Some(conn_status) = status.get(&source_id) else { return Ok(0); }; let Some(subscription_id) = conn_status.subscription_id else { return Ok(0); }; let items: Vec<(Uuid, u32)> = conn_status .monitored_item_map .iter() .filter(|(point_id, _)| target_ids.contains(point_id)) .map(|(point_id, monitored_item_id)| (*point_id, *monitored_item_id)) .collect(); (subscription_id, items) }; if point_item_pairs.is_empty() { return Ok(0); } let monitored_item_ids: Vec = point_item_pairs.iter().map(|(_, item_id)| *item_id).collect(); let status_codes = session .delete_monitored_items(subscription_id, &monitored_item_ids) .await .map_err(|e| { format!( "Failed to unsubscribe monitored items for source {}: {:?}", source_id, e ) })?; let mut removed_point_ids: Vec = Vec::new(); for (idx, status_code) in status_codes.iter().enumerate() { if idx >= point_item_pairs.len() { break; } if status_code.is_good() { removed_point_ids.push(point_item_pairs[idx].0); } else { tracing::warn!( "Failed to unsubscribe monitored item {} for source {}: {:?}", point_item_pairs[idx].1, source_id, status_code ); } } if removed_point_ids.is_empty() { return Ok(0); } let removed_set: std::collections::HashSet = removed_point_ids.iter().copied().collect(); if let Some(conn_status) = self.status.write().await.get_mut(&source_id) { for point_id in &removed_point_ids { conn_status.monitored_item_map.remove(point_id); } conn_status .client_handle_map .retain(|_, point_id| !removed_set.contains(point_id)); } let _ = self .remove_point_write_target_cache_by_point_ids(&removed_point_ids) .await; { let mut monitor_data = self.point_monitor_data.write().await; for point_id in &removed_point_ids { monitor_data.remove(point_id); } } { let mut history_data = self.point_history_data.write().await; for point_id in &removed_point_ids { history_data.remove(point_id); } } // 从轮询列表中移除传入的点,并记录移除的轮询点数量 let polling_removed_count = { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { let before_count = conn_status.poll_points.len(); conn_status.poll_points.retain(|p| !target_ids.contains(&p.point_id)); let after_count = conn_status.poll_points.len(); before_count - after_count } else { 0 } }; // 计算从订阅点和轮询点移除的总数 let total_removed = removed_point_ids.len() + polling_removed_count; tracing::info!( "Unsubscribed {} points (subscription: {}, polling: {}) from source {}", total_removed, removed_point_ids.len(), polling_removed_count, source_id ); Ok(removed_point_ids.len()) } }