From 3d18a65c7df88b8765c1c1a8e31f0aa93b980e8f Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 16 Apr 2026 08:23:49 +0800 Subject: [PATCH] refactor(core): move platform data and connection modules --- .../plc_platform_core/src}/connection.rs | 137 ++++++++++-------- {src => crates/plc_platform_core/src}/db.rs | 0 crates/plc_platform_core/src/lib.rs | 7 +- .../plc_platform_core/src}/service.rs | 0 .../plc_platform_core/src}/service/control.rs | 15 +- .../src}/service/equipment.rs | 15 +- .../plc_platform_core/src}/service/point.rs | 3 +- .../plc_platform_core/src}/service/source.rs | 3 +- .../plc_platform_core/src}/service/tag.rs | 3 +- .../plc_platform_core/src}/telemetry.rs | 9 +- .../tests/shared_core_smoke.rs | 6 + 11 files changed, 115 insertions(+), 83 deletions(-) rename {src => crates/plc_platform_core/src}/connection.rs (92%) rename {src => crates/plc_platform_core/src}/db.rs (100%) rename {src => crates/plc_platform_core/src}/service.rs (100%) rename {src => crates/plc_platform_core/src}/service/control.rs (96%) rename {src => crates/plc_platform_core/src}/service/equipment.rs (98%) rename {src => crates/plc_platform_core/src}/service/point.rs (99%) rename {src => crates/plc_platform_core/src}/service/source.rs (93%) rename {src => crates/plc_platform_core/src}/service/tag.rs (98%) rename {src => crates/plc_platform_core/src}/telemetry.rs (94%) create mode 100644 crates/plc_platform_core/tests/shared_core_smoke.rs diff --git a/src/connection.rs b/crates/plc_platform_core/src/connection.rs similarity index 92% rename from src/connection.rs rename to crates/plc_platform_core/src/connection.rs index ef8b34b..e694dbf 100644 --- a/src/connection.rs +++ b/crates/plc_platform_core/src/connection.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Utc}; use opcua::{ client::{ClientBuilder, IdentityToken, Session}, crypto::SecurityPolicy, @@ -18,7 +18,7 @@ use std::{ use tokio::task::JoinHandle; use tokio::sync::RwLock; use uuid::Uuid; -use plc_platform_core::model::{PointSubscriptionInfo, ScanMode}; +use crate::model::{PointSubscriptionInfo, ScanMode}; use crate::{ telemetry::PointMonitorInfo, @@ -26,6 +26,10 @@ use crate::{ const DEFAULT_POINT_RING_BUFFER_LEN: usize = 1000; +pub trait PointEventSink: Send + Sync { + fn send_point_new_value(&self, payload: crate::telemetry::PointNewValue) -> Result<(), String>; +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct SetPointValueReqItem { pub point_id: Uuid, @@ -95,11 +99,11 @@ pub struct ConnectionStatus { pub next_client_handle: u32, pub client_handle_map: HashMap, // client_handle -> point_id pub monitored_item_map: HashMap, // point_id -> monitored_item_id - pub poll_points: Arc>, // 正在轮询的点集合 - poll_handle: Option>, // 统一的轮询任务句柄 - heartbeat_handle: Option>, // 心跳任务句柄 - event_loop_handle: Option>, // event_loop 任务句柄 - event_loop_monitor_handle: Option>, // event_loop 监控任务句柄 + pub poll_points: Arc>, // 姝e湪杞鐨勭偣闆嗗悎 + poll_handle: Option>, // 缁熶竴鐨勮疆璇换鍔″彞鏌? + heartbeat_handle: Option>, // 蹇冭烦浠诲姟鍙ユ焺 + event_loop_handle: Option>, // event_loop 浠诲姟鍙ユ焺 + event_loop_monitor_handle: Option>, // event_loop 鐩戞帶浠诲姟鍙ユ焺 } #[derive(Clone)] @@ -108,7 +112,7 @@ pub struct ConnectionManager { point_monitor_data: Arc>>, point_history_data: Arc>>>, point_write_target_cache: Arc>>, - event_manager: Option>, + point_event_sink: Option>, pool: Option>, reconnect_tx: Option>, reconnect_rx: Arc>>>, @@ -169,7 +173,7 @@ impl ConnectionManager { point_monitor_data: Arc::new(RwLock::new(HashMap::new())), point_history_data: Arc::new(RwLock::new(HashMap::new())), point_write_target_cache: Arc::new(RwLock::new(HashMap::new())), - event_manager: None, + point_event_sink: None, pool: None, reconnect_tx: Some(reconnect_tx), reconnect_rx: Arc::new(std::sync::Mutex::new(Some(reconnect_rx))), @@ -178,8 +182,8 @@ impl ConnectionManager { } } - pub fn set_event_manager(&mut self, event_manager: std::sync::Arc) { - self.event_manager = Some(event_manager); + pub fn set_event_manager(&mut self, point_event_sink: Arc) { + self.point_event_sink = Some(point_event_sink); } pub fn set_pool(&mut self, pool: Arc) { @@ -188,7 +192,7 @@ impl ConnectionManager { pub fn set_pool_and_start_reconnect_task(&mut self, pool: Arc) { self.pool = Some(pool.clone()); - // 将 self 转换为不可变引用以调用 start_reconnect_task + // 灏?self 杞崲涓轰笉鍙彉寮曠敤浠ヨ皟鐢?start_reconnect_task let manager = self.clone(); manager.start_reconnect_task(); } @@ -205,7 +209,7 @@ impl ConnectionManager { let manager = self.clone(); let pool = manager.pool.clone(); tokio::spawn(async move { - // 获取重连通道的接收端 + // 鑾峰彇閲嶈繛閫氶亾鐨勬帴鏀剁 let mut reconnect_rx = manager.get_reconnect_rx().expect("Failed to get reconnect receiver"); while let Some(source_id) = reconnect_rx.recv().await { @@ -328,18 +332,18 @@ impl ConnectionManager { let manager = self.clone(); let handle = tokio::spawn(async move { - let mut ticker = tokio::time::interval(Duration::from_secs(8)); // 每8秒检测一次心跳 + let mut ticker = tokio::time::interval(Duration::from_secs(8)); // 姣?绉掓娴嬩竴娆″績璺? ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { ticker.tick().await; - // 检查session是否有效 + // 妫€鏌ession鏄惁鏈夋晥 let session = manager.get_session(source_id).await; let (session_valid, subscription_valid) = if let Some(session) = session { - // 尝试读取当前时间来验证连接 - let node_id = NodeId::new(0, 2258); // ServerCurrentTime节点 + // 灏濊瘯璇诲彇褰撳墠鏃堕棿鏉ラ獙璇佽繛鎺? + let node_id = NodeId::new(0, 2258); // ServerCurrentTime鑺傜偣 let read_request = ReadValueId { node_id, attribute_id: AttributeId::Value as u32, @@ -352,8 +356,8 @@ impl ConnectionManager { Err(_) => false, }; - // 检查订阅状态 - 仅当有 subscription_id 时才检查 - // 这里使用 set_publishing_mode 直接让服务器校验订阅 ID,避免仅靠会话读请求误判为健康。 + // 妫€鏌ヨ闃呯姸鎬?- 浠呭綋鏈?subscription_id 鏃舵墠妫€鏌? + // 杩欓噷浣跨敤 set_publishing_mode 鐩存帴璁╂湇鍔″櫒鏍¢獙璁㈤槄 ID锛岄伩鍏嶄粎闈犱細璇濊璇锋眰璇垽涓哄仴搴枫€? let subscription_id = { let status = manager.status.read().await; status.get(&source_id).and_then(|conn_status| conn_status.subscription_id) @@ -364,7 +368,7 @@ impl ConnectionManager { Err(_) => false, } } else { - // 没有 subscription_id 时,认为订阅状态有效(不需要检查) + // 娌℃湁 subscription_id 鏃讹紝璁や负璁㈤槄鐘舵€佹湁鏁堬紙涓嶉渶瑕佹鏌ワ級 true }; @@ -374,7 +378,7 @@ impl ConnectionManager { }; if !session_valid || !subscription_valid { - // 检查是否已经在重连中 + // 妫€鏌ユ槸鍚﹀凡缁忓湪閲嶈繛涓? let mut reconnecting = manager.reconnecting.write().await; if !reconnecting.contains(&source_id) { reconnecting.insert(source_id); @@ -392,7 +396,7 @@ impl ConnectionManager { source_id ); - // 通过通道发送重连请求 + // 閫氳繃閫氶亾鍙戦€侀噸杩炶姹? if let Some(tx) = manager.reconnect_tx.as_ref() { if let Err(e) = tx.send(source_id) { tracing::error!("Failed to send reconnect request for source {}: {}", source_id, e); @@ -407,7 +411,7 @@ impl ConnectionManager { } }); - // 保存心跳任务句柄 + // 淇濆瓨蹇冭烦浠诲姟鍙ユ焺 let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { conn_status.heartbeat_handle = Some(handle); @@ -415,15 +419,15 @@ impl ConnectionManager { } async fn start_unified_poll_task(&self, source_id: Uuid, session: Arc) { - let event_manager = match self.event_manager.clone() { - Some(em) => em, + let point_event_sink = match self.point_event_sink.clone() { + Some(sink) => sink, None => { - tracing::warn!("Event manager is not initialized, cannot start unified poll task"); + tracing::warn!("Point event sink is not initialized, cannot start unified poll task"); return; } }; - // 停止旧的轮询任务 + // 鍋滄鏃х殑杞浠诲姟 { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { @@ -438,10 +442,10 @@ impl ConnectionManager { source_id ); - // 克隆 status 引用,以便在异步任务中使用 + // 鍏嬮殕 status 寮曠敤锛屼互渚垮湪寮傛浠诲姟涓娇鐢? let status_ref = self.status.clone(); - // 启动新的轮询任务 + // 鍚姩鏂扮殑杞浠诲姟 let handle = tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_secs(1)); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -449,7 +453,7 @@ impl ConnectionManager { loop { ticker.tick().await; - // 在任务内部获取轮询点列表 + // 鍦ㄤ换鍔″唴閮ㄨ幏鍙栬疆璇㈢偣鍒楄〃 let poll_points = { let status = status_ref.read().await; status.get(&source_id) @@ -461,7 +465,7 @@ impl ConnectionManager { continue; } - // 构建批量读取请求 + // 鏋勫缓鎵归噺璇诲彇璇锋眰 let read_requests: Vec = poll_points .iter() .filter_map(|p| { @@ -478,7 +482,7 @@ impl ConnectionManager { continue; } - // 执行批量读取 + // 鎵ц鎵归噺璇诲彇 match session.read(&read_requests, TimestampsToReturn::Both, 0f64).await { Ok(results) => { for (poll_point, result) in poll_points.iter().zip(results.iter()) { @@ -492,7 +496,7 @@ impl ConnectionManager { .map(crate::telemetry::PointQuality::from_status_code) .unwrap_or(crate::telemetry::PointQuality::Good); - let _ = event_manager.send(crate::event::AppEvent::PointNewValue( + let _ = point_event_sink.send_point_new_value( crate::telemetry::PointNewValue { source_id, point_id: Some(poll_point.point_id), @@ -505,7 +509,7 @@ impl ConnectionManager { timestamp: Some(Utc::now()), scan_mode: ScanMode::Poll, }, - )); + ); } } Err(e) => { @@ -519,14 +523,14 @@ impl ConnectionManager { } }); - // 保存轮询任务句柄 + // 淇濆瓨杞浠诲姟鍙ユ焺 let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { conn_status.poll_handle = Some(handle); } } - // 将点添加到轮询列表 + // 灏嗙偣娣诲姞鍒拌疆璇㈠垪琛? async fn add_points_to_poll_list( &self, source_id: Uuid, @@ -534,12 +538,12 @@ impl ConnectionManager { ) -> usize { let mut started = 0usize; - // 添加新的轮询点 + // 娣诲姞鏂扮殑杞鐐? { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { for point in points { - // 检查点是否已经在轮询列表中 + // 妫€鏌ョ偣鏄惁宸茬粡鍦ㄨ疆璇㈠垪琛ㄤ腑 if !conn_status.poll_points.iter().any(|p| p.point_id == point.point_id) { Arc::make_mut(&mut conn_status.poll_points).push(PollPointInfo { point_id: point.point_id, @@ -662,7 +666,7 @@ impl ConnectionManager { let event_loop_handle = event_loop.spawn(); - // 添加监控任务来捕获 event_loop 结束事件 + // 娣诲姞鐩戞帶浠诲姟鏉ユ崟鑾?event_loop 缁撴潫浜嬩欢 let manager = self.clone(); let source_id_copy = source_id; let event_loop_monitor_handle = tokio::spawn(async move { @@ -683,7 +687,7 @@ impl ConnectionManager { } } - // 统一触发重连 + // 缁熶竴瑙﹀彂閲嶈繛 if let Some(tx) = manager.reconnect_tx.as_ref() { let _ = tx.send(source_id_copy); } @@ -710,16 +714,16 @@ impl ConnectionManager { poll_points: Arc::new(Vec::new()), poll_handle: None, heartbeat_handle: None, - event_loop_handle: None, // event_loop_handle 已被移动到监控任务中 + event_loop_handle: None, // event_loop_handle 宸茶绉诲姩鍒扮洃鎺т换鍔′腑 event_loop_monitor_handle: Some(event_loop_monitor_handle), }, ); - drop(status); // 显式释放锁,在调用 start_unified_poll_task 之前 + drop(status); // 鏄惧紡閲婃斁閿侊紝鍦ㄨ皟鐢?start_unified_poll_task 涔嬪墠 - // 启动统一的轮询任务 + // 鍚姩缁熶竴鐨勮疆璇换鍔? self.start_unified_poll_task(source_id, session).await; - // 启动心跳任务 + // 鍚姩蹇冭烦浠诲姟 self.start_heartbeat_task(source_id).await; tracing::info!("Successfully connected to source {}", source_id); @@ -750,19 +754,19 @@ impl ConnectionManager { pub async fn reconnect(&self, pool: &sqlx::PgPool, source_id: Uuid) -> Result<(), String> { tracing::info!("Reconnecting to source {}", source_id); - // 先断开连接 + // 鍏堟柇寮€杩炴帴 if let Err(e) = self.disconnect(source_id).await { tracing::error!("Failed to disconnect source {}: {}", source_id, e); } - // 再重新连接 + // 鍐嶉噸鏂拌繛鎺? let result = self.connect_from_source(pool, source_id).await; if result.is_ok() { let mut attempts = self.reconnect_attempts.write().await; attempts.remove(&source_id); } - // 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连 + // 鏃犺鎴愬姛杩樻槸澶辫触閮芥竻闄ら噸杩炴爣璁帮紝浠ヤ究蹇冭烦妫€娴嬪埌闂鍚庡彲浠ュ啀娆¤Е鍙戦噸杩? let mut reconnecting = self.reconnecting.write().await; reconnecting.remove(&source_id); @@ -770,7 +774,7 @@ impl ConnectionManager { } pub async fn disconnect(&self, source_id: Uuid) -> Result<(), String> { - // 停止轮询任务并清空轮询点列表 + // 鍋滄杞浠诲姟骞舵竻绌鸿疆璇㈢偣鍒楄〃 { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { @@ -778,15 +782,15 @@ impl ConnectionManager { if let Some(handle) = conn_status.poll_handle.take() { handle.abort(); } - // 停止心跳任务 + // 鍋滄蹇冭烦浠诲姟 if let Some(handle) = conn_status.heartbeat_handle.take() { handle.abort(); } - // 停止 event_loop 任务 + // 鍋滄 event_loop 浠诲姟 if let Some(handle) = conn_status.event_loop_handle.take() { handle.abort(); } - // 停止 event_loop 监控任务 + // 鍋滄 event_loop 鐩戞帶浠诲姟 if let Some(handle) = conn_status.event_loop_monitor_handle.take() { handle.abort(); } @@ -810,7 +814,7 @@ impl ConnectionManager { let source_ids: Vec = self.status.read().await.keys().copied().collect(); for source_id in source_ids { - // 停止轮询任务并清空轮询点列表 + // 鍋滄杞浠诲姟骞舵竻绌鸿疆璇㈢偣鍒楄〃 { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { @@ -818,15 +822,15 @@ impl ConnectionManager { if let Some(handle) = conn_status.poll_handle.take() { handle.abort(); } - // 停止心跳任务 + // 鍋滄蹇冭烦浠诲姟 if let Some(handle) = conn_status.heartbeat_handle.take() { handle.abort(); } - // 停止 event_loop 任务 + // 鍋滄 event_loop 浠诲姟 if let Some(handle) = conn_status.event_loop_handle.take() { handle.abort(); } - // 停止 event_loop 监控任务 + // 鍋滄 event_loop 鐩戞帶浠诲姟 if let Some(handle) = conn_status.event_loop_monitor_handle.take() { handle.abort(); } @@ -1044,9 +1048,9 @@ impl ConnectionManager { } // Emit local updates only when the full batch succeeds. - if let Some(event_manager) = &self.event_manager { + if let Some(point_event_sink) = &self.point_event_sink { for (source_id, point_id, variant) in success_events { - if let Err(e) = event_manager.send(crate::event::AppEvent::PointNewValue( + if let Err(e) = point_event_sink.send_point_new_value( crate::telemetry::PointNewValue { source_id, point_id: Some(point_id), @@ -1059,7 +1063,7 @@ impl ConnectionManager { timestamp: Some(Utc::now()), scan_mode: ScanMode::Poll, }, - )) { + ) { tracing::warn!( "Batch write succeeded but failed to dispatch point update for point {}: {}", point_id, @@ -1185,8 +1189,8 @@ impl ConnectionManager { .map(crate::telemetry::PointQuality::from_status_code) .unwrap_or(crate::telemetry::PointQuality::Good); - if let Some(event_manager) = &data_manager.event_manager { - let _ = event_manager.send(crate::event::AppEvent::PointNewValue( + if let Some(point_event_sink) = &data_manager.point_event_sink { + let _ = point_event_sink.send_point_new_value( crate::telemetry::PointNewValue { source_id: current_source_id, point_id: None, @@ -1198,7 +1202,7 @@ impl ConnectionManager { protocol: "opcua".to_string(), timestamp: timex, scan_mode: ScanMode::Subscribe, - })); + }); } }, |_event_fields, _item| {}, @@ -1465,7 +1469,7 @@ impl ConnectionManager { } } - // 从轮询列表中移除传入的点,并记录移除的轮询点数量 + // 浠庤疆璇㈠垪琛ㄤ腑绉婚櫎浼犲叆鐨勭偣锛屽苟璁板綍绉婚櫎鐨勮疆璇㈢偣鏁伴噺 let polling_removed_count = { let mut status = self.status.write().await; if let Some(conn_status) = status.get_mut(&source_id) { @@ -1478,7 +1482,7 @@ impl ConnectionManager { } }; - // 计算从订阅点和轮询点移除的总数 + // 璁$畻浠庤闃呯偣鍜岃疆璇㈢偣绉婚櫎鐨勬€绘暟 let total_removed = removed_point_ids.len() + polling_removed_count; tracing::info!( "Unsubscribed {} points (subscription: {}, polling: {}) from source {}", @@ -1494,3 +1498,10 @@ impl ConnectionManager { + + + + + + + diff --git a/src/db.rs b/crates/plc_platform_core/src/db.rs similarity index 100% rename from src/db.rs rename to crates/plc_platform_core/src/db.rs diff --git a/crates/plc_platform_core/src/lib.rs b/crates/plc_platform_core/src/lib.rs index f3a6d1e..b89faea 100644 --- a/crates/plc_platform_core/src/lib.rs +++ b/crates/plc_platform_core/src/lib.rs @@ -1,4 +1,9 @@ -pub mod bootstrap; +pub mod bootstrap; +pub mod connection; +pub mod db; pub mod model; pub mod platform_context; +pub mod service; +pub mod telemetry; pub mod util; + diff --git a/src/service.rs b/crates/plc_platform_core/src/service.rs similarity index 100% rename from src/service.rs rename to crates/plc_platform_core/src/service.rs diff --git a/src/service/control.rs b/crates/plc_platform_core/src/service/control.rs similarity index 96% rename from src/service/control.rs rename to crates/plc_platform_core/src/service/control.rs index 6666636..907de78 100644 --- a/src/service/control.rs +++ b/crates/plc_platform_core/src/service/control.rs @@ -1,4 +1,4 @@ -use plc_platform_core::model::{ControlUnit, EventRecord}; +use crate::model::{ControlUnit, EventRecord}; use sqlx::{PgPool, QueryBuilder, Row}; use uuid::Uuid; @@ -337,7 +337,7 @@ pub async fn get_all_enabled_units(pool: &PgPool) -> Result, sq pub async fn get_equipment_by_unit_ids( pool: &PgPool, unit_ids: &[Uuid], -) -> Result, sqlx::Error> { +) -> Result, sqlx::Error> { if unit_ids.is_empty() { return Ok(vec![]); } @@ -345,7 +345,7 @@ pub async fn get_equipment_by_unit_ids( "SELECT * FROM equipment WHERE unit_id = ANY($1) ORDER BY {}", equipment_order_clause_with_unit() ); - sqlx::query_as::<_, plc_platform_core::model::Equipment>(&sql) + sqlx::query_as::<_, crate::model::Equipment>(&sql) .bind(unit_ids) .fetch_all(pool) .await @@ -354,12 +354,12 @@ pub async fn get_equipment_by_unit_ids( pub async fn get_equipment_by_unit_id( pool: &PgPool, unit_id: Uuid, -) -> Result, sqlx::Error> { +) -> Result, sqlx::Error> { let sql = format!( "SELECT * FROM equipment WHERE unit_id = $1 ORDER BY {}", unit_order_clause() ); - sqlx::query_as::<_, plc_platform_core::model::Equipment>(&sql) + sqlx::query_as::<_, crate::model::Equipment>(&sql) .bind(unit_id) .fetch_all(pool) .await @@ -368,11 +368,11 @@ pub async fn get_equipment_by_unit_id( pub async fn get_points_by_equipment_ids( pool: &PgPool, equipment_ids: &[Uuid], -) -> Result, sqlx::Error> { +) -> Result, sqlx::Error> { if equipment_ids.is_empty() { return Ok(vec![]); } - sqlx::query_as::<_, plc_platform_core::model::Point>( + sqlx::query_as::<_, crate::model::Point>( r#"SELECT * FROM point WHERE equipment_id = ANY($1) ORDER BY equipment_id, created_at"#, ) .bind(equipment_ids) @@ -507,3 +507,4 @@ pub async fn get_equipment_role_points( }) .collect()) } + diff --git a/src/service/equipment.rs b/crates/plc_platform_core/src/service/equipment.rs similarity index 98% rename from src/service/equipment.rs rename to crates/plc_platform_core/src/service/equipment.rs index 7576683..a8373a1 100644 --- a/src/service/equipment.rs +++ b/crates/plc_platform_core/src/service/equipment.rs @@ -1,10 +1,13 @@ -use crate::{ - handler::equipment::EquipmentListItem, -}; -use plc_platform_core::model::{Equipment, Point}; +use crate::model::{Equipment, Point}; use sqlx::{query_as, PgPool, Row}; use uuid::Uuid; +#[derive(Debug, Clone)] +pub struct EquipmentListItem { + pub equipment: Equipment, + pub point_count: i64, +} + fn equipment_order_clause() -> &'static str { "e.code" } @@ -152,7 +155,6 @@ pub async fn get_equipment_paginated( updated_at: row.get("updated_at"), }, point_count: row.get::("point_count"), - role_points: vec![], }) .collect()) } @@ -308,3 +310,6 @@ mod tests { } } + + + diff --git a/src/service/point.rs b/crates/plc_platform_core/src/service/point.rs similarity index 99% rename from src/service/point.rs rename to crates/plc_platform_core/src/service/point.rs index 44d253b..f4a101c 100644 --- a/src/service/point.rs +++ b/crates/plc_platform_core/src/service/point.rs @@ -1,4 +1,4 @@ -use plc_platform_core::model::{Point, PointSubscriptionInfo}; +use crate::model::{Point, PointSubscriptionInfo}; use sqlx::{query_as, PgPool, Row}; use std::collections::HashMap; @@ -283,3 +283,4 @@ pub async fn get_points_paginated( } } } + diff --git a/src/service/source.rs b/crates/plc_platform_core/src/service/source.rs similarity index 93% rename from src/service/source.rs rename to crates/plc_platform_core/src/service/source.rs index 43b5972..4e42ce0 100644 --- a/src/service/source.rs +++ b/crates/plc_platform_core/src/service/source.rs @@ -1,4 +1,4 @@ -use plc_platform_core::model::Source; +use crate::model::Source; use sqlx::{query_as, PgPool}; pub async fn get_enabled_source( @@ -16,3 +16,4 @@ pub async fn get_all_enabled_sources(pool: &PgPool) -> Result, sqlx: .fetch_all(pool) .await } + diff --git a/src/service/tag.rs b/crates/plc_platform_core/src/service/tag.rs similarity index 98% rename from src/service/tag.rs rename to crates/plc_platform_core/src/service/tag.rs index 9094177..ebf1c14 100644 --- a/src/service/tag.rs +++ b/crates/plc_platform_core/src/service/tag.rs @@ -1,4 +1,4 @@ -use plc_platform_core::model::{Point, Tag}; +use crate::model::{Point, Tag}; use sqlx::{query_as, PgPool}; pub async fn get_tags_count(pool: &PgPool) -> Result { @@ -182,3 +182,4 @@ pub async fn delete_tag(pool: &PgPool, tag_id: uuid::Uuid) -> Result 0) } + diff --git a/src/telemetry.rs b/crates/plc_platform_core/src/telemetry.rs similarity index 94% rename from src/telemetry.rs rename to crates/plc_platform_core/src/telemetry.rs index ce5a2ab..b2e3005 100644 --- a/src/telemetry.rs +++ b/crates/plc_platform_core/src/telemetry.rs @@ -1,5 +1,5 @@ -use chrono::{DateTime, Utc}; -use plc_platform_core::model::ScanMode; +use chrono::{DateTime, Utc}; +use crate::model::ScanMode; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -79,14 +79,14 @@ pub struct PointMonitorInfo { pub point_id: Uuid, pub client_handle: u32, pub scan_mode: ScanMode, - #[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")] + #[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")] pub timestamp: Option>, pub quality: PointQuality, pub value: Option, pub value_type: Option, pub value_text: Option, pub old_value: Option, - #[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")] + #[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")] pub old_timestamp: Option>, pub value_changed: bool, } @@ -151,3 +151,4 @@ pub fn opcua_variant_type(value: &opcua::types::Variant) -> ValueType { _ => ValueType::Text, } } + diff --git a/crates/plc_platform_core/tests/shared_core_smoke.rs b/crates/plc_platform_core/tests/shared_core_smoke.rs new file mode 100644 index 0000000..95dfd2c --- /dev/null +++ b/crates/plc_platform_core/tests/shared_core_smoke.rs @@ -0,0 +1,6 @@ +use plc_platform_core::service::EquipmentRolePoint; + +#[test] +fn service_type_is_public() { + let _role_point: Option = None; +}