fix(opcua): stabilize reconnect loop and coalesce telemetry events
Always clear reconnect-in-progress markers after reconnect attempts so heartbeat-triggered retries are not blocked. Reduce high-frequency event overhead by coalescing consecutive point updates in the event worker and processing only the latest value per source/client handle. Made-with: Cursor
This commit is contained in:
parent
5406568969
commit
5fa63ad6dd
|
|
@ -702,11 +702,9 @@ impl ConnectionManager {
|
||||||
// 再重新连接
|
// 再重新连接
|
||||||
let result = self.connect_from_source(pool, source_id).await;
|
let result = self.connect_from_source(pool, source_id).await;
|
||||||
|
|
||||||
// 清除重连标记
|
// 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连
|
||||||
if result.is_ok() {
|
let mut reconnecting = self.reconnecting.write().await;
|
||||||
let mut reconnecting = self.reconnecting.write().await;
|
reconnecting.remove(&source_id);
|
||||||
reconnecting.remove(&source_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
|
||||||
290
src/event.rs
290
src/event.rs
|
|
@ -1,5 +1,6 @@
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
const EVENT_CHANNEL_CAPACITY: usize = 4096;
|
const EVENT_CHANNEL_CAPACITY: usize = 4096;
|
||||||
|
|
||||||
|
|
@ -43,127 +44,47 @@ impl EventManager {
|
||||||
let connection_manager_clone = connection_manager.clone();
|
let connection_manager_clone = connection_manager.clone();
|
||||||
while let Some(event) = receiver.recv().await {
|
while let Some(event) = receiver.recv().await {
|
||||||
match event {
|
match event {
|
||||||
ReloadEvent::SourceCreate { source_id } => {
|
|
||||||
tracing::info!("Processing SourceCreate event for {}", source_id);
|
|
||||||
if let Err(e) = connection_manager_clone.connect_from_source(&pool, source_id).await {
|
|
||||||
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ReloadEvent::SourceUpdate { source_id } => {
|
|
||||||
tracing::info!("Processing SourceUpdate event for {}", source_id);
|
|
||||||
if let Err(e) = connection_manager_clone.reconnect(&pool, source_id).await {
|
|
||||||
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ReloadEvent::SourceDelete { source_id } => {
|
|
||||||
tracing::info!("Processing SourceDelete event for {}", source_id);
|
|
||||||
if let Err(e) = connection_manager_clone.disconnect(source_id).await {
|
|
||||||
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
|
|
||||||
let requested_count = point_ids.len();
|
|
||||||
match connection_manager_clone
|
|
||||||
.subscribe_points_from_source(source_id, Some(point_ids), &pool)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(stats) => {
|
|
||||||
let subscribed = *stats.get("subscribed").unwrap_or(&0);
|
|
||||||
let polled = *stats.get("polled").unwrap_or(&0);
|
|
||||||
let total = *stats.get("total").unwrap_or(&0);
|
|
||||||
tracing::info!(
|
|
||||||
"PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}",
|
|
||||||
source_id,
|
|
||||||
requested_count,
|
|
||||||
subscribed,
|
|
||||||
polled,
|
|
||||||
total
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("Failed to subscribe to points: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ReloadEvent::PointDeleteBatch { source_id, point_ids } => {
|
|
||||||
tracing::info!(
|
|
||||||
"Processing PointDeleteBatch event for source {} with {} points",
|
|
||||||
source_id,
|
|
||||||
point_ids.len()
|
|
||||||
);
|
|
||||||
if let Err(e) = connection_manager_clone
|
|
||||||
.unsubscribe_points_from_source(source_id, point_ids)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
tracing::error!("Failed to unsubscribe points: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ReloadEvent::PointNewValue(payload) => {
|
ReloadEvent::PointNewValue(payload) => {
|
||||||
let source_id = payload.source_id;
|
let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> =
|
||||||
let client_handle = payload.client_handle;
|
HashMap::new();
|
||||||
let point_id = if let Some(point_id) = payload.point_id {
|
latest_by_key.insert((payload.source_id, payload.client_handle), payload);
|
||||||
Some(point_id)
|
|
||||||
} else {
|
|
||||||
let status = connection_manager_clone.get_status_read_guard().await;
|
|
||||||
status
|
|
||||||
.get(&source_id)
|
|
||||||
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
|
||||||
};
|
|
||||||
if let Some(point_id) = point_id {
|
|
||||||
// 从缓存中读取旧值
|
|
||||||
let (old_value, old_timestamp, value_changed) = {
|
|
||||||
let monitor_data = connection_manager_clone.get_point_monitor_data_read_guard().await;
|
|
||||||
let old_monitor_info = monitor_data.get(&point_id);
|
|
||||||
|
|
||||||
if let Some(old_info) = old_monitor_info {
|
loop {
|
||||||
let changed = old_info.value != payload.value ||
|
match receiver.try_recv() {
|
||||||
old_info.timestamp != payload.timestamp;
|
Ok(ReloadEvent::PointNewValue(next_payload)) => {
|
||||||
(old_info.value.clone(), old_info.timestamp, changed)
|
latest_by_key.insert(
|
||||||
} else {
|
(next_payload.source_id, next_payload.client_handle),
|
||||||
(None, None, false)
|
next_payload,
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let monitor = crate::telemetry::PointMonitorInfo {
|
|
||||||
protocol: payload.protocol.clone(),
|
|
||||||
source_id,
|
|
||||||
point_id,
|
|
||||||
client_handle,
|
|
||||||
scan_mode: payload.scan_mode.clone(),
|
|
||||||
timestamp: payload.timestamp,
|
|
||||||
quality: payload.quality.clone(),
|
|
||||||
value: payload.value.clone(),
|
|
||||||
value_type: payload.value_type.clone(),
|
|
||||||
value_text: payload.value_text.clone(),
|
|
||||||
old_value,
|
|
||||||
old_timestamp,
|
|
||||||
value_changed,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Process in event worker directly to avoid per-point spawn overhead.
|
|
||||||
if let Err(e) = connection_manager_clone
|
|
||||||
.update_point_monitor_data(monitor.clone())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
tracing::error!(
|
|
||||||
"Failed to update point monitor data for point {}: {}",
|
|
||||||
point_id,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ws_manager) = &ws_manager_clone {
|
|
||||||
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
|
|
||||||
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
|
||||||
tracing::error!(
|
|
||||||
"Failed to send WebSocket message to public room: {}",
|
|
||||||
e
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Ok(other_event) => {
|
||||||
|
handle_control_event(
|
||||||
|
other_event,
|
||||||
|
&pool,
|
||||||
|
&connection_manager_clone,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for point_payload in latest_by_key.into_values() {
|
||||||
|
process_point_new_value(
|
||||||
|
point_payload,
|
||||||
|
&connection_manager_clone,
|
||||||
|
ws_manager_clone.as_ref(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
other_event => {
|
||||||
|
handle_control_event(other_event, &pool, &connection_manager_clone).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -193,3 +114,142 @@ impl EventManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_control_event(
|
||||||
|
event: ReloadEvent,
|
||||||
|
pool: &sqlx::PgPool,
|
||||||
|
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
|
||||||
|
) {
|
||||||
|
match event {
|
||||||
|
ReloadEvent::SourceCreate { source_id } => {
|
||||||
|
tracing::info!("Processing SourceCreate event for {}", source_id);
|
||||||
|
if let Err(e) = connection_manager.connect_from_source(pool, source_id).await {
|
||||||
|
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReloadEvent::SourceUpdate { source_id } => {
|
||||||
|
tracing::info!("Processing SourceUpdate event for {}", source_id);
|
||||||
|
if let Err(e) = connection_manager.reconnect(pool, source_id).await {
|
||||||
|
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReloadEvent::SourceDelete { source_id } => {
|
||||||
|
tracing::info!("Processing SourceDelete event for {}", source_id);
|
||||||
|
if let Err(e) = connection_manager.disconnect(source_id).await {
|
||||||
|
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
|
||||||
|
let requested_count = point_ids.len();
|
||||||
|
match connection_manager
|
||||||
|
.subscribe_points_from_source(source_id, Some(point_ids), pool)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(stats) => {
|
||||||
|
let subscribed = *stats.get("subscribed").unwrap_or(&0);
|
||||||
|
let polled = *stats.get("polled").unwrap_or(&0);
|
||||||
|
let total = *stats.get("total").unwrap_or(&0);
|
||||||
|
tracing::info!(
|
||||||
|
"PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}",
|
||||||
|
source_id,
|
||||||
|
requested_count,
|
||||||
|
subscribed,
|
||||||
|
polled,
|
||||||
|
total
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to subscribe to points: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReloadEvent::PointDeleteBatch { source_id, point_ids } => {
|
||||||
|
tracing::info!(
|
||||||
|
"Processing PointDeleteBatch event for source {} with {} points",
|
||||||
|
source_id,
|
||||||
|
point_ids.len()
|
||||||
|
);
|
||||||
|
if let Err(e) = connection_manager
|
||||||
|
.unsubscribe_points_from_source(source_id, point_ids)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!("Failed to unsubscribe points: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReloadEvent::PointNewValue(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_point_new_value(
|
||||||
|
payload: crate::telemetry::PointNewValue,
|
||||||
|
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
|
||||||
|
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
|
||||||
|
) {
|
||||||
|
let source_id = payload.source_id;
|
||||||
|
let client_handle = payload.client_handle;
|
||||||
|
let point_id = if let Some(point_id) = payload.point_id {
|
||||||
|
Some(point_id)
|
||||||
|
} else {
|
||||||
|
let status = connection_manager.get_status_read_guard().await;
|
||||||
|
status
|
||||||
|
.get(&source_id)
|
||||||
|
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
||||||
|
};
|
||||||
|
if let Some(point_id) = point_id {
|
||||||
|
// 从缓存中读取旧值
|
||||||
|
let (old_value, old_timestamp, value_changed) = {
|
||||||
|
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
|
||||||
|
let old_monitor_info = monitor_data.get(&point_id);
|
||||||
|
|
||||||
|
if let Some(old_info) = old_monitor_info {
|
||||||
|
let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp;
|
||||||
|
(old_info.value.clone(), old_info.timestamp, changed)
|
||||||
|
} else {
|
||||||
|
(None, None, false)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let monitor = crate::telemetry::PointMonitorInfo {
|
||||||
|
protocol: payload.protocol,
|
||||||
|
source_id,
|
||||||
|
point_id,
|
||||||
|
client_handle,
|
||||||
|
scan_mode: payload.scan_mode,
|
||||||
|
timestamp: payload.timestamp,
|
||||||
|
quality: payload.quality,
|
||||||
|
value: payload.value,
|
||||||
|
value_type: payload.value_type,
|
||||||
|
value_text: payload.value_text,
|
||||||
|
old_value,
|
||||||
|
old_timestamp,
|
||||||
|
value_changed,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = connection_manager
|
||||||
|
.update_point_monitor_data(monitor.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to update point monitor data for point {}: {}",
|
||||||
|
point_id,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(ws_manager) = ws_manager {
|
||||||
|
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
|
||||||
|
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to send WebSocket message to public room: {}",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::warn!(
|
||||||
|
"Point not found for source {} client_handle {}",
|
||||||
|
source_id,
|
||||||
|
client_handle
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue