重构ConnectionManager:简化get_session方法,移除pool依赖,重命名new_with_pool为new

This commit is contained in:
caoqianming 2026-03-05 11:13:10 +08:00
parent 5685c33687
commit a2217a991c
2 changed files with 7 additions and 24 deletions

View File

@ -105,7 +105,6 @@ pub struct ConnectionManager {
point_monitor_data: Arc<RwLock<HashMap<Uuid, PointMonitorInfo>>>, point_monitor_data: Arc<RwLock<HashMap<Uuid, PointMonitorInfo>>>,
point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>, point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>,
point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>, point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>,
pool: Option<sqlx::PgPool>,
event_manager: Option<std::sync::Arc<crate::event::EventManager>>, event_manager: Option<std::sync::Arc<crate::event::EventManager>>,
} }
@ -155,10 +154,9 @@ impl ConnectionManager {
} }
} }
pub fn new_with_pool(pool: sqlx::PgPool) -> Self { pub fn new() -> Self {
Self { Self {
status: Arc::new(RwLock::new(HashMap::new())), status: Arc::new(RwLock::new(HashMap::new())),
pool: Some(pool),
point_monitor_data: Arc::new(RwLock::new(HashMap::new())), point_monitor_data: Arc::new(RwLock::new(HashMap::new())),
point_history_data: Arc::new(RwLock::new(HashMap::new())), point_history_data: Arc::new(RwLock::new(HashMap::new())),
point_write_target_cache: Arc::new(RwLock::new(HashMap::new())), point_write_target_cache: Arc::new(RwLock::new(HashMap::new())),
@ -562,29 +560,14 @@ impl ConnectionManager {
} }
pub async fn get_session(&self, source_id: Uuid) -> Option<Arc<Session>> { pub async fn get_session(&self, source_id: Uuid) -> Option<Arc<Session>> {
// comment fixed
{
let status = self.status.read().await; let status = self.status.read().await;
if let Some(conn_status) = status.get(&source_id) { if let Some(conn_status) = status.get(&source_id) {
if conn_status.is_connected { if conn_status.is_connected {
return conn_status.session.clone(); return conn_status.session.clone();
} }
} }
}
// comment fixed
if let Some(pool) = &self.pool {
if let Ok(()) = self.connect_from_source(pool, source_id).await {
// comment fixed
let status = self.status.read().await;
status.get(&source_id).and_then(|s| s.session.clone())
} else {
None None
} }
} else {
None
}
}
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> { pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> {
let status = self.status.read().await; let status = self.status.read().await;

View File

@ -38,7 +38,7 @@ async fn main() {
let config = AppConfig::from_env().expect("Failed to load configuration"); let config = AppConfig::from_env().expect("Failed to load configuration");
let pool = init_database(&config.database_url).await.expect("Failed to initialize database"); let pool = init_database(&config.database_url).await.expect("Failed to initialize database");
let mut connection_manager = ConnectionManager::new_with_pool(pool.clone()); let mut connection_manager = ConnectionManager::new();
let ws_manager = Arc::new(websocket::WebSocketManager::new()); let ws_manager = Arc::new(websocket::WebSocketManager::new());
let event_manager = Arc::new(EventManager::new( let event_manager = Arc::new(EventManager::new(
pool.clone(), pool.clone(),