diff --git a/README.md b/README.md index f8f1fa8..580ca65 100644 --- a/README.md +++ b/README.md @@ -159,8 +159,6 @@ deploy/ - `DATABASE_URL` - `HOST` - `PORT` -- `WRITE_API_KEY` -- `SIMULATE_PLC` ## 文档索引 diff --git a/crates/app_feeder_distributor/src/app.rs b/crates/app_feeder_distributor/src/app.rs index 9e7c1ca..1ab7d8f 100644 --- a/crates/app_feeder_distributor/src/app.rs +++ b/crates/app_feeder_distributor/src/app.rs @@ -1,20 +1,16 @@ use std::sync::Arc; use crate::{ - config::AppConfig, - connection::ConnectionManager, - control, - event::EventManager, - router::build_router, - websocket::WebSocketManager, + connection::ConnectionManager, control, event::EventManager, router::build_router, }; use axum::extract::FromRef; -use plc_platform_core::platform_context::PlatformContext; +use plc_platform_core::websocket::WebSocketManager; +use plc_platform_core::{config::ServerConfig, platform_context::PlatformContext}; use tokio::sync::mpsc; #[derive(Clone)] pub struct AppState { - pub config: AppConfig, + pub config: ServerConfig, pub platform: PlatformContext, pub event_manager: Arc, pub control_runtime: Arc, @@ -26,12 +22,12 @@ impl FromRef for PlatformContext { } } - pub async fn run() { dotenv::dotenv().ok(); plc_platform_core::util::log::init_logger(); let _single_instance = - match plc_platform_core::util::single_instance::try_acquire("PLCControl.FeederDistributor") { + match plc_platform_core::util::single_instance::try_acquire("PLCControl.FeederDistributor") + { Ok(guard) => guard, Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => { tracing::warn!("Another feeder distributor instance is already running"); @@ -43,7 +39,8 @@ pub async fn run() { } }; - let config = AppConfig::from_env().expect("Failed to load configuration"); + let config = ServerConfig::from_env("HOST", "0.0.0.0", "PORT", 60309) + .expect("Failed to load server configuration"); let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url) .await .expect("Failed to bootstrap platform"); @@ -60,31 +57,9 @@ pub async fn run() { Some(platform.ws_manager.clone()), )); - let sources = crate::service::get_all_enabled_sources(&platform.pool) + plc_platform_core::bootstrap::connect_all_enabled_sources(&platform) .await - .expect("Failed to fetch sources"); - - let mut tasks = Vec::new(); - for source in sources { - let cm = platform.connection_manager.clone(); - let p = platform.pool.clone(); - let source_name = source.name.clone(); - let source_id = source.id; - - let task = tokio::spawn(async move { - if let Err(err) = cm.connect_from_source(&p, source_id).await { - tracing::error!("Failed to connect to source {}: {}", source_name, err); - } - }); - - tasks.push(task); - } - - for task in tasks { - if let Err(err) = task.await { - tracing::error!("Source connection task failed: {:?}", err); - } - } + .expect("Failed to connect enabled sources"); let state = AppState { config: config.clone(), @@ -93,9 +68,6 @@ pub async fn run() { control_runtime: control_runtime.clone(), }; control::engine::start(state.clone(), control_runtime); - if config.simulate_plc { - control::simulate::start(state.clone()); - } let app = build_router(state.clone()); let addr = format!("{}:{}", config.server_host, config.server_port); @@ -136,19 +108,14 @@ pub fn test_state() -> AppState { .expect("lazy pool should build"); let connection_manager = Arc::new(ConnectionManager::new()); let ws_manager = Arc::new(WebSocketManager::new()); - let event_manager = Arc::new(EventManager::new( - pool.clone(), - Some(ws_manager.clone()), - )); + let event_manager = Arc::new(EventManager::new(pool.clone(), Some(ws_manager.clone()))); let platform = PlatformContext::new(pool, connection_manager, ws_manager); AppState { - config: AppConfig { + config: ServerConfig { database_url, server_host: "127.0.0.1".to_string(), server_port: 0, - write_api_key: Some("test-write-key".to_string()), - simulate_plc: false, }, platform, event_manager, diff --git a/crates/app_feeder_distributor/src/config.rs b/crates/app_feeder_distributor/src/config.rs deleted file mode 100644 index 9f96a8a..0000000 --- a/crates/app_feeder_distributor/src/config.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::env; - -#[derive(Clone)] -pub struct AppConfig { - pub database_url: String, - pub server_host: String, - pub server_port: u16, - pub write_api_key: Option, - /// When true, simulate RUN signal feedback after start/stop commands. - /// Set SIMULATE_PLC=true in .env for use with OPC UA proxy simulators. - pub simulate_plc: bool, -} - - -impl AppConfig { - pub fn from_env() -> Result { - let database_url = get_env("DATABASE_URL")?; - let server_host = env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string()); - let server_port = env::var("PORT") - .unwrap_or_else(|_| "60309".to_string()) - .parse::() - .map_err(|_| "PORT must be a number")?; - // Prefer WRITE_API_KEY, keep WRITE_KEY as backward-compatible fallback. - let write_api_key = env::var("WRITE_API_KEY") - .ok() - .or_else(|| env::var("WRITE_KEY").ok()); - - let simulate_plc = env::var("SIMULATE_PLC") - .unwrap_or_default() - .to_lowercase() == "true"; - - Ok(Self { - database_url, - server_host, - server_port, - write_api_key, - simulate_plc, - }) - } - - pub fn verify_write_key(&self, key: &str) -> bool { - self.write_api_key - .as_ref() - .map(|expected| expected == key) - .unwrap_or(false) - } -} - -fn get_env(key: &str) -> Result { - env::var(key).map_err(|_| format!("Missing environment variable: {}", key)) -} diff --git a/crates/app_feeder_distributor/src/control/engine.rs b/crates/app_feeder_distributor/src/control/engine.rs index cbd86e8..3e8a4fc 100644 --- a/crates/app_feeder_distributor/src/control/engine.rs +++ b/crates/app_feeder_distributor/src/control/engine.rs @@ -13,9 +13,9 @@ use crate::{ event::AppEvent, service::EquipmentRolePoint, telemetry::{PointMonitorInfo, PointQuality}, - websocket::WsMessage, AppState, }; +use plc_platform_core::websocket::WsMessage; /// Start the engine: a supervisor spawns one async task per enabled unit. pub fn start(state: AppState, runtime_store: Arc) { @@ -77,7 +77,7 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu }; // Fault / comm check. - let (kind_roles, kind_eq_ids, all_roles) = match load_equipment_maps(&state, unit_id).await { + let (kind_roles, all_roles) = match load_equipment_maps(&state, unit_id).await { Ok(maps) => maps, Err(e) => { tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e); @@ -122,11 +122,6 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e); continue; } - if state.config.simulate_plc { - if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { - crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await; - } - } } let mut runtime = store.get_or_init(unit_id).await; runtime.state = UnitRuntimeState::Running; @@ -154,11 +149,6 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e); continue; } - if state.config.simulate_plc { - if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { - crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await; - } - } } let mut runtime = store.get_or_init(unit_id).await; runtime.accumulated_run_sec += secs as i64 * 1000; @@ -172,10 +162,6 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu if let Some((pid, vt)) = dist_cmd { if let Err(e) = send_pulse_command(&state.platform.connection_manager, pid, vt.as_ref(), 300).await { tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e); - } else if state.config.simulate_plc { - if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { - crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await; - } } } runtime.state = UnitRuntimeState::DistributorRunning; @@ -199,11 +185,6 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e); continue; } - if state.config.simulate_plc { - if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { - crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await; - } - } } let mut runtime = store.get_or_init(unit_id).await; runtime.accumulated_run_sec = 0; @@ -405,7 +386,6 @@ async fn check_fault_comm( type EquipMaps = ( HashMap>, - HashMap, Vec<(Uuid, HashMap)>, ); @@ -438,7 +418,6 @@ fn build_equipment_maps( mut role_points_by_equipment: HashMap>, ) -> EquipMaps { let mut kind_roles: HashMap> = HashMap::new(); - let mut kind_eq_ids: HashMap = HashMap::new(); let mut all_roles: Vec<(Uuid, HashMap)> = Vec::new(); for equip in equipment_list { @@ -452,7 +431,6 @@ fn build_equipment_maps( if let Some(kind) = &equip.kind { if !kind_roles.contains_key(kind.as_str()) { kind_roles.insert(kind.clone(), role_map.clone()); - kind_eq_ids.insert(kind.clone(), equip.id); } else { tracing::warn!( "Engine: unit {} has multiple {} equipment; using first", @@ -463,7 +441,7 @@ fn build_equipment_maps( all_roles.push((equip.id, role_map)); } - (kind_roles, kind_eq_ids, all_roles) + (kind_roles, all_roles) } /// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad. diff --git a/crates/app_feeder_distributor/src/control/mod.rs b/crates/app_feeder_distributor/src/control/mod.rs index 2b8707f..775412f 100644 --- a/crates/app_feeder_distributor/src/control/mod.rs +++ b/crates/app_feeder_distributor/src/control/mod.rs @@ -1,7 +1,6 @@ pub use plc_platform_core::control::{command, runtime}; pub mod engine; -pub mod simulate; pub mod validator; use crate::telemetry::{DataValue, PointMonitorInfo}; diff --git a/crates/app_feeder_distributor/src/control/simulate.rs b/crates/app_feeder_distributor/src/control/simulate.rs deleted file mode 100644 index 9524d28..0000000 --- a/crates/app_feeder_distributor/src/control/simulate.rs +++ /dev/null @@ -1,213 +0,0 @@ -use tokio::time::Duration; -use uuid::Uuid; - -use crate::{ - connection::{BatchSetPointValueReq, SetPointValueReqItem}, - telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType}, - websocket::WsMessage, - AppState, -}; - -/// Start the chaos simulation task (only when SIMULATE_PLC=true). -/// Randomly disrupts `rem` or `flt` signals on equipment to exercise the control engine. -pub fn start(state: AppState) { - tokio::spawn(async move { - run(state).await; - }); -} - -async fn run(state: AppState) { - let mut rng = seed_rng(); - - loop { - // Wait a random 15-60 s between events. - let wait_secs = 15 + xorshift(&mut rng) % 46; - tokio::time::sleep(Duration::from_secs(wait_secs)).await; - - // Pick a random enabled unit. - let units = match crate::service::get_all_enabled_units(&state.platform.pool).await { - Ok(u) if !u.is_empty() => u, - _ => continue, - }; - let unit = &units[xorshift(&mut rng) as usize % units.len()]; - - // Only target units with auto control running; otherwise the event is uninteresting. - let runtime = state.control_runtime.get(unit.id).await; - if runtime.map_or(true, |r| !r.auto_enabled) { - continue; - } - - // Pick a random equipment in that unit. - let equipments = - match crate::service::get_equipment_by_unit_id(&state.platform.pool, unit.id).await { - Ok(e) if !e.is_empty() => e, - _ => continue, - }; - let eq = &equipments[xorshift(&mut rng) as usize % equipments.len()]; - - // Find which of rem / flt this equipment has. - let role_points = - match crate::service::get_equipment_role_points(&state.platform.pool, eq.id).await { - Ok(rp) if !rp.is_empty() => rp, - _ => continue, - }; - - let candidates: Vec<&str> = ["flt", "rem"] - .iter() - .filter(|&&r| role_points.iter().any(|p| p.signal_role == r)) - .copied() - .collect(); - - if candidates.is_empty() { - continue; - } - - let target_role = candidates[xorshift(&mut rng) as usize % candidates.len()]; - let target_point = role_points - .iter() - .find(|p| p.signal_role == target_role) - .unwrap(); - - // rem=false means the equipment is not in remote mode. - // flt=true means the equipment reports an active fault. - let trigger_value = target_role == "flt"; - - // Hold duration: 5-15 s for rem, 3-10 s for flt. - let hold_secs = if target_role == "flt" { - 3 + xorshift(&mut rng) % 8 - } else { - 5 + xorshift(&mut rng) % 11 - }; - - tracing::info!( - "[chaos] unit={} eq={} role={} -> {} (hold {}s)", - unit.code, - eq.code, - target_role, - if trigger_value { "FAULT" } else { "REM OFF" }, - hold_secs - ); - patch_signal(&state, target_point.point_id, trigger_value).await; - patch_signal(&state, target_point.point_id, trigger_value).await; - tokio::time::sleep(Duration::from_secs(hold_secs)).await; - patch_signal(&state, target_point.point_id, !trigger_value).await; - - tracing::info!( - "[chaos] unit={} eq={} role={} -> RESTORED", - unit.code, - eq.code, - target_role - ); - } -} - -/// Simulate RUN signal feedback for an equipment after a manual start/stop command. -/// Called by the engine and control handler when SIMULATE_PLC=true. -pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) { - let role_points = - match crate::service::get_equipment_role_points(&state.platform.pool, equipment_id).await { - Ok(v) => v, - Err(e) => { - tracing::warn!("simulate_run_feedback: db error: {}", e); - return; - } - }; - let run_point = match role_points.iter().find(|p| p.signal_role == "run") { - Some(p) => p.clone(), - None => return, - }; - patch_signal(state, run_point.point_id, run_on).await; -} - -/// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push. -pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { - let write_json = serde_json::json!(if value_on { 1 } else { 0 }); - let write_ok = match state - .platform.connection_manager - .write_point_values_batch(BatchSetPointValueReq { - items: vec![SetPointValueReqItem { - point_id, - value: write_json, - }], - }) - .await - { - Ok(res) => res.success, - Err(_) => false, - }; - - if write_ok { - return; - } - - // Fallback: patch the monitor cache directly and broadcast over WS. - let (value, value_type, value_text) = { - let guard = state - .platform.connection_manager - .get_point_monitor_data_read_guard() - .await; - match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) { - Some(ValueType::Int) => ( - DataValue::Int(if value_on { 1 } else { 0 }), - Some(ValueType::Int), - Some(if value_on { "1" } else { "0" }.to_string()), - ), - Some(ValueType::UInt) => ( - DataValue::UInt(if value_on { 1 } else { 0 }), - Some(ValueType::UInt), - Some(if value_on { "1" } else { "0" }.to_string()), - ), - _ => ( - DataValue::Bool(value_on), - Some(ValueType::Bool), - Some(value_on.to_string()), - ), - } - }; - - let monitor = PointMonitorInfo { - protocol: "simulation".to_string(), - source_id: Uuid::nil(), - point_id, - client_handle: 0, - scan_mode: plc_platform_core::model::ScanMode::Poll, - timestamp: Some(chrono::Utc::now()), - quality: PointQuality::Good, - value: Some(value), - value_type, - value_text, - old_value: None, - old_timestamp: None, - value_changed: true, - }; - - if let Err(e) = state - .platform.connection_manager - .update_point_monitor_data(monitor.clone()) - .await - { - tracing::warn!("[chaos] cache update failed for {}: {}", point_id, e); - return; - } - - let _ = state - .platform.ws_manager - .send_to_public(WsMessage::PointNewValue(monitor)) - .await; -} - -// Minimal XorShift64 PRNG (no external crate needed). - -fn seed_rng() -> u64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_nanos() as u64 ^ d.as_secs().wrapping_mul(0x9e37_79b9_7f4a_7c15)) - .unwrap_or(0xdeadbeef) -} - -fn xorshift(s: &mut u64) -> u64 { - *s ^= *s << 13; - *s ^= *s >> 7; - *s ^= *s << 17; - *s -} diff --git a/crates/app_feeder_distributor/src/event.rs b/crates/app_feeder_distributor/src/event.rs index 64eddb2..6d53c42 100644 --- a/crates/app_feeder_distributor/src/event.rs +++ b/crates/app_feeder_distributor/src/event.rs @@ -1,7 +1,10 @@ -use plc_platform_core::event::EventEnvelope; +use plc_platform_core::model::EventRecord; +use plc_platform_core::{ + event::EventEnvelope, + websocket::{WebSocketManager, WsMessage}, +}; use tokio::sync::mpsc; use uuid::Uuid; -use plc_platform_core::model::EventRecord; const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; @@ -37,7 +40,7 @@ pub struct EventManager { impl EventManager { pub fn new( pool: sqlx::PgPool, - ws_manager: Option>, + ws_manager: Option>, ) -> Self { let (control_sender, mut control_receiver) = mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); @@ -101,7 +104,7 @@ impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEv async fn handle_control_event( event: AppEvent, pool: &sqlx::PgPool, - ws_manager: Option<&std::sync::Arc>, + ws_manager: Option<&std::sync::Arc>, ) { persist_event_if_needed(&event, pool, ws_manager).await; @@ -175,7 +178,7 @@ async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String { async fn persist_event_if_needed( event: &AppEvent, pool: &sqlx::PgPool, - ws_manager: Option<&std::sync::Arc>, + ws_manager: Option<&std::sync::Arc>, ) { let record: Option<(&str, &str, Option, Option, Option, String, serde_json::Value)> = match event { AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => { @@ -307,7 +310,7 @@ async fn persist_event_if_needed( match inserted { Ok(record) => { if let Some(ws_manager) = ws_manager { - let ws_message = crate::websocket::WsMessage::EventCreated(record); + let ws_message = WsMessage::EventCreated(record); if let Err(err) = ws_manager.send_to_public(ws_message).await { tracing::warn!("Failed to broadcast event websocket message: {}", err); } diff --git a/crates/app_feeder_distributor/src/handler/control.rs b/crates/app_feeder_distributor/src/handler/control.rs index 98cd139..b54a0c9 100644 --- a/crates/app_feeder_distributor/src/handler/control.rs +++ b/crates/app_feeder_distributor/src/handler/control.rs @@ -169,15 +169,6 @@ async fn send_equipment_command( .await .map_err(|e| ApiErr::Internal(e, None))?; - if state.config.simulate_plc { - crate::control::simulate::simulate_run_feedback( - &state, - equipment_id, - matches!(action, ControlAction::Start), - ) - .await; - } - let event = match action { ControlAction::Start => crate::event::AppEvent::EquipmentStartCommandSent { equipment_id, diff --git a/crates/app_feeder_distributor/src/handler/point.rs b/crates/app_feeder_distributor/src/handler/point.rs index 1b72b43..872e325 100644 --- a/crates/app_feeder_distributor/src/handler/point.rs +++ b/crates/app_feeder_distributor/src/handler/point.rs @@ -1,41 +1 @@ -// Re-export all platform point handlers from core. pub use plc_platform_core::handler::point::*; - -use axum::{ - extract::State, - http::HeaderMap, - response::IntoResponse, - Json, -}; -use plc_platform_core::util::response::ApiErr; - -use crate::AppState; - -/// Feeder-specific: batch set point values (requires write key auth from app config). -pub async fn batch_set_point_value( - State(state): State, - headers: HeaderMap, - Json(payload): Json, -) -> Result { - let write_key = headers - .get("X-Write-Key") - .and_then(|v| v.to_str().ok()) - .unwrap_or_default(); - - if !state.config.verify_write_key(write_key) { - return Err(ApiErr::Forbidden( - "write permission denied".to_string(), - Some(serde_json::json!({ - "hint": "set WRITE_API_KEY (or legacy WRITE_KEY) and pass header X-Write-Key" - })), - )); - } - - let result = state - .platform - .connection_manager - .write_point_values_batch(payload) - .await - .map_err(|e| ApiErr::Internal(e, None))?; - Ok(Json(result)) -} diff --git a/crates/app_feeder_distributor/src/lib.rs b/crates/app_feeder_distributor/src/lib.rs index 0c23660..ca64b08 100644 --- a/crates/app_feeder_distributor/src/lib.rs +++ b/crates/app_feeder_distributor/src/lib.rs @@ -1,11 +1,8 @@ pub mod app; -pub mod config; pub mod control; pub mod event; pub mod handler; -pub mod middleware; pub mod router; -pub mod websocket; pub mod connection { pub use plc_platform_core::connection::*; diff --git a/crates/app_feeder_distributor/src/middleware.rs b/crates/app_feeder_distributor/src/middleware.rs deleted file mode 100644 index a06052d..0000000 --- a/crates/app_feeder_distributor/src/middleware.rs +++ /dev/null @@ -1,37 +0,0 @@ -use axum::{ - body::Body, - http::Request, - middleware::Next, - response::Response, -}; -use std::time::Instant; - -pub async fn simple_logger( - req: Request, - next: Next, -) -> Response { - // Borrow the path string directly; no clone needed. - let method = req.method().to_string(); - let uri = req.uri().to_string(); // `Uri::to_string()` allocates the owned string once. - - let start = Instant::now(); - let res = next.run(req).await; - let duration = start.elapsed(); - let status = res.status(); - match status.as_u16() { - 100..=399 => { - tracing::info!("{} {} {} {:?}", method, uri, status, duration); - } - 400..=499 => { - tracing::warn!("{} {} {} {:?}", method, uri, status, duration); - } - 500..=599 => { - tracing::error!("{} {} {} {:?}", method, uri, status, duration); - } - _ => { - tracing::warn!("{} {} {} {:?}", method, uri, status, duration); - } - } - - res -} diff --git a/crates/app_feeder_distributor/src/router.rs b/crates/app_feeder_distributor/src/router.rs index 3017ef3..5ef2b0a 100644 --- a/crates/app_feeder_distributor/src/router.rs +++ b/crates/app_feeder_distributor/src/router.rs @@ -1,23 +1,9 @@ use axum::{ - extract::Request, - middleware::Next, - response::Response, routing::{get, post}, Router, }; -use tower_http::cors::{Any, CorsLayer}; -use tower_http::services::ServeDir; -use crate::{handler, middleware::simple_logger, websocket, AppState}; - -async fn no_cache(req: Request, next: Next) -> Response { - let mut response = next.run(req).await; - response.headers_mut().insert( - axum::http::header::CACHE_CONTROL, - axum::http::HeaderValue::from_static("no-store"), - ); - response -} +use crate::{handler, AppState}; pub fn build_router(state: AppState) -> Router { // Platform routes (source, point, equipment, tag, page, logs) from core. @@ -25,11 +11,6 @@ pub fn build_router(state: AppState) -> Router { // Feeder-specific routes. let feeder_routes = Router::new() - // Feeder-only: batch set point values (requires write key auth). - .route( - "/api/point/value/batch", - post(handler::point::batch_set_point_value), - ) // Unit / control routes (feeder-specific). .route( "/api/unit", @@ -87,25 +68,19 @@ pub fn build_router(state: AppState) -> Router { .merge(feeder_routes) .nest( "/ui", - Router::new() - .fallback_service( - ServeDir::new("web/feeder") - .append_index_html_on_directories(true) - .fallback(ServeDir::new("web/core")), - ) - .layer(axum::middleware::from_fn(no_cache)), + plc_platform_core::http::static_ui_routes("web/feeder", "web/core"), + ) + .route( + "/ws/public", + get(plc_platform_core::websocket::public_websocket_handler::), ) - .route("/ws/public", get(websocket::public_websocket_handler)) .route( "/ws/client/{client_id}", - get(websocket::client_websocket_handler), - ) - .layer(axum::middleware::from_fn(simple_logger)) - .layer( - CorsLayer::new() - .allow_origin(Any) - .allow_methods(Any) - .allow_headers(Any), + get(plc_platform_core::websocket::client_websocket_handler::), ) + .layer(axum::middleware::from_fn( + plc_platform_core::http::simple_logger, + )) + .layer(plc_platform_core::http::permissive_cors()) .with_state(state) } diff --git a/crates/app_feeder_distributor/src/websocket.rs b/crates/app_feeder_distributor/src/websocket.rs deleted file mode 100644 index 418dbc8..0000000 --- a/crates/app_feeder_distributor/src/websocket.rs +++ /dev/null @@ -1,144 +0,0 @@ -use axum::{ - extract::{ - ws::{Message, WebSocket, WebSocketUpgrade}, - Path, State, - }, - response::IntoResponse, -}; -use std::sync::Arc; -use tokio::sync::broadcast; -use uuid::Uuid; - -pub use plc_platform_core::websocket::{ - RoomManager, WebSocketManager, WsClientMessage, WsMessage, -}; - -/// Public websocket handler. -pub async fn public_websocket_handler( - ws: WebSocketUpgrade, - State(state): State, -) -> impl IntoResponse { - let ws_manager = state.platform.ws_manager.clone(); - let app_state = state.clone(); - ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, "public".to_string(), app_state)) -} - -/// Client websocket handler. -pub async fn client_websocket_handler( - ws: WebSocketUpgrade, - Path(client_id): Path, - State(state): State, -) -> impl IntoResponse { - let ws_manager = state.platform.ws_manager.clone(); - let room_id = client_id.to_string(); - let app_state = state.clone(); - ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, room_id, app_state)) -} - -/// Handle websocket connection for one room. -async fn handle_socket( - mut socket: WebSocket, - ws_manager: Arc, - room_id: String, - state: crate::AppState, -) { - let mut rx = ws_manager.subscribe_room(&room_id).await; - let mut can_write = false; - - loop { - tokio::select! { - maybe_msg = socket.recv() => { - match maybe_msg { - Some(Ok(msg)) => { - if matches!(msg, Message::Close(_)) { - break; - } - match msg { - Message::Text(text) => { - match serde_json::from_str::(&text) { - Ok(WsClientMessage::AuthWrite(payload)) => { - can_write = state.config.verify_write_key(&payload.key); - if !can_write { - tracing::warn!("WebSocket write auth failed in room {}", room_id); - } - } - Ok(WsClientMessage::PointSetValueBatch(payload)) => { - let response = if !can_write { - crate::connection::BatchSetPointValueRes { - success: false, - err_msg: Some("write permission denied".to_string()), - success_count: 0, - failed_count: 0, - results: vec![], - } - } else { - match state.platform.connection_manager.write_point_values_batch(payload).await { - Ok(v) => v, - Err(e) => crate::connection::BatchSetPointValueRes { - success: false, - err_msg: Some(e), - success_count: 0, - failed_count: 1, - results: vec![crate::connection::SetPointValueResItem { - point_id: Uuid::nil(), - success: false, - err_msg: Some("Internal write error".to_string()), - }], - }, - } - }; - if let Err(e) = ws_manager - .send_to_room(&room_id, WsMessage::PointSetValueBatchResult(response)) - .await - { - tracing::error!( - "Failed to send PointSetValueBatchResult to room {}: {}", - room_id, - e - ); - } - } - Err(e) => { - tracing::warn!( - "Invalid websocket message in room {}: {}", - room_id, - e - ); - } - } - } - _ => { - tracing::debug!("Received WebSocket message from room {}: {:?}", room_id, msg); - } - } - } - Some(Err(e)) => { - tracing::error!("WebSocket error in room {}: {}", room_id, e); - break; - } - None => break, - } - } - room_message = rx.recv() => { - match room_message { - Ok(message) => match serde_json::to_string(&message) { - Ok(json_str) => { - if socket.send(Message::Text(json_str.into())).await.is_err() { - break; - } - } - Err(e) => { - tracing::error!("Failed to serialize websocket message: {}", e); - } - }, - Err(broadcast::error::RecvError::Lagged(skipped)) => { - tracing::warn!("WebSocket room {} lagged, skipped {} messages", room_id, skipped); - } - Err(broadcast::error::RecvError::Closed) => break, - } - } - } - } - - ws_manager.remove_room_if_empty(&room_id).await; -} diff --git a/crates/app_operation_system/src/app.rs b/crates/app_operation_system/src/app.rs index efd1591..21ea93b 100644 --- a/crates/app_operation_system/src/app.rs +++ b/crates/app_operation_system/src/app.rs @@ -1,25 +1,22 @@ -use axum::extract::FromRef; use crate::router::build_router; +use axum::extract::FromRef; use plc_platform_core::platform_context::PlatformContext; #[derive(Clone, Debug)] pub struct AppConfig { - pub database_url: String, - pub server_host: String, - pub server_port: u16, + pub server: plc_platform_core::config::ServerConfig, } impl AppConfig { pub fn from_env() -> Self { Self { - database_url: std::env::var("DATABASE_URL") - .expect("DATABASE_URL must be set"), - server_host: std::env::var("OPS_SERVER_HOST") - .unwrap_or_else(|_| "127.0.0.1".to_string()), - server_port: std::env::var("OPS_SERVER_PORT") - .ok() - .and_then(|value| value.parse().ok()) - .unwrap_or(3100), + server: plc_platform_core::config::ServerConfig::from_env( + "OPS_SERVER_HOST", + "127.0.0.1", + "OPS_SERVER_PORT", + 3100, + ) + .expect("Failed to load operation-system server configuration"), } } } @@ -54,7 +51,7 @@ pub async fn run() { }; let config = AppConfig::from_env(); - let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url) + let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.server.database_url) .await .expect("Failed to bootstrap platform"); let platform = builder.build(); @@ -65,7 +62,10 @@ pub async fn run() { platform, }; let app = build_router(state.clone()); - let addr = format!("{}:{}", state.config.server_host, state.config.server_port); + let addr = format!( + "{}:{}", + state.config.server.server_host, state.config.server.server_port + ); tracing::info!("Starting operation-system server at http://{}", addr); let listener = tokio::net::TcpListener::bind(&addr) .await @@ -81,15 +81,18 @@ pub fn test_state() -> AppState { let pool = sqlx::postgres::PgPoolOptions::new() .connect_lazy(&database_url) .expect("lazy pool should build"); - let connection_manager = std::sync::Arc::new(plc_platform_core::connection::ConnectionManager::new()); + let connection_manager = + std::sync::Arc::new(plc_platform_core::connection::ConnectionManager::new()); let ws_manager = std::sync::Arc::new(plc_platform_core::websocket::WebSocketManager::new()); AppState { app_name: "operation-system", config: AppConfig { - database_url, - server_host: "127.0.0.1".to_string(), - server_port: 0, + server: plc_platform_core::config::ServerConfig { + database_url, + server_host: "127.0.0.1".to_string(), + server_port: 0, + }, }, platform: PlatformContext::new(pool, connection_manager, ws_manager), } diff --git a/crates/app_operation_system/src/router.rs b/crates/app_operation_system/src/router.rs index 368e49c..61703cf 100644 --- a/crates/app_operation_system/src/router.rs +++ b/crates/app_operation_system/src/router.rs @@ -1,20 +1,7 @@ use axum::{extract::State, routing::get, Router}; -use tower_http::services::ServeDir; use crate::app::AppState; -async fn no_cache( - req: axum::extract::Request, - next: axum::middleware::Next, -) -> axum::response::Response { - let mut response = next.run(req).await; - response.headers_mut().insert( - axum::http::header::CACHE_CONTROL, - axum::http::HeaderValue::from_static("no-store"), - ); - response -} - pub fn build_router(state: AppState) -> Router { // Platform routes (source, point, equipment, tag, page, logs) from core. let platform = plc_platform_core::handler::platform_routes::(); @@ -23,20 +10,17 @@ pub fn build_router(state: AppState) -> Router { let ops_routes = Router::new() .route("/api/health", get(health_check)) .route("/api/docs/api-md", get(crate::handler::doc::get_api_md)) - .route("/api/docs/readme-md", get(crate::handler::doc::get_readme_md)); + .route( + "/api/docs/readme-md", + get(crate::handler::doc::get_readme_md), + ); Router::new() .merge(platform) .merge(ops_routes) .nest( "/ui", - Router::new() - .fallback_service( - ServeDir::new("web/ops") - .append_index_html_on_directories(true) - .fallback(ServeDir::new("web/core")), - ) - .layer(axum::middleware::from_fn(no_cache)), + plc_platform_core::http::static_ui_routes("web/ops", "web/core"), ) .with_state(state) } diff --git a/crates/plc_platform_core/src/bootstrap.rs b/crates/plc_platform_core/src/bootstrap.rs index 9cba1a8..0e4e272 100644 --- a/crates/plc_platform_core/src/bootstrap.rs +++ b/crates/plc_platform_core/src/bootstrap.rs @@ -22,7 +22,8 @@ impl PlatformBuilder { cm_for_telemetry, self.ws_manager.clone(), )); - self.connection_manager.set_event_manager(telemetry_processor); + self.connection_manager + .set_event_manager(telemetry_processor); // Start reconnect task (auto-reconnects on connection loss). self.connection_manager @@ -50,3 +51,31 @@ pub async fn bootstrap_platform(database_url: &str) -> Result Result<(), String> { + let sources = crate::service::get_all_enabled_sources(&platform.pool) + .await + .map_err(|e| format!("Failed to fetch sources: {}", e))?; + + let mut tasks = Vec::new(); + for source in sources { + let cm = platform.connection_manager.clone(); + let pool = platform.pool.clone(); + let source_name = source.name.clone(); + let source_id = source.id; + + tasks.push(tokio::spawn(async move { + if let Err(err) = cm.connect_from_source(&pool, source_id).await { + tracing::error!("Failed to connect to source {}: {}", source_name, err); + } + })); + } + + for task in tasks { + if let Err(err) = task.await { + tracing::error!("Source connection task failed: {:?}", err); + } + } + + Ok(()) +} diff --git a/crates/plc_platform_core/src/config.rs b/crates/plc_platform_core/src/config.rs new file mode 100644 index 0000000..7950171 --- /dev/null +++ b/crates/plc_platform_core/src/config.rs @@ -0,0 +1,51 @@ +use std::env; + +#[derive(Clone, Debug)] +pub struct ServerConfig { + pub database_url: String, + pub server_host: String, + pub server_port: u16, +} + +impl ServerConfig { + pub fn from_env( + host_key: &str, + host_default: &str, + port_key: &str, + port_default: u16, + ) -> Result { + Ok(Self { + database_url: required_env("DATABASE_URL")?, + server_host: env_string(host_key, host_default), + server_port: env_u16(port_key, port_default)?, + }) + } +} + +pub fn required_env(key: &str) -> Result { + env::var(key).map_err(|_| format!("Missing environment variable: {}", key)) +} + +pub fn env_string(key: &str, default: &str) -> String { + env::var(key).unwrap_or_else(|_| default.to_string()) +} + +pub fn env_u16(key: &str, default: u16) -> Result { + match env::var(key) { + Ok(value) => value + .parse::() + .map_err(|_| format!("{} must be a number", key)), + Err(_) => Ok(default), + } +} + +pub fn env_bool(key: &str, default: bool) -> bool { + env::var(key) + .map(|value| { + matches!( + value.to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "on" + ) + }) + .unwrap_or(default) +} diff --git a/crates/plc_platform_core/src/handler/point.rs b/crates/plc_platform_core/src/handler/point.rs index 0d784b7..01c86f1 100644 --- a/crates/plc_platform_core/src/handler/point.rs +++ b/crates/plc_platform_core/src/handler/point.rs @@ -10,12 +10,12 @@ use std::collections::{HashMap, HashSet}; use uuid::Uuid; use validator::Validate; +use crate::model::{Node, Point}; +use crate::platform_context::PlatformContext; use crate::util::{ pagination::{PaginatedResponse, PaginationParams}, response::ApiErr, }; -use crate::platform_context::PlatformContext; -use crate::model::{Node, Point}; fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator) { let ids: Vec = { @@ -58,6 +58,18 @@ pub struct PointHistoryItem { pub value_number: Option, } +pub async fn batch_set_point_value( + State(state): State, + Json(payload): Json, +) -> Result { + let result = state + .connection_manager + .write_point_values_batch(payload) + .await + .map_err(|e| ApiErr::Internal(e, None))?; + Ok(Json(result)) +} + pub async fn get_point_list( State(state): State, Query(query): Query, @@ -556,7 +568,10 @@ pub async fn batch_create_points( crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?; for (source_id, points) in grouped { let point_ids: Vec = points.into_iter().map(|p| p.point_id).collect(); - state.emit_event(crate::event::PlatformEvent::PointsCreated { source_id, point_ids }); + state.emit_event(crate::event::PlatformEvent::PointsCreated { + source_id, + point_ids, + }); } } @@ -616,7 +631,10 @@ pub async fn batch_delete_points( for (source_id, points) in grouped { let ids: Vec = points.into_iter().map(|p| p.point_id).collect(); - state.emit_event(crate::event::PlatformEvent::PointsDeleted { source_id, point_ids: ids }); + state.emit_event(crate::event::PlatformEvent::PointsDeleted { + source_id, + point_ids: ids, + }); } notify_units(&state, affected_unit_ids); @@ -636,4 +654,3 @@ fn monitor_value_to_number(item: &crate::telemetry::PointMonitorInfo) -> Option< _ => None, } } - diff --git a/crates/plc_platform_core/src/handler/router.rs b/crates/plc_platform_core/src/handler/router.rs index 68fcdee..bb8ce8a 100644 --- a/crates/plc_platform_core/src/handler/router.rs +++ b/crates/plc_platform_core/src/handler/router.rs @@ -24,8 +24,7 @@ where ) .route( "/api/source/{source_id}", - axum::routing::delete(super::source::delete_source) - .put(super::source::update_source), + axum::routing::delete(super::source::delete_source).put(super::source::update_source), ) .route( "/api/source/{source_id}/reconnect", @@ -41,10 +40,13 @@ where ) // Point .route("/api/point", get(super::point::get_point_list)) + .route( + "/api/point/value/batch", + post(super::point::batch_set_point_value), + ) .route( "/api/point/batch", - post(super::point::batch_create_points) - .delete(super::point::batch_delete_points), + post(super::point::batch_create_points).delete(super::point::batch_delete_points), ) .route( "/api/point/{point_id}/history", @@ -67,8 +69,7 @@ where // Equipment .route( "/api/equipment", - get(super::equipment::get_equipment_list) - .post(super::equipment::create_equipment), + get(super::equipment::get_equipment_list).post(super::equipment::create_equipment), ) .route( "/api/equipment/{equipment_id}", diff --git a/crates/plc_platform_core/src/http.rs b/crates/plc_platform_core/src/http.rs new file mode 100644 index 0000000..88bc23f --- /dev/null +++ b/crates/plc_platform_core/src/http.rs @@ -0,0 +1,61 @@ +use std::time::Instant; + +use axum::{ + body::Body, + extract::Request, + http::{header, HeaderValue}, + middleware::Next, + response::Response, + Router, +}; +use tower_http::{ + cors::{Any, CorsLayer}, + services::ServeDir, +}; + +pub async fn no_cache(req: Request, next: Next) -> Response { + let mut response = next.run(req).await; + response + .headers_mut() + .insert(header::CACHE_CONTROL, HeaderValue::from_static("no-store")); + response +} + +pub async fn simple_logger(req: axum::http::Request, next: Next) -> Response { + let method = req.method().to_string(); + let uri = req.uri().to_string(); + + let start = Instant::now(); + let res = next.run(req).await; + let duration = start.elapsed(); + let status = res.status(); + + match status.as_u16() { + 100..=399 => tracing::info!("{} {} {} {:?}", method, uri, status, duration), + 400..=499 => tracing::warn!("{} {} {} {:?}", method, uri, status, duration), + 500..=599 => tracing::error!("{} {} {} {:?}", method, uri, status, duration), + _ => tracing::warn!("{} {} {} {:?}", method, uri, status, duration), + } + + res +} + +pub fn permissive_cors() -> CorsLayer { + CorsLayer::new() + .allow_origin(Any) + .allow_methods(Any) + .allow_headers(Any) +} + +pub fn static_ui_routes(app_dir: &'static str, core_dir: &'static str) -> Router +where + S: Clone + Send + Sync + 'static, +{ + Router::new() + .fallback_service( + ServeDir::new(app_dir) + .append_index_html_on_directories(true) + .fallback(ServeDir::new(core_dir)), + ) + .layer(axum::middleware::from_fn(no_cache)) +} diff --git a/crates/plc_platform_core/src/lib.rs b/crates/plc_platform_core/src/lib.rs index b748bc1..058ff23 100644 --- a/crates/plc_platform_core/src/lib.rs +++ b/crates/plc_platform_core/src/lib.rs @@ -1,9 +1,11 @@ -pub mod bootstrap; +pub mod bootstrap; +pub mod config; pub mod connection; pub mod control; pub mod db; pub mod event; pub mod handler; +pub mod http; pub mod model; pub mod platform_context; pub mod service; @@ -13,4 +15,3 @@ pub mod util; pub mod websocket; pub use event::EventEnvelope; - diff --git a/crates/plc_platform_core/src/websocket.rs b/crates/plc_platform_core/src/websocket.rs index 6960f0a..c0e73c7 100644 --- a/crates/plc_platform_core/src/websocket.rs +++ b/crates/plc_platform_core/src/websocket.rs @@ -1,9 +1,17 @@ use std::{collections::HashMap, sync::Arc}; +use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + FromRef, Path, State, + }, + response::IntoResponse, +}; use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, RwLock}; use uuid::Uuid; +use crate::platform_context::PlatformContext; use crate::{ connection::{BatchSetPointValueReq, BatchSetPointValueRes}, control::runtime::UnitRuntime, @@ -23,15 +31,9 @@ pub enum WsMessage { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", content = "data", rename_all = "snake_case")] pub enum WsClientMessage { - AuthWrite(WsAuthWriteReq), PointSetValueBatch(BatchSetPointValueReq), } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WsAuthWriteReq { - pub key: String, -} - #[derive(Clone)] pub struct RoomManager { rooms: Arc>>>, @@ -139,3 +141,121 @@ impl Default for WebSocketManager { Self::new() } } + +pub async fn public_websocket_handler( + ws: WebSocketUpgrade, + State(state): State, +) -> impl IntoResponse +where + S: Clone + Send + Sync + 'static, + PlatformContext: FromRef, +{ + let platform = PlatformContext::from_ref(&state); + let ws_manager = platform.ws_manager.clone(); + ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, "public".to_string(), platform)) +} + +pub async fn client_websocket_handler( + ws: WebSocketUpgrade, + Path(client_id): Path, + State(state): State, +) -> impl IntoResponse +where + S: Clone + Send + Sync + 'static, + PlatformContext: FromRef, +{ + let platform = PlatformContext::from_ref(&state); + let ws_manager = platform.ws_manager.clone(); + let room_id = client_id.to_string(); + ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, room_id, platform)) +} + +async fn handle_socket( + mut socket: WebSocket, + ws_manager: Arc, + room_id: String, + platform: PlatformContext, +) { + let mut rx = ws_manager.subscribe_room(&room_id).await; + + loop { + tokio::select! { + maybe_msg = socket.recv() => { + match maybe_msg { + Some(Ok(msg)) => { + if matches!(msg, Message::Close(_)) { + break; + } + match msg { + Message::Text(text) => { + match serde_json::from_str::(&text) { + Ok(WsClientMessage::PointSetValueBatch(payload)) => { + let response = match platform.connection_manager.write_point_values_batch(payload).await { + Ok(v) => v, + Err(e) => BatchSetPointValueRes { + success: false, + err_msg: Some(e), + success_count: 0, + failed_count: 1, + results: vec![crate::connection::SetPointValueResItem { + point_id: Uuid::nil(), + success: false, + err_msg: Some("Internal write error".to_string()), + }], + }, + }; + if let Err(e) = ws_manager + .send_to_room(&room_id, WsMessage::PointSetValueBatchResult(response)) + .await + { + tracing::error!( + "Failed to send PointSetValueBatchResult to room {}: {}", + room_id, + e + ); + } + } + Err(e) => { + tracing::warn!( + "Invalid websocket message in room {}: {}", + room_id, + e + ); + } + } + } + _ => { + tracing::debug!("Received WebSocket message from room {}: {:?}", room_id, msg); + } + } + } + Some(Err(e)) => { + tracing::error!("WebSocket error in room {}: {}", room_id, e); + break; + } + None => break, + } + } + room_message = rx.recv() => { + match room_message { + Ok(message) => match serde_json::to_string(&message) { + Ok(json_str) => { + if socket.send(Message::Text(json_str.into())).await.is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to serialize websocket message: {}", e); + } + }, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + tracing::warn!("WebSocket room {} lagged, skipped {} messages", room_id, skipped); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + } + } + + ws_manager.remove_room_if_empty(&room_id).await; +} diff --git a/docs/api-feeder.md b/docs/api-feeder.md index 93ccfc7..0bf6284 100644 --- a/docs/api-feeder.md +++ b/docs/api-feeder.md @@ -160,10 +160,6 @@ 批量写点。 -请求头: - -- `X-Write-Key: ` - 请求示例: ```json