Compare commits
11 Commits
aaf887a6fc
...
0893c9783c
| Author | SHA1 | Date |
|---|---|---|
|
|
0893c9783c | |
|
|
5dc1081c90 | |
|
|
afab910780 | |
|
|
d4d5749ccc | |
|
|
76b6e17927 | |
|
|
b197607d5f | |
|
|
a2208e8958 | |
|
|
487d3cdf26 | |
|
|
4d88bcbce3 | |
|
|
afac9f1eb9 | |
|
|
8e4abd0af9 |
|
|
@ -95,9 +95,11 @@ pub struct ConnectionStatus {
|
||||||
pub next_client_handle: u32,
|
pub next_client_handle: u32,
|
||||||
pub client_handle_map: HashMap<u32, Uuid>, // client_handle -> point_id
|
pub client_handle_map: HashMap<u32, Uuid>, // client_handle -> point_id
|
||||||
pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id
|
pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id
|
||||||
pub poll_points: Vec<PollPointInfo>, // 正在轮询的点集合
|
pub poll_points: Arc<Vec<PollPointInfo>>, // 正在轮询的点集合
|
||||||
poll_handle: Option<JoinHandle<()>>, // 统一的轮询任务句柄
|
poll_handle: Option<JoinHandle<()>>, // 统一的轮询任务句柄
|
||||||
heartbeat_handle: Option<JoinHandle<()>>, // 心跳任务句柄
|
heartbeat_handle: Option<JoinHandle<()>>, // 心跳任务句柄
|
||||||
|
event_loop_handle: Option<JoinHandle<opcua::types::StatusCode>>, // event_loop 任务句柄
|
||||||
|
event_loop_monitor_handle: Option<JoinHandle<()>>, // event_loop 监控任务句柄
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
@ -110,6 +112,7 @@ pub struct ConnectionManager {
|
||||||
pool: Option<Arc<sqlx::PgPool>>,
|
pool: Option<Arc<sqlx::PgPool>>,
|
||||||
reconnect_tx: Option<tokio::sync::mpsc::UnboundedSender<Uuid>>,
|
reconnect_tx: Option<tokio::sync::mpsc::UnboundedSender<Uuid>>,
|
||||||
reconnect_rx: Arc<std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>>>>,
|
reconnect_rx: Arc<std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>>>>,
|
||||||
|
reconnecting: Arc<RwLock<HashSet<Uuid>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -169,6 +172,7 @@ impl ConnectionManager {
|
||||||
pool: None,
|
pool: None,
|
||||||
reconnect_tx: Some(reconnect_tx),
|
reconnect_tx: Some(reconnect_tx),
|
||||||
reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))),
|
reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))),
|
||||||
|
reconnecting: Arc::new(RwLock::new(HashSet::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -275,10 +279,9 @@ impl ConnectionManager {
|
||||||
ticker.tick().await;
|
ticker.tick().await;
|
||||||
|
|
||||||
// 检查session是否有效
|
// 检查session是否有效
|
||||||
let session_valid = {
|
let session = manager.get_session(source_id).await;
|
||||||
let status = manager.status.read().await;
|
|
||||||
if let Some(conn_status) = status.get(&source_id) {
|
let (session_valid, subscription_valid) = if let Some(session) = session {
|
||||||
if let Some(session) = conn_status.session.as_ref() {
|
|
||||||
// 尝试读取当前时间来验证连接
|
// 尝试读取当前时间来验证连接
|
||||||
let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点
|
let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点
|
||||||
let read_request = ReadValueId {
|
let read_request = ReadValueId {
|
||||||
|
|
@ -288,21 +291,60 @@ impl ConnectionManager {
|
||||||
data_encoding: Default::default(),
|
data_encoding: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let session_valid = match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await {
|
||||||
|
Ok(_) => true,
|
||||||
|
Err(_) => false,
|
||||||
|
};
|
||||||
|
|
||||||
|
// 检查订阅状态 - 仅当有 subscription_id 时才检查
|
||||||
|
let subscription_valid = {
|
||||||
|
let status = manager.status.read().await;
|
||||||
|
if let Some(conn_status) = status.get(&source_id) {
|
||||||
|
if let Some(_subscription_id) = conn_status.subscription_id {
|
||||||
|
// 尝试读取订阅的属性来验证订阅是否真正有效
|
||||||
|
let node_id = NodeId::new(0, 2253); // ServerServiceLevel节点
|
||||||
|
let read_request = ReadValueId {
|
||||||
|
node_id,
|
||||||
|
attribute_id: AttributeId::Value as u32,
|
||||||
|
index_range: NumericRange::None,
|
||||||
|
data_encoding: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await {
|
match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await {
|
||||||
Ok(_) => true,
|
Ok(_) => true,
|
||||||
Err(_) => false,
|
Err(_) => false,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
false
|
// 没有 subscription_id 时,认为订阅状态有效(不需要检查)
|
||||||
|
true
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// 没有连接状态时,认为订阅状态无效
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if !session_valid {
|
(session_valid, subscription_valid)
|
||||||
|
} else {
|
||||||
|
(false, false)
|
||||||
|
};
|
||||||
|
|
||||||
|
if !session_valid || !subscription_valid {
|
||||||
|
// 检查是否已经在重连中
|
||||||
|
let mut reconnecting = manager.reconnecting.write().await;
|
||||||
|
if !reconnecting.contains(&source_id) {
|
||||||
|
reconnecting.insert(source_id);
|
||||||
|
drop(reconnecting);
|
||||||
|
|
||||||
|
let reason = if !session_valid {
|
||||||
|
"invalid session"
|
||||||
|
} else {
|
||||||
|
"invalid subscription"
|
||||||
|
};
|
||||||
|
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
"Heartbeat detected invalid session for source {}, triggering reconnection",
|
"Heartbeat detected {} for source {}, triggering reconnection",
|
||||||
|
reason,
|
||||||
source_id
|
source_id
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
@ -314,6 +356,9 @@ impl ConnectionManager {
|
||||||
} else {
|
} else {
|
||||||
tracing::warn!("Reconnect channel not available for source {}", source_id);
|
tracing::warn!("Reconnect channel not available for source {}", source_id);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
drop(reconnecting);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -364,7 +409,7 @@ impl ConnectionManager {
|
||||||
let poll_points = {
|
let poll_points = {
|
||||||
let status = status_ref.read().await;
|
let status = status_ref.read().await;
|
||||||
status.get(&source_id)
|
status.get(&source_id)
|
||||||
.map(|conn_status| conn_status.poll_points.clone())
|
.map(|conn_status| Arc::clone(&conn_status.poll_points))
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -392,12 +437,7 @@ impl ConnectionManager {
|
||||||
// 执行批量读取
|
// 执行批量读取
|
||||||
match session.read(&read_requests, TimestampsToReturn::Both, 0f64).await {
|
match session.read(&read_requests, TimestampsToReturn::Both, 0f64).await {
|
||||||
Ok(results) => {
|
Ok(results) => {
|
||||||
for (i, result) in results.iter().enumerate() {
|
for (poll_point, result) in poll_points.iter().zip(results.iter()) {
|
||||||
if i >= poll_points.len() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let poll_point = &poll_points[i];
|
|
||||||
let dv = result;
|
let dv = result;
|
||||||
let val = dv.value.clone();
|
let val = dv.value.clone();
|
||||||
let unified_value = val.as_ref().map(crate::telemetry::opcua_variant_to_data);
|
let unified_value = val.as_ref().map(crate::telemetry::opcua_variant_to_data);
|
||||||
|
|
@ -457,7 +497,7 @@ impl ConnectionManager {
|
||||||
for point in points {
|
for point in points {
|
||||||
// 检查点是否已经在轮询列表中
|
// 检查点是否已经在轮询列表中
|
||||||
if !conn_status.poll_points.iter().any(|p| p.point_id == point.point_id) {
|
if !conn_status.poll_points.iter().any(|p| p.point_id == point.point_id) {
|
||||||
conn_status.poll_points.push(PollPointInfo {
|
Arc::make_mut(&mut conn_status.poll_points).push(PollPointInfo {
|
||||||
point_id: point.point_id,
|
point_id: point.point_id,
|
||||||
external_id: point.external_id.clone(),
|
external_id: point.external_id.clone(),
|
||||||
});
|
});
|
||||||
|
|
@ -576,8 +616,40 @@ impl ConnectionManager {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = event_loop.spawn();
|
let event_loop_handle = event_loop.spawn();
|
||||||
session.wait_for_connection().await;
|
|
||||||
|
// 添加监控任务来捕获 event_loop 结束事件
|
||||||
|
let manager = self.clone();
|
||||||
|
let source_id_copy = source_id;
|
||||||
|
let event_loop_monitor_handle = tokio::spawn(async move {
|
||||||
|
match event_loop_handle.await {
|
||||||
|
Ok(status) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"OPCUA event loop ended for source {}: {:?}",
|
||||||
|
source_id_copy,
|
||||||
|
status
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
"OPCUA event loop panic for source {}: {}",
|
||||||
|
source_id_copy,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 统一触发重连
|
||||||
|
if let Some(tx) = manager.reconnect_tx.as_ref() {
|
||||||
|
let _ = tx.send(source_id_copy);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if !session.wait_for_connection().await {
|
||||||
|
let error = "Session connection failed".to_string();
|
||||||
|
self.fail_connect(source_id, &error).await;
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
|
||||||
let mut status = self.status.write().await;
|
let mut status = self.status.write().await;
|
||||||
status.insert(
|
status.insert(
|
||||||
|
|
@ -591,9 +663,11 @@ impl ConnectionManager {
|
||||||
next_client_handle: 1000,
|
next_client_handle: 1000,
|
||||||
client_handle_map: HashMap::new(),
|
client_handle_map: HashMap::new(),
|
||||||
monitored_item_map: HashMap::new(),
|
monitored_item_map: HashMap::new(),
|
||||||
poll_points: Vec::new(),
|
poll_points: Arc::new(Vec::new()),
|
||||||
poll_handle: None,
|
poll_handle: None,
|
||||||
heartbeat_handle: None,
|
heartbeat_handle: None,
|
||||||
|
event_loop_handle: None, // event_loop_handle 已被移动到监控任务中
|
||||||
|
event_loop_monitor_handle: Some(event_loop_monitor_handle),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前
|
drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前
|
||||||
|
|
@ -621,9 +695,11 @@ impl ConnectionManager {
|
||||||
client_handle_map: HashMap::new(),
|
client_handle_map: HashMap::new(),
|
||||||
monitored_item_map: HashMap::new(),
|
monitored_item_map: HashMap::new(),
|
||||||
next_client_handle: 1000,
|
next_client_handle: 1000,
|
||||||
poll_points: Vec::new(),
|
poll_points: Arc::new(Vec::new()),
|
||||||
poll_handle: None,
|
poll_handle: None,
|
||||||
heartbeat_handle: None,
|
heartbeat_handle: None,
|
||||||
|
event_loop_handle: None,
|
||||||
|
event_loop_monitor_handle: None,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -636,7 +712,15 @@ impl ConnectionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 再重新连接
|
// 再重新连接
|
||||||
self.connect_from_source(pool, source_id).await
|
let result = self.connect_from_source(pool, source_id).await;
|
||||||
|
|
||||||
|
// 清除重连标记
|
||||||
|
if result.is_ok() {
|
||||||
|
let mut reconnecting = self.reconnecting.write().await;
|
||||||
|
reconnecting.remove(&source_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> {
|
pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> {
|
||||||
|
|
@ -644,7 +728,7 @@ impl ConnectionManager {
|
||||||
{
|
{
|
||||||
let mut status = self.status.write().await;
|
let mut status = self.status.write().await;
|
||||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||||
conn_status.poll_points.clear();
|
Arc::make_mut(&mut conn_status.poll_points).clear();
|
||||||
if let Some(handle) = conn_status.poll_handle.take() {
|
if let Some(handle) = conn_status.poll_handle.take() {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
}
|
}
|
||||||
|
|
@ -652,6 +736,14 @@ impl ConnectionManager {
|
||||||
if let Some(handle) = conn_status.heartbeat_handle.take() {
|
if let Some(handle) = conn_status.heartbeat_handle.take() {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
}
|
}
|
||||||
|
// 停止 event_loop 任务
|
||||||
|
if let Some(handle) = conn_status.event_loop_handle.take() {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
|
// 停止 event_loop 监控任务
|
||||||
|
if let Some(handle) = conn_status.event_loop_monitor_handle.take() {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -676,7 +768,7 @@ impl ConnectionManager {
|
||||||
{
|
{
|
||||||
let mut status = self.status.write().await;
|
let mut status = self.status.write().await;
|
||||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||||
conn_status.poll_points.clear();
|
Arc::make_mut(&mut conn_status.poll_points).clear();
|
||||||
if let Some(handle) = conn_status.poll_handle.take() {
|
if let Some(handle) = conn_status.poll_handle.take() {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
}
|
}
|
||||||
|
|
@ -684,6 +776,14 @@ impl ConnectionManager {
|
||||||
if let Some(handle) = conn_status.heartbeat_handle.take() {
|
if let Some(handle) = conn_status.heartbeat_handle.take() {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
}
|
}
|
||||||
|
// 停止 event_loop 任务
|
||||||
|
if let Some(handle) = conn_status.event_loop_handle.take() {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
|
// 停止 event_loop 监控任务
|
||||||
|
if let Some(handle) = conn_status.event_loop_monitor_handle.take() {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -708,6 +808,17 @@ impl ConnectionManager {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn allocate_client_handle(&self, source_id: Uuid) -> Option<u32> {
|
||||||
|
let mut status = self.status.write().await;
|
||||||
|
if let Some(conn) = status.get_mut(&source_id) {
|
||||||
|
let handle = conn.next_client_handle;
|
||||||
|
conn.next_client_handle += 1;
|
||||||
|
Some(handle)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> {
|
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> {
|
||||||
let status = self.status.read().await;
|
let status = self.status.read().await;
|
||||||
status.get(&source_id).map(ConnectionStatusView::from)
|
status.get(&source_id).map(ConnectionStatusView::from)
|
||||||
|
|
@ -1060,12 +1171,6 @@ impl ConnectionManager {
|
||||||
}
|
}
|
||||||
let subscription_id = subscription_id.unwrap_or_default();
|
let subscription_id = subscription_id.unwrap_or_default();
|
||||||
|
|
||||||
let mut client_handle_seed: u32 = self
|
|
||||||
.status
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.get(&source_id)
|
|
||||||
.map_or(1000, |s| s.next_client_handle);
|
|
||||||
let mut items_to_create: Vec<MonitoredItemCreateRequest> = Vec::new();
|
let mut items_to_create: Vec<MonitoredItemCreateRequest> = Vec::new();
|
||||||
let mut item_points: Vec<PointSubscriptionInfo> = Vec::new();
|
let mut item_points: Vec<PointSubscriptionInfo> = Vec::new();
|
||||||
|
|
||||||
|
|
@ -1073,8 +1178,8 @@ impl ConnectionManager {
|
||||||
let node_id = NodeId::from_str(&p.external_id)
|
let node_id = NodeId::from_str(&p.external_id)
|
||||||
.map_err(|e| format!("Invalid node id {}: {}", p.external_id, e))?;
|
.map_err(|e| format!("Invalid node id {}: {}", p.external_id, e))?;
|
||||||
|
|
||||||
let client_handle = client_handle_seed;
|
let client_handle = self.allocate_client_handle(source_id).await
|
||||||
client_handle_seed += 1;
|
.ok_or_else(|| format!("Failed to allocate client handle for source {}", source_id))?;
|
||||||
|
|
||||||
if let Some(s) = self.status.write().await.get_mut(&source_id) {
|
if let Some(s) = self.status.write().await.get_mut(&source_id) {
|
||||||
s.client_handle_map.insert(client_handle, p.point_id);
|
s.client_handle_map.insert(client_handle, p.point_id);
|
||||||
|
|
@ -1118,7 +1223,6 @@ impl ConnectionManager {
|
||||||
conn_status
|
conn_status
|
||||||
.client_handle_map
|
.client_handle_map
|
||||||
.retain(|_, point_id| !item_point_ids.contains(point_id));
|
.retain(|_, point_id| !item_point_ids.contains(point_id));
|
||||||
conn_status.next_client_handle = client_handle_seed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let polled_count = self
|
let polled_count = self
|
||||||
|
|
@ -1145,7 +1249,7 @@ impl ConnectionManager {
|
||||||
.monitored_item_map
|
.monitored_item_map
|
||||||
.insert(point.point_id, monitored_item_result.result.monitored_item_id);
|
.insert(point.point_id, monitored_item_result.result.monitored_item_id);
|
||||||
// 从轮询列表中移除该点
|
// 从轮询列表中移除该点
|
||||||
conn_status.poll_points.retain(|p| p.point_id != point.point_id);
|
Arc::make_mut(&mut conn_status.poll_points).retain(|p| p.point_id != point.point_id);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
|
|
@ -1174,7 +1278,6 @@ impl ConnectionManager {
|
||||||
conn_status
|
conn_status
|
||||||
.client_handle_map
|
.client_handle_map
|
||||||
.retain(|_, point_id| !failed_point_ids.contains(point_id));
|
.retain(|_, point_id| !failed_point_ids.contains(point_id));
|
||||||
conn_status.next_client_handle = client_handle_seed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let polled_count = self
|
let polled_count = self
|
||||||
|
|
@ -1198,12 +1301,14 @@ impl ConnectionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
let target_ids: std::collections::HashSet<Uuid> = point_ids.into_iter().collect();
|
let target_ids: std::collections::HashSet<Uuid> = point_ids.into_iter().collect();
|
||||||
let (session, subscription_id, point_item_pairs) = {
|
let session = self.get_session(source_id).await;
|
||||||
let status = self.status.read().await;
|
let Some(session) = session else {
|
||||||
let Some(conn_status) = status.get(&source_id) else {
|
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
};
|
};
|
||||||
let Some(session) = conn_status.session.clone() else {
|
|
||||||
|
let (subscription_id, point_item_pairs) = {
|
||||||
|
let status = self.status.read().await;
|
||||||
|
let Some(conn_status) = status.get(&source_id) else {
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
};
|
};
|
||||||
let Some(subscription_id) = conn_status.subscription_id else {
|
let Some(subscription_id) = conn_status.subscription_id else {
|
||||||
|
|
@ -1217,7 +1322,7 @@ impl ConnectionManager {
|
||||||
.map(|(point_id, monitored_item_id)| (*point_id, *monitored_item_id))
|
.map(|(point_id, monitored_item_id)| (*point_id, *monitored_item_id))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
(session, subscription_id, items)
|
(subscription_id, items)
|
||||||
};
|
};
|
||||||
|
|
||||||
if point_item_pairs.is_empty() {
|
if point_item_pairs.is_empty() {
|
||||||
|
|
@ -1291,7 +1396,7 @@ impl ConnectionManager {
|
||||||
let mut status = self.status.write().await;
|
let mut status = self.status.write().await;
|
||||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||||
let before_count = conn_status.poll_points.len();
|
let before_count = conn_status.poll_points.len();
|
||||||
conn_status.poll_points.retain(|p| !target_ids.contains(&p.point_id));
|
Arc::make_mut(&mut conn_status.poll_points).retain(|p| !target_ids.contains(&p.point_id));
|
||||||
let after_count = conn_status.poll_points.len();
|
let after_count = conn_status.poll_points.len();
|
||||||
before_count - after_count
|
before_count - after_count
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
20
src/event.rs
20
src/event.rs
|
|
@ -37,11 +37,13 @@ impl EventManager {
|
||||||
let ws_manager_clone = ws_manager.clone();
|
let ws_manager_clone = ws_manager.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// 在循环外克隆,避免在循环中移动
|
||||||
|
let connection_manager_clone = connection_manager.clone();
|
||||||
while let Some(event) = receiver.recv().await {
|
while let Some(event) = receiver.recv().await {
|
||||||
match event {
|
match event {
|
||||||
ReloadEvent::SourceCreate { source_id } => {
|
ReloadEvent::SourceCreate { source_id } => {
|
||||||
tracing::info!("Processing SourceCreate event for {}", source_id);
|
tracing::info!("Processing SourceCreate event for {}", source_id);
|
||||||
if let Err(e) = connection_manager.connect_from_source(&pool, source_id).await {
|
if let Err(e) = connection_manager_clone.connect_from_source(&pool, source_id).await {
|
||||||
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -50,13 +52,13 @@ impl EventManager {
|
||||||
}
|
}
|
||||||
ReloadEvent::SourceDelete { source_id } => {
|
ReloadEvent::SourceDelete { source_id } => {
|
||||||
tracing::info!("Processing SourceDelete event for {}", source_id);
|
tracing::info!("Processing SourceDelete event for {}", source_id);
|
||||||
if let Err(e) = connection_manager.disconnect(source_id).await {
|
if let Err(e) = connection_manager_clone.disconnect(source_id).await {
|
||||||
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
|
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
|
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
|
||||||
let requested_count = point_ids.len();
|
let requested_count = point_ids.len();
|
||||||
match connection_manager
|
match connection_manager_clone
|
||||||
.subscribe_points_from_source(source_id, Some(point_ids), &pool)
|
.subscribe_points_from_source(source_id, Some(point_ids), &pool)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|
@ -84,7 +86,7 @@ impl EventManager {
|
||||||
source_id,
|
source_id,
|
||||||
point_ids.len()
|
point_ids.len()
|
||||||
);
|
);
|
||||||
if let Err(e) = connection_manager
|
if let Err(e) = connection_manager_clone
|
||||||
.unsubscribe_points_from_source(source_id, point_ids)
|
.unsubscribe_points_from_source(source_id, point_ids)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|
@ -97,7 +99,7 @@ impl EventManager {
|
||||||
let point_id = if let Some(point_id) = payload.point_id {
|
let point_id = if let Some(point_id) = payload.point_id {
|
||||||
Some(point_id)
|
Some(point_id)
|
||||||
} else {
|
} else {
|
||||||
let status = connection_manager.get_status_read_guard().await;
|
let status = connection_manager_clone.get_status_read_guard().await;
|
||||||
status
|
status
|
||||||
.get(&source_id)
|
.get(&source_id)
|
||||||
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
||||||
|
|
@ -105,7 +107,7 @@ impl EventManager {
|
||||||
if let Some(point_id) = point_id {
|
if let Some(point_id) = point_id {
|
||||||
// 从缓存中读取旧值
|
// 从缓存中读取旧值
|
||||||
let (old_value, old_timestamp, value_changed) = {
|
let (old_value, old_timestamp, value_changed) = {
|
||||||
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
|
let monitor_data = connection_manager_clone.get_point_monitor_data_read_guard().await;
|
||||||
let old_monitor_info = monitor_data.get(&point_id);
|
let old_monitor_info = monitor_data.get(&point_id);
|
||||||
|
|
||||||
if let Some(old_info) = old_monitor_info {
|
if let Some(old_info) = old_monitor_info {
|
||||||
|
|
@ -138,16 +140,18 @@ impl EventManager {
|
||||||
let monitor_for_db = monitor.clone();
|
let monitor_for_db = monitor.clone();
|
||||||
|
|
||||||
// 并行执行 update_point_monitor_data 和 send_to_public,不等待完成
|
// 并行执行 update_point_monitor_data 和 send_to_public,不等待完成
|
||||||
|
let cm_clone = connection_manager_clone.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// 更新监控数据
|
// 更新监控数据
|
||||||
if let Err(e) = connection_manager.update_point_monitor_data(monitor_for_db).await {
|
if let Err(e) = cm_clone.update_point_monitor_data(monitor_for_db).await {
|
||||||
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
|
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let ws_clone = ws_manager_clone.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// 发送WebSocket消息
|
// 发送WebSocket消息
|
||||||
if let Some(ws_manager) = &ws_manager_clone {
|
if let Some(ws_manager) = ws_clone {
|
||||||
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws);
|
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws);
|
||||||
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
||||||
tracing::error!("Failed to send WebSocket message to public room: {}", e);
|
tracing::error!("Failed to send WebSocket message to public room: {}", e);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue