fix(opcua): reconnect when subscription becomes invalid

This commit is contained in:
caoqianming 2026-03-09 08:58:40 +08:00
parent 0893c9783c
commit 63bcf679c2
1 changed files with 57 additions and 52 deletions

View File

@ -297,31 +297,19 @@ impl ConnectionManager {
}; };
// 检查订阅状态 - 仅当有 subscription_id 时才检查 // 检查订阅状态 - 仅当有 subscription_id 时才检查
let subscription_valid = { // 这里使用 set_publishing_mode 直接让服务器校验订阅 ID避免仅靠会话读请求误判为健康。
let subscription_id = {
let status = manager.status.read().await; let status = manager.status.read().await;
if let Some(conn_status) = status.get(&source_id) { status.get(&source_id).and_then(|conn_status| conn_status.subscription_id)
if let Some(_subscription_id) = conn_status.subscription_id { };
// 尝试读取订阅的属性来验证订阅是否真正有效 let subscription_valid = if let Some(subscription_id) = subscription_id {
let node_id = NodeId::new(0, 2253); // ServerServiceLevel节点 match session.set_publishing_mode(&[subscription_id], true).await {
let read_request = ReadValueId { Ok(results) => results.first().map(|status| status.is_good()).unwrap_or(false),
node_id, Err(_) => false,
attribute_id: AttributeId::Value as u32,
index_range: NumericRange::None,
data_encoding: Default::default(),
};
match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await {
Ok(_) => true,
Err(_) => false,
}
} else {
// 没有 subscription_id 时,认为订阅状态有效(不需要检查)
true
}
} else {
// 没有连接状态时,认为订阅状态无效
false
} }
} else {
// 没有 subscription_id 时,认为订阅状态有效(不需要检查)
true
}; };
(session_valid, subscription_valid) (session_valid, subscription_valid)
@ -1102,6 +1090,8 @@ impl ConnectionManager {
let manager = self.clone(); let manager = self.clone();
let current_source_id = source_id; let current_source_id = source_id;
let status_manager = manager.clone();
let data_manager = manager.clone();
match session match session
.create_subscription( .create_subscription(
Duration::from_secs(1), Duration::from_secs(1),
@ -1110,36 +1100,51 @@ impl ConnectionManager {
0, 0,
0, 0,
true, true,
opcua::client::DataChangeCallback::new(move |dv, item| { opcua::client::SubscriptionCallbacks::new(
let client_handle = item.client_handle(); move |notification| {
let val = dv.value; if notification.status.is_bad() {
let timex = Some(Utc::now()); tracing::warn!(
let unified_value = "Subscription status changed to {:?} for source {}, triggering reconnection",
val.as_ref().map(crate::telemetry::opcua_variant_to_data); notification.status,
let unified_value_type = current_source_id
val.as_ref().map(crate::telemetry::opcua_variant_type); );
let unified_value_text = val.as_ref().map(|v| v.to_string()); if let Some(tx) = status_manager.reconnect_tx.as_ref() {
let quality = dv.status let _ = tx.send(current_source_id);
.as_ref() }
.map(crate::telemetry::PointQuality::from_status_code) }
.unwrap_or(crate::telemetry::PointQuality::Unknown); },
move |dv, item| {
let client_handle = item.client_handle();
let val = dv.value;
let timex = Some(Utc::now());
let unified_value =
val.as_ref().map(crate::telemetry::opcua_variant_to_data);
let unified_value_type =
val.as_ref().map(crate::telemetry::opcua_variant_type);
let unified_value_text = val.as_ref().map(|v| v.to_string());
let quality = dv.status
.as_ref()
.map(crate::telemetry::PointQuality::from_status_code)
.unwrap_or(crate::telemetry::PointQuality::Unknown);
if let Some(event_manager) = &manager.event_manager { if let Some(event_manager) = &data_manager.event_manager {
let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue(
crate::telemetry::PointNewValue { crate::telemetry::PointNewValue {
source_id: current_source_id, source_id: current_source_id,
point_id: None, point_id: None,
client_handle, client_handle,
value: unified_value, value: unified_value,
value_type: unified_value_type, value_type: unified_value_type,
value_text: unified_value_text, value_text: unified_value_text,
quality, quality,
protocol: "opcua".to_string(), protocol: "opcua".to_string(),
timestamp: timex, timestamp: timex,
scan_mode: ScanMode::Subscribe, scan_mode: ScanMode::Subscribe,
})); }));
} }
}), },
|_event_fields, _item| {},
),
) )
.await .await
{ {