feat(reconnect): add retry backoff and manual reconnect

This commit is contained in:
caoqianming 2026-03-17 08:15:54 +08:00
parent dd110919dd
commit 7e6c7a7e4c
4 changed files with 92 additions and 0 deletions

View File

@ -113,6 +113,7 @@ pub struct ConnectionManager {
reconnect_tx: Option<tokio::sync::mpsc::UnboundedSender<Uuid>>,
reconnect_rx: Arc<std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>>>>,
reconnecting: Arc<RwLock<HashSet<Uuid>>>,
reconnect_attempts: Arc<RwLock<HashMap<Uuid, u32>>>,
}
@ -173,6 +174,7 @@ impl ConnectionManager {
reconnect_tx: Some(reconnect_tx),
reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))),
reconnecting: Arc::new(RwLock::new(HashSet::new())),
reconnect_attempts: Arc::new(RwLock::new(HashMap::new())),
}
}
@ -211,6 +213,7 @@ impl ConnectionManager {
if let Some(ref pool) = pool {
if let Err(e) = manager.reconnect(pool, source_id).await {
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
manager.schedule_reconnect_retry(source_id).await;
}
} else {
tracing::warn!("Pool not available for reconnection of source {}", source_id);
@ -219,6 +222,44 @@ impl ConnectionManager {
});
}
async fn schedule_reconnect_retry(&self, source_id: Uuid) {
let attempt = {
let mut attempts = self.reconnect_attempts.write().await;
let entry = attempts.entry(source_id).or_insert(0);
*entry = entry.saturating_add(1);
*entry
};
let delay_secs = match attempt {
1 => 3,
2 => 5,
3 => 10,
4 => 15,
_ => 30,
};
tracing::warn!(
"Reconnect attempt {} for source {} failed, retrying in {}s",
attempt,
source_id,
delay_secs
);
let tx = self.reconnect_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(delay_secs)).await;
if let Some(tx) = tx.as_ref() {
if let Err(e) = tx.send(source_id) {
tracing::error!(
"Failed to queue delayed reconnect for source {}: {}",
source_id,
e
);
}
}
});
}
pub async fn remove_point_write_target_cache_by_point_ids(
&self,
point_ids: &[Uuid],
@ -701,6 +742,10 @@ impl ConnectionManager {
// 再重新连接
let result = self.connect_from_source(pool, source_id).await;
if result.is_ok() {
let mut attempts = self.reconnect_attempts.write().await;
attempts.remove(&source_id);
}
// 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连
let mut reconnecting = self.reconnecting.write().await;

View File

@ -328,6 +328,33 @@ pub async fn delete_source(
Ok(StatusCode::NO_CONTENT)
}
pub async fn reconnect_source(
State(state): State<AppState>,
Path(source_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1")
.bind(source_id)
.fetch_optional(pool)
.await?
.is_some();
if !exists {
return Err(ApiErr::NotFound(
format!("Source with id {} not found", source_id),
None,
));
}
state
.connection_manager
.reconnect(pool, source_id)
.await
.map_err(|e| ApiErr::Internal(e, None))?;
Ok(Json(serde_json::json!({"ok_msg": "Source reconnected successfully"})))
}
pub async fn browse_and_save_nodes(
State(state): State<AppState>,
Path(source_id): Path<Uuid>,

View File

@ -126,6 +126,7 @@ fn build_router(state: AppState) -> Router {
let all_route = Router::new()
.route("/api/source", get(handler::source::get_source_list).post(handler::source::create_source))
.route("/api/source/{source_id}", axum::routing::delete(handler::source::delete_source).put(handler::source::update_source))
.route("/api/source/{source_id}/reconnect", axum::routing::post(handler::source::reconnect_source))
.route("/api/source/{source_id}/browse", axum::routing::post(handler::source::browse_and_save_nodes))
.route("/api/source/{source_id}/node-tree", get(handler::source::get_node_tree))
.route("/api/point", get(handler::point::get_point_list))

View File

@ -100,6 +100,14 @@ function renderSources() {
fillSourceForm(source);
};
const reconnectBtn = document.createElement('button');
reconnectBtn.textContent = '重连';
reconnectBtn.className = 'secondary';
reconnectBtn.onclick = async (e) => {
e.stopPropagation();
await reconnectSource(source.id, source.name);
};
const deleteBtn = document.createElement('button');
deleteBtn.textContent = '删除';
deleteBtn.className = 'danger';
@ -110,6 +118,7 @@ function renderSources() {
actionRow.appendChild(selectPointsBtn);
actionRow.appendChild(editBtn);
actionRow.appendChild(reconnectBtn);
actionRow.appendChild(deleteBtn);
item.appendChild(titleRow);
@ -249,6 +258,16 @@ async function deleteSource(sourceId) {
await loadSources();
}
async function reconnectSource(sourceId, sourceName) {
setStatus(`正在重连 ${sourceName || 'Source'}...`);
await apiFetch(`/api/source/${sourceId}/reconnect`, { method: 'POST' });
await loadSources();
if (state.selectedSourceId === sourceId) {
await loadPoints();
}
setStatus(`${sourceName || 'Source'} 重连完成`);
}
async function deletePoint(pointId) {
if (!confirm('确认删除该 Point?')) return;
await apiFetch(`/api/point/${pointId}`, { method: 'DELETE' });