改进心跳检测中的订阅状态检查,通过读取服务器节点属性来验证订阅是否真正有效

This commit is contained in:
caoqianming 2026-03-06 12:57:22 +08:00
parent afab910780
commit 5dc1081c90
1 changed files with 41 additions and 21 deletions

View File

@ -281,7 +281,7 @@ impl ConnectionManager {
// 检查session是否有效
let session = manager.get_session(source_id).await;
let session_valid = if let Some(session) = session {
let (session_valid, subscription_valid) = if let Some(session) = session {
// 尝试读取当前时间来验证连接
let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点
let read_request = ReadValueId {
@ -291,23 +291,58 @@ impl ConnectionManager {
data_encoding: Default::default(),
};
match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await {
let session_valid = match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await {
Ok(_) => true,
Err(_) => false,
}
};
// 检查订阅状态
let subscription_valid = {
let status = manager.status.read().await;
if let Some(conn_status) = status.get(&source_id) {
if let Some(_subscription_id) = conn_status.subscription_id {
// 尝试读取订阅的属性来验证订阅是否真正有效
let node_id = NodeId::new(0, 2253); // ServerServiceLevel节点
let read_request = ReadValueId {
node_id,
attribute_id: AttributeId::Value as u32,
index_range: NumericRange::None,
data_encoding: Default::default(),
};
match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await {
Ok(_) => true,
Err(_) => false,
}
} else {
false
}
} else {
false
}
};
(session_valid, subscription_valid)
} else {
false
(false, false)
};
if !session_valid {
if !session_valid || !subscription_valid {
// 检查是否已经在重连中
let mut reconnecting = manager.reconnecting.write().await;
if !reconnecting.contains(&source_id) {
reconnecting.insert(source_id);
drop(reconnecting);
let reason = if !session_valid {
"invalid session"
} else {
"invalid subscription"
};
tracing::warn!(
"Heartbeat detected invalid session for source {}, triggering reconnection",
"Heartbeat detected {} for source {}, triggering reconnection",
reason,
source_id
);
@ -1074,21 +1109,6 @@ impl ConnectionManager {
0,
true,
opcua::client::DataChangeCallback::new(move |dv, item| {
// 检查是否有 BadNoSubscription 错误
if let Some(status) = &dv.status {
if *status == opcua::types::StatusCode::BadNoSubscription {
tracing::warn!(
"Detected BadNoSubscription in data change callback for source {}, triggering reconnection",
current_source_id
);
// 触发重连
if let Some(tx) = manager.reconnect_tx.as_ref() {
let _ = tx.send(current_source_id);
}
return;
}
}
let client_handle = item.client_handle();
let val = dv.value;
let timex = Some(Utc::now());