From 9a3d1f5ebb5b755a482f6c2d61e68b4a493890ff Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 16 Apr 2026 11:16:56 +0800 Subject: [PATCH] refactor(core): move websocket runtime and command infrastructure --- crates/app_feeder_distributor/src/main.rs | 15 +- .../plc_platform_core/src/control/command.rs | 62 +++++++++ crates/plc_platform_core/src/control/mod.rs | 2 + .../plc_platform_core/src/control/runtime.rs | 94 +++++++++++++ crates/plc_platform_core/src/lib.rs | 2 + crates/plc_platform_core/src/websocket.rs | 128 ++++++++++++++++++ .../plc_platform_core/tests/runtime_smoke.rs | 37 +++++ 7 files changed, 326 insertions(+), 14 deletions(-) create mode 100644 crates/plc_platform_core/src/control/command.rs create mode 100644 crates/plc_platform_core/src/control/mod.rs create mode 100644 crates/plc_platform_core/src/control/runtime.rs create mode 100644 crates/plc_platform_core/src/websocket.rs create mode 100644 crates/plc_platform_core/tests/runtime_smoke.rs diff --git a/crates/app_feeder_distributor/src/main.rs b/crates/app_feeder_distributor/src/main.rs index e63b9d6..df959a2 100644 --- a/crates/app_feeder_distributor/src/main.rs +++ b/crates/app_feeder_distributor/src/main.rs @@ -21,20 +21,7 @@ mod telemetry { #[allow(dead_code)] mod websocket { - #[derive(Debug, Clone)] - pub enum WsMessage { - PointNewValue(crate::telemetry::PointMonitorInfo), - EventCreated(plc_platform_core::model::EventRecord), - } - - #[derive(Clone, Default)] - pub struct WebSocketManager; - - impl WebSocketManager { - pub async fn send_to_public(&self, _message: WsMessage) -> Result { - Ok(0) - } - } + pub use plc_platform_core::websocket::*; } use config::AppConfig; diff --git a/crates/plc_platform_core/src/control/command.rs b/crates/plc_platform_core/src/control/command.rs new file mode 100644 index 0000000..1aa5fb3 --- /dev/null +++ b/crates/plc_platform_core/src/control/command.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use serde_json::json; +use uuid::Uuid; + +use crate::{ + connection::{BatchSetPointValueReq, ConnectionManager, SetPointValueReqItem}, + telemetry::ValueType, +}; + +pub async fn send_pulse_command( + connection_manager: &Arc, + point_id: Uuid, + value_type: Option<&ValueType>, + pulse_ms: u64, +) -> Result<(), String> { + let high = pulse_value(true, value_type); + let low = pulse_value(false, value_type); + + let high_result = connection_manager + .write_point_values_batch(BatchSetPointValueReq { + items: vec![SetPointValueReqItem { + point_id, + value: high, + }], + }) + .await?; + + if !high_result.success { + return Err(format!("Pulse high write failed: {:?}", high_result.err_msg)); + } + + tokio::time::sleep(std::time::Duration::from_millis(pulse_ms)).await; + + let low_result = connection_manager + .write_point_values_batch(BatchSetPointValueReq { + items: vec![SetPointValueReqItem { + point_id, + value: low, + }], + }) + .await?; + + if !low_result.success { + return Err(format!("Pulse low write failed: {:?}", low_result.err_msg)); + } + + Ok(()) +} + +fn pulse_value(high: bool, value_type: Option<&ValueType>) -> serde_json::Value { + match value_type { + Some(ValueType::Bool) => serde_json::Value::Bool(high), + _ => { + if high { + json!(1) + } else { + json!(0) + } + } + } +} diff --git a/crates/plc_platform_core/src/control/mod.rs b/crates/plc_platform_core/src/control/mod.rs new file mode 100644 index 0000000..affecb3 --- /dev/null +++ b/crates/plc_platform_core/src/control/mod.rs @@ -0,0 +1,2 @@ +pub mod command; +pub mod runtime; diff --git a/crates/plc_platform_core/src/control/runtime.rs b/crates/plc_platform_core/src/control/runtime.rs new file mode 100644 index 0000000..269696a --- /dev/null +++ b/crates/plc_platform_core/src/control/runtime.rs @@ -0,0 +1,94 @@ +use std::{collections::HashMap, sync::Arc}; + +use tokio::sync::{Notify, RwLock}; +use uuid::Uuid; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum UnitRuntimeState { + Stopped, + Running, + DistributorRunning, + FaultLocked, + CommLocked, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct UnitRuntime { + pub unit_id: Uuid, + pub state: UnitRuntimeState, + pub auto_enabled: bool, + pub accumulated_run_sec: i64, + pub display_acc_sec: i64, + pub fault_locked: bool, + pub flt_active: bool, + pub comm_locked: bool, + pub manual_ack_required: bool, + pub rem_local: bool, +} + +impl UnitRuntime { + pub fn new(unit_id: Uuid) -> Self { + Self { + unit_id, + state: UnitRuntimeState::Stopped, + auto_enabled: false, + accumulated_run_sec: 0, + display_acc_sec: 0, + fault_locked: false, + flt_active: false, + comm_locked: false, + manual_ack_required: false, + rem_local: false, + } + } +} + +#[derive(Clone, Default)] +pub struct ControlRuntimeStore { + inner: Arc>>, + notifiers: Arc>>>, +} + +impl ControlRuntimeStore { + pub fn new() -> Self { + Self::default() + } + + pub async fn get(&self, unit_id: Uuid) -> Option { + self.inner.read().await.get(&unit_id).cloned() + } + + pub async fn get_or_init(&self, unit_id: Uuid) -> UnitRuntime { + if let Some(runtime) = self.get(unit_id).await { + return runtime; + } + + let runtime = UnitRuntime::new(unit_id); + self.inner.write().await.insert(unit_id, runtime.clone()); + runtime + } + + pub async fn upsert(&self, runtime: UnitRuntime) { + self.inner.write().await.insert(runtime.unit_id, runtime); + } + + pub async fn get_or_create_notify(&self, unit_id: Uuid) -> Arc { + self.notifiers + .write() + .await + .entry(unit_id) + .or_insert_with(|| Arc::new(Notify::new())) + .clone() + } + + pub async fn get_all(&self) -> HashMap { + self.inner.read().await.clone() + } + + pub async fn notify_unit(&self, unit_id: Uuid) { + if let Some(notify) = self.notifiers.read().await.get(&unit_id) { + notify.notify_one(); + } + } +} diff --git a/crates/plc_platform_core/src/lib.rs b/crates/plc_platform_core/src/lib.rs index 8a845fe..2064429 100644 --- a/crates/plc_platform_core/src/lib.rs +++ b/crates/plc_platform_core/src/lib.rs @@ -1,5 +1,6 @@ pub mod bootstrap; pub mod connection; +pub mod control; pub mod db; pub mod event; pub mod model; @@ -7,6 +8,7 @@ pub mod platform_context; pub mod service; pub mod telemetry; pub mod util; +pub mod websocket; pub use event::EventEnvelope; diff --git a/crates/plc_platform_core/src/websocket.rs b/crates/plc_platform_core/src/websocket.rs new file mode 100644 index 0000000..919cfdf --- /dev/null +++ b/crates/plc_platform_core/src/websocket.rs @@ -0,0 +1,128 @@ +use std::{collections::HashMap, sync::Arc}; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{broadcast, RwLock}; +use uuid::Uuid; + +use crate::{ + connection::{BatchSetPointValueReq, BatchSetPointValueRes}, + control::runtime::UnitRuntime, + model::EventRecord, + telemetry::PointMonitorInfo, +}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", content = "data")] +pub enum WsMessage { + PointNewValue(PointMonitorInfo), + PointSetValueBatchResult(BatchSetPointValueRes), + EventCreated(EventRecord), + UnitRuntimeChanged(UnitRuntime), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", content = "data", rename_all = "snake_case")] +pub enum WsClientMessage { + AuthWrite(WsAuthWriteReq), + PointSetValueBatch(BatchSetPointValueReq), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WsAuthWriteReq { + pub key: String, +} + +#[derive(Clone)] +pub struct RoomManager { + rooms: Arc>>>, +} + +impl RoomManager { + pub fn new() -> Self { + Self { + rooms: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn get_or_create_room(&self, room_id: &str) -> broadcast::Sender { + let mut rooms = self.rooms.write().await; + + if let Some(sender) = rooms.get(room_id) { + return sender.clone(); + } + + let (sender, _) = broadcast::channel(100); + rooms.insert(room_id.to_string(), sender.clone()); + tracing::info!("Created new room: {}", room_id); + sender + } + + pub async fn get_room(&self, room_id: &str) -> Option> { + let rooms = self.rooms.read().await; + rooms.get(room_id).cloned() + } + + pub async fn remove_room_if_empty(&self, room_id: &str) { + let mut rooms = self.rooms.write().await; + let should_remove = rooms + .get(room_id) + .map(|sender| sender.receiver_count() == 0) + .unwrap_or(false); + + if should_remove { + rooms.remove(room_id); + tracing::info!("Removed empty room: {}", room_id); + } + } + + pub async fn send_to_room(&self, room_id: &str, message: WsMessage) -> Result { + if let Some(sender) = self.get_room(room_id).await { + match sender.send(message) { + Ok(count) => Ok(count), + Err(broadcast::error::SendError(_)) => Ok(0), + } + } else { + Ok(0) + } + } +} + +impl Default for RoomManager { + fn default() -> Self { + Self::new() + } +} + +#[derive(Clone)] +pub struct WebSocketManager { + public_room: Arc, +} + +impl WebSocketManager { + pub fn new() -> Self { + Self { + public_room: Arc::new(RoomManager::new()), + } + } + + pub async fn send_to_public(&self, message: WsMessage) -> Result { + self.public_room.get_or_create_room("public").await; + self.public_room.send_to_room("public", message).await + } + + pub async fn send_to_client( + &self, + client_id: Uuid, + message: WsMessage, + ) -> Result { + self.public_room + .send_to_room(&client_id.to_string(), message) + .await + } +} + +impl Default for WebSocketManager { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/plc_platform_core/tests/runtime_smoke.rs b/crates/plc_platform_core/tests/runtime_smoke.rs new file mode 100644 index 0000000..e3ca949 --- /dev/null +++ b/crates/plc_platform_core/tests/runtime_smoke.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use plc_platform_core::control::runtime::{ControlRuntimeStore, UnitRuntimeState}; +use uuid::Uuid; + +#[tokio::test] +async fn runtime_store_exposes_shared_runtime_surface() { + let unit_id = Uuid::new_v4(); + let store = ControlRuntimeStore::new(); + + let initial = store.get_or_init(unit_id).await; + assert_eq!(initial.unit_id, unit_id); + assert_eq!(initial.state, UnitRuntimeState::Stopped); + assert!(!initial.auto_enabled); + assert_eq!( + serde_json::to_string(&UnitRuntimeState::Stopped).unwrap(), + "\"stopped\"" + ); + + let notify = store.get_or_create_notify(unit_id).await; + let waiter = tokio::spawn(async move { + tokio::time::timeout(Duration::from_millis(50), notify.notified()).await + }); + + let mut updated = initial.clone(); + updated.auto_enabled = true; + updated.state = UnitRuntimeState::Running; + store.upsert(updated.clone()).await; + store.notify_unit(unit_id).await; + + let notified = waiter.await.unwrap(); + assert!(notified.is_ok(), "unit notifier should wake waiters"); + + let persisted = store.get(unit_id).await.expect("runtime should exist"); + assert_eq!(persisted.state, UnitRuntimeState::Running); + assert!(persisted.auto_enabled); +}