From 7e6c7a7e4c2d9667779043291e3a99a75c74c7e7 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 17 Mar 2026 08:15:54 +0800 Subject: [PATCH] feat(reconnect): add retry backoff and manual reconnect --- src/connection.rs | 45 +++++++++++++++++++++++++++++++++++++++++++ src/handler/source.rs | 27 ++++++++++++++++++++++++++ src/main.rs | 1 + web/app.js | 19 ++++++++++++++++++ 4 files changed, 92 insertions(+) diff --git a/src/connection.rs b/src/connection.rs index e2d84a7..14d0f80 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -113,6 +113,7 @@ pub struct ConnectionManager { reconnect_tx: Option>, reconnect_rx: Arc>>>, reconnecting: Arc>>, + reconnect_attempts: Arc>>, } @@ -173,6 +174,7 @@ impl ConnectionManager { reconnect_tx: Some(reconnect_tx), reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))), reconnecting: Arc::new(RwLock::new(HashSet::new())), + reconnect_attempts: Arc::new(RwLock::new(HashMap::new())), } } @@ -211,6 +213,7 @@ impl ConnectionManager { if let Some(ref pool) = pool { if let Err(e) = manager.reconnect(pool, source_id).await { tracing::error!("Failed to reconnect source {}: {}", source_id, e); + manager.schedule_reconnect_retry(source_id).await; } } else { tracing::warn!("Pool not available for reconnection of source {}", source_id); @@ -219,6 +222,44 @@ impl ConnectionManager { }); } + async fn schedule_reconnect_retry(&self, source_id: Uuid) { + let attempt = { + let mut attempts = self.reconnect_attempts.write().await; + let entry = attempts.entry(source_id).or_insert(0); + *entry = entry.saturating_add(1); + *entry + }; + + let delay_secs = match attempt { + 1 => 3, + 2 => 5, + 3 => 10, + 4 => 15, + _ => 30, + }; + + tracing::warn!( + "Reconnect attempt {} for source {} failed, retrying in {}s", + attempt, + source_id, + delay_secs + ); + + let tx = self.reconnect_tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(delay_secs)).await; + if let Some(tx) = tx.as_ref() { + if let Err(e) = tx.send(source_id) { + tracing::error!( + "Failed to queue delayed reconnect for source {}: {}", + source_id, + e + ); + } + } + }); + } + pub async fn remove_point_write_target_cache_by_point_ids( &self, point_ids: &[Uuid], @@ -701,6 +742,10 @@ impl ConnectionManager { // 再重新连接 let result = self.connect_from_source(pool, source_id).await; + if result.is_ok() { + let mut attempts = self.reconnect_attempts.write().await; + attempts.remove(&source_id); + } // 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连 let mut reconnecting = self.reconnecting.write().await; diff --git a/src/handler/source.rs b/src/handler/source.rs index 069f044..1d8b361 100644 --- a/src/handler/source.rs +++ b/src/handler/source.rs @@ -328,6 +328,33 @@ pub async fn delete_source( Ok(StatusCode::NO_CONTENT) } +pub async fn reconnect_source( + State(state): State, + Path(source_id): Path, +) -> Result { + let pool = &state.pool; + + let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1") + .bind(source_id) + .fetch_optional(pool) + .await? + .is_some(); + if !exists { + return Err(ApiErr::NotFound( + format!("Source with id {} not found", source_id), + None, + )); + } + + state + .connection_manager + .reconnect(pool, source_id) + .await + .map_err(|e| ApiErr::Internal(e, None))?; + + Ok(Json(serde_json::json!({"ok_msg": "Source reconnected successfully"}))) +} + pub async fn browse_and_save_nodes( State(state): State, Path(source_id): Path, diff --git a/src/main.rs b/src/main.rs index ebd145c..524922e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -126,6 +126,7 @@ fn build_router(state: AppState) -> Router { let all_route = Router::new() .route("/api/source", get(handler::source::get_source_list).post(handler::source::create_source)) .route("/api/source/{source_id}", axum::routing::delete(handler::source::delete_source).put(handler::source::update_source)) + .route("/api/source/{source_id}/reconnect", axum::routing::post(handler::source::reconnect_source)) .route("/api/source/{source_id}/browse", axum::routing::post(handler::source::browse_and_save_nodes)) .route("/api/source/{source_id}/node-tree", get(handler::source::get_node_tree)) .route("/api/point", get(handler::point::get_point_list)) diff --git a/web/app.js b/web/app.js index f3909e4..45658de 100644 --- a/web/app.js +++ b/web/app.js @@ -100,6 +100,14 @@ function renderSources() { fillSourceForm(source); }; + const reconnectBtn = document.createElement('button'); + reconnectBtn.textContent = '重连'; + reconnectBtn.className = 'secondary'; + reconnectBtn.onclick = async (e) => { + e.stopPropagation(); + await reconnectSource(source.id, source.name); + }; + const deleteBtn = document.createElement('button'); deleteBtn.textContent = '删除'; deleteBtn.className = 'danger'; @@ -110,6 +118,7 @@ function renderSources() { actionRow.appendChild(selectPointsBtn); actionRow.appendChild(editBtn); + actionRow.appendChild(reconnectBtn); actionRow.appendChild(deleteBtn); item.appendChild(titleRow); @@ -249,6 +258,16 @@ async function deleteSource(sourceId) { await loadSources(); } +async function reconnectSource(sourceId, sourceName) { + setStatus(`正在重连 ${sourceName || 'Source'}...`); + await apiFetch(`/api/source/${sourceId}/reconnect`, { method: 'POST' }); + await loadSources(); + if (state.selectedSourceId === sourceId) { + await loadPoints(); + } + setStatus(`${sourceName || 'Source'} 重连完成`); +} + async function deletePoint(pointId) { if (!confirm('确认删除该 Point?')) return; await apiFetch(`/api/point/${pointId}`, { method: 'DELETE' });