refactor(core): move platform data and connection modules
This commit is contained in:
parent
6b3c52e45e
commit
3d18a65c7d
|
|
@ -1,4 +1,4 @@
|
|||
use chrono::{DateTime, Utc};
|
||||
use chrono::{DateTime, Utc};
|
||||
use opcua::{
|
||||
client::{ClientBuilder, IdentityToken, Session},
|
||||
crypto::SecurityPolicy,
|
||||
|
|
@ -18,7 +18,7 @@ use std::{
|
|||
use tokio::task::JoinHandle;
|
||||
use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
use plc_platform_core::model::{PointSubscriptionInfo, ScanMode};
|
||||
use crate::model::{PointSubscriptionInfo, ScanMode};
|
||||
|
||||
use crate::{
|
||||
telemetry::PointMonitorInfo,
|
||||
|
|
@ -26,6 +26,10 @@ use crate::{
|
|||
|
||||
const DEFAULT_POINT_RING_BUFFER_LEN: usize = 1000;
|
||||
|
||||
pub trait PointEventSink: Send + Sync {
|
||||
fn send_point_new_value(&self, payload: crate::telemetry::PointNewValue) -> Result<(), String>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub struct SetPointValueReqItem {
|
||||
pub point_id: Uuid,
|
||||
|
|
@ -95,11 +99,11 @@ pub struct ConnectionStatus {
|
|||
pub next_client_handle: u32,
|
||||
pub client_handle_map: HashMap<u32, Uuid>, // client_handle -> point_id
|
||||
pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id
|
||||
pub poll_points: Arc<Vec<PollPointInfo>>, // 正在轮询的点集合
|
||||
poll_handle: Option<JoinHandle<()>>, // 统一的轮询任务句柄
|
||||
heartbeat_handle: Option<JoinHandle<()>>, // 心跳任务句柄
|
||||
event_loop_handle: Option<JoinHandle<opcua::types::StatusCode>>, // event_loop 任务句柄
|
||||
event_loop_monitor_handle: Option<JoinHandle<()>>, // event_loop 监控任务句柄
|
||||
pub poll_points: Arc<Vec<PollPointInfo>>, // 姝e湪杞鐨勭偣闆嗗悎
|
||||
poll_handle: Option<JoinHandle<()>>, // 缁熶竴鐨勮疆璇换鍔″彞鏌?
|
||||
heartbeat_handle: Option<JoinHandle<()>>, // 蹇冭烦浠诲姟鍙ユ焺
|
||||
event_loop_handle: Option<JoinHandle<opcua::types::StatusCode>>, // event_loop 浠诲姟鍙ユ焺
|
||||
event_loop_monitor_handle: Option<JoinHandle<()>>, // event_loop 鐩戞帶浠诲姟鍙ユ焺
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
|
@ -108,7 +112,7 @@ pub struct ConnectionManager {
|
|||
point_monitor_data: Arc<RwLock<HashMap<Uuid, PointMonitorInfo>>>,
|
||||
point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>,
|
||||
point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>,
|
||||
event_manager: Option<std::sync::Arc<crate::event::EventManager>>,
|
||||
point_event_sink: Option<Arc<dyn PointEventSink>>,
|
||||
pool: Option<Arc<sqlx::PgPool>>,
|
||||
reconnect_tx: Option<tokio::sync::mpsc::UnboundedSender<Uuid>>,
|
||||
reconnect_rx: Arc<std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<Uuid>>>>,
|
||||
|
|
@ -169,7 +173,7 @@ impl ConnectionManager {
|
|||
point_monitor_data: Arc::new(RwLock::new(HashMap::new())),
|
||||
point_history_data: Arc::new(RwLock::new(HashMap::new())),
|
||||
point_write_target_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
event_manager: None,
|
||||
point_event_sink: None,
|
||||
pool: None,
|
||||
reconnect_tx: Some(reconnect_tx),
|
||||
reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))),
|
||||
|
|
@ -178,8 +182,8 @@ impl ConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn set_event_manager(&mut self, event_manager: std::sync::Arc<crate::event::EventManager>) {
|
||||
self.event_manager = Some(event_manager);
|
||||
pub fn set_event_manager(&mut self, point_event_sink: Arc<dyn PointEventSink>) {
|
||||
self.point_event_sink = Some(point_event_sink);
|
||||
}
|
||||
|
||||
pub fn set_pool(&mut self, pool: Arc<sqlx::PgPool>) {
|
||||
|
|
@ -188,7 +192,7 @@ impl ConnectionManager {
|
|||
|
||||
pub fn set_pool_and_start_reconnect_task(&mut self, pool: Arc<sqlx::PgPool>) {
|
||||
self.pool = Some(pool.clone());
|
||||
// 将 self 转换为不可变引用以调用 start_reconnect_task
|
||||
// 灏?self 杞崲涓轰笉鍙彉寮曠敤浠ヨ皟鐢?start_reconnect_task
|
||||
let manager = self.clone();
|
||||
manager.start_reconnect_task();
|
||||
}
|
||||
|
|
@ -205,7 +209,7 @@ impl ConnectionManager {
|
|||
let manager = self.clone();
|
||||
let pool = manager.pool.clone();
|
||||
tokio::spawn(async move {
|
||||
// 获取重连通道的接收端
|
||||
// 鑾峰彇閲嶈繛閫氶亾鐨勬帴鏀剁
|
||||
let mut reconnect_rx = manager.get_reconnect_rx().expect("Failed to get reconnect receiver");
|
||||
|
||||
while let Some(source_id) = reconnect_rx.recv().await {
|
||||
|
|
@ -328,18 +332,18 @@ impl ConnectionManager {
|
|||
let manager = self.clone();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(8)); // 每8秒检测一次心跳
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(8)); // 姣?绉掓娴嬩竴娆″績璺?
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
|
||||
// 检查session是否有效
|
||||
// 妫€鏌ession鏄惁鏈夋晥
|
||||
let session = manager.get_session(source_id).await;
|
||||
|
||||
let (session_valid, subscription_valid) = if let Some(session) = session {
|
||||
// 尝试读取当前时间来验证连接
|
||||
let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点
|
||||
// 灏濊瘯璇诲彇褰撳墠鏃堕棿鏉ラ獙璇佽繛鎺?
|
||||
let node_id = NodeId::new(0, 2258); // ServerCurrentTime鑺傜偣
|
||||
let read_request = ReadValueId {
|
||||
node_id,
|
||||
attribute_id: AttributeId::Value as u32,
|
||||
|
|
@ -352,8 +356,8 @@ impl ConnectionManager {
|
|||
Err(_) => false,
|
||||
};
|
||||
|
||||
// 检查订阅状态 - 仅当有 subscription_id 时才检查
|
||||
// 这里使用 set_publishing_mode 直接让服务器校验订阅 ID,避免仅靠会话读请求误判为健康。
|
||||
// 妫€鏌ヨ闃呯姸鎬?- 浠呭綋鏈?subscription_id 鏃舵墠妫€鏌?
|
||||
// 杩欓噷浣跨敤 set_publishing_mode 鐩存帴璁╂湇鍔″櫒鏍¢獙璁㈤槄 ID锛岄伩鍏嶄粎闈犱細璇濊璇锋眰璇垽涓哄仴搴枫€?
|
||||
let subscription_id = {
|
||||
let status = manager.status.read().await;
|
||||
status.get(&source_id).and_then(|conn_status| conn_status.subscription_id)
|
||||
|
|
@ -364,7 +368,7 @@ impl ConnectionManager {
|
|||
Err(_) => false,
|
||||
}
|
||||
} else {
|
||||
// 没有 subscription_id 时,认为订阅状态有效(不需要检查)
|
||||
// 娌℃湁 subscription_id 鏃讹紝璁や负璁㈤槄鐘舵€佹湁鏁堬紙涓嶉渶瑕佹鏌ワ級
|
||||
true
|
||||
};
|
||||
|
||||
|
|
@ -374,7 +378,7 @@ impl ConnectionManager {
|
|||
};
|
||||
|
||||
if !session_valid || !subscription_valid {
|
||||
// 检查是否已经在重连中
|
||||
// 妫€鏌ユ槸鍚﹀凡缁忓湪閲嶈繛涓?
|
||||
let mut reconnecting = manager.reconnecting.write().await;
|
||||
if !reconnecting.contains(&source_id) {
|
||||
reconnecting.insert(source_id);
|
||||
|
|
@ -392,7 +396,7 @@ impl ConnectionManager {
|
|||
source_id
|
||||
);
|
||||
|
||||
// 通过通道发送重连请求
|
||||
// 閫氳繃閫氶亾鍙戦€侀噸杩炶姹?
|
||||
if let Some(tx) = manager.reconnect_tx.as_ref() {
|
||||
if let Err(e) = tx.send(source_id) {
|
||||
tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e);
|
||||
|
|
@ -407,7 +411,7 @@ impl ConnectionManager {
|
|||
}
|
||||
});
|
||||
|
||||
// 保存心跳任务句柄
|
||||
// 淇濆瓨蹇冭烦浠诲姟鍙ユ焺
|
||||
let mut status = self.status.write().await;
|
||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||
conn_status.heartbeat_handle = Some(handle);
|
||||
|
|
@ -415,15 +419,15 @@ impl ConnectionManager {
|
|||
}
|
||||
|
||||
async fn start_unified_poll_task(&self, source_id: Uuid, session: Arc<Session>) {
|
||||
let event_manager = match self.event_manager.clone() {
|
||||
Some(em) => em,
|
||||
let point_event_sink = match self.point_event_sink.clone() {
|
||||
Some(sink) => sink,
|
||||
None => {
|
||||
tracing::warn!("Event manager is not initialized, cannot start unified poll task");
|
||||
tracing::warn!("Point event sink is not initialized, cannot start unified poll task");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// 停止旧的轮询任务
|
||||
// 鍋滄鏃х殑杞浠诲姟
|
||||
{
|
||||
let mut status = self.status.write().await;
|
||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||
|
|
@ -438,10 +442,10 @@ impl ConnectionManager {
|
|||
source_id
|
||||
);
|
||||
|
||||
// 克隆 status 引用,以便在异步任务中使用
|
||||
// 鍏嬮殕 status 寮曠敤锛屼互渚垮湪寮傛浠诲姟涓娇鐢?
|
||||
let status_ref = self.status.clone();
|
||||
|
||||
// 启动新的轮询任务
|
||||
// 鍚姩鏂扮殑杞浠诲姟
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(1));
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
|
@ -449,7 +453,7 @@ impl ConnectionManager {
|
|||
loop {
|
||||
ticker.tick().await;
|
||||
|
||||
// 在任务内部获取轮询点列表
|
||||
// 鍦ㄤ换鍔″唴閮ㄨ幏鍙栬疆璇㈢偣鍒楄〃
|
||||
let poll_points = {
|
||||
let status = status_ref.read().await;
|
||||
status.get(&source_id)
|
||||
|
|
@ -461,7 +465,7 @@ impl ConnectionManager {
|
|||
continue;
|
||||
}
|
||||
|
||||
// 构建批量读取请求
|
||||
// 鏋勫缓鎵归噺璇诲彇璇锋眰
|
||||
let read_requests: Vec<ReadValueId> = poll_points
|
||||
.iter()
|
||||
.filter_map(|p| {
|
||||
|
|
@ -478,7 +482,7 @@ impl ConnectionManager {
|
|||
continue;
|
||||
}
|
||||
|
||||
// 执行批量读取
|
||||
// 鎵ц鎵归噺璇诲彇
|
||||
match session.read(&read_requests, TimestampsToReturn::Both, 0f64).await {
|
||||
Ok(results) => {
|
||||
for (poll_point, result) in poll_points.iter().zip(results.iter()) {
|
||||
|
|
@ -492,7 +496,7 @@ impl ConnectionManager {
|
|||
.map(crate::telemetry::PointQuality::from_status_code)
|
||||
.unwrap_or(crate::telemetry::PointQuality::Good);
|
||||
|
||||
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
|
||||
let _ = point_event_sink.send_point_new_value(
|
||||
crate::telemetry::PointNewValue {
|
||||
source_id,
|
||||
point_id: Some(poll_point.point_id),
|
||||
|
|
@ -505,7 +509,7 @@ impl ConnectionManager {
|
|||
timestamp: Some(Utc::now()),
|
||||
scan_mode: ScanMode::Poll,
|
||||
},
|
||||
));
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
|
@ -519,14 +523,14 @@ impl ConnectionManager {
|
|||
}
|
||||
});
|
||||
|
||||
// 保存轮询任务句柄
|
||||
// 淇濆瓨杞浠诲姟鍙ユ焺
|
||||
let mut status = self.status.write().await;
|
||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||
conn_status.poll_handle = Some(handle);
|
||||
}
|
||||
}
|
||||
|
||||
// 将点添加到轮询列表
|
||||
// 灏嗙偣娣诲姞鍒拌疆璇㈠垪琛?
|
||||
async fn add_points_to_poll_list(
|
||||
&self,
|
||||
source_id: Uuid,
|
||||
|
|
@ -534,12 +538,12 @@ impl ConnectionManager {
|
|||
) -> usize {
|
||||
let mut started = 0usize;
|
||||
|
||||
// 添加新的轮询点
|
||||
// 娣诲姞鏂扮殑杞鐐?
|
||||
{
|
||||
let mut status = self.status.write().await;
|
||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||
for point in points {
|
||||
// 检查点是否已经在轮询列表中
|
||||
// 妫€鏌ョ偣鏄惁宸茬粡鍦ㄨ疆璇㈠垪琛ㄤ腑
|
||||
if !conn_status.poll_points.iter().any(|p| p.point_id == point.point_id) {
|
||||
Arc::make_mut(&mut conn_status.poll_points).push(PollPointInfo {
|
||||
point_id: point.point_id,
|
||||
|
|
@ -662,7 +666,7 @@ impl ConnectionManager {
|
|||
|
||||
let event_loop_handle = event_loop.spawn();
|
||||
|
||||
// 添加监控任务来捕获 event_loop 结束事件
|
||||
// 娣诲姞鐩戞帶浠诲姟鏉ユ崟鑾?event_loop 缁撴潫浜嬩欢
|
||||
let manager = self.clone();
|
||||
let source_id_copy = source_id;
|
||||
let event_loop_monitor_handle = tokio::spawn(async move {
|
||||
|
|
@ -683,7 +687,7 @@ impl ConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
// 统一触发重连
|
||||
// 缁熶竴瑙﹀彂閲嶈繛
|
||||
if let Some(tx) = manager.reconnect_tx.as_ref() {
|
||||
let _ = tx.send(source_id_copy);
|
||||
}
|
||||
|
|
@ -710,16 +714,16 @@ impl ConnectionManager {
|
|||
poll_points: Arc::new(Vec::new()),
|
||||
poll_handle: None,
|
||||
heartbeat_handle: None,
|
||||
event_loop_handle: None, // event_loop_handle 已被移动到监控任务中
|
||||
event_loop_handle: None, // event_loop_handle 宸茶绉诲姩鍒扮洃鎺т换鍔′腑
|
||||
event_loop_monitor_handle: Some(event_loop_monitor_handle),
|
||||
},
|
||||
);
|
||||
drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前
|
||||
drop(status); // 鏄惧紡閲婃斁閿侊紝鍦ㄨ皟鐢?start_unified_poll_task 涔嬪墠
|
||||
|
||||
// 启动统一的轮询任务
|
||||
// 鍚姩缁熶竴鐨勮疆璇换鍔?
|
||||
self.start_unified_poll_task(source_id, session).await;
|
||||
|
||||
// 启动心跳任务
|
||||
// 鍚姩蹇冭烦浠诲姟
|
||||
self.start_heartbeat_task(source_id).await;
|
||||
|
||||
tracing::info!("Successfully connected to source {}", source_id);
|
||||
|
|
@ -750,19 +754,19 @@ impl ConnectionManager {
|
|||
pub async fn reconnect(&self, pool: &sqlx::PgPool, source_id: Uuid) -> Result<(), String> {
|
||||
tracing::info!("Reconnecting to source {}", source_id);
|
||||
|
||||
// 先断开连接
|
||||
// 鍏堟柇寮€杩炴帴
|
||||
if let Err(e) = self.disconnect(source_id).await {
|
||||
tracing::error!("Failed to disconnect source {}: {}", source_id, e);
|
||||
}
|
||||
|
||||
// 再重新连接
|
||||
// 鍐嶉噸鏂拌繛鎺?
|
||||
let result = self.connect_from_source(pool, source_id).await;
|
||||
if result.is_ok() {
|
||||
let mut attempts = self.reconnect_attempts.write().await;
|
||||
attempts.remove(&source_id);
|
||||
}
|
||||
|
||||
// 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连
|
||||
// 鏃犺鎴愬姛杩樻槸澶辫触閮芥竻闄ら噸杩炴爣璁帮紝浠ヤ究蹇冭烦妫€娴嬪埌闂鍚庡彲浠ュ啀娆¤Е鍙戦噸杩?
|
||||
let mut reconnecting = self.reconnecting.write().await;
|
||||
reconnecting.remove(&source_id);
|
||||
|
||||
|
|
@ -770,7 +774,7 @@ impl ConnectionManager {
|
|||
}
|
||||
|
||||
pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> {
|
||||
// 停止轮询任务并清空轮询点列表
|
||||
// 鍋滄杞浠诲姟骞舵竻绌鸿疆璇㈢偣鍒楄〃
|
||||
{
|
||||
let mut status = self.status.write().await;
|
||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||
|
|
@ -778,15 +782,15 @@ impl ConnectionManager {
|
|||
if let Some(handle) = conn_status.poll_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
// 停止心跳任务
|
||||
// 鍋滄蹇冭烦浠诲姟
|
||||
if let Some(handle) = conn_status.heartbeat_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
// 停止 event_loop 任务
|
||||
// 鍋滄 event_loop 浠诲姟
|
||||
if let Some(handle) = conn_status.event_loop_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
// 停止 event_loop 监控任务
|
||||
// 鍋滄 event_loop 鐩戞帶浠诲姟
|
||||
if let Some(handle) = conn_status.event_loop_monitor_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
|
|
@ -810,7 +814,7 @@ impl ConnectionManager {
|
|||
let source_ids: Vec<Uuid> = self.status.read().await.keys().copied().collect();
|
||||
|
||||
for source_id in source_ids {
|
||||
// 停止轮询任务并清空轮询点列表
|
||||
// 鍋滄杞浠诲姟骞舵竻绌鸿疆璇㈢偣鍒楄〃
|
||||
{
|
||||
let mut status = self.status.write().await;
|
||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||
|
|
@ -818,15 +822,15 @@ impl ConnectionManager {
|
|||
if let Some(handle) = conn_status.poll_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
// 停止心跳任务
|
||||
// 鍋滄蹇冭烦浠诲姟
|
||||
if let Some(handle) = conn_status.heartbeat_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
// 停止 event_loop 任务
|
||||
// 鍋滄 event_loop 浠诲姟
|
||||
if let Some(handle) = conn_status.event_loop_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
// 停止 event_loop 监控任务
|
||||
// 鍋滄 event_loop 鐩戞帶浠诲姟
|
||||
if let Some(handle) = conn_status.event_loop_monitor_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
|
|
@ -1044,9 +1048,9 @@ impl ConnectionManager {
|
|||
}
|
||||
|
||||
// Emit local updates only when the full batch succeeds.
|
||||
if let Some(event_manager) = &self.event_manager {
|
||||
if let Some(point_event_sink) = &self.point_event_sink {
|
||||
for (source_id, point_id, variant) in success_events {
|
||||
if let Err(e) = event_manager.send(crate::event::AppEvent::PointNewValue(
|
||||
if let Err(e) = point_event_sink.send_point_new_value(
|
||||
crate::telemetry::PointNewValue {
|
||||
source_id,
|
||||
point_id: Some(point_id),
|
||||
|
|
@ -1059,7 +1063,7 @@ impl ConnectionManager {
|
|||
timestamp: Some(Utc::now()),
|
||||
scan_mode: ScanMode::Poll,
|
||||
},
|
||||
)) {
|
||||
) {
|
||||
tracing::warn!(
|
||||
"Batch write succeeded but failed to dispatch point update for point {}: {}",
|
||||
point_id,
|
||||
|
|
@ -1185,8 +1189,8 @@ impl ConnectionManager {
|
|||
.map(crate::telemetry::PointQuality::from_status_code)
|
||||
.unwrap_or(crate::telemetry::PointQuality::Good);
|
||||
|
||||
if let Some(event_manager) = &data_manager.event_manager {
|
||||
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
|
||||
if let Some(point_event_sink) = &data_manager.point_event_sink {
|
||||
let _ = point_event_sink.send_point_new_value(
|
||||
crate::telemetry::PointNewValue {
|
||||
source_id: current_source_id,
|
||||
point_id: None,
|
||||
|
|
@ -1198,7 +1202,7 @@ impl ConnectionManager {
|
|||
protocol: "opcua".to_string(),
|
||||
timestamp: timex,
|
||||
scan_mode: ScanMode::Subscribe,
|
||||
}));
|
||||
});
|
||||
}
|
||||
},
|
||||
|_event_fields, _item| {},
|
||||
|
|
@ -1465,7 +1469,7 @@ impl ConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
// 从轮询列表中移除传入的点,并记录移除的轮询点数量
|
||||
// 浠庤疆璇㈠垪琛ㄤ腑绉婚櫎浼犲叆鐨勭偣锛屽苟璁板綍绉婚櫎鐨勮疆璇㈢偣鏁伴噺
|
||||
let polling_removed_count = {
|
||||
let mut status = self.status.write().await;
|
||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||
|
|
@ -1478,7 +1482,7 @@ impl ConnectionManager {
|
|||
}
|
||||
};
|
||||
|
||||
// 计算从订阅点和轮询点移除的总数
|
||||
// 璁$畻浠庤闃呯偣鍜岃疆璇㈢偣绉婚櫎鐨勬€绘暟
|
||||
let total_removed = removed_point_ids.len() + polling_removed_count;
|
||||
tracing::info!(
|
||||
"Unsubscribed {} points (subscription: {}, polling: {}) from source {}",
|
||||
|
|
@ -1494,3 +1498,10 @@ impl ConnectionManager {
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,4 +1,9 @@
|
|||
pub mod bootstrap;
|
||||
pub mod bootstrap;
|
||||
pub mod connection;
|
||||
pub mod db;
|
||||
pub mod model;
|
||||
pub mod platform_context;
|
||||
pub mod service;
|
||||
pub mod telemetry;
|
||||
pub mod util;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use plc_platform_core::model::{ControlUnit, EventRecord};
|
||||
use crate::model::{ControlUnit, EventRecord};
|
||||
use sqlx::{PgPool, QueryBuilder, Row};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
|
@ -337,7 +337,7 @@ pub async fn get_all_enabled_units(pool: &PgPool) -> Result<Vec<ControlUnit>, sq
|
|||
pub async fn get_equipment_by_unit_ids(
|
||||
pool: &PgPool,
|
||||
unit_ids: &[Uuid],
|
||||
) -> Result<Vec<plc_platform_core::model::Equipment>, sqlx::Error> {
|
||||
) -> Result<Vec<crate::model::Equipment>, sqlx::Error> {
|
||||
if unit_ids.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
|
@ -345,7 +345,7 @@ pub async fn get_equipment_by_unit_ids(
|
|||
"SELECT * FROM equipment WHERE unit_id = ANY($1) ORDER BY {}",
|
||||
equipment_order_clause_with_unit()
|
||||
);
|
||||
sqlx::query_as::<_, plc_platform_core::model::Equipment>(&sql)
|
||||
sqlx::query_as::<_, crate::model::Equipment>(&sql)
|
||||
.bind(unit_ids)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
|
|
@ -354,12 +354,12 @@ pub async fn get_equipment_by_unit_ids(
|
|||
pub async fn get_equipment_by_unit_id(
|
||||
pool: &PgPool,
|
||||
unit_id: Uuid,
|
||||
) -> Result<Vec<plc_platform_core::model::Equipment>, sqlx::Error> {
|
||||
) -> Result<Vec<crate::model::Equipment>, sqlx::Error> {
|
||||
let sql = format!(
|
||||
"SELECT * FROM equipment WHERE unit_id = $1 ORDER BY {}",
|
||||
unit_order_clause()
|
||||
);
|
||||
sqlx::query_as::<_, plc_platform_core::model::Equipment>(&sql)
|
||||
sqlx::query_as::<_, crate::model::Equipment>(&sql)
|
||||
.bind(unit_id)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
|
|
@ -368,11 +368,11 @@ pub async fn get_equipment_by_unit_id(
|
|||
pub async fn get_points_by_equipment_ids(
|
||||
pool: &PgPool,
|
||||
equipment_ids: &[Uuid],
|
||||
) -> Result<Vec<plc_platform_core::model::Point>, sqlx::Error> {
|
||||
) -> Result<Vec<crate::model::Point>, sqlx::Error> {
|
||||
if equipment_ids.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
sqlx::query_as::<_, plc_platform_core::model::Point>(
|
||||
sqlx::query_as::<_, crate::model::Point>(
|
||||
r#"SELECT * FROM point WHERE equipment_id = ANY($1) ORDER BY equipment_id, created_at"#,
|
||||
)
|
||||
.bind(equipment_ids)
|
||||
|
|
@ -507,3 +507,4 @@ pub async fn get_equipment_role_points(
|
|||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
|
|
@ -1,10 +1,13 @@
|
|||
use crate::{
|
||||
handler::equipment::EquipmentListItem,
|
||||
};
|
||||
use plc_platform_core::model::{Equipment, Point};
|
||||
use crate::model::{Equipment, Point};
|
||||
use sqlx::{query_as, PgPool, Row};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EquipmentListItem {
|
||||
pub equipment: Equipment,
|
||||
pub point_count: i64,
|
||||
}
|
||||
|
||||
fn equipment_order_clause() -> &'static str {
|
||||
"e.code"
|
||||
}
|
||||
|
|
@ -152,7 +155,6 @@ pub async fn get_equipment_paginated(
|
|||
updated_at: row.get("updated_at"),
|
||||
},
|
||||
point_count: row.get::<i64, _>("point_count"),
|
||||
role_points: vec![],
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
|
@ -308,3 +310,6 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
use plc_platform_core::model::{Point, PointSubscriptionInfo};
|
||||
use crate::model::{Point, PointSubscriptionInfo};
|
||||
use sqlx::{query_as, PgPool, Row};
|
||||
use std::collections::HashMap;
|
||||
|
||||
|
|
@ -283,3 +283,4 @@ pub async fn get_points_paginated(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
use plc_platform_core::model::Source;
|
||||
use crate::model::Source;
|
||||
use sqlx::{query_as, PgPool};
|
||||
|
||||
pub async fn get_enabled_source(
|
||||
|
|
@ -16,3 +16,4 @@ pub async fn get_all_enabled_sources(pool: &PgPool) -> Result<Vec<Source>, sqlx:
|
|||
.fetch_all(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
use plc_platform_core::model::{Point, Tag};
|
||||
use crate::model::{Point, Tag};
|
||||
use sqlx::{query_as, PgPool};
|
||||
|
||||
pub async fn get_tags_count(pool: &PgPool) -> Result<i64, sqlx::Error> {
|
||||
|
|
@ -182,3 +182,4 @@ pub async fn delete_tag(pool: &PgPool, tag_id: uuid::Uuid) -> Result<bool, sqlx:
|
|||
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
use chrono::{DateTime, Utc};
|
||||
use plc_platform_core::model::ScanMode;
|
||||
use chrono::{DateTime, Utc};
|
||||
use crate::model::ScanMode;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
|
@ -79,14 +79,14 @@ pub struct PointMonitorInfo {
|
|||
pub point_id: Uuid,
|
||||
pub client_handle: u32,
|
||||
pub scan_mode: ScanMode,
|
||||
#[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")]
|
||||
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
|
||||
pub timestamp: Option<DateTime<Utc>>,
|
||||
pub quality: PointQuality,
|
||||
pub value: Option<DataValue>,
|
||||
pub value_type: Option<ValueType>,
|
||||
pub value_text: Option<String>,
|
||||
pub old_value: Option<DataValue>,
|
||||
#[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")]
|
||||
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
|
||||
pub old_timestamp: Option<DateTime<Utc>>,
|
||||
pub value_changed: bool,
|
||||
}
|
||||
|
|
@ -151,3 +151,4 @@ pub fn opcua_variant_type(value: &opcua::types::Variant) -> ValueType {
|
|||
_ => ValueType::Text,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
use plc_platform_core::service::EquipmentRolePoint;
|
||||
|
||||
#[test]
|
||||
fn service_type_is_public() {
|
||||
let _role_point: Option<EquipmentRolePoint> = None;
|
||||
}
|
||||
Loading…
Reference in New Issue