Compare commits
2 Commits
0f37c9435e
...
aaf887a6fc
| Author | SHA1 | Date |
|---|---|---|
|
|
aaf887a6fc | |
|
|
ee3ee273b2 |
|
|
@ -180,6 +180,13 @@ impl ConnectionManager {
|
|||
self.pool = Some(pool);
|
||||
}
|
||||
|
||||
pub fn set_pool_and_start_reconnect_task(&mut self, pool: Arc<sqlx::PgPool>) {
|
||||
self.pool = Some(pool.clone());
|
||||
// 将 self 转换为不可变引用以调用 start_reconnect_task
|
||||
let manager = self.clone();
|
||||
manager.start_reconnect_task();
|
||||
}
|
||||
|
||||
pub fn set_reconnect_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender<Uuid>) {
|
||||
self.reconnect_tx = Some(tx);
|
||||
}
|
||||
|
|
|
|||
37
src/event.rs
37
src/event.rs
|
|
@ -133,23 +133,32 @@ impl EventManager {
|
|||
value_changed,
|
||||
};
|
||||
|
||||
// 只克隆一次 monitor,减少内存分配
|
||||
let monitor_clone = monitor.clone();
|
||||
if let Err(e) = connection_manager.update_point_monitor_data(monitor_clone).await {
|
||||
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
|
||||
}
|
||||
// 克隆 monitor,用于并行执行
|
||||
let monitor_for_ws = monitor.clone();
|
||||
let monitor_for_db = monitor.clone();
|
||||
|
||||
if let Some(ws_manager) = &ws_manager_clone {
|
||||
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
|
||||
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
||||
tracing::error!("Failed to send WebSocket message to public room: {}", e);
|
||||
// 并行执行 update_point_monitor_data 和 send_to_public,不等待完成
|
||||
tokio::spawn(async move {
|
||||
// 更新监控数据
|
||||
if let Err(e) = connection_manager.update_point_monitor_data(monitor_for_db).await {
|
||||
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
|
||||
}
|
||||
});
|
||||
|
||||
// 暂时注释掉 send_to_client,因为现在信息只需发送到 public
|
||||
// if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
|
||||
// tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
|
||||
// }
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
// 发送WebSocket消息
|
||||
if let Some(ws_manager) = &ws_manager_clone {
|
||||
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws);
|
||||
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
||||
tracing::error!("Failed to send WebSocket message to public room: {}", e);
|
||||
}
|
||||
|
||||
// 暂时注释掉 send_to_client,因为现在信息只需发送到 public
|
||||
// if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
|
||||
// tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
|
||||
// }
|
||||
}
|
||||
});
|
||||
} else {
|
||||
tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,12 +46,11 @@ async fn main() {
|
|||
Some(ws_manager.clone()),
|
||||
));
|
||||
connection_manager.set_event_manager(event_manager.clone());
|
||||
connection_manager.set_pool(Arc::new(pool.clone()));
|
||||
connection_manager.set_pool_and_start_reconnect_task(Arc::new(pool.clone()));
|
||||
|
||||
let connection_manager = Arc::new(connection_manager);
|
||||
|
||||
// 启动重连任务
|
||||
connection_manager.start_reconnect_task();
|
||||
|
||||
|
||||
// Connect to all enabled sources concurrently
|
||||
let sources = service::get_all_enabled_sources(&pool)
|
||||
|
|
|
|||
Loading…
Reference in New Issue