Restore SIMULATE_PLC chaos task and run feedback
Bring back crates/app_feeder_distributor/src/control/simulate.rs which
was dropped when feeder's local connection/telemetry/websocket modules
moved into core; the restored module reuses core's equivalents via
plc_platform_core::{connection, service, telemetry, websocket}.
Also restore the call sites that were lost alongside it:
- app.rs starts the chaos task when SIMULATE_PLC=true
- engine.rs fires simulate_run_feedback after each pulse command so
the auto-control state machine sees the RUN bit transition it
would get from a real PLC
- handler/control.rs does the same after manual start/stop commands
The SIMULATE_PLC flag is now read via simulate::enabled() from the
environment rather than state.config.simulate_plc (the old config
struct was removed with the module migration). To expose equipment
ids by kind (used for run feedback), build_equipment_maps now also
returns HashMap<kind, Uuid>.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
3b92c0028a
commit
2a6dde9e0e
|
|
@ -57,6 +57,11 @@ pub async fn run() {
|
||||||
};
|
};
|
||||||
control::engine::start(state.clone(), control_runtime);
|
control::engine::start(state.clone(), control_runtime);
|
||||||
|
|
||||||
|
if control::simulate::enabled() {
|
||||||
|
tracing::info!("SIMULATE_PLC enabled: starting chaos simulation");
|
||||||
|
control::simulate::start(state.clone());
|
||||||
|
}
|
||||||
|
|
||||||
let app = build_router(state.clone());
|
let app = build_router(state.clone());
|
||||||
let ui_url = config.local_ui_url();
|
let ui_url = config.local_ui_url();
|
||||||
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
|
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
|
||||||
|
|
|
||||||
|
|
@ -80,7 +80,8 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
|
||||||
};
|
};
|
||||||
|
|
||||||
// Fault / comm check.
|
// Fault / comm check.
|
||||||
let (kind_roles, all_roles) = match load_equipment_maps(&state, unit_id).await {
|
let (kind_roles, all_roles, kind_eq_ids) = match load_equipment_maps(&state, unit_id).await
|
||||||
|
{
|
||||||
Ok(maps) => maps,
|
Ok(maps) => maps,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e);
|
tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e);
|
||||||
|
|
@ -142,6 +143,12 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
|
||||||
tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e);
|
tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if crate::control::simulate::enabled() {
|
||||||
|
if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() {
|
||||||
|
crate::control::simulate::simulate_run_feedback(&state, eq_id, true)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let mut runtime = store.get_or_init(unit_id).await;
|
let mut runtime = store.get_or_init(unit_id).await;
|
||||||
runtime.state = UnitRuntimeState::Running;
|
runtime.state = UnitRuntimeState::Running;
|
||||||
|
|
@ -195,6 +202,12 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
|
||||||
tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e);
|
tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if crate::control::simulate::enabled() {
|
||||||
|
if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() {
|
||||||
|
crate::control::simulate::simulate_run_feedback(&state, eq_id, false)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let mut runtime = store.get_or_init(unit_id).await;
|
let mut runtime = store.get_or_init(unit_id).await;
|
||||||
runtime.accumulated_run_sec += secs as i64 * 1000;
|
runtime.accumulated_run_sec += secs as i64 * 1000;
|
||||||
|
|
@ -227,6 +240,13 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
|
||||||
unit_id,
|
unit_id,
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
|
} else if crate::control::simulate::enabled() {
|
||||||
|
if let Some(eq_id) = kind_eq_ids.get("distributor").copied() {
|
||||||
|
crate::control::simulate::simulate_run_feedback(
|
||||||
|
&state, eq_id, true,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
runtime.state = UnitRuntimeState::DistributorRunning;
|
runtime.state = UnitRuntimeState::DistributorRunning;
|
||||||
|
|
@ -267,6 +287,12 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
|
||||||
);
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if crate::control::simulate::enabled() {
|
||||||
|
if let Some(eq_id) = kind_eq_ids.get("distributor").copied() {
|
||||||
|
crate::control::simulate::simulate_run_feedback(&state, eq_id, false)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let mut runtime = store.get_or_init(unit_id).await;
|
let mut runtime = store.get_or_init(unit_id).await;
|
||||||
runtime.accumulated_run_sec = 0;
|
runtime.accumulated_run_sec = 0;
|
||||||
|
|
@ -491,6 +517,7 @@ async fn check_fault_comm(
|
||||||
type EquipMaps = (
|
type EquipMaps = (
|
||||||
HashMap<String, HashMap<String, EquipmentRolePoint>>,
|
HashMap<String, HashMap<String, EquipmentRolePoint>>,
|
||||||
Vec<(Uuid, HashMap<String, EquipmentRolePoint>)>,
|
Vec<(Uuid, HashMap<String, EquipmentRolePoint>)>,
|
||||||
|
HashMap<String, Uuid>,
|
||||||
);
|
);
|
||||||
|
|
||||||
async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result<EquipMaps, sqlx::Error> {
|
async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result<EquipMaps, sqlx::Error> {
|
||||||
|
|
@ -527,6 +554,7 @@ fn build_equipment_maps(
|
||||||
) -> EquipMaps {
|
) -> EquipMaps {
|
||||||
let mut kind_roles: HashMap<String, HashMap<String, EquipmentRolePoint>> = HashMap::new();
|
let mut kind_roles: HashMap<String, HashMap<String, EquipmentRolePoint>> = HashMap::new();
|
||||||
let mut all_roles: Vec<(Uuid, HashMap<String, EquipmentRolePoint>)> = Vec::new();
|
let mut all_roles: Vec<(Uuid, HashMap<String, EquipmentRolePoint>)> = Vec::new();
|
||||||
|
let mut kind_eq_ids: HashMap<String, Uuid> = HashMap::new();
|
||||||
|
|
||||||
for equip in equipment_list {
|
for equip in equipment_list {
|
||||||
let role_map: HashMap<String, EquipmentRolePoint> = role_points_by_equipment
|
let role_map: HashMap<String, EquipmentRolePoint> = role_points_by_equipment
|
||||||
|
|
@ -539,6 +567,7 @@ fn build_equipment_maps(
|
||||||
if let Some(kind) = &equip.kind {
|
if let Some(kind) = &equip.kind {
|
||||||
if !kind_roles.contains_key(kind.as_str()) {
|
if !kind_roles.contains_key(kind.as_str()) {
|
||||||
kind_roles.insert(kind.clone(), role_map.clone());
|
kind_roles.insert(kind.clone(), role_map.clone());
|
||||||
|
kind_eq_ids.insert(kind.clone(), equip.id);
|
||||||
} else {
|
} else {
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
"Engine: unit {} has multiple {} equipment; using first",
|
"Engine: unit {} has multiple {} equipment; using first",
|
||||||
|
|
@ -550,7 +579,7 @@ fn build_equipment_maps(
|
||||||
all_roles.push((equip.id, role_map));
|
all_roles.push((equip.id, role_map));
|
||||||
}
|
}
|
||||||
|
|
||||||
(kind_roles, all_roles)
|
(kind_roles, all_roles, kind_eq_ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad.
|
/// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad.
|
||||||
|
|
@ -621,7 +650,7 @@ mod tests {
|
||||||
signal_role: "start_cmd".to_string(),
|
signal_role: "start_cmd".to_string(),
|
||||||
}],
|
}],
|
||||||
);
|
);
|
||||||
let (first_kind_roles, _) = build_equipment_maps(unit_id, &equipment_list, first_roles);
|
let (first_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, first_roles);
|
||||||
|
|
||||||
let mut second_roles = HashMap::new();
|
let mut second_roles = HashMap::new();
|
||||||
second_roles.insert(
|
second_roles.insert(
|
||||||
|
|
@ -631,7 +660,8 @@ mod tests {
|
||||||
signal_role: "start_cmd".to_string(),
|
signal_role: "start_cmd".to_string(),
|
||||||
}],
|
}],
|
||||||
);
|
);
|
||||||
let (second_kind_roles, _) = build_equipment_maps(unit_id, &equipment_list, second_roles);
|
let (second_kind_roles, _, _) =
|
||||||
|
build_equipment_maps(unit_id, &equipment_list, second_roles);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
first_kind_roles["coal_feeder"]["start_cmd"].point_id,
|
first_kind_roles["coal_feeder"]["start_cmd"].point_id,
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
pub use plc_platform_core::control::{command, runtime};
|
pub use plc_platform_core::control::{command, runtime};
|
||||||
|
|
||||||
pub mod engine;
|
pub mod engine;
|
||||||
|
pub mod simulate;
|
||||||
pub mod validator;
|
pub mod validator;
|
||||||
|
|
||||||
use plc_platform_core::telemetry::{DataValue, PointMonitorInfo};
|
use plc_platform_core::telemetry::{DataValue, PointMonitorInfo};
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,223 @@
|
||||||
|
use tokio::time::Duration;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use plc_platform_core::{
|
||||||
|
connection::{BatchSetPointValueReq, SetPointValueReqItem},
|
||||||
|
service,
|
||||||
|
telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType},
|
||||||
|
websocket::WsMessage,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::AppState;
|
||||||
|
|
||||||
|
/// Whether SIMULATE_PLC mode is enabled via environment variable.
|
||||||
|
pub fn enabled() -> bool {
|
||||||
|
matches!(
|
||||||
|
std::env::var("SIMULATE_PLC").ok().as_deref(),
|
||||||
|
Some("true") | Some("1")
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the chaos simulation task (only when SIMULATE_PLC=true).
|
||||||
|
/// Randomly disrupts `rem` or `flt` signals on equipment to exercise the control engine.
|
||||||
|
pub fn start(state: AppState) {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
run(state).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(state: AppState) {
|
||||||
|
let mut rng = seed_rng();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Wait a random 15-60 s between events.
|
||||||
|
let wait_secs = 15 + xorshift(&mut rng) % 46;
|
||||||
|
tokio::time::sleep(Duration::from_secs(wait_secs)).await;
|
||||||
|
|
||||||
|
// Pick a random enabled unit.
|
||||||
|
let units = match service::get_all_enabled_units(&state.platform.pool).await {
|
||||||
|
Ok(u) if !u.is_empty() => u,
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
let unit = &units[xorshift(&mut rng) as usize % units.len()];
|
||||||
|
|
||||||
|
// Only target units with auto control running; otherwise the event is uninteresting.
|
||||||
|
let runtime = state.control_runtime.get(unit.id).await;
|
||||||
|
if runtime.map_or(true, |r| !r.auto_enabled) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pick a random equipment in that unit.
|
||||||
|
let equipments =
|
||||||
|
match service::get_equipment_by_unit_id(&state.platform.pool, unit.id).await {
|
||||||
|
Ok(e) if !e.is_empty() => e,
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
let eq = &equipments[xorshift(&mut rng) as usize % equipments.len()];
|
||||||
|
|
||||||
|
// Find which of rem / flt this equipment has.
|
||||||
|
let role_points =
|
||||||
|
match service::get_equipment_role_points(&state.platform.pool, eq.id).await {
|
||||||
|
Ok(rp) if !rp.is_empty() => rp,
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let candidates: Vec<&str> = ["flt", "rem"]
|
||||||
|
.iter()
|
||||||
|
.filter(|&&r| role_points.iter().any(|p| p.signal_role == r))
|
||||||
|
.copied()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if candidates.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let target_role = candidates[xorshift(&mut rng) as usize % candidates.len()];
|
||||||
|
let target_point = role_points
|
||||||
|
.iter()
|
||||||
|
.find(|p| p.signal_role == target_role)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// rem=false means the equipment is not in remote mode.
|
||||||
|
// flt=true means the equipment reports an active fault.
|
||||||
|
let trigger_value = target_role == "flt";
|
||||||
|
|
||||||
|
// Hold duration: 5-15 s for rem, 3-10 s for flt.
|
||||||
|
let hold_secs = if target_role == "flt" {
|
||||||
|
3 + xorshift(&mut rng) % 8
|
||||||
|
} else {
|
||||||
|
5 + xorshift(&mut rng) % 11
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"[chaos] unit={} eq={} role={} -> {} (hold {}s)",
|
||||||
|
unit.code,
|
||||||
|
eq.code,
|
||||||
|
target_role,
|
||||||
|
if trigger_value { "FAULT" } else { "REM OFF" },
|
||||||
|
hold_secs
|
||||||
|
);
|
||||||
|
patch_signal(&state, target_point.point_id, trigger_value).await;
|
||||||
|
patch_signal(&state, target_point.point_id, trigger_value).await;
|
||||||
|
tokio::time::sleep(Duration::from_secs(hold_secs)).await;
|
||||||
|
patch_signal(&state, target_point.point_id, !trigger_value).await;
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"[chaos] unit={} eq={} role={} -> RESTORED",
|
||||||
|
unit.code,
|
||||||
|
eq.code,
|
||||||
|
target_role
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Simulate RUN signal feedback for an equipment after a manual start/stop command.
|
||||||
|
/// Called by the engine and control handler when SIMULATE_PLC=true.
|
||||||
|
pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) {
|
||||||
|
let role_points =
|
||||||
|
match service::get_equipment_role_points(&state.platform.pool, equipment_id).await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("simulate_run_feedback: db error: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let run_point = match role_points.iter().find(|p| p.signal_role == "run") {
|
||||||
|
Some(p) => p.clone(),
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
patch_signal(state, run_point.point_id, run_on).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push.
|
||||||
|
pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) {
|
||||||
|
let write_json = serde_json::json!(if value_on { 1 } else { 0 });
|
||||||
|
let write_ok = match state
|
||||||
|
.platform.connection_manager
|
||||||
|
.write_point_values_batch(BatchSetPointValueReq {
|
||||||
|
items: vec![SetPointValueReqItem {
|
||||||
|
point_id,
|
||||||
|
value: write_json,
|
||||||
|
}],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(res) => res.success,
|
||||||
|
Err(_) => false,
|
||||||
|
};
|
||||||
|
|
||||||
|
if write_ok {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback: patch the monitor cache directly and broadcast over WS.
|
||||||
|
let (value, value_type, value_text) = {
|
||||||
|
let guard = state
|
||||||
|
.platform.connection_manager
|
||||||
|
.get_point_monitor_data_read_guard()
|
||||||
|
.await;
|
||||||
|
match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) {
|
||||||
|
Some(ValueType::Int) => (
|
||||||
|
DataValue::Int(if value_on { 1 } else { 0 }),
|
||||||
|
Some(ValueType::Int),
|
||||||
|
Some(if value_on { "1" } else { "0" }.to_string()),
|
||||||
|
),
|
||||||
|
Some(ValueType::UInt) => (
|
||||||
|
DataValue::UInt(if value_on { 1 } else { 0 }),
|
||||||
|
Some(ValueType::UInt),
|
||||||
|
Some(if value_on { "1" } else { "0" }.to_string()),
|
||||||
|
),
|
||||||
|
_ => (
|
||||||
|
DataValue::Bool(value_on),
|
||||||
|
Some(ValueType::Bool),
|
||||||
|
Some(value_on.to_string()),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let monitor = PointMonitorInfo {
|
||||||
|
protocol: "simulation".to_string(),
|
||||||
|
source_id: Uuid::nil(),
|
||||||
|
point_id,
|
||||||
|
client_handle: 0,
|
||||||
|
scan_mode: plc_platform_core::model::ScanMode::Poll,
|
||||||
|
timestamp: Some(chrono::Utc::now()),
|
||||||
|
quality: PointQuality::Good,
|
||||||
|
value: Some(value),
|
||||||
|
value_type,
|
||||||
|
value_text,
|
||||||
|
old_value: None,
|
||||||
|
old_timestamp: None,
|
||||||
|
value_changed: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = state
|
||||||
|
.platform.connection_manager
|
||||||
|
.update_point_monitor_data(monitor.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!("[chaos] cache update failed for {}: {}", point_id, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = state
|
||||||
|
.platform.ws_manager
|
||||||
|
.send_to_public(WsMessage::PointNewValue(monitor))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Minimal XorShift64 PRNG (no external crate needed).
|
||||||
|
|
||||||
|
fn seed_rng() -> u64 {
|
||||||
|
std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_nanos() as u64 ^ d.as_secs().wrapping_mul(0x9e37_79b9_7f4a_7c15))
|
||||||
|
.unwrap_or(0xdeadbeef)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn xorshift(s: &mut u64) -> u64 {
|
||||||
|
*s ^= *s << 13;
|
||||||
|
*s ^= *s >> 7;
|
||||||
|
*s ^= *s << 17;
|
||||||
|
*s
|
||||||
|
}
|
||||||
|
|
@ -186,6 +186,15 @@ async fn send_equipment_command(
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ApiErr::Internal(e, None))?;
|
.map_err(|e| ApiErr::Internal(e, None))?;
|
||||||
|
|
||||||
|
if crate::control::simulate::enabled() {
|
||||||
|
crate::control::simulate::simulate_run_feedback(
|
||||||
|
&state,
|
||||||
|
equipment_id,
|
||||||
|
matches!(action, ControlAction::Start),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
let event = match action {
|
let event = match action {
|
||||||
ControlAction::Start => AppEvent::EquipmentStartCommandSent {
|
ControlAction::Start => AppEvent::EquipmentStartCommandSent {
|
||||||
equipment_id,
|
equipment_id,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue