refactor(core): move websocket runtime and command infrastructure

This commit is contained in:
caoqianming 2026-04-16 11:16:56 +08:00
parent de1879bbf2
commit 9a3d1f5ebb
7 changed files with 326 additions and 14 deletions

View File

@ -21,20 +21,7 @@ mod telemetry {
#[allow(dead_code)]
mod websocket {
#[derive(Debug, Clone)]
pub enum WsMessage {
PointNewValue(crate::telemetry::PointMonitorInfo),
EventCreated(plc_platform_core::model::EventRecord),
}
#[derive(Clone, Default)]
pub struct WebSocketManager;
impl WebSocketManager {
pub async fn send_to_public(&self, _message: WsMessage) -> Result<usize, String> {
Ok(0)
}
}
pub use plc_platform_core::websocket::*;
}
use config::AppConfig;

View File

@ -0,0 +1,62 @@
use std::sync::Arc;
use serde_json::json;
use uuid::Uuid;
use crate::{
connection::{BatchSetPointValueReq, ConnectionManager, SetPointValueReqItem},
telemetry::ValueType,
};
pub async fn send_pulse_command(
connection_manager: &Arc<ConnectionManager>,
point_id: Uuid,
value_type: Option<&ValueType>,
pulse_ms: u64,
) -> Result<(), String> {
let high = pulse_value(true, value_type);
let low = pulse_value(false, value_type);
let high_result = connection_manager
.write_point_values_batch(BatchSetPointValueReq {
items: vec![SetPointValueReqItem {
point_id,
value: high,
}],
})
.await?;
if !high_result.success {
return Err(format!("Pulse high write failed: {:?}", high_result.err_msg));
}
tokio::time::sleep(std::time::Duration::from_millis(pulse_ms)).await;
let low_result = connection_manager
.write_point_values_batch(BatchSetPointValueReq {
items: vec![SetPointValueReqItem {
point_id,
value: low,
}],
})
.await?;
if !low_result.success {
return Err(format!("Pulse low write failed: {:?}", low_result.err_msg));
}
Ok(())
}
fn pulse_value(high: bool, value_type: Option<&ValueType>) -> serde_json::Value {
match value_type {
Some(ValueType::Bool) => serde_json::Value::Bool(high),
_ => {
if high {
json!(1)
} else {
json!(0)
}
}
}
}

View File

@ -0,0 +1,2 @@
pub mod command;
pub mod runtime;

View File

@ -0,0 +1,94 @@
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{Notify, RwLock};
use uuid::Uuid;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum UnitRuntimeState {
Stopped,
Running,
DistributorRunning,
FaultLocked,
CommLocked,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct UnitRuntime {
pub unit_id: Uuid,
pub state: UnitRuntimeState,
pub auto_enabled: bool,
pub accumulated_run_sec: i64,
pub display_acc_sec: i64,
pub fault_locked: bool,
pub flt_active: bool,
pub comm_locked: bool,
pub manual_ack_required: bool,
pub rem_local: bool,
}
impl UnitRuntime {
pub fn new(unit_id: Uuid) -> Self {
Self {
unit_id,
state: UnitRuntimeState::Stopped,
auto_enabled: false,
accumulated_run_sec: 0,
display_acc_sec: 0,
fault_locked: false,
flt_active: false,
comm_locked: false,
manual_ack_required: false,
rem_local: false,
}
}
}
#[derive(Clone, Default)]
pub struct ControlRuntimeStore {
inner: Arc<RwLock<HashMap<Uuid, UnitRuntime>>>,
notifiers: Arc<RwLock<HashMap<Uuid, Arc<Notify>>>>,
}
impl ControlRuntimeStore {
pub fn new() -> Self {
Self::default()
}
pub async fn get(&self, unit_id: Uuid) -> Option<UnitRuntime> {
self.inner.read().await.get(&unit_id).cloned()
}
pub async fn get_or_init(&self, unit_id: Uuid) -> UnitRuntime {
if let Some(runtime) = self.get(unit_id).await {
return runtime;
}
let runtime = UnitRuntime::new(unit_id);
self.inner.write().await.insert(unit_id, runtime.clone());
runtime
}
pub async fn upsert(&self, runtime: UnitRuntime) {
self.inner.write().await.insert(runtime.unit_id, runtime);
}
pub async fn get_or_create_notify(&self, unit_id: Uuid) -> Arc<Notify> {
self.notifiers
.write()
.await
.entry(unit_id)
.or_insert_with(|| Arc::new(Notify::new()))
.clone()
}
pub async fn get_all(&self) -> HashMap<Uuid, UnitRuntime> {
self.inner.read().await.clone()
}
pub async fn notify_unit(&self, unit_id: Uuid) {
if let Some(notify) = self.notifiers.read().await.get(&unit_id) {
notify.notify_one();
}
}
}

View File

@ -1,5 +1,6 @@
pub mod bootstrap;
pub mod connection;
pub mod control;
pub mod db;
pub mod event;
pub mod model;
@ -7,6 +8,7 @@ pub mod platform_context;
pub mod service;
pub mod telemetry;
pub mod util;
pub mod websocket;
pub use event::EventEnvelope;

View File

@ -0,0 +1,128 @@
use std::{collections::HashMap, sync::Arc};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, RwLock};
use uuid::Uuid;
use crate::{
connection::{BatchSetPointValueReq, BatchSetPointValueRes},
control::runtime::UnitRuntime,
model::EventRecord,
telemetry::PointMonitorInfo,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum WsMessage {
PointNewValue(PointMonitorInfo),
PointSetValueBatchResult(BatchSetPointValueRes),
EventCreated(EventRecord),
UnitRuntimeChanged(UnitRuntime),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "snake_case")]
pub enum WsClientMessage {
AuthWrite(WsAuthWriteReq),
PointSetValueBatch(BatchSetPointValueReq),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WsAuthWriteReq {
pub key: String,
}
#[derive(Clone)]
pub struct RoomManager {
rooms: Arc<RwLock<HashMap<String, broadcast::Sender<WsMessage>>>>,
}
impl RoomManager {
pub fn new() -> Self {
Self {
rooms: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn get_or_create_room(&self, room_id: &str) -> broadcast::Sender<WsMessage> {
let mut rooms = self.rooms.write().await;
if let Some(sender) = rooms.get(room_id) {
return sender.clone();
}
let (sender, _) = broadcast::channel(100);
rooms.insert(room_id.to_string(), sender.clone());
tracing::info!("Created new room: {}", room_id);
sender
}
pub async fn get_room(&self, room_id: &str) -> Option<broadcast::Sender<WsMessage>> {
let rooms = self.rooms.read().await;
rooms.get(room_id).cloned()
}
pub async fn remove_room_if_empty(&self, room_id: &str) {
let mut rooms = self.rooms.write().await;
let should_remove = rooms
.get(room_id)
.map(|sender| sender.receiver_count() == 0)
.unwrap_or(false);
if should_remove {
rooms.remove(room_id);
tracing::info!("Removed empty room: {}", room_id);
}
}
pub async fn send_to_room(&self, room_id: &str, message: WsMessage) -> Result<usize, String> {
if let Some(sender) = self.get_room(room_id).await {
match sender.send(message) {
Ok(count) => Ok(count),
Err(broadcast::error::SendError(_)) => Ok(0),
}
} else {
Ok(0)
}
}
}
impl Default for RoomManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct WebSocketManager {
public_room: Arc<RoomManager>,
}
impl WebSocketManager {
pub fn new() -> Self {
Self {
public_room: Arc::new(RoomManager::new()),
}
}
pub async fn send_to_public(&self, message: WsMessage) -> Result<usize, String> {
self.public_room.get_or_create_room("public").await;
self.public_room.send_to_room("public", message).await
}
pub async fn send_to_client(
&self,
client_id: Uuid,
message: WsMessage,
) -> Result<usize, String> {
self.public_room
.send_to_room(&client_id.to_string(), message)
.await
}
}
impl Default for WebSocketManager {
fn default() -> Self {
Self::new()
}
}

View File

@ -0,0 +1,37 @@
use std::time::Duration;
use plc_platform_core::control::runtime::{ControlRuntimeStore, UnitRuntimeState};
use uuid::Uuid;
#[tokio::test]
async fn runtime_store_exposes_shared_runtime_surface() {
let unit_id = Uuid::new_v4();
let store = ControlRuntimeStore::new();
let initial = store.get_or_init(unit_id).await;
assert_eq!(initial.unit_id, unit_id);
assert_eq!(initial.state, UnitRuntimeState::Stopped);
assert!(!initial.auto_enabled);
assert_eq!(
serde_json::to_string(&UnitRuntimeState::Stopped).unwrap(),
"\"stopped\""
);
let notify = store.get_or_create_notify(unit_id).await;
let waiter = tokio::spawn(async move {
tokio::time::timeout(Duration::from_millis(50), notify.notified()).await
});
let mut updated = initial.clone();
updated.auto_enabled = true;
updated.state = UnitRuntimeState::Running;
store.upsert(updated.clone()).await;
store.notify_unit(unit_id).await;
let notified = waiter.await.unwrap();
assert!(notified.is_ok(), "unit notifier should wake waiters");
let persisted = store.get(unit_id).await.expect("runtime should exist");
assert_eq!(persisted.state, UnitRuntimeState::Running);
assert!(persisted.auto_enabled);
}