修复潜在的死锁问题:将读锁的作用域限制在块内,避免在持有读锁时尝试获取写锁

This commit is contained in:
caoqianming 2026-03-04 12:20:13 +08:00
parent 6f62d753a5
commit 173814416f
1 changed files with 11 additions and 11 deletions

View File

@ -129,18 +129,19 @@ impl EventManager {
.get(&source_id) .get(&source_id)
.and_then(|s| s.client_handle_map.get(&client_handle).copied()) .and_then(|s| s.client_handle_map.get(&client_handle).copied())
}; };
if let Some(point_id) = point_id { if let Some(point_id) = point_id {
// 从缓存中读取旧值 // 从缓存中读取旧值
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await; let (old_value, old_timestamp, value_changed) = {
let old_monitor_info = monitor_data.get(&point_id); let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
let old_monitor_info = monitor_data.get(&point_id);
let (old_value, old_timestamp, value_changed) = if let Some(old_info) = old_monitor_info { if let Some(old_info) = old_monitor_info {
let changed = old_info.value != payload.value || let changed = old_info.value != payload.value ||
old_info.timestamp != payload.timestamp; old_info.timestamp != payload.timestamp;
(old_info.value.clone(), old_info.timestamp, changed) (old_info.value.clone(), old_info.timestamp, changed)
} else { } else {
(None, None, false) (None, None, false)
}
}; };
let monitor = crate::telemetry::PointMonitorInfo { let monitor = crate::telemetry::PointMonitorInfo {
@ -167,7 +168,6 @@ impl EventManager {
if let Some(ws_manager) = &ws_manager_clone { if let Some(ws_manager) = &ws_manager_clone {
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
if let Err(e) = ws_manager.send_to_public(ws_message.clone()).await { if let Err(e) = ws_manager.send_to_public(ws_message.clone()).await {
tracing::error!("Failed to send WebSocket message to public room: {}", e); tracing::error!("Failed to send WebSocket message to public room: {}", e);
} }
@ -175,7 +175,7 @@ impl EventManager {
if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await { if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e); tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
} }
} }
} else { } else {
tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle); tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle);
} }