plc_control/src/connection.rs

1497 lines
56 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use chrono::{DateTime, Utc};
use opcua::{
client::{ClientBuilder, IdentityToken, Session},
crypto::SecurityPolicy,
types::{
AttributeId, MessageSecurityMode, MonitoredItemCreateRequest, MonitoringMode,
MonitoringParameters, NodeId, NumericRange, ReadValueId, TimestampsToReturn, Variant,
WriteValue, DataValue as OpcuaDataValue,
UserTokenPolicy,
},
};
use std::{
collections::{HashMap, HashSet, VecDeque},
str::FromStr,
sync::Arc,
time::Duration,
};
use tokio::task::JoinHandle;
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::{
model::{PointSubscriptionInfo, ScanMode},
telemetry::PointMonitorInfo,
};
const DEFAULT_POINT_RING_BUFFER_LEN: usize = 1000;
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct SetPointValueReqItem {
pub point_id: Uuid,
pub value: serde_json::Value,
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct BatchSetPointValueReq {
pub items: Vec<SetPointValueReqItem>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SetPointValueResItem {
pub point_id: Uuid,
pub success: bool,
pub err_msg: Option<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BatchSetPointValueRes {
pub success: bool,
pub err_msg: Option<String>,
pub success_count: usize,
pub failed_count: usize,
pub results: Vec<SetPointValueResItem>,
}
#[derive(Debug, Clone)]
struct PointWriteTarget {
source_id: Uuid,
external_id: String,
}
#[derive(Debug, Clone)]
pub struct PollPointInfo {
pub point_id: Uuid,
pub external_id: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ConnectionStatusView {
pub is_connected: bool,
pub last_error: Option<String>,
pub last_time: DateTime<Utc>,
pub subscription_id: Option<u32>,
pub next_client_handle: u32,
}
impl From<&ConnectionStatus> for ConnectionStatusView {
fn from(status: &ConnectionStatus) -> Self {
Self {
is_connected: status.is_connected,
last_error: status.last_error.clone(),
last_time: status.last_time,
subscription_id: status.subscription_id,
next_client_handle: status.next_client_handle,
}
}
}
pub struct ConnectionStatus {
pub session: Option<Arc<Session>>,
pub is_connected: bool,
pub last_error: Option<String>,
pub last_time: DateTime<Utc>,
pub subscription_id: Option<u32>,
pub next_client_handle: u32,
pub client_handle_map: HashMap<u32, Uuid>, // client_handle -> point_id
pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id
pub poll_points: Arc<Vec<PollPointInfo>>, // 正在轮询的点集合
poll_handle: Option<JoinHandle<()>>, // 统一的轮询任务句柄
heartbeat_handle: Option<JoinHandle<()>>, // 心跳任务句柄
event_loop_handle: Option<JoinHandle<opcua::types::StatusCode>>, // event_loop 任务句柄
event_loop_monitor_handle: Option<JoinHandle<()>>, // event_loop 监控任务句柄
}
#[derive(Clone)]
pub struct ConnectionManager {
status: Arc<RwLock<HashMap<Uuid, ConnectionStatus>>>,
point_monitor_data: Arc<RwLock<HashMap<Uuid, PointMonitorInfo>>>,
point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>,
point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>,
event_manager: Option<std::sync::Arc<crate::event::EventManager>>,
pool: Option<Arc<sqlx::PgPool>>,
reconnect_tx: Option<tokio::sync::mpsc::UnboundedSender<Uuid>>,
reconnect_rx: Arc<std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>>>>,
reconnecting: Arc<RwLock<HashSet<Uuid>>>,
reconnect_attempts: Arc<RwLock<HashMap<Uuid, u32>>>,
}
impl ConnectionManager {
fn subscription_result(subscribed: usize, polled: usize) -> HashMap<String, usize> {
let mut result = HashMap::new();
result.insert("subscribed".to_string(), subscribed);
result.insert("polled".to_string(), polled);
result.insert("total".to_string(), subscribed + polled);
result
}
fn json_value_to_opcua_variant(value: &serde_json::Value) -> Result<Variant, String> {
match value {
serde_json::Value::Null => Ok(Variant::Empty),
serde_json::Value::Bool(v) => Ok(Variant::from(*v)),
serde_json::Value::Number(n) => {
if let Some(v) = n.as_i64() {
Ok(Variant::from(v))
} else if let Some(v) = n.as_u64() {
Ok(Variant::from(v))
} else if let Some(v) = n.as_f64() {
Ok(Variant::from(v))
} else {
Err("Unsupported numeric value".to_string())
}
}
serde_json::Value::String(s) => Ok(Variant::from(s.clone())),
_ => Err("Only null/bool/number/string are supported for point write".to_string()),
}
}
fn write_value_batch_result(
success: bool,
err_msg: Option<String>,
results: Vec<SetPointValueResItem>,
) -> BatchSetPointValueRes {
let success_count = results.iter().filter(|r| r.success).count();
let failed_count = results.len().saturating_sub(success_count);
BatchSetPointValueRes {
success,
err_msg,
success_count,
failed_count,
results,
}
}
pub fn new() -> Self {
let (reconnect_tx, reconnect_rx) = tokio::sync::mpsc::unbounded_channel::<Uuid>();
Self {
status: Arc::new(RwLock::new(HashMap::new())),
point_monitor_data: Arc::new(RwLock::new(HashMap::new())),
point_history_data: Arc::new(RwLock::new(HashMap::new())),
point_write_target_cache: Arc::new(RwLock::new(HashMap::new())),
event_manager: None,
pool: None,
reconnect_tx: Some(reconnect_tx),
reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))),
reconnecting: Arc::new(RwLock::new(HashSet::new())),
reconnect_attempts: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn set_event_manager(&mut self, event_manager: std::sync::Arc<crate::event::EventManager>) {
self.event_manager = Some(event_manager);
}
pub fn set_pool(&mut self, pool: Arc<sqlx::PgPool>) {
self.pool = Some(pool);
}
pub fn set_pool_and_start_reconnect_task(&mut self, pool: Arc<sqlx::PgPool>) {
self.pool = Some(pool.clone());
// 将 self 转换为不可变引用以调用 start_reconnect_task
let manager = self.clone();
manager.start_reconnect_task();
}
pub fn set_reconnect_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender<Uuid>) {
self.reconnect_tx = Some(tx);
}
pub fn get_reconnect_rx(&self) -> Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>> {
self.reconnect_rx.lock().unwrap().take()
}
pub fn start_reconnect_task(&self) {
let manager = self.clone();
let pool = manager.pool.clone();
tokio::spawn(async move {
// 获取重连通道的接收端
let mut reconnect_rx = manager.get_reconnect_rx().expect("Failed to get reconnect receiver");
while let Some(source_id) = reconnect_rx.recv().await {
tracing::info!("Received reconnect request for source {}", source_id);
if let Some(ref pool) = pool {
if let Err(e) = manager.reconnect(pool, source_id).await {
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
manager.schedule_reconnect_retry(source_id).await;
}
} else {
tracing::warn!("Pool not available for reconnection of source {}", source_id);
}
}
});
}
async fn schedule_reconnect_retry(&self, source_id: Uuid) {
let attempt = {
let mut attempts = self.reconnect_attempts.write().await;
let entry = attempts.entry(source_id).or_insert(0);
*entry = entry.saturating_add(1);
*entry
};
let delay_secs = match attempt {
1 => 3,
2 => 5,
3 => 10,
4 => 15,
_ => 30,
};
tracing::warn!(
"Reconnect attempt {} for source {} failed, retrying in {}s",
attempt,
source_id,
delay_secs
);
let tx = self.reconnect_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(delay_secs)).await;
if let Some(tx) = tx.as_ref() {
if let Err(e) = tx.send(source_id) {
tracing::error!(
"Failed to queue delayed reconnect for source {}: {}",
source_id,
e
);
}
}
});
}
pub async fn remove_point_write_target_cache_by_point_ids(
&self,
point_ids: &[Uuid],
) -> usize {
if point_ids.is_empty() {
return 0;
}
let mut removed = 0usize;
let mut cache = self.point_write_target_cache.write().await;
for point_id in point_ids {
if cache.remove(point_id).is_some() {
removed += 1;
}
}
removed
}
pub async fn update_point_monitor_data(
&self,
sample: PointMonitorInfo,
) -> Result<(), String> {
let point_id = sample.point_id;
let mut data = self.point_monitor_data.write().await;
data.insert(point_id, sample.clone());
let mut history_data = self.point_history_data.write().await;
let ring = history_data
.entry(point_id)
.or_insert_with(|| VecDeque::with_capacity(DEFAULT_POINT_RING_BUFFER_LEN));
ring.push_back(sample);
if ring.len() > DEFAULT_POINT_RING_BUFFER_LEN {
let _ = ring.pop_front();
}
Ok(())
}
pub async fn get_status_read_guard(&self) -> tokio::sync::RwLockReadGuard<'_, HashMap<Uuid, ConnectionStatus>> {
self.status.read().await
}
pub async fn get_point_monitor_data_read_guard(
&self,
) -> tokio::sync::RwLockReadGuard<'_, HashMap<Uuid, PointMonitorInfo>> {
self.point_monitor_data.read().await
}
pub async fn get_point_history(&self, point_id: Uuid, limit: usize) -> Vec<PointMonitorInfo> {
if limit == 0 {
return Vec::new();
}
let history_data = self.point_history_data.read().await;
history_data
.get(&point_id)
.map(|ring| {
let skip = ring.len().saturating_sub(limit);
ring.iter().skip(skip).cloned().collect()
})
.unwrap_or_default()
}
async fn start_heartbeat_task(&self, source_id: Uuid) {
let manager = self.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(8)); // 每8秒检测一次心跳
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
// 检查session是否有效
let session = manager.get_session(source_id).await;
let (session_valid, subscription_valid) = if let Some(session) = session {
// 尝试读取当前时间来验证连接
let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点
let read_request = ReadValueId {
node_id,
attribute_id: AttributeId::Value as u32,
index_range: NumericRange::None,
data_encoding: Default::default(),
};
let session_valid = match session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await {
Ok(_) => true,
Err(_) => false,
};
// 检查订阅状态 - 仅当有 subscription_id 时才检查
// 这里使用 set_publishing_mode 直接让服务器校验订阅 ID避免仅靠会话读请求误判为健康。
let subscription_id = {
let status = manager.status.read().await;
status.get(&source_id).and_then(|conn_status| conn_status.subscription_id)
};
let subscription_valid = if let Some(subscription_id) = subscription_id {
match session.set_publishing_mode(&[subscription_id], true).await {
Ok(results) => results.first().map(|status| status.is_good()).unwrap_or(false),
Err(_) => false,
}
} else {
// 没有 subscription_id 时,认为订阅状态有效(不需要检查)
true
};
(session_valid, subscription_valid)
} else {
(false, false)
};
if !session_valid || !subscription_valid {
// 检查是否已经在重连中
let mut reconnecting = manager.reconnecting.write().await;
if !reconnecting.contains(&source_id) {
reconnecting.insert(source_id);
drop(reconnecting);
let reason = if !session_valid {
"invalid session"
} else {
"invalid subscription"
};
tracing::warn!(
"Heartbeat detected {} for source {}, triggering reconnection",
reason,
source_id
);
// 通过通道发送重连请求
if let Some(tx) = manager.reconnect_tx.as_ref() {
if let Err(e) = tx.send(source_id) {
tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e);
}
} else {
tracing::warn!("Reconnect channel not available for source {}", source_id);
}
} else {
drop(reconnecting);
}
}
}
});
// 保存心跳任务句柄
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
conn_status.heartbeat_handle = Some(handle);
}
}
async fn start_unified_poll_task(&self, source_id: Uuid, session: Arc<Session>) {
let event_manager = match self.event_manager.clone() {
Some(em) => em,
None => {
tracing::warn!("Event manager is not initialized, cannot start unified poll task");
return;
}
};
// 停止旧的轮询任务
{
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
if let Some(handle) = conn_status.poll_handle.take() {
handle.abort();
}
}
}
tracing::info!(
"Starting unified poll task for source {}",
source_id
);
// 克隆 status 引用,以便在异步任务中使用
let status_ref = self.status.clone();
// 启动新的轮询任务
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(1));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
// 在任务内部获取轮询点列表
let poll_points = {
let status = status_ref.read().await;
status.get(&source_id)
.map(|conn_status| Arc::clone(&conn_status.poll_points))
.unwrap_or_default()
};
if poll_points.is_empty() {
continue;
}
// 构建批量读取请求
let read_requests: Vec<ReadValueId> = poll_points
.iter()
.filter_map(|p| {
NodeId::from_str(&p.external_id).ok().map(|node_id| ReadValueId {
node_id,
attribute_id: AttributeId::Value as u32,
index_range: NumericRange::None,
data_encoding: Default::default(),
})
})
.collect();
if read_requests.is_empty() {
continue;
}
// 执行批量读取
match session.read(&read_requests, TimestampsToReturn::Both, 0f64).await {
Ok(results) => {
for (poll_point, result) in poll_points.iter().zip(results.iter()) {
let dv = result;
let val = dv.value.clone();
let unified_value = val.as_ref().map(crate::telemetry::opcua_variant_to_data);
let unified_value_type = val.as_ref().map(crate::telemetry::opcua_variant_type);
let unified_value_text = val.as_ref().map(|v| v.to_string());
let quality = dv.status
.as_ref()
.map(crate::telemetry::PointQuality::from_status_code)
.unwrap_or(crate::telemetry::PointQuality::Good);
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue {
source_id,
point_id: Some(poll_point.point_id),
client_handle: 0,
value: unified_value,
value_type: unified_value_type,
value_text: unified_value_text,
quality,
protocol: "opcua".to_string(),
timestamp: Some(Utc::now()),
scan_mode: ScanMode::Poll,
},
));
}
}
Err(e) => {
tracing::warn!(
"Unified poll read failed for source {}: {:?}",
source_id,
e
);
}
}
}
});
// 保存轮询任务句柄
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
conn_status.poll_handle = Some(handle);
}
}
// 将点添加到轮询列表
async fn add_points_to_poll_list(
&self,
source_id: Uuid,
points: &[PointSubscriptionInfo],
) -> usize {
let mut started = 0usize;
// 添加新的轮询点
{
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
for point in points {
// 检查点是否已经在轮询列表中
if !conn_status.poll_points.iter().any(|p| p.point_id == point.point_id) {
Arc::make_mut(&mut conn_status.poll_points).push(PollPointInfo {
point_id: point.point_id,
external_id: point.external_id.clone(),
});
started += 1;
tracing::info!(
"Point {} switched to poll mode",
point.point_id
);
}
}
}
}
started
}
pub async fn connect_from_source(
&self,
pool: &sqlx::PgPool,
source_id: Uuid,
) -> Result<(), String> {
let source = match crate::service::get_enabled_source(pool, source_id).await {
Ok(Some(s)) => s,
Ok(None) => {
tracing::info!(
"Source {} is not enabled or does not exist, skipping connection",
source_id
);
return Ok(());
}
Err(e) => {
let error = format!("Failed to fetch source: {}", e);
tracing::error!("{}", error);
return Err(error);
}
};
self.connect(
source.id,
&source.endpoint,
source.security_policy.as_deref(),
source.security_mode.as_deref(),
source.username.as_deref(),
source.password.as_deref(),
)
.await?;
// Subscribe to points for this source
self.subscribe_points_from_source(source_id, None, pool)
.await?;
Ok(())
}
pub async fn connect(
&self,
source_id: Uuid,
endpoint: &str,
security_policy: Option<&str>,
security_mode: Option<&str>,
username: Option<&str>,
password: Option<&str>,
) -> Result<(), String> {
let mut client = match ClientBuilder::new()
.application_name("plc_gateway")
.application_uri("urn:plc_gateway:opcua-client")
.trust_server_certs(true)
.create_sample_keypair(true)
.session_retry_limit(3)
.client()
{
Ok(client) => client,
Err(e) => {
let error = format!("Failed to create client: {:?}", e);
self.fail_connect(source_id, &error).await;
return Err(error);
}
};
let security_policy = if security_policy.is_none() {
SecurityPolicy::None
} else {
security_policy
.and_then(|s| SecurityPolicy::from_str(s).ok())
.unwrap_or(SecurityPolicy::None)
};
let security_mode = match security_mode {
Some("Sign") => MessageSecurityMode::Sign,
Some("SignAndEncrypt") => MessageSecurityMode::SignAndEncrypt,
_ => MessageSecurityMode::None,
};
let identity_token = match (username, password) {
(Some(user), Some(pass)) => IdentityToken::new_user_name(user, pass),
_ => IdentityToken::Anonymous,
};
let (session, event_loop) = match client
.connect_to_matching_endpoint(
(
endpoint,
security_policy.to_str(),
security_mode,
UserTokenPolicy::anonymous(),
),
identity_token,
)
.await
{
Ok(res) => res,
Err(e) => {
let error = format!("Failed to connect: {:?}", e);
self.fail_connect(source_id, &error).await;
return Err(error);
}
};
let event_loop_handle = event_loop.spawn();
// 添加监控任务来捕获 event_loop 结束事件
let manager = self.clone();
let source_id_copy = source_id;
let event_loop_monitor_handle = tokio::spawn(async move {
match event_loop_handle.await {
Ok(status) => {
tracing::warn!(
"OPCUA event loop ended for source {}: {:?}",
source_id_copy,
status
);
}
Err(e) => {
tracing::error!(
"OPCUA event loop panic for source {}: {}",
source_id_copy,
e
);
}
}
// 统一触发重连
if let Some(tx) = manager.reconnect_tx.as_ref() {
let _ = tx.send(source_id_copy);
}
});
if !session.wait_for_connection().await {
let error = "Session connection failed".to_string();
self.fail_connect(source_id, &error).await;
return Err(error);
}
let mut status = self.status.write().await;
status.insert(
source_id,
ConnectionStatus {
session: Some(session.clone()),
is_connected: true,
last_error: None,
last_time: Utc::now(),
subscription_id: None,
next_client_handle: 1000,
client_handle_map: HashMap::new(),
monitored_item_map: HashMap::new(),
poll_points: Arc::new(Vec::new()),
poll_handle: None,
heartbeat_handle: None,
event_loop_handle: None, // event_loop_handle 已被移动到监控任务中
event_loop_monitor_handle: Some(event_loop_monitor_handle),
},
);
drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前
// 启动统一的轮询任务
self.start_unified_poll_task(source_id, session).await;
// 启动心跳任务
self.start_heartbeat_task(source_id).await;
tracing::info!("Successfully connected to source {}", source_id);
Ok(())
}
pub async fn fail_connect(&self, source_id: Uuid, error: &str) {
let mut status = self.status.write().await;
status.insert(
source_id,
ConnectionStatus {
session: None,
is_connected: false,
last_error: Some(error.to_string()),
last_time: Utc::now(),
subscription_id: None,
client_handle_map: HashMap::new(),
monitored_item_map: HashMap::new(),
next_client_handle: 1000,
poll_points: Arc::new(Vec::new()),
poll_handle: None,
heartbeat_handle: None,
event_loop_handle: None,
event_loop_monitor_handle: None,
},
);
}
pub async fn reconnect(&self, pool: &sqlx::PgPool, source_id: Uuid) -> Result<(), String> {
tracing::info!("Reconnecting to source {}", source_id);
// 先断开连接
if let Err(e) = self.disconnect(source_id).await {
tracing::error!("Failed to disconnect source {}: {}", source_id, e);
}
// 再重新连接
let result = self.connect_from_source(pool, source_id).await;
if result.is_ok() {
let mut attempts = self.reconnect_attempts.write().await;
attempts.remove(&source_id);
}
// 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连
let mut reconnecting = self.reconnecting.write().await;
reconnecting.remove(&source_id);
result
}
pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> {
// 停止轮询任务并清空轮询点列表
{
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
Arc::make_mut(&mut conn_status.poll_points).clear();
if let Some(handle) = conn_status.poll_handle.take() {
handle.abort();
}
// 停止心跳任务
if let Some(handle) = conn_status.heartbeat_handle.take() {
handle.abort();
}
// 停止 event_loop 任务
if let Some(handle) = conn_status.event_loop_handle.take() {
handle.abort();
}
// 停止 event_loop 监控任务
if let Some(handle) = conn_status.event_loop_monitor_handle.take() {
handle.abort();
}
}
}
let conn_status = self.status.write().await.remove(&source_id);
if let Some(conn_status) = conn_status {
if let Some(session) = conn_status.session {
session.disconnect().await.map_err(|e| {
let error = format!("Failed to disconnect: {}", e);
tracing::error!("{}", error);
error
})?;
}
}
Ok(())
}
pub async fn disconnect_all(&self) {
let source_ids: Vec<Uuid> = self.status.read().await.keys().copied().collect();
for source_id in source_ids {
// 停止轮询任务并清空轮询点列表
{
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
Arc::make_mut(&mut conn_status.poll_points).clear();
if let Some(handle) = conn_status.poll_handle.take() {
handle.abort();
}
// 停止心跳任务
if let Some(handle) = conn_status.heartbeat_handle.take() {
handle.abort();
}
// 停止 event_loop 任务
if let Some(handle) = conn_status.event_loop_handle.take() {
handle.abort();
}
// 停止 event_loop 监控任务
if let Some(handle) = conn_status.event_loop_monitor_handle.take() {
handle.abort();
}
}
}
let conn_status = self.status.write().await.remove(&source_id);
if let Some(conn_status) = conn_status {
if let Some(session) = conn_status.session {
if let Err(e) = session.disconnect().await {
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
}
}
}
}
}
pub async fn get_session(&self, source_id: Uuid) -> Option<Arc<Session>> {
let status = self.status.read().await;
if let Some(conn_status) = status.get(&source_id) {
if conn_status.is_connected {
return conn_status.session.clone();
}
}
None
}
pub async fn allocate_client_handle(&self, source_id: Uuid) -> Option<u32> {
let mut status = self.status.write().await;
if let Some(conn) = status.get_mut(&source_id) {
let handle = conn.next_client_handle;
conn.next_client_handle += 1;
Some(handle)
} else {
None
}
}
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> {
let status = self.status.read().await;
status.get(&source_id).map(ConnectionStatusView::from)
}
pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatusView)> {
let status = self.status.read().await;
status.iter().map(|(source_id, conn_status)| (*source_id, ConnectionStatusView::from(conn_status))).collect()
}
pub async fn write_point_values_batch(
&self,
payload: BatchSetPointValueReq,
) -> Result<BatchSetPointValueRes, String> {
if payload.items.is_empty() {
return Ok(Self::write_value_batch_result(
false,
Some("items cannot be empty".to_string()),
vec![],
));
}
let target_map = self.point_write_target_cache.read().await;
#[derive(Clone)]
struct PendingWrite {
point_id: Uuid,
source_id: Uuid,
external_id: String,
variant: Variant,
}
let mut results = Vec::new();
let mut pending_by_source: HashMap<Uuid, Vec<PendingWrite>> = HashMap::new();
for item in payload.items {
let Some(target) = target_map.get(&item.point_id) else {
results.push(SetPointValueResItem {
point_id: item.point_id,
success: false,
err_msg: Some("Point write target not found in cache".to_string()),
});
continue;
};
let variant = match Self::json_value_to_opcua_variant(&item.value) {
Ok(v) => v,
Err(e) => {
results.push(SetPointValueResItem {
point_id: item.point_id,
success: false,
err_msg: Some(e),
});
continue;
}
};
pending_by_source
.entry(target.source_id)
.or_default()
.push(PendingWrite {
point_id: item.point_id,
source_id: target.source_id,
external_id: target.external_id.clone(),
variant,
});
}
if !results.is_empty() {
return Ok(Self::write_value_batch_result(
false,
Some("Batch write precheck failed, no write executed".to_string()),
results,
));
}
// Validate all target sources are already connected before executing writes.
for source_id in pending_by_source.keys() {
let source_connected = self
.get_status(*source_id)
.await
.map(|s| s.is_connected)
.unwrap_or(false);
if !source_connected {
return Ok(Self::write_value_batch_result(
false,
Some(format!("Source {} is not connected", source_id)),
vec![],
));
}
}
let mut success_events: Vec<(Uuid, Uuid, Variant)> = Vec::new();
for (source_id, writes) in pending_by_source {
let Some(session) = self.get_session(source_id).await else {
return Ok(Self::write_value_batch_result(
false,
Some(format!("Source {} is not connected", source_id)),
vec![],
));
};
let mut write_values = Vec::new();
let mut write_items = Vec::new();
for write in writes {
let node_id = match NodeId::from_str(&write.external_id) {
Ok(v) => v,
Err(e) => {
results.push(SetPointValueResItem {
point_id: write.point_id,
success: false,
err_msg: Some(format!("Invalid node id {}: {}", write.external_id, e)),
});
continue;
}
};
write_values.push(WriteValue {
node_id,
attribute_id: AttributeId::Value as u32,
index_range: NumericRange::None,
value: OpcuaDataValue::value_only(write.variant.clone()),
});
write_items.push(write);
}
if write_values.is_empty() {
continue;
}
let status_codes = match session.write(&write_values).await {
Ok(v) => v,
Err(e) => {
let write_results: Vec<SetPointValueResItem> = write_items
.into_iter()
.map(|write| SetPointValueResItem {
point_id: write.point_id,
success: false,
err_msg: Some(format!("Write failed: {:?}", e)),
})
.collect();
return Ok(Self::write_value_batch_result(
false,
Some("Batch write failed, no local updates emitted".to_string()),
write_results,
));
}
};
for (idx, write) in write_items.iter().enumerate() {
let status_opt = status_codes.get(idx);
let status_ok = status_opt.map(|s| s.is_good()).unwrap_or(false);
results.push(SetPointValueResItem {
point_id: write.point_id,
success: status_ok,
err_msg: if status_ok {
None
} else {
Some(match status_opt {
Some(s) => format!("Write rejected: {:?}", s),
None => "Write result missing from server response".to_string(),
})
},
});
if !status_ok {
return Ok(Self::write_value_batch_result(
false,
Some("Batch write failed, no local updates emitted".to_string()),
results,
));
}
success_events.push((write.source_id, write.point_id, write.variant.clone()));
}
}
// Emit local updates only when the full batch succeeds.
if let Some(event_manager) = &self.event_manager {
for (source_id, point_id, variant) in success_events {
if let Err(e) = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue {
source_id,
point_id: Some(point_id),
client_handle: 0,
value: Some(crate::telemetry::opcua_variant_to_data(&variant)),
value_type: Some(crate::telemetry::opcua_variant_type(&variant)),
value_text: Some(variant.to_string()),
quality: crate::telemetry::PointQuality::Good,
protocol: "opcua".to_string(),
timestamp: Some(Utc::now()),
scan_mode: crate::model::ScanMode::Poll,
},
)) {
tracing::warn!(
"Batch write succeeded but failed to dispatch point update for point {}: {}",
point_id,
e
);
}
}
}
Ok(Self::write_value_batch_result(true, None, results))
}
pub async fn subscribe_points_from_source(
&self,
source_id: Uuid,
point_ids: Option<Vec<Uuid>>,
pool: &sqlx::PgPool,
) -> Result<HashMap<String, usize>, String> {
let point_ids = point_ids.unwrap_or_default();
let mut points = crate::service::get_points_with_ids(pool, source_id, &point_ids)
.await
.map_err(|e| format!("Failed to get points for source {}: {}", source_id, e))?;
{
let mut cache = self.point_write_target_cache.write().await;
for p in &points {
cache.insert(
p.point_id,
PointWriteTarget {
source_id,
external_id: p.external_id.clone(),
},
);
}
}
if points.is_empty() {
tracing::info!("No valid points found to subscribe for source {}", source_id);
return Ok(Self::subscription_result(0, 0));
}
tracing::info!(
"Processing subscription for source {} with {} points",
source_id,
points.len()
);
let session = self
.get_session(source_id)
.await
.ok_or_else(|| format!("Failed to get session for source {}", source_id))?;
let subscription_id = {
let status = self.status.read().await;
if let Some(conn_status) = status.get(&source_id) {
let subscribed_point_ids: std::collections::HashSet<Uuid> =
conn_status.client_handle_map.values().copied().collect();
let original_count = points.len();
points.retain(|p| !subscribed_point_ids.contains(&p.point_id));
let filtered_count = points.len();
if filtered_count < original_count {
tracing::info!(
"Filtered out {} already subscribed points for source {}",
original_count - filtered_count,
source_id
);
}
if points.is_empty() {
tracing::info!("All points are already subscribed for source {}", source_id);
return Ok(Self::subscription_result(0, 0));
}
conn_status.subscription_id
} else {
None
}
};
let subscription_id = match subscription_id {
Some(id) => Some(id),
None => {
let manager = self.clone();
let current_source_id = source_id;
let status_manager = manager.clone();
let data_manager = manager.clone();
match session
.create_subscription(
Duration::from_secs(1),
15,
5,
0,
0,
true,
opcua::client::SubscriptionCallbacks::new(
move |notification| {
if notification.status.is_bad() {
tracing::warn!(
"Subscription status changed to {:?} for source {}, triggering reconnection",
notification.status,
current_source_id
);
if let Some(tx) = status_manager.reconnect_tx.as_ref() {
let _ = tx.send(current_source_id);
}
}
},
move |dv, item| {
let client_handle = item.client_handle();
let val = dv.value;
let timex = Some(Utc::now());
let unified_value =
val.as_ref().map(crate::telemetry::opcua_variant_to_data);
let unified_value_type =
val.as_ref().map(crate::telemetry::opcua_variant_type);
let unified_value_text = val.as_ref().map(|v| v.to_string());
let quality = dv.status
.as_ref()
.map(crate::telemetry::PointQuality::from_status_code)
.unwrap_or(crate::telemetry::PointQuality::Good);
if let Some(event_manager) = &data_manager.event_manager {
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue {
source_id: current_source_id,
point_id: None,
client_handle,
value: unified_value,
value_type: unified_value_type,
value_text: unified_value_text,
quality,
protocol: "opcua".to_string(),
timestamp: timex,
scan_mode: ScanMode::Subscribe,
}));
}
},
|_event_fields, _item| {},
),
)
.await
{
Ok(id) => {
if let Some(conn_status) = manager.status.write().await.get_mut(&source_id) {
conn_status.subscription_id = Some(id);
}
tracing::info!("Created subscription {} for source {}", id, source_id);
Some(id)
}
Err(e) => {
tracing::warn!(
"Failed to create subscription for source {}, falling back to poll mode: {:?}",
source_id,
e
);
None
}
}
}
};
if subscription_id.is_none() {
let polled_count = self
.add_points_to_poll_list(source_id, &points)
.await;
return Ok(Self::subscription_result(0, polled_count));
}
let subscription_id = subscription_id.unwrap_or_default();
let mut items_to_create: Vec<MonitoredItemCreateRequest> = Vec::new();
let mut item_points: Vec<PointSubscriptionInfo> = Vec::new();
let mut pending_client_handles: Vec<(u32, Uuid)> = Vec::new();
for p in points.iter().cloned() {
let node_id = NodeId::from_str(&p.external_id)
.map_err(|e| format!("Invalid node id {}: {}", p.external_id, e))?;
let client_handle = self.allocate_client_handle(source_id).await
.ok_or_else(|| format!("Failed to allocate client handle for source {}", source_id))?;
pending_client_handles.push((client_handle, p.point_id));
let request = MonitoredItemCreateRequest {
item_to_monitor: ReadValueId {
node_id,
attribute_id: AttributeId::Value as u32,
index_range: Default::default(),
data_encoding: Default::default(),
},
monitoring_mode: MonitoringMode::Reporting,
requested_parameters: MonitoringParameters {
client_handle,
sampling_interval: 0.0,
filter: Default::default(),
queue_size: 10,
discard_oldest: true,
},
};
items_to_create.push(request);
item_points.push(p);
}
if let Some(conn_status) = self.status.write().await.get_mut(&source_id) {
for (client_handle, point_id) in &pending_client_handles {
conn_status.client_handle_map.insert(*client_handle, *point_id);
}
}
let results = match session
.create_monitored_items(subscription_id, TimestampsToReturn::Both, items_to_create)
.await
{
Ok(results) => results,
Err(e) => {
tracing::warn!(
"Failed to create monitored items for source {}, falling back to poll mode: {:?}",
source_id,
e
);
let item_point_ids: HashSet<Uuid> = item_points.iter().map(|p| p.point_id).collect();
if let Some(conn_status) = self.status.write().await.get_mut(&source_id) {
conn_status
.client_handle_map
.retain(|_, point_id| !item_point_ids.contains(point_id));
}
let polled_count = self
.add_points_to_poll_list(source_id, &item_points)
.await;
return Ok(Self::subscription_result(0, polled_count));
}
};
let mut successfully_subscribed_points = Vec::new();
let mut successfully_subscribed_set: HashSet<Uuid> = HashSet::new();
let mut failed_points: Vec<PointSubscriptionInfo> = Vec::new();
let mut successful_monitored_items: Vec<(Uuid, u32)> = Vec::new();
for (i, monitored_item_result) in results.iter().enumerate() {
if i >= item_points.len() {
break;
}
let point = &item_points[i];
if monitored_item_result.result.status_code.is_good() {
successfully_subscribed_points.push(point.point_id);
successfully_subscribed_set.insert(point.point_id);
successful_monitored_items.push((
point.point_id,
monitored_item_result.result.monitored_item_id,
));
} else {
tracing::error!(
"Failed to create monitored item for point {}: {:?}",
point.point_id,
monitored_item_result.result.status_code
);
failed_points.push(point.clone());
}
}
if results.len() < item_points.len() {
for point in item_points.iter().skip(results.len()) {
tracing::warn!(
"No monitored item result returned for point {}, fallback to poll",
point.point_id
);
failed_points.push(point.clone());
}
}
failed_points.retain(|p| !successfully_subscribed_set.contains(&p.point_id));
if let Some(conn_status) = self.status.write().await.get_mut(&source_id) {
for (point_id, monitored_item_id) in &successful_monitored_items {
conn_status
.monitored_item_map
.insert(*point_id, *monitored_item_id);
}
if !successfully_subscribed_set.is_empty() {
Arc::make_mut(&mut conn_status.poll_points)
.retain(|p| !successfully_subscribed_set.contains(&p.point_id));
}
let failed_point_ids: HashSet<Uuid> = failed_points.iter().map(|p| p.point_id).collect();
conn_status
.client_handle_map
.retain(|_, point_id| !failed_point_ids.contains(point_id));
}
let polled_count = self
.add_points_to_poll_list(source_id, &failed_points)
.await;
Ok(Self::subscription_result(
successfully_subscribed_points.len(),
polled_count,
))
}
///
pub async fn unsubscribe_points_from_source(
&self,
source_id: Uuid,
point_ids: Vec<Uuid>,
) -> Result<usize, String> {
if point_ids.is_empty() {
return Ok(0);
}
let target_ids: std::collections::HashSet<Uuid> = point_ids.into_iter().collect();
let session = self.get_session(source_id).await;
let Some(session) = session else {
return Ok(0);
};
let (subscription_id, point_item_pairs) = {
let status = self.status.read().await;
let Some(conn_status) = status.get(&source_id) else {
return Ok(0);
};
let Some(subscription_id) = conn_status.subscription_id else {
return Ok(0);
};
let items: Vec<(Uuid, u32)> = conn_status
.monitored_item_map
.iter()
.filter(|(point_id, _)| target_ids.contains(point_id))
.map(|(point_id, monitored_item_id)| (*point_id, *monitored_item_id))
.collect();
(subscription_id, items)
};
if point_item_pairs.is_empty() {
return Ok(0);
}
let monitored_item_ids: Vec<u32> =
point_item_pairs.iter().map(|(_, item_id)| *item_id).collect();
let status_codes = session
.delete_monitored_items(subscription_id, &monitored_item_ids)
.await
.map_err(|e| {
format!(
"Failed to unsubscribe monitored items for source {}: {:?}",
source_id, e
)
})?;
let mut removed_point_ids: Vec<Uuid> = Vec::new();
for (idx, status_code) in status_codes.iter().enumerate() {
if idx >= point_item_pairs.len() {
break;
}
if status_code.is_good() {
removed_point_ids.push(point_item_pairs[idx].0);
} else {
tracing::warn!(
"Failed to unsubscribe monitored item {} for source {}: {:?}",
point_item_pairs[idx].1,
source_id,
status_code
);
}
}
if removed_point_ids.is_empty() {
return Ok(0);
}
let removed_set: std::collections::HashSet<Uuid> =
removed_point_ids.iter().copied().collect();
if let Some(conn_status) = self.status.write().await.get_mut(&source_id) {
for point_id in &removed_point_ids {
conn_status.monitored_item_map.remove(point_id);
}
conn_status
.client_handle_map
.retain(|_, point_id| !removed_set.contains(point_id));
}
let _ = self
.remove_point_write_target_cache_by_point_ids(&removed_point_ids)
.await;
{
let mut monitor_data = self.point_monitor_data.write().await;
for point_id in &removed_point_ids {
monitor_data.remove(point_id);
}
}
{
let mut history_data = self.point_history_data.write().await;
for point_id in &removed_point_ids {
history_data.remove(point_id);
}
}
// 从轮询列表中移除传入的点,并记录移除的轮询点数量
let polling_removed_count = {
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
let before_count = conn_status.poll_points.len();
Arc::make_mut(&mut conn_status.poll_points).retain(|p| !target_ids.contains(&p.point_id));
let after_count = conn_status.poll_points.len();
before_count - after_count
} else {
0
}
};
// 计算从订阅点和轮询点移除的总数
let total_removed = removed_point_ids.len() + polling_removed_count;
tracing::info!(
"Unsubscribed {} points (subscription: {}, polling: {}) from source {}",
total_removed,
removed_point_ids.len(),
polling_removed_count,
source_id
);
Ok(removed_point_ids.len())
}
}