From 5dc1081c904ced28cbcb8bc89078d866be49d4c0 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Fri, 6 Mar 2026 12:57:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E8=BF=9B=E5=BF=83=E8=B7=B3=E6=A3=80?= =?UTF-8?q?=E6=B5=8B=E4=B8=AD=E7=9A=84=E8=AE=A2=E9=98=85=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E6=A3=80=E6=9F=A5=EF=BC=8C=E9=80=9A=E8=BF=87=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=E8=8A=82=E7=82=B9=E5=B1=9E=E6=80=A7?= =?UTF-8?q?=E6=9D=A5=E9=AA=8C=E8=AF=81=E8=AE=A2=E9=98=85=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E7=9C=9F=E6=AD=A3=E6=9C=89=E6=95=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/connection.rs | 62 +++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 8083d05..d602ae4 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -281,7 +281,7 @@ impl ConnectionManager { // 检查session是否有效 let session = manager.get_session(source_id).await; - let session_valid = if let Some(session) = session { + let (session_valid, subscription_valid) = if let Some(session) = session { // 尝试读取当前时间来验证连接 let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点 let read_request = ReadValueId { @@ -291,23 +291,58 @@ impl ConnectionManager { data_encoding: Default::default(), }; - match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await { + let session_valid = match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await { Ok(_) => true, Err(_) => false, - } + }; + + // 检查订阅状态 + let subscription_valid = { + let status = manager.status.read().await; + if let Some(conn_status) = status.get(&source_id) { + if let Some(_subscription_id) = conn_status.subscription_id { + // 尝试读取订阅的属性来验证订阅是否真正有效 + let node_id = NodeId::new(0, 2253); // ServerServiceLevel节点 + let read_request = ReadValueId { + node_id, + attribute_id: AttributeId::Value as u32, + index_range: NumericRange::None, + data_encoding: Default::default(), + }; + + match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await { + Ok(_) => true, + Err(_) => false, + } + } else { + false + } + } else { + false + } + }; + + (session_valid, subscription_valid) } else { - false + (false, false) }; - if !session_valid { + if !session_valid || !subscription_valid { // 检查是否已经在重连中 let mut reconnecting = manager.reconnecting.write().await; if !reconnecting.contains(&source_id) { reconnecting.insert(source_id); drop(reconnecting); + let reason = if !session_valid { + "invalid session" + } else { + "invalid subscription" + }; + tracing::warn!( - "Heartbeat detected invalid session for source {}, triggering reconnection", + "Heartbeat detected {} for source {}, triggering reconnection", + reason, source_id ); @@ -1074,21 +1109,6 @@ impl ConnectionManager { 0, true, opcua::client::DataChangeCallback::new(move |dv, item| { - // 检查是否有 BadNoSubscription 错误 - if let Some(status) = &dv.status { - if *status == opcua::types::StatusCode::BadNoSubscription { - tracing::warn!( - "Detected BadNoSubscription in data change callback for source {}, triggering reconnection", - current_source_id - ); - // 触发重连 - if let Some(tx) = manager.reconnect_tx.as_ref() { - let _ = tx.send(current_source_id); - } - return; - } - } - let client_handle = item.client_handle(); let val = dv.value; let timex = Some(Utc::now());