diff --git a/src/control/mod.rs b/src/control/mod.rs index 9c8da12..0b5c26d 100644 --- a/src/control/mod.rs +++ b/src/control/mod.rs @@ -1,6 +1,7 @@ pub mod command; pub mod engine; pub mod runtime; +pub mod simulate; pub mod validator; use crate::telemetry::{DataValue, PointMonitorInfo}; diff --git a/src/control/simulate.rs b/src/control/simulate.rs new file mode 100644 index 0000000..c75674a --- /dev/null +++ b/src/control/simulate.rs @@ -0,0 +1,195 @@ +use tokio::time::Duration; +use uuid::Uuid; + +use crate::{ + connection::{BatchSetPointValueReq, SetPointValueReqItem}, + telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType}, + websocket::WsMessage, + AppState, +}; + +/// Start the chaos simulation task (only when SIMULATE_PLC=true). +/// Randomly disrupts `rem` or `flt` signals on equipment to exercise the control engine. +pub fn start(state: AppState) { + tokio::spawn(async move { + run(state).await; + }); +} + +async fn run(state: AppState) { + let mut rng = seed_rng(); + + loop { + // Wait a random 15–60 s between events. + let wait_secs = 15 + xorshift(&mut rng) % 46; + tokio::time::sleep(Duration::from_secs(wait_secs)).await; + + // Pick a random enabled unit. + let units = match crate::service::get_all_enabled_units(&state.pool).await { + Ok(u) if !u.is_empty() => u, + _ => continue, + }; + let unit = &units[xorshift(&mut rng) as usize % units.len()]; + + // Only target units with auto control running — otherwise the event is uninteresting. + let runtime = state.control_runtime.get(unit.id).await; + if runtime.map_or(true, |r| !r.auto_enabled) { + continue; + } + + // Pick a random equipment in that unit. + let equipments = + match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await { + Ok(e) if !e.is_empty() => e, + _ => continue, + }; + let eq = &equipments[xorshift(&mut rng) as usize % equipments.len()]; + + // Find which of rem / flt this equipment has. + let role_points = + match crate::service::get_equipment_role_points(&state.pool, eq.id).await { + Ok(rp) if !rp.is_empty() => rp, + _ => continue, + }; + + let candidates: Vec<&str> = ["flt", "rem"] + .iter() + .filter(|&&r| role_points.iter().any(|p| p.signal_role == r)) + .copied() + .collect(); + + if candidates.is_empty() { + continue; + } + + let target_role = candidates[xorshift(&mut rng) as usize % candidates.len()]; + let target_point = role_points + .iter() + .find(|p| p.signal_role == target_role) + .unwrap(); + + // rem=false → not in remote mode (blocks commands) + // flt=true → fault signal active (triggers fault lock) + let trigger_value = target_role == "flt"; + + // Hold duration: 5–15 s for rem, 3–10 s for flt. + let hold_secs = if target_role == "flt" { + 3 + xorshift(&mut rng) % 8 + } else { + 5 + xorshift(&mut rng) % 11 + }; + + tracing::info!( + "[chaos] unit={} eq={} role={} → {} (hold {}s)", + unit.code, + eq.code, + target_role, + if trigger_value { "FAULT" } else { "REM OFF" }, + hold_secs + ); + + patch_signal(&state, target_point.point_id, trigger_value).await; + tokio::time::sleep(Duration::from_secs(hold_secs)).await; + patch_signal(&state, target_point.point_id, !trigger_value).await; + + tracing::info!( + "[chaos] unit={} eq={} role={} → RESTORED", + unit.code, + eq.code, + target_role + ); + } +} + +/// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push. +async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { + let write_json = serde_json::json!(if value_on { 1 } else { 0 }); + let write_ok = match state + .connection_manager + .write_point_values_batch(BatchSetPointValueReq { + items: vec![SetPointValueReqItem { + point_id, + value: write_json, + }], + }) + .await + { + Ok(res) => res.success, + Err(_) => false, + }; + + if write_ok { + return; + } + + // Fallback: patch the monitor cache directly and broadcast over WS. + let (value, value_type, value_text) = { + let guard = state + .connection_manager + .get_point_monitor_data_read_guard() + .await; + match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) { + Some(ValueType::Int) => ( + DataValue::Int(if value_on { 1 } else { 0 }), + Some(ValueType::Int), + Some(if value_on { "1" } else { "0" }.to_string()), + ), + Some(ValueType::UInt) => ( + DataValue::UInt(if value_on { 1 } else { 0 }), + Some(ValueType::UInt), + Some(if value_on { "1" } else { "0" }.to_string()), + ), + _ => ( + DataValue::Bool(value_on), + Some(ValueType::Bool), + Some(value_on.to_string()), + ), + } + }; + + let monitor = PointMonitorInfo { + protocol: "simulation".to_string(), + source_id: Uuid::nil(), + point_id, + client_handle: 0, + scan_mode: crate::model::ScanMode::Poll, + timestamp: Some(chrono::Utc::now()), + quality: PointQuality::Good, + value: Some(value), + value_type, + value_text, + old_value: None, + old_timestamp: None, + value_changed: true, + }; + + if let Err(e) = state + .connection_manager + .update_point_monitor_data(monitor.clone()) + .await + { + tracing::warn!("[chaos] cache update failed for {}: {}", point_id, e); + return; + } + + let _ = state + .ws_manager + .send_to_public(WsMessage::PointNewValue(monitor)) + .await; +} + +// ── Minimal XorShift64 PRNG (no external crate needed) ──────────────────────── + +fn seed_rng() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64 ^ d.as_secs().wrapping_mul(0x9e37_79b9_7f4a_7c15)) + .unwrap_or(0xdeadbeef) +} + +fn xorshift(s: &mut u64) -> u64 { + *s ^= *s << 13; + *s ^= *s >> 7; + *s ^= *s << 17; + *s +} diff --git a/src/main.rs b/src/main.rs index c9e96c7..c857203 100644 --- a/src/main.rs +++ b/src/main.rs @@ -104,6 +104,9 @@ async fn main() { control_runtime: control_runtime.clone(), }; control::engine::start(state.clone(), control_runtime); + if config.simulate_plc { + control::simulate::start(state.clone()); + } let app = build_router(state.clone()); let addr = format!("{}:{}", config.server_host, config.server_port); tracing::info!("Starting server at http://{}", addr);