diff --git a/crates/app_feeder_distributor/src/app.rs b/crates/app_feeder_distributor/src/app.rs index 49646f7..0b4405d 100644 --- a/crates/app_feeder_distributor/src/app.rs +++ b/crates/app_feeder_distributor/src/app.rs @@ -57,6 +57,11 @@ pub async fn run() { }; control::engine::start(state.clone(), control_runtime); + if control::simulate::enabled() { + tracing::info!("SIMULATE_PLC enabled: starting chaos simulation"); + control::simulate::start(state.clone()); + } + let app = build_router(state.clone()); let ui_url = config.local_ui_url(); let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); diff --git a/crates/app_feeder_distributor/src/control/engine.rs b/crates/app_feeder_distributor/src/control/engine.rs index 08eb8c4..92dd915 100644 --- a/crates/app_feeder_distributor/src/control/engine.rs +++ b/crates/app_feeder_distributor/src/control/engine.rs @@ -80,7 +80,8 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu }; // Fault / comm check. - let (kind_roles, all_roles) = match load_equipment_maps(&state, unit_id).await { + let (kind_roles, all_roles, kind_eq_ids) = match load_equipment_maps(&state, unit_id).await + { Ok(maps) => maps, Err(e) => { tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e); @@ -142,6 +143,12 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e); continue; } + if crate::control::simulate::enabled() { + if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { + crate::control::simulate::simulate_run_feedback(&state, eq_id, true) + .await; + } + } } let mut runtime = store.get_or_init(unit_id).await; runtime.state = UnitRuntimeState::Running; @@ -195,6 +202,12 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e); continue; } + if crate::control::simulate::enabled() { + if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { + crate::control::simulate::simulate_run_feedback(&state, eq_id, false) + .await; + } + } } let mut runtime = store.get_or_init(unit_id).await; runtime.accumulated_run_sec += secs as i64 * 1000; @@ -227,6 +240,13 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu unit_id, e ); + } else if crate::control::simulate::enabled() { + if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { + crate::control::simulate::simulate_run_feedback( + &state, eq_id, true, + ) + .await; + } } } runtime.state = UnitRuntimeState::DistributorRunning; @@ -267,6 +287,12 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu ); continue; } + if crate::control::simulate::enabled() { + if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { + crate::control::simulate::simulate_run_feedback(&state, eq_id, false) + .await; + } + } } let mut runtime = store.get_or_init(unit_id).await; runtime.accumulated_run_sec = 0; @@ -491,6 +517,7 @@ async fn check_fault_comm( type EquipMaps = ( HashMap>, Vec<(Uuid, HashMap)>, + HashMap, ); async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result { @@ -527,6 +554,7 @@ fn build_equipment_maps( ) -> EquipMaps { let mut kind_roles: HashMap> = HashMap::new(); let mut all_roles: Vec<(Uuid, HashMap)> = Vec::new(); + let mut kind_eq_ids: HashMap = HashMap::new(); for equip in equipment_list { let role_map: HashMap = role_points_by_equipment @@ -539,6 +567,7 @@ fn build_equipment_maps( if let Some(kind) = &equip.kind { if !kind_roles.contains_key(kind.as_str()) { kind_roles.insert(kind.clone(), role_map.clone()); + kind_eq_ids.insert(kind.clone(), equip.id); } else { tracing::warn!( "Engine: unit {} has multiple {} equipment; using first", @@ -550,7 +579,7 @@ fn build_equipment_maps( all_roles.push((equip.id, role_map)); } - (kind_roles, all_roles) + (kind_roles, all_roles, kind_eq_ids) } /// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad. @@ -621,7 +650,7 @@ mod tests { signal_role: "start_cmd".to_string(), }], ); - let (first_kind_roles, _) = build_equipment_maps(unit_id, &equipment_list, first_roles); + let (first_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, first_roles); let mut second_roles = HashMap::new(); second_roles.insert( @@ -631,7 +660,8 @@ mod tests { signal_role: "start_cmd".to_string(), }], ); - let (second_kind_roles, _) = build_equipment_maps(unit_id, &equipment_list, second_roles); + let (second_kind_roles, _, _) = + build_equipment_maps(unit_id, &equipment_list, second_roles); assert_eq!( first_kind_roles["coal_feeder"]["start_cmd"].point_id, diff --git a/crates/app_feeder_distributor/src/control/mod.rs b/crates/app_feeder_distributor/src/control/mod.rs index b833f51..d1af408 100644 --- a/crates/app_feeder_distributor/src/control/mod.rs +++ b/crates/app_feeder_distributor/src/control/mod.rs @@ -1,6 +1,7 @@ pub use plc_platform_core::control::{command, runtime}; pub mod engine; +pub mod simulate; pub mod validator; use plc_platform_core::telemetry::{DataValue, PointMonitorInfo}; diff --git a/crates/app_feeder_distributor/src/control/simulate.rs b/crates/app_feeder_distributor/src/control/simulate.rs new file mode 100644 index 0000000..66e9ba6 --- /dev/null +++ b/crates/app_feeder_distributor/src/control/simulate.rs @@ -0,0 +1,223 @@ +use tokio::time::Duration; +use uuid::Uuid; + +use plc_platform_core::{ + connection::{BatchSetPointValueReq, SetPointValueReqItem}, + service, + telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType}, + websocket::WsMessage, +}; + +use crate::AppState; + +/// Whether SIMULATE_PLC mode is enabled via environment variable. +pub fn enabled() -> bool { + matches!( + std::env::var("SIMULATE_PLC").ok().as_deref(), + Some("true") | Some("1") + ) +} + +/// Start the chaos simulation task (only when SIMULATE_PLC=true). +/// Randomly disrupts `rem` or `flt` signals on equipment to exercise the control engine. +pub fn start(state: AppState) { + tokio::spawn(async move { + run(state).await; + }); +} + +async fn run(state: AppState) { + let mut rng = seed_rng(); + + loop { + // Wait a random 15-60 s between events. + let wait_secs = 15 + xorshift(&mut rng) % 46; + tokio::time::sleep(Duration::from_secs(wait_secs)).await; + + // Pick a random enabled unit. + let units = match service::get_all_enabled_units(&state.platform.pool).await { + Ok(u) if !u.is_empty() => u, + _ => continue, + }; + let unit = &units[xorshift(&mut rng) as usize % units.len()]; + + // Only target units with auto control running; otherwise the event is uninteresting. + let runtime = state.control_runtime.get(unit.id).await; + if runtime.map_or(true, |r| !r.auto_enabled) { + continue; + } + + // Pick a random equipment in that unit. + let equipments = + match service::get_equipment_by_unit_id(&state.platform.pool, unit.id).await { + Ok(e) if !e.is_empty() => e, + _ => continue, + }; + let eq = &equipments[xorshift(&mut rng) as usize % equipments.len()]; + + // Find which of rem / flt this equipment has. + let role_points = + match service::get_equipment_role_points(&state.platform.pool, eq.id).await { + Ok(rp) if !rp.is_empty() => rp, + _ => continue, + }; + + let candidates: Vec<&str> = ["flt", "rem"] + .iter() + .filter(|&&r| role_points.iter().any(|p| p.signal_role == r)) + .copied() + .collect(); + + if candidates.is_empty() { + continue; + } + + let target_role = candidates[xorshift(&mut rng) as usize % candidates.len()]; + let target_point = role_points + .iter() + .find(|p| p.signal_role == target_role) + .unwrap(); + + // rem=false means the equipment is not in remote mode. + // flt=true means the equipment reports an active fault. + let trigger_value = target_role == "flt"; + + // Hold duration: 5-15 s for rem, 3-10 s for flt. + let hold_secs = if target_role == "flt" { + 3 + xorshift(&mut rng) % 8 + } else { + 5 + xorshift(&mut rng) % 11 + }; + + tracing::info!( + "[chaos] unit={} eq={} role={} -> {} (hold {}s)", + unit.code, + eq.code, + target_role, + if trigger_value { "FAULT" } else { "REM OFF" }, + hold_secs + ); + patch_signal(&state, target_point.point_id, trigger_value).await; + patch_signal(&state, target_point.point_id, trigger_value).await; + tokio::time::sleep(Duration::from_secs(hold_secs)).await; + patch_signal(&state, target_point.point_id, !trigger_value).await; + + tracing::info!( + "[chaos] unit={} eq={} role={} -> RESTORED", + unit.code, + eq.code, + target_role + ); + } +} + +/// Simulate RUN signal feedback for an equipment after a manual start/stop command. +/// Called by the engine and control handler when SIMULATE_PLC=true. +pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) { + let role_points = + match service::get_equipment_role_points(&state.platform.pool, equipment_id).await { + Ok(v) => v, + Err(e) => { + tracing::warn!("simulate_run_feedback: db error: {}", e); + return; + } + }; + let run_point = match role_points.iter().find(|p| p.signal_role == "run") { + Some(p) => p.clone(), + None => return, + }; + patch_signal(state, run_point.point_id, run_on).await; +} + +/// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push. +pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { + let write_json = serde_json::json!(if value_on { 1 } else { 0 }); + let write_ok = match state + .platform.connection_manager + .write_point_values_batch(BatchSetPointValueReq { + items: vec![SetPointValueReqItem { + point_id, + value: write_json, + }], + }) + .await + { + Ok(res) => res.success, + Err(_) => false, + }; + + if write_ok { + return; + } + + // Fallback: patch the monitor cache directly and broadcast over WS. + let (value, value_type, value_text) = { + let guard = state + .platform.connection_manager + .get_point_monitor_data_read_guard() + .await; + match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) { + Some(ValueType::Int) => ( + DataValue::Int(if value_on { 1 } else { 0 }), + Some(ValueType::Int), + Some(if value_on { "1" } else { "0" }.to_string()), + ), + Some(ValueType::UInt) => ( + DataValue::UInt(if value_on { 1 } else { 0 }), + Some(ValueType::UInt), + Some(if value_on { "1" } else { "0" }.to_string()), + ), + _ => ( + DataValue::Bool(value_on), + Some(ValueType::Bool), + Some(value_on.to_string()), + ), + } + }; + + let monitor = PointMonitorInfo { + protocol: "simulation".to_string(), + source_id: Uuid::nil(), + point_id, + client_handle: 0, + scan_mode: plc_platform_core::model::ScanMode::Poll, + timestamp: Some(chrono::Utc::now()), + quality: PointQuality::Good, + value: Some(value), + value_type, + value_text, + old_value: None, + old_timestamp: None, + value_changed: true, + }; + + if let Err(e) = state + .platform.connection_manager + .update_point_monitor_data(monitor.clone()) + .await + { + tracing::warn!("[chaos] cache update failed for {}: {}", point_id, e); + return; + } + + let _ = state + .platform.ws_manager + .send_to_public(WsMessage::PointNewValue(monitor)) + .await; +} + +// Minimal XorShift64 PRNG (no external crate needed). + +fn seed_rng() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64 ^ d.as_secs().wrapping_mul(0x9e37_79b9_7f4a_7c15)) + .unwrap_or(0xdeadbeef) +} + +fn xorshift(s: &mut u64) -> u64 { + *s ^= *s << 13; + *s ^= *s >> 7; + *s ^= *s << 17; + *s +} diff --git a/crates/app_feeder_distributor/src/handler/control.rs b/crates/app_feeder_distributor/src/handler/control.rs index b328ee5..35f68a4 100644 --- a/crates/app_feeder_distributor/src/handler/control.rs +++ b/crates/app_feeder_distributor/src/handler/control.rs @@ -186,6 +186,15 @@ async fn send_equipment_command( .await .map_err(|e| ApiErr::Internal(e, None))?; + if crate::control::simulate::enabled() { + crate::control::simulate::simulate_run_feedback( + &state, + equipment_id, + matches!(action, ControlAction::Start), + ) + .await; + } + let event = match action { ControlAction::Start => AppEvent::EquipmentStartCommandSent { equipment_id,