Compare commits

...

2 Commits

3 changed files with 32 additions and 17 deletions

View File

@ -180,6 +180,13 @@ impl ConnectionManager {
self.pool = Some(pool); self.pool = Some(pool);
} }
pub fn set_pool_and_start_reconnect_task(&mut self, pool: Arc<sqlx::PgPool>) {
self.pool = Some(pool.clone());
// 将 self 转换为不可变引用以调用 start_reconnect_task
let manager = self.clone();
manager.start_reconnect_task();
}
pub fn set_reconnect_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender<Uuid>) { pub fn set_reconnect_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender<Uuid>) {
self.reconnect_tx = Some(tx); self.reconnect_tx = Some(tx);
} }

View File

@ -133,14 +133,22 @@ impl EventManager {
value_changed, value_changed,
}; };
// 只克隆一次 monitor减少内存分配 // 克隆 monitor用于并行执行
let monitor_clone = monitor.clone(); let monitor_for_ws = monitor.clone();
if let Err(e) = connection_manager.update_point_monitor_data(monitor_clone).await { let monitor_for_db = monitor.clone();
// 并行执行 update_point_monitor_data 和 send_to_public不等待完成
tokio::spawn(async move {
// 更新监控数据
if let Err(e) = connection_manager.update_point_monitor_data(monitor_for_db).await {
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e); tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
} }
});
tokio::spawn(async move {
// 发送WebSocket消息
if let Some(ws_manager) = &ws_manager_clone { if let Some(ws_manager) = &ws_manager_clone {
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws);
if let Err(e) = ws_manager.send_to_public(ws_message).await { if let Err(e) = ws_manager.send_to_public(ws_message).await {
tracing::error!("Failed to send WebSocket message to public room: {}", e); tracing::error!("Failed to send WebSocket message to public room: {}", e);
} }
@ -150,6 +158,7 @@ impl EventManager {
// tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e); // tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
// } // }
} }
});
} else { } else {
tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle); tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle);
} }

View File

@ -46,12 +46,11 @@ async fn main() {
Some(ws_manager.clone()), Some(ws_manager.clone()),
)); ));
connection_manager.set_event_manager(event_manager.clone()); connection_manager.set_event_manager(event_manager.clone());
connection_manager.set_pool(Arc::new(pool.clone())); connection_manager.set_pool_and_start_reconnect_task(Arc::new(pool.clone()));
let connection_manager = Arc::new(connection_manager); let connection_manager = Arc::new(connection_manager);
// 启动重连任务
connection_manager.start_reconnect_task();
// Connect to all enabled sources concurrently // Connect to all enabled sources concurrently
let sources = service::get_all_enabled_sources(&pool) let sources = service::get_all_enabled_sources(&pool)