优化main.rs日志打印:移除重复日志,使用并发处理source连接和订阅

This commit is contained in:
caoqianming 2026-03-05 11:08:01 +08:00
parent a63433e757
commit 5685c33687
1 changed files with 26 additions and 27 deletions

View File

@ -48,42 +48,41 @@ async fn main() {
connection_manager.set_event_manager(event_manager.clone()); connection_manager.set_event_manager(event_manager.clone());
let connection_manager = Arc::new(connection_manager); let connection_manager = Arc::new(connection_manager);
// Connect to all enabled sources // Connect to all enabled sources concurrently
let sources = service::get_all_enabled_sources(&pool) let sources = service::get_all_enabled_sources(&pool)
.await .await
.expect("Failed to fetch sources"); .expect("Failed to fetch sources");
// Spawn a task for each source to connect and subscribe concurrently
let mut tasks = Vec::new();
for source in sources { for source in sources {
tracing::info!("Connecting to source: {} ({})", source.name, source.endpoint); let cm = connection_manager.clone();
match connection_manager.connect_from_source(&pool, source.id).await { let p = pool.clone();
let source_name = source.name.clone();
let source_id = source.id;
let task = tokio::spawn(async move {
match cm.connect_from_source(&p, source_id).await {
Ok(_) => { Ok(_) => {
tracing::info!("Successfully connected to source: {}", source.name);
// Subscribe to points for this source // Subscribe to points for this source
match connection_manager if let Err(e) = cm.subscribe_points_from_source(source_id, None, &p).await {
.subscribe_points_from_source(source.id, None, &pool) tracing::error!("Failed to subscribe to points for source {}: {}", source_name, e);
.await
{
Ok(stats) => {
let subscribed = *stats.get("subscribed").unwrap_or(&0);
let polled = *stats.get("polled").unwrap_or(&0);
let total = *stats.get("total").unwrap_or(&0);
tracing::info!(
"Point subscribe setup for source {}: subscribed={}, polled={}, total={}",
source.name,
subscribed,
polled,
total
);
}
Err(e) => {
tracing::error!("Failed to subscribe to points for source {}: {}", source.name, e);
}
} }
} }
Err(e) => { Err(e) => {
tracing::error!("Failed to connect to source {}: {}", source.name, e); tracing::error!("Failed to connect to source {}: {}", source_name, e);
} }
} }
});
tasks.push(task);
}
// Wait for all connection tasks to complete
for task in tasks {
if let Err(e) = task.await {
tracing::error!("Source connection task failed: {:?}", e);
}
} }
let state = AppState { let state = AppState {