feat: 添加重连保护机制修复重复重连问题
This commit is contained in:
parent
aaf887a6fc
commit
8e4abd0af9
|
|
@ -110,6 +110,7 @@ pub struct ConnectionManager {
|
||||||
pool: Option<Arc<sqlx::PgPool>>,
|
pool: Option<Arc<sqlx::PgPool>>,
|
||||||
reconnect_tx: Option<tokio::sync::mpsc::UnboundedSender<Uuid>>,
|
reconnect_tx: Option<tokio::sync::mpsc::UnboundedSender<Uuid>>,
|
||||||
reconnect_rx: Arc<std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>>>>,
|
reconnect_rx: Arc<std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>>>>,
|
||||||
|
reconnecting: Arc<RwLock<HashSet<Uuid>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -169,6 +170,7 @@ impl ConnectionManager {
|
||||||
pool: None,
|
pool: None,
|
||||||
reconnect_tx: Some(reconnect_tx),
|
reconnect_tx: Some(reconnect_tx),
|
||||||
reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))),
|
reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))),
|
||||||
|
reconnecting: Arc::new(RwLock::new(HashSet::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -301,18 +303,27 @@ impl ConnectionManager {
|
||||||
};
|
};
|
||||||
|
|
||||||
if !session_valid {
|
if !session_valid {
|
||||||
tracing::warn!(
|
// 检查是否已经在重连中
|
||||||
"Heartbeat detected invalid session for source {}, triggering reconnection",
|
let mut reconnecting = manager.reconnecting.write().await;
|
||||||
source_id
|
if !reconnecting.contains(&source_id) {
|
||||||
);
|
reconnecting.insert(source_id);
|
||||||
|
drop(reconnecting);
|
||||||
|
|
||||||
// 通过通道发送重连请求
|
tracing::warn!(
|
||||||
if let Some(tx) = manager.reconnect_tx.as_ref() {
|
"Heartbeat detected invalid session for source {}, triggering reconnection",
|
||||||
if let Err(e) = tx.send(source_id) {
|
source_id
|
||||||
tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e);
|
);
|
||||||
|
|
||||||
|
// 通过通道发送重连请求
|
||||||
|
if let Some(tx) = manager.reconnect_tx.as_ref() {
|
||||||
|
if let Err(e) = tx.send(source_id) {
|
||||||
|
tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::warn!("Reconnect channel not available for source {}", source_id);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tracing::warn!("Reconnect channel not available for source {}", source_id);
|
drop(reconnecting);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -636,7 +647,15 @@ impl ConnectionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 再重新连接
|
// 再重新连接
|
||||||
self.connect_from_source(pool, source_id).await
|
let result = self.connect_from_source(pool, source_id).await;
|
||||||
|
|
||||||
|
// 清除重连标记
|
||||||
|
if result.is_ok() {
|
||||||
|
let mut reconnecting = self.reconnecting.write().await;
|
||||||
|
reconnecting.remove(&source_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> {
|
pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> {
|
||||||
|
|
|
||||||
20
src/event.rs
20
src/event.rs
|
|
@ -37,11 +37,13 @@ impl EventManager {
|
||||||
let ws_manager_clone = ws_manager.clone();
|
let ws_manager_clone = ws_manager.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// 在循环外克隆,避免在循环中移动
|
||||||
|
let connection_manager_clone = connection_manager.clone();
|
||||||
while let Some(event) = receiver.recv().await {
|
while let Some(event) = receiver.recv().await {
|
||||||
match event {
|
match event {
|
||||||
ReloadEvent::SourceCreate { source_id } => {
|
ReloadEvent::SourceCreate { source_id } => {
|
||||||
tracing::info!("Processing SourceCreate event for {}", source_id);
|
tracing::info!("Processing SourceCreate event for {}", source_id);
|
||||||
if let Err(e) = connection_manager.connect_from_source(&pool, source_id).await {
|
if let Err(e) = connection_manager_clone.connect_from_source(&pool, source_id).await {
|
||||||
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -50,13 +52,13 @@ impl EventManager {
|
||||||
}
|
}
|
||||||
ReloadEvent::SourceDelete { source_id } => {
|
ReloadEvent::SourceDelete { source_id } => {
|
||||||
tracing::info!("Processing SourceDelete event for {}", source_id);
|
tracing::info!("Processing SourceDelete event for {}", source_id);
|
||||||
if let Err(e) = connection_manager.disconnect(source_id).await {
|
if let Err(e) = connection_manager_clone.disconnect(source_id).await {
|
||||||
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
|
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
|
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
|
||||||
let requested_count = point_ids.len();
|
let requested_count = point_ids.len();
|
||||||
match connection_manager
|
match connection_manager_clone
|
||||||
.subscribe_points_from_source(source_id, Some(point_ids), &pool)
|
.subscribe_points_from_source(source_id, Some(point_ids), &pool)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|
@ -84,7 +86,7 @@ impl EventManager {
|
||||||
source_id,
|
source_id,
|
||||||
point_ids.len()
|
point_ids.len()
|
||||||
);
|
);
|
||||||
if let Err(e) = connection_manager
|
if let Err(e) = connection_manager_clone
|
||||||
.unsubscribe_points_from_source(source_id, point_ids)
|
.unsubscribe_points_from_source(source_id, point_ids)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|
@ -97,7 +99,7 @@ impl EventManager {
|
||||||
let point_id = if let Some(point_id) = payload.point_id {
|
let point_id = if let Some(point_id) = payload.point_id {
|
||||||
Some(point_id)
|
Some(point_id)
|
||||||
} else {
|
} else {
|
||||||
let status = connection_manager.get_status_read_guard().await;
|
let status = connection_manager_clone.get_status_read_guard().await;
|
||||||
status
|
status
|
||||||
.get(&source_id)
|
.get(&source_id)
|
||||||
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
||||||
|
|
@ -105,7 +107,7 @@ impl EventManager {
|
||||||
if let Some(point_id) = point_id {
|
if let Some(point_id) = point_id {
|
||||||
// 从缓存中读取旧值
|
// 从缓存中读取旧值
|
||||||
let (old_value, old_timestamp, value_changed) = {
|
let (old_value, old_timestamp, value_changed) = {
|
||||||
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
|
let monitor_data = connection_manager_clone.get_point_monitor_data_read_guard().await;
|
||||||
let old_monitor_info = monitor_data.get(&point_id);
|
let old_monitor_info = monitor_data.get(&point_id);
|
||||||
|
|
||||||
if let Some(old_info) = old_monitor_info {
|
if let Some(old_info) = old_monitor_info {
|
||||||
|
|
@ -138,16 +140,18 @@ impl EventManager {
|
||||||
let monitor_for_db = monitor.clone();
|
let monitor_for_db = monitor.clone();
|
||||||
|
|
||||||
// 并行执行 update_point_monitor_data 和 send_to_public,不等待完成
|
// 并行执行 update_point_monitor_data 和 send_to_public,不等待完成
|
||||||
|
let cm_clone = connection_manager_clone.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// 更新监控数据
|
// 更新监控数据
|
||||||
if let Err(e) = connection_manager.update_point_monitor_data(monitor_for_db).await {
|
if let Err(e) = cm_clone.update_point_monitor_data(monitor_for_db).await {
|
||||||
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
|
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let ws_clone = ws_manager_clone.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// 发送WebSocket消息
|
// 发送WebSocket消息
|
||||||
if let Some(ws_manager) = &ws_manager_clone {
|
if let Some(ws_manager) = ws_clone {
|
||||||
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws);
|
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws);
|
||||||
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
||||||
tracing::error!("Failed to send WebSocket message to public room: {}", e);
|
tracing::error!("Failed to send WebSocket message to public room: {}", e);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue