diff --git a/src/connection.rs b/src/connection.rs index d277ddf..266b352 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -297,31 +297,19 @@ impl ConnectionManager { }; // 检查订阅状态 - 仅当有 subscription_id 时才检查 - let subscription_valid = { + // 这里使用 set_publishing_mode 直接让服务器校验订阅 ID,避免仅靠会话读请求误判为健康。 + let subscription_id = { let status = manager.status.read().await; - if let Some(conn_status) = status.get(&source_id) { - if let Some(_subscription_id) = conn_status.subscription_id { - // 尝试读取订阅的属性来验证订阅是否真正有效 - let node_id = NodeId::new(0, 2253); // ServerServiceLevel节点 - let read_request = ReadValueId { - node_id, - attribute_id: AttributeId::Value as u32, - index_range: NumericRange::None, - data_encoding: Default::default(), - }; - - match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await { - Ok(_) => true, - Err(_) => false, - } - } else { - // 没有 subscription_id 时,认为订阅状态有效(不需要检查) - true - } - } else { - // 没有连接状态时,认为订阅状态无效 - false + status.get(&source_id).and_then(|conn_status| conn_status.subscription_id) + }; + let subscription_valid = if let Some(subscription_id) = subscription_id { + match session.set_publishing_mode(&[subscription_id], true).await { + Ok(results) => results.first().map(|status| status.is_good()).unwrap_or(false), + Err(_) => false, } + } else { + // 没有 subscription_id 时,认为订阅状态有效(不需要检查) + true }; (session_valid, subscription_valid) @@ -1102,6 +1090,8 @@ impl ConnectionManager { let manager = self.clone(); let current_source_id = source_id; + let status_manager = manager.clone(); + let data_manager = manager.clone(); match session .create_subscription( Duration::from_secs(1), @@ -1110,36 +1100,51 @@ impl ConnectionManager { 0, 0, true, - opcua::client::DataChangeCallback::new(move |dv, item| { - let client_handle = item.client_handle(); - let val = dv.value; - let timex = Some(Utc::now()); - let unified_value = - val.as_ref().map(crate::telemetry::opcua_variant_to_data); - let unified_value_type = - val.as_ref().map(crate::telemetry::opcua_variant_type); - let unified_value_text = val.as_ref().map(|v| v.to_string()); - let quality = dv.status - .as_ref() - .map(crate::telemetry::PointQuality::from_status_code) - .unwrap_or(crate::telemetry::PointQuality::Unknown); + opcua::client::SubscriptionCallbacks::new( + move |notification| { + if notification.status.is_bad() { + tracing::warn!( + "Subscription status changed to {:?} for source {}, triggering reconnection", + notification.status, + current_source_id + ); + if let Some(tx) = status_manager.reconnect_tx.as_ref() { + let _ = tx.send(current_source_id); + } + } + }, + move |dv, item| { + let client_handle = item.client_handle(); + let val = dv.value; + let timex = Some(Utc::now()); + let unified_value = + val.as_ref().map(crate::telemetry::opcua_variant_to_data); + let unified_value_type = + val.as_ref().map(crate::telemetry::opcua_variant_type); + let unified_value_text = val.as_ref().map(|v| v.to_string()); + let quality = dv.status + .as_ref() + .map(crate::telemetry::PointQuality::from_status_code) + .unwrap_or(crate::telemetry::PointQuality::Unknown); - if let Some(event_manager) = &manager.event_manager { - let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( - crate::telemetry::PointNewValue { - source_id: current_source_id, - point_id: None, - client_handle, - value: unified_value, - value_type: unified_value_type, - value_text: unified_value_text, - quality, - protocol: "opcua".to_string(), - timestamp: timex, - scan_mode: ScanMode::Subscribe, - })); - } - }), + if let Some(event_manager) = &data_manager.event_manager { + let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( + crate::telemetry::PointNewValue { + source_id: current_source_id, + point_id: None, + client_handle, + value: unified_value, + value_type: unified_value_type, + value_text: unified_value_text, + quality, + protocol: "opcua".to_string(), + timestamp: timex, + scan_mode: ScanMode::Subscribe, + })); + } + }, + |_event_fields, _item| {}, + ), ) .await {