优化 OPC UA event_loop 监控和重连机制

This commit is contained in:
caoqianming 2026-03-06 10:01:18 +08:00
parent 76b6e17927
commit d4d5749ccc
1 changed files with 39 additions and 1 deletions

View File

@ -99,6 +99,7 @@ pub struct ConnectionStatus {
poll_handle: Option<JoinHandle<()>>, // 统一的轮询任务句柄
heartbeat_handle: Option<JoinHandle<()>>, // 心跳任务句柄
event_loop_handle: Option<JoinHandle<opcua::types::StatusCode>>, // event_loop 任务句柄
event_loop_monitor_handle: Option<JoinHandle<()>>, // event_loop 监控任务句柄
}
#[derive(Clone)]
@ -580,6 +581,33 @@ impl ConnectionManager {
let event_loop_handle = event_loop.spawn();
// 添加监控任务来捕获 event_loop 结束事件
let manager = self.clone();
let source_id_copy = source_id;
let event_loop_monitor_handle = tokio::spawn(async move {
match event_loop_handle.await {
Ok(status) => {
tracing::warn!(
"OPCUA event loop ended for source {}: {:?}",
source_id_copy,
status
);
}
Err(e) => {
tracing::error!(
"OPCUA event loop panic for source {}: {}",
source_id_copy,
e
);
}
}
// 统一触发重连
if let Some(tx) = manager.reconnect_tx.as_ref() {
let _ = tx.send(source_id_copy);
}
});
if !session.wait_for_connection().await {
let error = "Session connection failed".to_string();
self.fail_connect(source_id, &error).await;
@ -601,7 +629,8 @@ impl ConnectionManager {
poll_points: Arc::new(Vec::new()),
poll_handle: None,
heartbeat_handle: None,
event_loop_handle: Some(event_loop_handle),
event_loop_handle: None, // event_loop_handle 已被移动到监控任务中
event_loop_monitor_handle: Some(event_loop_monitor_handle),
},
);
drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前
@ -633,6 +662,7 @@ impl ConnectionManager {
poll_handle: None,
heartbeat_handle: None,
event_loop_handle: None,
event_loop_monitor_handle: None,
},
);
}
@ -673,6 +703,10 @@ impl ConnectionManager {
if let Some(handle) = conn_status.event_loop_handle.take() {
handle.abort();
}
// 停止 event_loop 监控任务
if let Some(handle) = conn_status.event_loop_monitor_handle.take() {
handle.abort();
}
}
}
@ -709,6 +743,10 @@ impl ConnectionManager {
if let Some(handle) = conn_status.event_loop_handle.take() {
handle.abort();
}
// 停止 event_loop 监控任务
if let Some(handle) = conn_status.event_loop_monitor_handle.take() {
handle.abort();
}
}
}