feat(simulate): chaos task for rem/flt signal testing
When SIMULATE_PLC=true, a background task randomly disrupts rem or flt signals on equipment (rem=false for 5-15s, flt=true for 3-10s) to exercise fault detection, comm lock, and recovery logic in the engine. Uses XorShift64 PRNG with no extra dependencies. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
9d787e452b
commit
532eeaba42
|
|
@ -1,6 +1,7 @@
|
|||
pub mod command;
|
||||
pub mod engine;
|
||||
pub mod runtime;
|
||||
pub mod simulate;
|
||||
pub mod validator;
|
||||
|
||||
use crate::telemetry::{DataValue, PointMonitorInfo};
|
||||
|
|
|
|||
|
|
@ -0,0 +1,195 @@
|
|||
use tokio::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
connection::{BatchSetPointValueReq, SetPointValueReqItem},
|
||||
telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType},
|
||||
websocket::WsMessage,
|
||||
AppState,
|
||||
};
|
||||
|
||||
/// Start the chaos simulation task (only when SIMULATE_PLC=true).
|
||||
/// Randomly disrupts `rem` or `flt` signals on equipment to exercise the control engine.
|
||||
pub fn start(state: AppState) {
|
||||
tokio::spawn(async move {
|
||||
run(state).await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn run(state: AppState) {
|
||||
let mut rng = seed_rng();
|
||||
|
||||
loop {
|
||||
// Wait a random 15–60 s between events.
|
||||
let wait_secs = 15 + xorshift(&mut rng) % 46;
|
||||
tokio::time::sleep(Duration::from_secs(wait_secs)).await;
|
||||
|
||||
// Pick a random enabled unit.
|
||||
let units = match crate::service::get_all_enabled_units(&state.pool).await {
|
||||
Ok(u) if !u.is_empty() => u,
|
||||
_ => continue,
|
||||
};
|
||||
let unit = &units[xorshift(&mut rng) as usize % units.len()];
|
||||
|
||||
// Only target units with auto control running — otherwise the event is uninteresting.
|
||||
let runtime = state.control_runtime.get(unit.id).await;
|
||||
if runtime.map_or(true, |r| !r.auto_enabled) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Pick a random equipment in that unit.
|
||||
let equipments =
|
||||
match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await {
|
||||
Ok(e) if !e.is_empty() => e,
|
||||
_ => continue,
|
||||
};
|
||||
let eq = &equipments[xorshift(&mut rng) as usize % equipments.len()];
|
||||
|
||||
// Find which of rem / flt this equipment has.
|
||||
let role_points =
|
||||
match crate::service::get_equipment_role_points(&state.pool, eq.id).await {
|
||||
Ok(rp) if !rp.is_empty() => rp,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
let candidates: Vec<&str> = ["flt", "rem"]
|
||||
.iter()
|
||||
.filter(|&&r| role_points.iter().any(|p| p.signal_role == r))
|
||||
.copied()
|
||||
.collect();
|
||||
|
||||
if candidates.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let target_role = candidates[xorshift(&mut rng) as usize % candidates.len()];
|
||||
let target_point = role_points
|
||||
.iter()
|
||||
.find(|p| p.signal_role == target_role)
|
||||
.unwrap();
|
||||
|
||||
// rem=false → not in remote mode (blocks commands)
|
||||
// flt=true → fault signal active (triggers fault lock)
|
||||
let trigger_value = target_role == "flt";
|
||||
|
||||
// Hold duration: 5–15 s for rem, 3–10 s for flt.
|
||||
let hold_secs = if target_role == "flt" {
|
||||
3 + xorshift(&mut rng) % 8
|
||||
} else {
|
||||
5 + xorshift(&mut rng) % 11
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"[chaos] unit={} eq={} role={} → {} (hold {}s)",
|
||||
unit.code,
|
||||
eq.code,
|
||||
target_role,
|
||||
if trigger_value { "FAULT" } else { "REM OFF" },
|
||||
hold_secs
|
||||
);
|
||||
|
||||
patch_signal(&state, target_point.point_id, trigger_value).await;
|
||||
tokio::time::sleep(Duration::from_secs(hold_secs)).await;
|
||||
patch_signal(&state, target_point.point_id, !trigger_value).await;
|
||||
|
||||
tracing::info!(
|
||||
"[chaos] unit={} eq={} role={} → RESTORED",
|
||||
unit.code,
|
||||
eq.code,
|
||||
target_role
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push.
|
||||
async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) {
|
||||
let write_json = serde_json::json!(if value_on { 1 } else { 0 });
|
||||
let write_ok = match state
|
||||
.connection_manager
|
||||
.write_point_values_batch(BatchSetPointValueReq {
|
||||
items: vec![SetPointValueReqItem {
|
||||
point_id,
|
||||
value: write_json,
|
||||
}],
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(res) => res.success,
|
||||
Err(_) => false,
|
||||
};
|
||||
|
||||
if write_ok {
|
||||
return;
|
||||
}
|
||||
|
||||
// Fallback: patch the monitor cache directly and broadcast over WS.
|
||||
let (value, value_type, value_text) = {
|
||||
let guard = state
|
||||
.connection_manager
|
||||
.get_point_monitor_data_read_guard()
|
||||
.await;
|
||||
match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) {
|
||||
Some(ValueType::Int) => (
|
||||
DataValue::Int(if value_on { 1 } else { 0 }),
|
||||
Some(ValueType::Int),
|
||||
Some(if value_on { "1" } else { "0" }.to_string()),
|
||||
),
|
||||
Some(ValueType::UInt) => (
|
||||
DataValue::UInt(if value_on { 1 } else { 0 }),
|
||||
Some(ValueType::UInt),
|
||||
Some(if value_on { "1" } else { "0" }.to_string()),
|
||||
),
|
||||
_ => (
|
||||
DataValue::Bool(value_on),
|
||||
Some(ValueType::Bool),
|
||||
Some(value_on.to_string()),
|
||||
),
|
||||
}
|
||||
};
|
||||
|
||||
let monitor = PointMonitorInfo {
|
||||
protocol: "simulation".to_string(),
|
||||
source_id: Uuid::nil(),
|
||||
point_id,
|
||||
client_handle: 0,
|
||||
scan_mode: crate::model::ScanMode::Poll,
|
||||
timestamp: Some(chrono::Utc::now()),
|
||||
quality: PointQuality::Good,
|
||||
value: Some(value),
|
||||
value_type,
|
||||
value_text,
|
||||
old_value: None,
|
||||
old_timestamp: None,
|
||||
value_changed: true,
|
||||
};
|
||||
|
||||
if let Err(e) = state
|
||||
.connection_manager
|
||||
.update_point_monitor_data(monitor.clone())
|
||||
.await
|
||||
{
|
||||
tracing::warn!("[chaos] cache update failed for {}: {}", point_id, e);
|
||||
return;
|
||||
}
|
||||
|
||||
let _ = state
|
||||
.ws_manager
|
||||
.send_to_public(WsMessage::PointNewValue(monitor))
|
||||
.await;
|
||||
}
|
||||
|
||||
// ── Minimal XorShift64 PRNG (no external crate needed) ────────────────────────
|
||||
|
||||
fn seed_rng() -> u64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos() as u64 ^ d.as_secs().wrapping_mul(0x9e37_79b9_7f4a_7c15))
|
||||
.unwrap_or(0xdeadbeef)
|
||||
}
|
||||
|
||||
fn xorshift(s: &mut u64) -> u64 {
|
||||
*s ^= *s << 13;
|
||||
*s ^= *s >> 7;
|
||||
*s ^= *s << 17;
|
||||
*s
|
||||
}
|
||||
|
|
@ -104,6 +104,9 @@ async fn main() {
|
|||
control_runtime: control_runtime.clone(),
|
||||
};
|
||||
control::engine::start(state.clone(), control_runtime);
|
||||
if config.simulate_plc {
|
||||
control::simulate::start(state.clone());
|
||||
}
|
||||
let app = build_router(state.clone());
|
||||
let addr = format!("{}:{}", config.server_host, config.server_port);
|
||||
tracing::info!("Starting server at http://{}", addr);
|
||||
|
|
|
|||
Loading…
Reference in New Issue