Move shared feeder plumbing into core

This commit is contained in:
caoqianming 2026-04-21 16:04:03 +08:00
parent 1317271e16
commit 24b1d3546b
23 changed files with 361 additions and 675 deletions

View File

@ -159,8 +159,6 @@ deploy/
- `DATABASE_URL` - `DATABASE_URL`
- `HOST` - `HOST`
- `PORT` - `PORT`
- `WRITE_API_KEY`
- `SIMULATE_PLC`
## 文档索引 ## 文档索引

View File

@ -1,20 +1,16 @@
use std::sync::Arc; use std::sync::Arc;
use crate::{ use crate::{
config::AppConfig, connection::ConnectionManager, control, event::EventManager, router::build_router,
connection::ConnectionManager,
control,
event::EventManager,
router::build_router,
websocket::WebSocketManager,
}; };
use axum::extract::FromRef; use axum::extract::FromRef;
use plc_platform_core::platform_context::PlatformContext; use plc_platform_core::websocket::WebSocketManager;
use plc_platform_core::{config::ServerConfig, platform_context::PlatformContext};
use tokio::sync::mpsc; use tokio::sync::mpsc;
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
pub config: AppConfig, pub config: ServerConfig,
pub platform: PlatformContext, pub platform: PlatformContext,
pub event_manager: Arc<EventManager>, pub event_manager: Arc<EventManager>,
pub control_runtime: Arc<control::runtime::ControlRuntimeStore>, pub control_runtime: Arc<control::runtime::ControlRuntimeStore>,
@ -26,12 +22,12 @@ impl FromRef<AppState> for PlatformContext {
} }
} }
pub async fn run() { pub async fn run() {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
plc_platform_core::util::log::init_logger(); plc_platform_core::util::log::init_logger();
let _single_instance = let _single_instance =
match plc_platform_core::util::single_instance::try_acquire("PLCControl.FeederDistributor") { match plc_platform_core::util::single_instance::try_acquire("PLCControl.FeederDistributor")
{
Ok(guard) => guard, Ok(guard) => guard,
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => { Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
tracing::warn!("Another feeder distributor instance is already running"); tracing::warn!("Another feeder distributor instance is already running");
@ -43,7 +39,8 @@ pub async fn run() {
} }
}; };
let config = AppConfig::from_env().expect("Failed to load configuration"); let config = ServerConfig::from_env("HOST", "0.0.0.0", "PORT", 60309)
.expect("Failed to load server configuration");
let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url) let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url)
.await .await
.expect("Failed to bootstrap platform"); .expect("Failed to bootstrap platform");
@ -60,31 +57,9 @@ pub async fn run() {
Some(platform.ws_manager.clone()), Some(platform.ws_manager.clone()),
)); ));
let sources = crate::service::get_all_enabled_sources(&platform.pool) plc_platform_core::bootstrap::connect_all_enabled_sources(&platform)
.await .await
.expect("Failed to fetch sources"); .expect("Failed to connect enabled sources");
let mut tasks = Vec::new();
for source in sources {
let cm = platform.connection_manager.clone();
let p = platform.pool.clone();
let source_name = source.name.clone();
let source_id = source.id;
let task = tokio::spawn(async move {
if let Err(err) = cm.connect_from_source(&p, source_id).await {
tracing::error!("Failed to connect to source {}: {}", source_name, err);
}
});
tasks.push(task);
}
for task in tasks {
if let Err(err) = task.await {
tracing::error!("Source connection task failed: {:?}", err);
}
}
let state = AppState { let state = AppState {
config: config.clone(), config: config.clone(),
@ -93,9 +68,6 @@ pub async fn run() {
control_runtime: control_runtime.clone(), control_runtime: control_runtime.clone(),
}; };
control::engine::start(state.clone(), control_runtime); control::engine::start(state.clone(), control_runtime);
if config.simulate_plc {
control::simulate::start(state.clone());
}
let app = build_router(state.clone()); let app = build_router(state.clone());
let addr = format!("{}:{}", config.server_host, config.server_port); let addr = format!("{}:{}", config.server_host, config.server_port);
@ -136,19 +108,14 @@ pub fn test_state() -> AppState {
.expect("lazy pool should build"); .expect("lazy pool should build");
let connection_manager = Arc::new(ConnectionManager::new()); let connection_manager = Arc::new(ConnectionManager::new());
let ws_manager = Arc::new(WebSocketManager::new()); let ws_manager = Arc::new(WebSocketManager::new());
let event_manager = Arc::new(EventManager::new( let event_manager = Arc::new(EventManager::new(pool.clone(), Some(ws_manager.clone())));
pool.clone(),
Some(ws_manager.clone()),
));
let platform = PlatformContext::new(pool, connection_manager, ws_manager); let platform = PlatformContext::new(pool, connection_manager, ws_manager);
AppState { AppState {
config: AppConfig { config: ServerConfig {
database_url, database_url,
server_host: "127.0.0.1".to_string(), server_host: "127.0.0.1".to_string(),
server_port: 0, server_port: 0,
write_api_key: Some("test-write-key".to_string()),
simulate_plc: false,
}, },
platform, platform,
event_manager, event_manager,

View File

@ -1,51 +0,0 @@
use std::env;
#[derive(Clone)]
pub struct AppConfig {
pub database_url: String,
pub server_host: String,
pub server_port: u16,
pub write_api_key: Option<String>,
/// When true, simulate RUN signal feedback after start/stop commands.
/// Set SIMULATE_PLC=true in .env for use with OPC UA proxy simulators.
pub simulate_plc: bool,
}
impl AppConfig {
pub fn from_env() -> Result<Self, String> {
let database_url = get_env("DATABASE_URL")?;
let server_host = env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
let server_port = env::var("PORT")
.unwrap_or_else(|_| "60309".to_string())
.parse::<u16>()
.map_err(|_| "PORT must be a number")?;
// Prefer WRITE_API_KEY, keep WRITE_KEY as backward-compatible fallback.
let write_api_key = env::var("WRITE_API_KEY")
.ok()
.or_else(|| env::var("WRITE_KEY").ok());
let simulate_plc = env::var("SIMULATE_PLC")
.unwrap_or_default()
.to_lowercase() == "true";
Ok(Self {
database_url,
server_host,
server_port,
write_api_key,
simulate_plc,
})
}
pub fn verify_write_key(&self, key: &str) -> bool {
self.write_api_key
.as_ref()
.map(|expected| expected == key)
.unwrap_or(false)
}
}
fn get_env(key: &str) -> Result<String, String> {
env::var(key).map_err(|_| format!("Missing environment variable: {}", key))
}

View File

@ -13,9 +13,9 @@ use crate::{
event::AppEvent, event::AppEvent,
service::EquipmentRolePoint, service::EquipmentRolePoint,
telemetry::{PointMonitorInfo, PointQuality}, telemetry::{PointMonitorInfo, PointQuality},
websocket::WsMessage,
AppState, AppState,
}; };
use plc_platform_core::websocket::WsMessage;
/// Start the engine: a supervisor spawns one async task per enabled unit. /// Start the engine: a supervisor spawns one async task per enabled unit.
pub fn start(state: AppState, runtime_store: Arc<ControlRuntimeStore>) { pub fn start(state: AppState, runtime_store: Arc<ControlRuntimeStore>) {
@ -77,7 +77,7 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
}; };
// Fault / comm check. // Fault / comm check.
let (kind_roles, kind_eq_ids, all_roles) = match load_equipment_maps(&state, unit_id).await { let (kind_roles, all_roles) = match load_equipment_maps(&state, unit_id).await {
Ok(maps) => maps, Ok(maps) => maps,
Err(e) => { Err(e) => {
tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e); tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e);
@ -122,11 +122,6 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e); tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e);
continue; continue;
} }
if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() {
crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await;
}
}
} }
let mut runtime = store.get_or_init(unit_id).await; let mut runtime = store.get_or_init(unit_id).await;
runtime.state = UnitRuntimeState::Running; runtime.state = UnitRuntimeState::Running;
@ -154,11 +149,6 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e); tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e);
continue; continue;
} }
if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() {
crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await;
}
}
} }
let mut runtime = store.get_or_init(unit_id).await; let mut runtime = store.get_or_init(unit_id).await;
runtime.accumulated_run_sec += secs as i64 * 1000; runtime.accumulated_run_sec += secs as i64 * 1000;
@ -172,10 +162,6 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
if let Some((pid, vt)) = dist_cmd { if let Some((pid, vt)) = dist_cmd {
if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e); tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e);
} else if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("distributor").copied() {
crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await;
}
} }
} }
runtime.state = UnitRuntimeState::DistributorRunning; runtime.state = UnitRuntimeState::DistributorRunning;
@ -199,11 +185,6 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e); tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e);
continue; continue;
} }
if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("distributor").copied() {
crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await;
}
}
} }
let mut runtime = store.get_or_init(unit_id).await; let mut runtime = store.get_or_init(unit_id).await;
runtime.accumulated_run_sec = 0; runtime.accumulated_run_sec = 0;
@ -405,7 +386,6 @@ async fn check_fault_comm(
type EquipMaps = ( type EquipMaps = (
HashMap<String, HashMap<String, EquipmentRolePoint>>, HashMap<String, HashMap<String, EquipmentRolePoint>>,
HashMap<String, Uuid>,
Vec<(Uuid, HashMap<String, EquipmentRolePoint>)>, Vec<(Uuid, HashMap<String, EquipmentRolePoint>)>,
); );
@ -438,7 +418,6 @@ fn build_equipment_maps(
mut role_points_by_equipment: HashMap<Uuid, Vec<EquipmentRolePoint>>, mut role_points_by_equipment: HashMap<Uuid, Vec<EquipmentRolePoint>>,
) -> EquipMaps { ) -> EquipMaps {
let mut kind_roles: HashMap<String, HashMap<String, EquipmentRolePoint>> = HashMap::new(); let mut kind_roles: HashMap<String, HashMap<String, EquipmentRolePoint>> = HashMap::new();
let mut kind_eq_ids: HashMap<String, Uuid> = HashMap::new();
let mut all_roles: Vec<(Uuid, HashMap<String, EquipmentRolePoint>)> = Vec::new(); let mut all_roles: Vec<(Uuid, HashMap<String, EquipmentRolePoint>)> = Vec::new();
for equip in equipment_list { for equip in equipment_list {
@ -452,7 +431,6 @@ fn build_equipment_maps(
if let Some(kind) = &equip.kind { if let Some(kind) = &equip.kind {
if !kind_roles.contains_key(kind.as_str()) { if !kind_roles.contains_key(kind.as_str()) {
kind_roles.insert(kind.clone(), role_map.clone()); kind_roles.insert(kind.clone(), role_map.clone());
kind_eq_ids.insert(kind.clone(), equip.id);
} else { } else {
tracing::warn!( tracing::warn!(
"Engine: unit {} has multiple {} equipment; using first", "Engine: unit {} has multiple {} equipment; using first",
@ -463,7 +441,7 @@ fn build_equipment_maps(
all_roles.push((equip.id, role_map)); all_roles.push((equip.id, role_map));
} }
(kind_roles, kind_eq_ids, all_roles) (kind_roles, all_roles)
} }
/// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad. /// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad.

View File

@ -1,7 +1,6 @@
pub use plc_platform_core::control::{command, runtime}; pub use plc_platform_core::control::{command, runtime};
pub mod engine; pub mod engine;
pub mod simulate;
pub mod validator; pub mod validator;
use crate::telemetry::{DataValue, PointMonitorInfo}; use crate::telemetry::{DataValue, PointMonitorInfo};

View File

@ -1,213 +0,0 @@
use tokio::time::Duration;
use uuid::Uuid;
use crate::{
connection::{BatchSetPointValueReq, SetPointValueReqItem},
telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType},
websocket::WsMessage,
AppState,
};
/// Start the chaos simulation task (only when SIMULATE_PLC=true).
/// Randomly disrupts `rem` or `flt` signals on equipment to exercise the control engine.
pub fn start(state: AppState) {
tokio::spawn(async move {
run(state).await;
});
}
async fn run(state: AppState) {
let mut rng = seed_rng();
loop {
// Wait a random 15-60 s between events.
let wait_secs = 15 + xorshift(&mut rng) % 46;
tokio::time::sleep(Duration::from_secs(wait_secs)).await;
// Pick a random enabled unit.
let units = match crate::service::get_all_enabled_units(&state.platform.pool).await {
Ok(u) if !u.is_empty() => u,
_ => continue,
};
let unit = &units[xorshift(&mut rng) as usize % units.len()];
// Only target units with auto control running; otherwise the event is uninteresting.
let runtime = state.control_runtime.get(unit.id).await;
if runtime.map_or(true, |r| !r.auto_enabled) {
continue;
}
// Pick a random equipment in that unit.
let equipments =
match crate::service::get_equipment_by_unit_id(&state.platform.pool, unit.id).await {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
let eq = &equipments[xorshift(&mut rng) as usize % equipments.len()];
// Find which of rem / flt this equipment has.
let role_points =
match crate::service::get_equipment_role_points(&state.platform.pool, eq.id).await {
Ok(rp) if !rp.is_empty() => rp,
_ => continue,
};
let candidates: Vec<&str> = ["flt", "rem"]
.iter()
.filter(|&&r| role_points.iter().any(|p| p.signal_role == r))
.copied()
.collect();
if candidates.is_empty() {
continue;
}
let target_role = candidates[xorshift(&mut rng) as usize % candidates.len()];
let target_point = role_points
.iter()
.find(|p| p.signal_role == target_role)
.unwrap();
// rem=false means the equipment is not in remote mode.
// flt=true means the equipment reports an active fault.
let trigger_value = target_role == "flt";
// Hold duration: 5-15 s for rem, 3-10 s for flt.
let hold_secs = if target_role == "flt" {
3 + xorshift(&mut rng) % 8
} else {
5 + xorshift(&mut rng) % 11
};
tracing::info!(
"[chaos] unit={} eq={} role={} -> {} (hold {}s)",
unit.code,
eq.code,
target_role,
if trigger_value { "FAULT" } else { "REM OFF" },
hold_secs
);
patch_signal(&state, target_point.point_id, trigger_value).await;
patch_signal(&state, target_point.point_id, trigger_value).await;
tokio::time::sleep(Duration::from_secs(hold_secs)).await;
patch_signal(&state, target_point.point_id, !trigger_value).await;
tracing::info!(
"[chaos] unit={} eq={} role={} -> RESTORED",
unit.code,
eq.code,
target_role
);
}
}
/// Simulate RUN signal feedback for an equipment after a manual start/stop command.
/// Called by the engine and control handler when SIMULATE_PLC=true.
pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) {
let role_points =
match crate::service::get_equipment_role_points(&state.platform.pool, equipment_id).await {
Ok(v) => v,
Err(e) => {
tracing::warn!("simulate_run_feedback: db error: {}", e);
return;
}
};
let run_point = match role_points.iter().find(|p| p.signal_role == "run") {
Some(p) => p.clone(),
None => return,
};
patch_signal(state, run_point.point_id, run_on).await;
}
/// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push.
pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) {
let write_json = serde_json::json!(if value_on { 1 } else { 0 });
let write_ok = match state
.platform.connection_manager
.write_point_values_batch(BatchSetPointValueReq {
items: vec![SetPointValueReqItem {
point_id,
value: write_json,
}],
})
.await
{
Ok(res) => res.success,
Err(_) => false,
};
if write_ok {
return;
}
// Fallback: patch the monitor cache directly and broadcast over WS.
let (value, value_type, value_text) = {
let guard = state
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) {
Some(ValueType::Int) => (
DataValue::Int(if value_on { 1 } else { 0 }),
Some(ValueType::Int),
Some(if value_on { "1" } else { "0" }.to_string()),
),
Some(ValueType::UInt) => (
DataValue::UInt(if value_on { 1 } else { 0 }),
Some(ValueType::UInt),
Some(if value_on { "1" } else { "0" }.to_string()),
),
_ => (
DataValue::Bool(value_on),
Some(ValueType::Bool),
Some(value_on.to_string()),
),
}
};
let monitor = PointMonitorInfo {
protocol: "simulation".to_string(),
source_id: Uuid::nil(),
point_id,
client_handle: 0,
scan_mode: plc_platform_core::model::ScanMode::Poll,
timestamp: Some(chrono::Utc::now()),
quality: PointQuality::Good,
value: Some(value),
value_type,
value_text,
old_value: None,
old_timestamp: None,
value_changed: true,
};
if let Err(e) = state
.platform.connection_manager
.update_point_monitor_data(monitor.clone())
.await
{
tracing::warn!("[chaos] cache update failed for {}: {}", point_id, e);
return;
}
let _ = state
.platform.ws_manager
.send_to_public(WsMessage::PointNewValue(monitor))
.await;
}
// Minimal XorShift64 PRNG (no external crate needed).
fn seed_rng() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64 ^ d.as_secs().wrapping_mul(0x9e37_79b9_7f4a_7c15))
.unwrap_or(0xdeadbeef)
}
fn xorshift(s: &mut u64) -> u64 {
*s ^= *s << 13;
*s ^= *s >> 7;
*s ^= *s << 17;
*s
}

View File

@ -1,7 +1,10 @@
use plc_platform_core::event::EventEnvelope; use plc_platform_core::model::EventRecord;
use plc_platform_core::{
event::EventEnvelope,
websocket::{WebSocketManager, WsMessage},
};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use plc_platform_core::model::EventRecord;
const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024;
@ -37,7 +40,7 @@ pub struct EventManager {
impl EventManager { impl EventManager {
pub fn new( pub fn new(
pool: sqlx::PgPool, pool: sqlx::PgPool,
ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>, ws_manager: Option<std::sync::Arc<WebSocketManager>>,
) -> Self { ) -> Self {
let (control_sender, mut control_receiver) = let (control_sender, mut control_receiver) =
mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY); mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
@ -101,7 +104,7 @@ impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEv
async fn handle_control_event( async fn handle_control_event(
event: AppEvent, event: AppEvent,
pool: &sqlx::PgPool, pool: &sqlx::PgPool,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>, ws_manager: Option<&std::sync::Arc<WebSocketManager>>,
) { ) {
persist_event_if_needed(&event, pool, ws_manager).await; persist_event_if_needed(&event, pool, ws_manager).await;
@ -175,7 +178,7 @@ async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String {
async fn persist_event_if_needed( async fn persist_event_if_needed(
event: &AppEvent, event: &AppEvent,
pool: &sqlx::PgPool, pool: &sqlx::PgPool,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>, ws_manager: Option<&std::sync::Arc<WebSocketManager>>,
) { ) {
let record: Option<(&str, &str, Option<Uuid>, Option<Uuid>, Option<Uuid>, String, serde_json::Value)> = match event { let record: Option<(&str, &str, Option<Uuid>, Option<Uuid>, Option<Uuid>, String, serde_json::Value)> = match event {
AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => { AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => {
@ -307,7 +310,7 @@ async fn persist_event_if_needed(
match inserted { match inserted {
Ok(record) => { Ok(record) => {
if let Some(ws_manager) = ws_manager { if let Some(ws_manager) = ws_manager {
let ws_message = crate::websocket::WsMessage::EventCreated(record); let ws_message = WsMessage::EventCreated(record);
if let Err(err) = ws_manager.send_to_public(ws_message).await { if let Err(err) = ws_manager.send_to_public(ws_message).await {
tracing::warn!("Failed to broadcast event websocket message: {}", err); tracing::warn!("Failed to broadcast event websocket message: {}", err);
} }

View File

@ -169,15 +169,6 @@ async fn send_equipment_command(
.await .await
.map_err(|e| ApiErr::Internal(e, None))?; .map_err(|e| ApiErr::Internal(e, None))?;
if state.config.simulate_plc {
crate::control::simulate::simulate_run_feedback(
&state,
equipment_id,
matches!(action, ControlAction::Start),
)
.await;
}
let event = match action { let event = match action {
ControlAction::Start => crate::event::AppEvent::EquipmentStartCommandSent { ControlAction::Start => crate::event::AppEvent::EquipmentStartCommandSent {
equipment_id, equipment_id,

View File

@ -1,41 +1 @@
// Re-export all platform point handlers from core.
pub use plc_platform_core::handler::point::*; pub use plc_platform_core::handler::point::*;
use axum::{
extract::State,
http::HeaderMap,
response::IntoResponse,
Json,
};
use plc_platform_core::util::response::ApiErr;
use crate::AppState;
/// Feeder-specific: batch set point values (requires write key auth from app config).
pub async fn batch_set_point_value(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<plc_platform_core::connection::BatchSetPointValueReq>,
) -> Result<impl IntoResponse, ApiErr> {
let write_key = headers
.get("X-Write-Key")
.and_then(|v| v.to_str().ok())
.unwrap_or_default();
if !state.config.verify_write_key(write_key) {
return Err(ApiErr::Forbidden(
"write permission denied".to_string(),
Some(serde_json::json!({
"hint": "set WRITE_API_KEY (or legacy WRITE_KEY) and pass header X-Write-Key"
})),
));
}
let result = state
.platform
.connection_manager
.write_point_values_batch(payload)
.await
.map_err(|e| ApiErr::Internal(e, None))?;
Ok(Json(result))
}

View File

@ -1,11 +1,8 @@
pub mod app; pub mod app;
pub mod config;
pub mod control; pub mod control;
pub mod event; pub mod event;
pub mod handler; pub mod handler;
pub mod middleware;
pub mod router; pub mod router;
pub mod websocket;
pub mod connection { pub mod connection {
pub use plc_platform_core::connection::*; pub use plc_platform_core::connection::*;

View File

@ -1,37 +0,0 @@
use axum::{
body::Body,
http::Request,
middleware::Next,
response::Response,
};
use std::time::Instant;
pub async fn simple_logger(
req: Request<Body>,
next: Next,
) -> Response {
// Borrow the path string directly; no clone needed.
let method = req.method().to_string();
let uri = req.uri().to_string(); // `Uri::to_string()` allocates the owned string once.
let start = Instant::now();
let res = next.run(req).await;
let duration = start.elapsed();
let status = res.status();
match status.as_u16() {
100..=399 => {
tracing::info!("{} {} {} {:?}", method, uri, status, duration);
}
400..=499 => {
tracing::warn!("{} {} {} {:?}", method, uri, status, duration);
}
500..=599 => {
tracing::error!("{} {} {} {:?}", method, uri, status, duration);
}
_ => {
tracing::warn!("{} {} {} {:?}", method, uri, status, duration);
}
}
res
}

View File

@ -1,23 +1,9 @@
use axum::{ use axum::{
extract::Request,
middleware::Next,
response::Response,
routing::{get, post}, routing::{get, post},
Router, Router,
}; };
use tower_http::cors::{Any, CorsLayer};
use tower_http::services::ServeDir;
use crate::{handler, middleware::simple_logger, websocket, AppState}; use crate::{handler, AppState};
async fn no_cache(req: Request, next: Next) -> Response {
let mut response = next.run(req).await;
response.headers_mut().insert(
axum::http::header::CACHE_CONTROL,
axum::http::HeaderValue::from_static("no-store"),
);
response
}
pub fn build_router(state: AppState) -> Router { pub fn build_router(state: AppState) -> Router {
// Platform routes (source, point, equipment, tag, page, logs) from core. // Platform routes (source, point, equipment, tag, page, logs) from core.
@ -25,11 +11,6 @@ pub fn build_router(state: AppState) -> Router {
// Feeder-specific routes. // Feeder-specific routes.
let feeder_routes = Router::new() let feeder_routes = Router::new()
// Feeder-only: batch set point values (requires write key auth).
.route(
"/api/point/value/batch",
post(handler::point::batch_set_point_value),
)
// Unit / control routes (feeder-specific). // Unit / control routes (feeder-specific).
.route( .route(
"/api/unit", "/api/unit",
@ -87,25 +68,19 @@ pub fn build_router(state: AppState) -> Router {
.merge(feeder_routes) .merge(feeder_routes)
.nest( .nest(
"/ui", "/ui",
Router::new() plc_platform_core::http::static_ui_routes("web/feeder", "web/core"),
.fallback_service(
ServeDir::new("web/feeder")
.append_index_html_on_directories(true)
.fallback(ServeDir::new("web/core")),
) )
.layer(axum::middleware::from_fn(no_cache)), .route(
"/ws/public",
get(plc_platform_core::websocket::public_websocket_handler::<AppState>),
) )
.route("/ws/public", get(websocket::public_websocket_handler))
.route( .route(
"/ws/client/{client_id}", "/ws/client/{client_id}",
get(websocket::client_websocket_handler), get(plc_platform_core::websocket::client_websocket_handler::<AppState>),
)
.layer(axum::middleware::from_fn(simple_logger))
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any),
) )
.layer(axum::middleware::from_fn(
plc_platform_core::http::simple_logger,
))
.layer(plc_platform_core::http::permissive_cors())
.with_state(state) .with_state(state)
} }

View File

@ -1,144 +0,0 @@
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Path, State,
},
response::IntoResponse,
};
use std::sync::Arc;
use tokio::sync::broadcast;
use uuid::Uuid;
pub use plc_platform_core::websocket::{
RoomManager, WebSocketManager, WsClientMessage, WsMessage,
};
/// Public websocket handler.
pub async fn public_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<crate::AppState>,
) -> impl IntoResponse {
let ws_manager = state.platform.ws_manager.clone();
let app_state = state.clone();
ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, "public".to_string(), app_state))
}
/// Client websocket handler.
pub async fn client_websocket_handler(
ws: WebSocketUpgrade,
Path(client_id): Path<Uuid>,
State(state): State<crate::AppState>,
) -> impl IntoResponse {
let ws_manager = state.platform.ws_manager.clone();
let room_id = client_id.to_string();
let app_state = state.clone();
ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, room_id, app_state))
}
/// Handle websocket connection for one room.
async fn handle_socket(
mut socket: WebSocket,
ws_manager: Arc<WebSocketManager>,
room_id: String,
state: crate::AppState,
) {
let mut rx = ws_manager.subscribe_room(&room_id).await;
let mut can_write = false;
loop {
tokio::select! {
maybe_msg = socket.recv() => {
match maybe_msg {
Some(Ok(msg)) => {
if matches!(msg, Message::Close(_)) {
break;
}
match msg {
Message::Text(text) => {
match serde_json::from_str::<WsClientMessage>(&text) {
Ok(WsClientMessage::AuthWrite(payload)) => {
can_write = state.config.verify_write_key(&payload.key);
if !can_write {
tracing::warn!("WebSocket write auth failed in room {}", room_id);
}
}
Ok(WsClientMessage::PointSetValueBatch(payload)) => {
let response = if !can_write {
crate::connection::BatchSetPointValueRes {
success: false,
err_msg: Some("write permission denied".to_string()),
success_count: 0,
failed_count: 0,
results: vec![],
}
} else {
match state.platform.connection_manager.write_point_values_batch(payload).await {
Ok(v) => v,
Err(e) => crate::connection::BatchSetPointValueRes {
success: false,
err_msg: Some(e),
success_count: 0,
failed_count: 1,
results: vec![crate::connection::SetPointValueResItem {
point_id: Uuid::nil(),
success: false,
err_msg: Some("Internal write error".to_string()),
}],
},
}
};
if let Err(e) = ws_manager
.send_to_room(&room_id, WsMessage::PointSetValueBatchResult(response))
.await
{
tracing::error!(
"Failed to send PointSetValueBatchResult to room {}: {}",
room_id,
e
);
}
}
Err(e) => {
tracing::warn!(
"Invalid websocket message in room {}: {}",
room_id,
e
);
}
}
}
_ => {
tracing::debug!("Received WebSocket message from room {}: {:?}", room_id, msg);
}
}
}
Some(Err(e)) => {
tracing::error!("WebSocket error in room {}: {}", room_id, e);
break;
}
None => break,
}
}
room_message = rx.recv() => {
match room_message {
Ok(message) => match serde_json::to_string(&message) {
Ok(json_str) => {
if socket.send(Message::Text(json_str.into())).await.is_err() {
break;
}
}
Err(e) => {
tracing::error!("Failed to serialize websocket message: {}", e);
}
},
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!("WebSocket room {} lagged, skipped {} messages", room_id, skipped);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
ws_manager.remove_room_if_empty(&room_id).await;
}

View File

@ -1,25 +1,22 @@
use axum::extract::FromRef;
use crate::router::build_router; use crate::router::build_router;
use axum::extract::FromRef;
use plc_platform_core::platform_context::PlatformContext; use plc_platform_core::platform_context::PlatformContext;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct AppConfig { pub struct AppConfig {
pub database_url: String, pub server: plc_platform_core::config::ServerConfig,
pub server_host: String,
pub server_port: u16,
} }
impl AppConfig { impl AppConfig {
pub fn from_env() -> Self { pub fn from_env() -> Self {
Self { Self {
database_url: std::env::var("DATABASE_URL") server: plc_platform_core::config::ServerConfig::from_env(
.expect("DATABASE_URL must be set"), "OPS_SERVER_HOST",
server_host: std::env::var("OPS_SERVER_HOST") "127.0.0.1",
.unwrap_or_else(|_| "127.0.0.1".to_string()), "OPS_SERVER_PORT",
server_port: std::env::var("OPS_SERVER_PORT") 3100,
.ok() )
.and_then(|value| value.parse().ok()) .expect("Failed to load operation-system server configuration"),
.unwrap_or(3100),
} }
} }
} }
@ -54,7 +51,7 @@ pub async fn run() {
}; };
let config = AppConfig::from_env(); let config = AppConfig::from_env();
let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url) let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.server.database_url)
.await .await
.expect("Failed to bootstrap platform"); .expect("Failed to bootstrap platform");
let platform = builder.build(); let platform = builder.build();
@ -65,7 +62,10 @@ pub async fn run() {
platform, platform,
}; };
let app = build_router(state.clone()); let app = build_router(state.clone());
let addr = format!("{}:{}", state.config.server_host, state.config.server_port); let addr = format!(
"{}:{}",
state.config.server.server_host, state.config.server.server_port
);
tracing::info!("Starting operation-system server at http://{}", addr); tracing::info!("Starting operation-system server at http://{}", addr);
let listener = tokio::net::TcpListener::bind(&addr) let listener = tokio::net::TcpListener::bind(&addr)
.await .await
@ -81,16 +81,19 @@ pub fn test_state() -> AppState {
let pool = sqlx::postgres::PgPoolOptions::new() let pool = sqlx::postgres::PgPoolOptions::new()
.connect_lazy(&database_url) .connect_lazy(&database_url)
.expect("lazy pool should build"); .expect("lazy pool should build");
let connection_manager = std::sync::Arc::new(plc_platform_core::connection::ConnectionManager::new()); let connection_manager =
std::sync::Arc::new(plc_platform_core::connection::ConnectionManager::new());
let ws_manager = std::sync::Arc::new(plc_platform_core::websocket::WebSocketManager::new()); let ws_manager = std::sync::Arc::new(plc_platform_core::websocket::WebSocketManager::new());
AppState { AppState {
app_name: "operation-system", app_name: "operation-system",
config: AppConfig { config: AppConfig {
server: plc_platform_core::config::ServerConfig {
database_url, database_url,
server_host: "127.0.0.1".to_string(), server_host: "127.0.0.1".to_string(),
server_port: 0, server_port: 0,
}, },
},
platform: PlatformContext::new(pool, connection_manager, ws_manager), platform: PlatformContext::new(pool, connection_manager, ws_manager),
} }
} }

View File

@ -1,20 +1,7 @@
use axum::{extract::State, routing::get, Router}; use axum::{extract::State, routing::get, Router};
use tower_http::services::ServeDir;
use crate::app::AppState; use crate::app::AppState;
async fn no_cache(
req: axum::extract::Request,
next: axum::middleware::Next,
) -> axum::response::Response {
let mut response = next.run(req).await;
response.headers_mut().insert(
axum::http::header::CACHE_CONTROL,
axum::http::HeaderValue::from_static("no-store"),
);
response
}
pub fn build_router(state: AppState) -> Router { pub fn build_router(state: AppState) -> Router {
// Platform routes (source, point, equipment, tag, page, logs) from core. // Platform routes (source, point, equipment, tag, page, logs) from core.
let platform = plc_platform_core::handler::platform_routes::<AppState>(); let platform = plc_platform_core::handler::platform_routes::<AppState>();
@ -23,20 +10,17 @@ pub fn build_router(state: AppState) -> Router {
let ops_routes = Router::new() let ops_routes = Router::new()
.route("/api/health", get(health_check)) .route("/api/health", get(health_check))
.route("/api/docs/api-md", get(crate::handler::doc::get_api_md)) .route("/api/docs/api-md", get(crate::handler::doc::get_api_md))
.route("/api/docs/readme-md", get(crate::handler::doc::get_readme_md)); .route(
"/api/docs/readme-md",
get(crate::handler::doc::get_readme_md),
);
Router::new() Router::new()
.merge(platform) .merge(platform)
.merge(ops_routes) .merge(ops_routes)
.nest( .nest(
"/ui", "/ui",
Router::new() plc_platform_core::http::static_ui_routes("web/ops", "web/core"),
.fallback_service(
ServeDir::new("web/ops")
.append_index_html_on_directories(true)
.fallback(ServeDir::new("web/core")),
)
.layer(axum::middleware::from_fn(no_cache)),
) )
.with_state(state) .with_state(state)
} }

View File

@ -22,7 +22,8 @@ impl PlatformBuilder {
cm_for_telemetry, cm_for_telemetry,
self.ws_manager.clone(), self.ws_manager.clone(),
)); ));
self.connection_manager.set_event_manager(telemetry_processor); self.connection_manager
.set_event_manager(telemetry_processor);
// Start reconnect task (auto-reconnects on connection loss). // Start reconnect task (auto-reconnects on connection loss).
self.connection_manager self.connection_manager
@ -50,3 +51,31 @@ pub async fn bootstrap_platform(database_url: &str) -> Result<PlatformBuilder, S
ws_manager, ws_manager,
}) })
} }
pub async fn connect_all_enabled_sources(platform: &PlatformContext) -> Result<(), String> {
let sources = crate::service::get_all_enabled_sources(&platform.pool)
.await
.map_err(|e| format!("Failed to fetch sources: {}", e))?;
let mut tasks = Vec::new();
for source in sources {
let cm = platform.connection_manager.clone();
let pool = platform.pool.clone();
let source_name = source.name.clone();
let source_id = source.id;
tasks.push(tokio::spawn(async move {
if let Err(err) = cm.connect_from_source(&pool, source_id).await {
tracing::error!("Failed to connect to source {}: {}", source_name, err);
}
}));
}
for task in tasks {
if let Err(err) = task.await {
tracing::error!("Source connection task failed: {:?}", err);
}
}
Ok(())
}

View File

@ -0,0 +1,51 @@
use std::env;
#[derive(Clone, Debug)]
pub struct ServerConfig {
pub database_url: String,
pub server_host: String,
pub server_port: u16,
}
impl ServerConfig {
pub fn from_env(
host_key: &str,
host_default: &str,
port_key: &str,
port_default: u16,
) -> Result<Self, String> {
Ok(Self {
database_url: required_env("DATABASE_URL")?,
server_host: env_string(host_key, host_default),
server_port: env_u16(port_key, port_default)?,
})
}
}
pub fn required_env(key: &str) -> Result<String, String> {
env::var(key).map_err(|_| format!("Missing environment variable: {}", key))
}
pub fn env_string(key: &str, default: &str) -> String {
env::var(key).unwrap_or_else(|_| default.to_string())
}
pub fn env_u16(key: &str, default: u16) -> Result<u16, String> {
match env::var(key) {
Ok(value) => value
.parse::<u16>()
.map_err(|_| format!("{} must be a number", key)),
Err(_) => Ok(default),
}
}
pub fn env_bool(key: &str, default: bool) -> bool {
env::var(key)
.map(|value| {
matches!(
value.to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
)
})
.unwrap_or(default)
}

View File

@ -10,12 +10,12 @@ use std::collections::{HashMap, HashSet};
use uuid::Uuid; use uuid::Uuid;
use validator::Validate; use validator::Validate;
use crate::model::{Node, Point};
use crate::platform_context::PlatformContext;
use crate::util::{ use crate::util::{
pagination::{PaginatedResponse, PaginationParams}, pagination::{PaginatedResponse, PaginationParams},
response::ApiErr, response::ApiErr,
}; };
use crate::platform_context::PlatformContext;
use crate::model::{Node, Point};
fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator<Item = Uuid>) { fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator<Item = Uuid>) {
let ids: Vec<Uuid> = { let ids: Vec<Uuid> = {
@ -58,6 +58,18 @@ pub struct PointHistoryItem {
pub value_number: Option<f64>, pub value_number: Option<f64>,
} }
pub async fn batch_set_point_value(
State(state): State<PlatformContext>,
Json(payload): Json<crate::connection::BatchSetPointValueReq>,
) -> Result<impl IntoResponse, ApiErr> {
let result = state
.connection_manager
.write_point_values_batch(payload)
.await
.map_err(|e| ApiErr::Internal(e, None))?;
Ok(Json(result))
}
pub async fn get_point_list( pub async fn get_point_list(
State(state): State<PlatformContext>, State(state): State<PlatformContext>,
Query(query): Query<GetPointListQuery>, Query(query): Query<GetPointListQuery>,
@ -556,7 +568,10 @@ pub async fn batch_create_points(
crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?; crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?;
for (source_id, points) in grouped { for (source_id, points) in grouped {
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect(); let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
state.emit_event(crate::event::PlatformEvent::PointsCreated { source_id, point_ids }); state.emit_event(crate::event::PlatformEvent::PointsCreated {
source_id,
point_ids,
});
} }
} }
@ -616,7 +631,10 @@ pub async fn batch_delete_points(
for (source_id, points) in grouped { for (source_id, points) in grouped {
let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect(); let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
state.emit_event(crate::event::PlatformEvent::PointsDeleted { source_id, point_ids: ids }); state.emit_event(crate::event::PlatformEvent::PointsDeleted {
source_id,
point_ids: ids,
});
} }
notify_units(&state, affected_unit_ids); notify_units(&state, affected_unit_ids);
@ -636,4 +654,3 @@ fn monitor_value_to_number(item: &crate::telemetry::PointMonitorInfo) -> Option<
_ => None, _ => None,
} }
} }

View File

@ -24,8 +24,7 @@ where
) )
.route( .route(
"/api/source/{source_id}", "/api/source/{source_id}",
axum::routing::delete(super::source::delete_source) axum::routing::delete(super::source::delete_source).put(super::source::update_source),
.put(super::source::update_source),
) )
.route( .route(
"/api/source/{source_id}/reconnect", "/api/source/{source_id}/reconnect",
@ -41,10 +40,13 @@ where
) )
// Point // Point
.route("/api/point", get(super::point::get_point_list)) .route("/api/point", get(super::point::get_point_list))
.route(
"/api/point/value/batch",
post(super::point::batch_set_point_value),
)
.route( .route(
"/api/point/batch", "/api/point/batch",
post(super::point::batch_create_points) post(super::point::batch_create_points).delete(super::point::batch_delete_points),
.delete(super::point::batch_delete_points),
) )
.route( .route(
"/api/point/{point_id}/history", "/api/point/{point_id}/history",
@ -67,8 +69,7 @@ where
// Equipment // Equipment
.route( .route(
"/api/equipment", "/api/equipment",
get(super::equipment::get_equipment_list) get(super::equipment::get_equipment_list).post(super::equipment::create_equipment),
.post(super::equipment::create_equipment),
) )
.route( .route(
"/api/equipment/{equipment_id}", "/api/equipment/{equipment_id}",

View File

@ -0,0 +1,61 @@
use std::time::Instant;
use axum::{
body::Body,
extract::Request,
http::{header, HeaderValue},
middleware::Next,
response::Response,
Router,
};
use tower_http::{
cors::{Any, CorsLayer},
services::ServeDir,
};
pub async fn no_cache(req: Request, next: Next) -> Response {
let mut response = next.run(req).await;
response
.headers_mut()
.insert(header::CACHE_CONTROL, HeaderValue::from_static("no-store"));
response
}
pub async fn simple_logger(req: axum::http::Request<Body>, next: Next) -> Response {
let method = req.method().to_string();
let uri = req.uri().to_string();
let start = Instant::now();
let res = next.run(req).await;
let duration = start.elapsed();
let status = res.status();
match status.as_u16() {
100..=399 => tracing::info!("{} {} {} {:?}", method, uri, status, duration),
400..=499 => tracing::warn!("{} {} {} {:?}", method, uri, status, duration),
500..=599 => tracing::error!("{} {} {} {:?}", method, uri, status, duration),
_ => tracing::warn!("{} {} {} {:?}", method, uri, status, duration),
}
res
}
pub fn permissive_cors() -> CorsLayer {
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any)
}
pub fn static_ui_routes<S>(app_dir: &'static str, core_dir: &'static str) -> Router<S>
where
S: Clone + Send + Sync + 'static,
{
Router::new()
.fallback_service(
ServeDir::new(app_dir)
.append_index_html_on_directories(true)
.fallback(ServeDir::new(core_dir)),
)
.layer(axum::middleware::from_fn(no_cache))
}

View File

@ -1,9 +1,11 @@
pub mod bootstrap; pub mod bootstrap;
pub mod config;
pub mod connection; pub mod connection;
pub mod control; pub mod control;
pub mod db; pub mod db;
pub mod event; pub mod event;
pub mod handler; pub mod handler;
pub mod http;
pub mod model; pub mod model;
pub mod platform_context; pub mod platform_context;
pub mod service; pub mod service;
@ -13,4 +15,3 @@ pub mod util;
pub mod websocket; pub mod websocket;
pub use event::EventEnvelope; pub use event::EventEnvelope;

View File

@ -1,9 +1,17 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
FromRef, Path, State,
},
response::IntoResponse,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, RwLock}; use tokio::sync::{broadcast, RwLock};
use uuid::Uuid; use uuid::Uuid;
use crate::platform_context::PlatformContext;
use crate::{ use crate::{
connection::{BatchSetPointValueReq, BatchSetPointValueRes}, connection::{BatchSetPointValueReq, BatchSetPointValueRes},
control::runtime::UnitRuntime, control::runtime::UnitRuntime,
@ -23,15 +31,9 @@ pub enum WsMessage {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "snake_case")] #[serde(tag = "type", content = "data", rename_all = "snake_case")]
pub enum WsClientMessage { pub enum WsClientMessage {
AuthWrite(WsAuthWriteReq),
PointSetValueBatch(BatchSetPointValueReq), PointSetValueBatch(BatchSetPointValueReq),
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WsAuthWriteReq {
pub key: String,
}
#[derive(Clone)] #[derive(Clone)]
pub struct RoomManager { pub struct RoomManager {
rooms: Arc<RwLock<HashMap<String, broadcast::Sender<WsMessage>>>>, rooms: Arc<RwLock<HashMap<String, broadcast::Sender<WsMessage>>>>,
@ -139,3 +141,121 @@ impl Default for WebSocketManager {
Self::new() Self::new()
} }
} }
pub async fn public_websocket_handler<S>(
ws: WebSocketUpgrade,
State(state): State<S>,
) -> impl IntoResponse
where
S: Clone + Send + Sync + 'static,
PlatformContext: FromRef<S>,
{
let platform = PlatformContext::from_ref(&state);
let ws_manager = platform.ws_manager.clone();
ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, "public".to_string(), platform))
}
pub async fn client_websocket_handler<S>(
ws: WebSocketUpgrade,
Path(client_id): Path<Uuid>,
State(state): State<S>,
) -> impl IntoResponse
where
S: Clone + Send + Sync + 'static,
PlatformContext: FromRef<S>,
{
let platform = PlatformContext::from_ref(&state);
let ws_manager = platform.ws_manager.clone();
let room_id = client_id.to_string();
ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, room_id, platform))
}
async fn handle_socket(
mut socket: WebSocket,
ws_manager: Arc<WebSocketManager>,
room_id: String,
platform: PlatformContext,
) {
let mut rx = ws_manager.subscribe_room(&room_id).await;
loop {
tokio::select! {
maybe_msg = socket.recv() => {
match maybe_msg {
Some(Ok(msg)) => {
if matches!(msg, Message::Close(_)) {
break;
}
match msg {
Message::Text(text) => {
match serde_json::from_str::<WsClientMessage>(&text) {
Ok(WsClientMessage::PointSetValueBatch(payload)) => {
let response = match platform.connection_manager.write_point_values_batch(payload).await {
Ok(v) => v,
Err(e) => BatchSetPointValueRes {
success: false,
err_msg: Some(e),
success_count: 0,
failed_count: 1,
results: vec![crate::connection::SetPointValueResItem {
point_id: Uuid::nil(),
success: false,
err_msg: Some("Internal write error".to_string()),
}],
},
};
if let Err(e) = ws_manager
.send_to_room(&room_id, WsMessage::PointSetValueBatchResult(response))
.await
{
tracing::error!(
"Failed to send PointSetValueBatchResult to room {}: {}",
room_id,
e
);
}
}
Err(e) => {
tracing::warn!(
"Invalid websocket message in room {}: {}",
room_id,
e
);
}
}
}
_ => {
tracing::debug!("Received WebSocket message from room {}: {:?}", room_id, msg);
}
}
}
Some(Err(e)) => {
tracing::error!("WebSocket error in room {}: {}", room_id, e);
break;
}
None => break,
}
}
room_message = rx.recv() => {
match room_message {
Ok(message) => match serde_json::to_string(&message) {
Ok(json_str) => {
if socket.send(Message::Text(json_str.into())).await.is_err() {
break;
}
}
Err(e) => {
tracing::error!("Failed to serialize websocket message: {}", e);
}
},
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!("WebSocket room {} lagged, skipped {} messages", room_id, skipped);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
ws_manager.remove_room_if_empty(&room_id).await;
}

View File

@ -160,10 +160,6 @@
批量写点。 批量写点。
请求头:
- `X-Write-Key: <key>`
请求示例: 请求示例:
```json ```json