diff --git a/crates/app_operation_system/src/control/engine.rs b/crates/app_operation_system/src/control/engine.rs index 715792e..846c6b9 100644 --- a/crates/app_operation_system/src/control/engine.rs +++ b/crates/app_operation_system/src/control/engine.rs @@ -18,12 +18,13 @@ use crate::{ control::{ interlock::{self, InterlockContext}, runtime::{SegmentRuntime, SegmentRuntimeStore}, + simulate, state::SegmentState, - step_executor::{self, CommandPointIndex, DispatchOutcome}, + step_executor::{self, CommandPointIndex, DispatchInputs, DispatchOutcome}, }, event::AppEvent, model::{ProcessSegment, SegmentInterlock, SegmentResource, SegmentStep}, - service::segment as segment_service, + service::{segment as segment_service, station as station_service}, AppState, }; @@ -222,6 +223,28 @@ async fn tick( .filter(|i| i.applies_to == "run_halt") .collect(); if let Err(reason) = interlock::evaluate_all(&run_halt, ctx, monitor) { + // Honor cancel_on_fault for the current step before locking out. + if let Some(step_no) = runtime.current_step_no { + if let Some(step) = steps.iter().find(|s| s.step_no == step_no) { + if step.cancel_on_fault { + if let Err(err) = step_executor::send_stop_command( + step, + &state.platform.connection_manager, + cmd_index, + monitor, + ) + .await + { + tracing::warn!( + "Engine: segment {} run-halt stop for step {} failed: {}", + segment.id, + step_no, + err + ); + } + } + } + } let _ = state.event_manager.send(AppEvent::SegmentFaultLocked { segment_id: segment.id, message: reason.clone(), @@ -321,11 +344,31 @@ async fn tick( runtime.fault_message = Some(format!("step {} not found", step_no)); return Some(runtime); }; + + // Resolve transfer_move_to inputs ahead of dispatch. + let station_code = if step.action_kind == "transfer_move_to" { + match step.target_station_id { + Some(id) => match station_service::get_station_by_id(&state.platform.pool, id) + .await + { + Ok(Some(s)) => Some(s.code), + Ok(None) | Err(_) => None, + }, + None => None, + } + } else { + None + }; + let inputs = DispatchInputs { + target_station_code: station_code.as_deref(), + }; + let outcome = step_executor::dispatch( step, &state.platform.connection_manager, cmd_index, monitor, + &inputs, ) .await; match outcome { @@ -336,9 +379,34 @@ async fn tick( segment_id: segment.id, step_no, }); + // SIMULATE_PLC: schedule the confirm signal to arrive so the + // engine can drive the segment end-to-end without a PLC. + if simulate::enabled() { + if let Some((pid, invert, expected)) = resolve_confirm_point(step, ctx) { + let logical_value = expected ^ invert; + simulate::schedule_confirm(state.clone(), pid, logical_value, 200); + } + } Some(runtime) } DispatchOutcome::Misconfigured(msg) | DispatchOutcome::WriteError(msg) => { + if step.cancel_on_fault { + if let Err(err) = step_executor::send_stop_command( + step, + &state.platform.connection_manager, + cmd_index, + monitor, + ) + .await + { + tracing::warn!( + "Engine: segment {} stop on fault for step {} failed: {}", + segment.id, + step_no, + err + ); + } + } let _ = state.event_manager.send(AppEvent::SegmentFaultLocked { segment_id: segment.id, message: msg.clone(), @@ -426,6 +494,23 @@ async fn tick( } _ => { // "fault" or unknown + if step.cancel_on_fault { + if let Err(err) = step_executor::send_stop_command( + step, + &state.platform.connection_manager, + cmd_index, + monitor, + ) + .await + { + tracing::warn!( + "Engine: segment {} timeout stop for step {} failed: {}", + segment.id, + step_no, + err + ); + } + } runtime.state = SegmentState::Faulted; runtime.fault_message = Some(format!("step {} timeout", step_no)); } diff --git a/crates/app_operation_system/src/control/mod.rs b/crates/app_operation_system/src/control/mod.rs index 8ba451a..ac9dc99 100644 --- a/crates/app_operation_system/src/control/mod.rs +++ b/crates/app_operation_system/src/control/mod.rs @@ -4,6 +4,7 @@ pub mod engine; pub mod interlock; pub mod resource; pub mod runtime; +pub mod simulate; pub mod state; pub mod step_executor; diff --git a/crates/app_operation_system/src/control/simulate.rs b/crates/app_operation_system/src/control/simulate.rs new file mode 100644 index 0000000..1f2f7cc --- /dev/null +++ b/crates/app_operation_system/src/control/simulate.rs @@ -0,0 +1,153 @@ +//! Dev-time signal injection so segments can be driven end-to-end without a real PLC. +//! +//! Activated via `SIMULATE_PLC=true|1` (matches the feeder convention). When +//! enabled, the engine schedules a `patch_signal` after dispatching each step's +//! command so the confirm signal arrives at `expected_value` after a short +//! delay, advancing the state machine. +//! +//! When OPC UA writes succeed they propagate normally. The fallback updates the +//! monitor cache directly and broadcasts `WsMessage::PointNewValue`, so the +//! engine + frontend see the same change. + +use std::time::Duration; + +use chrono::Utc; +use plc_platform_core::{ + connection::{BatchSetPointValueReq, SetPointValueReqItem}, + telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType}, + websocket::WsMessage, +}; +use uuid::Uuid; + +use crate::AppState; + +pub fn enabled() -> bool { + matches!( + std::env::var("SIMULATE_PLC").ok().as_deref(), + Some("true") | Some("1") + ) +} + +/// Spawn a background task that, after `delay_ms`, patches `confirm_point_id` +/// to `expected_value`. No-op if simulate is disabled. +pub fn schedule_confirm( + state: AppState, + confirm_point_id: Uuid, + expected_value: bool, + delay_ms: u64, +) { + if !enabled() { + return; + } + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + patch_signal(&state, confirm_point_id, expected_value).await; + }); +} + +/// Patch a point: prefer OPC UA write, fall back to direct cache update + WS push. +pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { + let write_json = serde_json::json!(if value_on { 1 } else { 0 }); + let write_ok = match state + .platform + .connection_manager + .write_point_values_batch(BatchSetPointValueReq { + items: vec![SetPointValueReqItem { + point_id, + value: write_json, + }], + }) + .await + { + Ok(res) => res.success, + Err(_) => false, + }; + if write_ok { + return; + } + + let (value, value_type, value_text) = { + let guard = state + .platform + .connection_manager + .get_point_monitor_data_read_guard() + .await; + match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) { + Some(ValueType::Int) => ( + DataValue::Int(if value_on { 1 } else { 0 }), + Some(ValueType::Int), + Some(if value_on { "1" } else { "0" }.to_string()), + ), + Some(ValueType::UInt) => ( + DataValue::UInt(if value_on { 1 } else { 0 }), + Some(ValueType::UInt), + Some(if value_on { "1" } else { "0" }.to_string()), + ), + _ => ( + DataValue::Bool(value_on), + Some(ValueType::Bool), + Some(value_on.to_string()), + ), + } + }; + + let monitor = PointMonitorInfo { + protocol: "simulation".to_string(), + source_id: Uuid::nil(), + point_id, + client_handle: 0, + scan_mode: plc_platform_core::model::ScanMode::Poll, + timestamp: Some(Utc::now()), + quality: PointQuality::Good, + value: Some(value), + value_type, + value_text, + old_value: None, + old_timestamp: None, + value_changed: true, + }; + + if let Err(err) = state + .platform + .connection_manager + .update_point_monitor_data(monitor.clone()) + .await + { + tracing::warn!("[ops-sim] cache update failed for {}: {}", point_id, err); + return; + } + let _ = state + .platform + .ws_manager + .send_to_public(WsMessage::PointNewValue(monitor)) + .await; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn enabled_responds_to_env_flag() { + // Snapshot whatever the parent process set, restore at the end so the + // env touch doesn't leak between tests. + let prev = std::env::var("SIMULATE_PLC").ok(); + + std::env::remove_var("SIMULATE_PLC"); + assert!(!enabled()); + + std::env::set_var("SIMULATE_PLC", "1"); + assert!(enabled()); + + std::env::set_var("SIMULATE_PLC", "true"); + assert!(enabled()); + + std::env::set_var("SIMULATE_PLC", "no"); + assert!(!enabled()); + + match prev { + Some(v) => std::env::set_var("SIMULATE_PLC", v), + None => std::env::remove_var("SIMULATE_PLC"), + } + } +} diff --git a/crates/app_operation_system/src/control/step_executor.rs b/crates/app_operation_system/src/control/step_executor.rs index 0f7cc9d..57ec6ce 100644 --- a/crates/app_operation_system/src/control/step_executor.rs +++ b/crates/app_operation_system/src/control/step_executor.rs @@ -1,20 +1,29 @@ //! Step executor (design doc §5.4). //! -//! Resolves a `segment_step.action_kind` to a concrete write on a command point -//! using `plc_platform_core::control::command::send_pulse_command`. Confirmation -//! is handled by the engine's `Confirming` state; this module only sends the -//! initial command. +//! Resolves a `segment_step.action_kind` to a concrete write on a command point. +//! Three dispatch modes: +//! +//! - Pulse (default): write high → wait `pulse_ms` → write low. Matches short +//! commands such as `open_door` / `robot_permit`. +//! - Hold (`step.hold_until_confirm = true`): write high once and leave it +//! asserted; engine emits the configured `stop_command_role` once the confirm +//! signal arrives or the step transitions to fault. +//! - Value (action `transfer_move_to`): write the target station's `code` to the +//! move-command point so the field translates the target position itself. +//! +//! Confirmation reads still live in the engine's `Confirming` state. use std::collections::HashMap; +use std::sync::Arc; use plc_platform_core::{ - connection::ConnectionManager, + connection::{BatchSetPointValueReq, ConnectionManager, SetPointValueReqItem}, control::command::send_pulse_command, service::EquipmentSignalRole, - telemetry::PointMonitorInfo, + telemetry::{PointMonitorInfo, ValueType}, }; +use serde_json::json; use sqlx::PgPool; -use std::sync::Arc; use uuid::Uuid; use crate::model::SegmentStep; @@ -46,6 +55,13 @@ impl CommandPointIndex { } } +/// Optional inputs the engine resolves ahead of dispatch. +#[derive(Default)] +pub struct DispatchInputs<'a> { + /// Target station's `code`, used by `transfer_move_to` as the value to write. + pub target_station_code: Option<&'a str>, +} + /// Outcome of dispatching a step's command. pub enum DispatchOutcome { /// A command was issued (or skipped because the action is wait-only). @@ -58,15 +74,13 @@ pub enum DispatchOutcome { WriteError(String), } -/// Dispatch `step.action_kind`. For first-pass the executor supports both pulse -/// commands and "hold until confirm" commands — pulse is the default unless -/// `step.hold_until_confirm` is true, in which case we send a single high value -/// and let the engine emit the stop command after confirmation. +/// Dispatch `step.action_kind`. See module docs for the three dispatch modes. pub async fn dispatch( step: &SegmentStep, connection: &Arc, command_points: &CommandPointIndex, monitor: &HashMap, + inputs: &DispatchInputs<'_>, ) -> DispatchOutcome { if step.action_kind == "wait_signal" { return DispatchOutcome::Issued; @@ -105,9 +119,30 @@ pub async fn dispatch( } }; - let value_type = monitor.get(&point_id).and_then(|m| m.value_type.clone()); - let pulse_ms = step.pulse_ms.unwrap_or(default_pulse_ms(&step.action_kind)) as u64; + if step.action_kind == "transfer_move_to" { + let Some(code) = inputs.target_station_code else { + return DispatchOutcome::Misconfigured(format!( + "step {} transfer_move_to missing target_station_id", + step.step_no + )); + }; + let value_type = monitor.get(&point_id).and_then(|m| m.value_type.clone()); + return match write_station_target(connection, point_id, value_type.as_ref(), code).await { + Ok(()) => DispatchOutcome::Issued, + Err(err) => DispatchOutcome::WriteError(err), + }; + } + let value_type = monitor.get(&point_id).and_then(|m| m.value_type.clone()); + + if step.hold_until_confirm { + return match write_high(connection, point_id, value_type.as_ref()).await { + Ok(()) => DispatchOutcome::Issued, + Err(err) => DispatchOutcome::WriteError(err), + }; + } + + let pulse_ms = step.pulse_ms.unwrap_or(default_pulse_ms(&step.action_kind)) as u64; if let Err(err) = send_pulse_command(connection, point_id, value_type.as_ref(), pulse_ms).await { return DispatchOutcome::WriteError(err); @@ -116,8 +151,8 @@ pub async fn dispatch( DispatchOutcome::Issued } -/// Send the configured stop command (used when `hold_until_confirm` is true or -/// on fault cleanup). No-op if no stop role is configured. +/// Send the configured stop command. Used after `hold_until_confirm` steps and +/// on `cancel_on_fault` cleanup. No-op when no stop role is configured. pub async fn send_stop_command( step: &SegmentStep, connection: &Arc, @@ -141,6 +176,60 @@ pub async fn send_stop_command( send_pulse_command(connection, point_id, value_type.as_ref(), 300).await } +/// Write `1` (or `true`) to a command point exactly once. +async fn write_high( + connection: &Arc, + point_id: Uuid, + value_type: Option<&ValueType>, +) -> Result<(), String> { + let value = match value_type { + Some(ValueType::Bool) => json!(true), + _ => json!(1), + }; + let res = connection + .write_point_values_batch(BatchSetPointValueReq { + items: vec![SetPointValueReqItem { point_id, value }], + }) + .await?; + if res.success { + Ok(()) + } else { + Err(format!("hold write failed: {:?}", res.err_msg)) + } +} + +/// Write the target station code as the command value. +async fn write_station_target( + connection: &Arc, + point_id: Uuid, + value_type: Option<&ValueType>, + code: &str, +) -> Result<(), String> { + // Treat numeric station codes as integer writes when the command point is + // an int/uint; otherwise fall through to a text write. + let value = match value_type { + Some(ValueType::Int) | Some(ValueType::UInt) => code + .parse::() + .map(|n| json!(n)) + .unwrap_or_else(|_| json!(code)), + Some(ValueType::Float) => code + .parse::() + .map(|n| json!(n)) + .unwrap_or_else(|_| json!(code)), + _ => json!(code), + }; + let res = connection + .write_point_values_batch(BatchSetPointValueReq { + items: vec![SetPointValueReqItem { point_id, value }], + }) + .await?; + if res.success { + Ok(()) + } else { + Err(format!("transfer_move_to write failed: {:?}", res.err_msg)) + } +} + /// Default command-role mapping per design doc §4.2.4 table. fn default_command_role(action_kind: &str) -> Option<&'static str> { match action_kind { @@ -223,17 +312,64 @@ mod tests { #[test] fn wait_signal_step_is_dispatched_without_command_role() { - // wait_signal returns Issued even if no command_role / equipment configured. let step = make_step("wait_signal", None, None); - // Build a sync runtime to drive the async call. let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let connection = Arc::new(ConnectionManager::new()); rt.block_on(async { - let outcome = dispatch(&step, &connection, &CommandPointIndex::default(), &HashMap::new()).await; + let outcome = dispatch( + &step, + &connection, + &CommandPointIndex::default(), + &HashMap::new(), + &DispatchInputs::default(), + ) + .await; assert!(matches!(outcome, DispatchOutcome::Issued)); }); } + + #[test] + fn transfer_move_to_without_station_code_is_misconfigured() { + let step = make_step("transfer_move_to", Some(Uuid::new_v4()), Some("move_cmd")); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let connection = Arc::new(ConnectionManager::new()); + rt.block_on(async { + let outcome = dispatch( + &step, + &connection, + &CommandPointIndex::default(), + &HashMap::new(), + &DispatchInputs::default(), + ) + .await; + assert!(matches!(outcome, DispatchOutcome::Misconfigured(_))); + }); + } + + #[test] + fn misconfigured_when_command_role_missing_default() { + let step = make_step("pulse_cmd", Some(Uuid::new_v4()), None); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let connection = Arc::new(ConnectionManager::new()); + rt.block_on(async { + let outcome = dispatch( + &step, + &connection, + &CommandPointIndex::default(), + &HashMap::new(), + &DispatchInputs::default(), + ) + .await; + assert!(matches!(outcome, DispatchOutcome::Misconfigured(_))); + }); + } }