//! Segment supervisor + per-segment task (design doc §5.1–§5.3). //! //! Supervisor scans enabled segments every 10 s and ensures each has a running //! task (mirrors the `app_feeder_distributor` supervisor). Each per-segment //! task drives the 9-state machine in §5.2 by re-reading config + interlocks //! every iteration and reacting to runtime change notifications. use std::collections::HashMap; use std::sync::Arc; use chrono::Utc; use plc_platform_core::telemetry::PointMonitorInfo; use plc_platform_core::websocket::{AppWsEvent, WsMessage}; use tokio::time::Duration; use uuid::Uuid; use crate::{ control::{ interlock::{self, InterlockContext}, runtime::{SegmentRuntime, SegmentRuntimeStore}, simulate, state::SegmentState, step_executor::{self, CommandPointIndex, DispatchInputs, DispatchOutcome}, }, event::AppEvent, model::{ProcessSegment, SegmentInterlock, SegmentResource, SegmentStep}, service::{segment as segment_service, station as station_service}, AppState, }; const APP_NAME: &str = "operation-system"; const SUPERVISOR_INTERVAL_SECS: u64 = 10; const FAULT_TICK_MS: u64 = 500; /// Start the engine supervisor. Mirrors the feeder entry point. pub fn start(state: AppState, store: Arc) { tokio::spawn(async move { supervise(state, store).await; }); } async fn supervise(state: AppState, store: Arc) { let mut tasks: HashMap> = HashMap::new(); let mut interval = tokio::time::interval(Duration::from_secs(SUPERVISOR_INTERVAL_SECS)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); tracing::info!("Operation-system engine supervisor started"); loop { interval.tick().await; match segment_service::list_segments(&state.platform.pool, None).await { Ok(segments) => { for segment in segments.into_iter().filter(|s| s.enabled) { let needs_spawn = tasks .get(&segment.id) .is_none_or(|handle| handle.is_finished()); if needs_spawn { let task_state = state.clone(); let task_store = store.clone(); let segment_id = segment.id; let handle = tokio::spawn(async move { segment_task(task_state, task_store, segment_id).await; }); tasks.insert(segment.id, handle); } } } Err(err) => tracing::error!("Engine supervisor: list_segments failed: {}", err), } } } async fn segment_task(state: AppState, store: Arc, segment_id: Uuid) { let notify = store.get_or_create_notify(segment_id).await; let mut fault_tick = tokio::time::interval(Duration::from_millis(FAULT_TICK_MS)); fault_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { // 1. Reload segment config; exit when disabled or removed. let segment = match segment_service::get_segment_by_id(&state.platform.pool, segment_id) .await { Ok(Some(s)) if s.enabled && s.mode != "disabled" => s, Ok(_) => { tracing::info!( "Engine: segment {} disabled or removed, task exiting", segment_id ); state.resource_registry.release_all_for(segment_id).await; return; } Err(err) => { tracing::error!("Engine: segment {} reload failed: {}", segment_id, err); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; // 2. Reload steps + interlocks + resource keys. let steps = match segment_service::list_steps(&state.platform.pool, segment_id).await { Ok(s) => s, Err(err) => { tracing::error!("Engine: segment {} steps reload failed: {}", segment_id, err); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; let interlocks = match segment_service::list_interlocks(&state.platform.pool, segment_id).await { Ok(v) => v, Err(err) => { tracing::error!( "Engine: segment {} interlocks reload failed: {}", segment_id, err ); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; let resources = match segment_service::list_resources(&state.platform.pool, segment_id).await { Ok(v) => v, Err(err) => { tracing::error!( "Engine: segment {} resources reload failed: {}", segment_id, err ); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; let cmd_index = match CommandPointIndex::for_steps(&state.platform.pool, &steps).await { Ok(idx) => idx, Err(err) => { tracing::error!( "Engine: segment {} command-point load failed: {}", segment_id, err ); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; let ctx = match InterlockContext::load_for_interlocks(&state.platform.pool, &interlocks) .await { Ok(c) => c, Err(err) => { tracing::error!( "Engine: segment {} interlock-context load failed: {}", segment_id, err ); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; // 3. Snapshot the monitor map for the rest of this tick. let monitor_guard = state .platform .connection_manager .get_point_monitor_data_read_guard() .await; let monitor: HashMap = monitor_guard.clone(); drop(monitor_guard); // 4. Apply one state-machine step. let runtime = store.get_or_init(segment_id).await; let next_runtime = tick( &state, &segment, &steps, &interlocks, &resources, &ctx, &cmd_index, &monitor, runtime, ) .await; let runtime_changed = match next_runtime { Some(updated) => { store.upsert(updated.clone()).await; push_runtime_change(&state, &updated).await; true } None => false, }; // 5. Decide how long to sleep based on next state. let snapshot = store.get_or_init(segment_id).await; if !runtime_changed && should_wait(&snapshot, segment.mode.as_str()) { tokio::select! { _ = fault_tick.tick() => {} _ = notify.notified() => {} } } } } #[allow(clippy::too_many_arguments)] async fn tick( state: &AppState, segment: &ProcessSegment, steps: &[SegmentStep], interlocks: &[SegmentInterlock], resources: &[SegmentResource], ctx: &InterlockContext, cmd_index: &CommandPointIndex, monitor: &HashMap, mut runtime: SegmentRuntime, ) -> Option { // Run-halt interlocks apply once we're past Checking. if matches!( runtime.state, SegmentState::Executing | SegmentState::Confirming | SegmentState::Resetting ) { let run_halt: Vec<&SegmentInterlock> = interlocks .iter() .filter(|i| i.applies_to == "run_halt") .collect(); if let Err(reason) = interlock::evaluate_all(&run_halt, ctx, monitor) { // Honor cancel_on_fault for the current step before locking out. if let Some(step_no) = runtime.current_step_no { if let Some(step) = steps.iter().find(|s| s.step_no == step_no) { if step.cancel_on_fault { if let Err(err) = step_executor::send_stop_command( step, &state.platform.connection_manager, cmd_index, monitor, ) .await { tracing::warn!( "Engine: segment {} run-halt stop for step {} failed: {}", segment.id, step_no, err ); } } } } let _ = state.event_manager.send(AppEvent::SegmentFaultLocked { segment_id: segment.id, message: reason.clone(), }); runtime.state = SegmentState::Faulted; runtime.fault_message = Some(reason); return Some(runtime); } } match runtime.state { SegmentState::Idle => { // Wait for auto activation or remote-manual notifications. if runtime.auto_enabled && segment.mode == "auto" { runtime.state = SegmentState::Checking; runtime.blocked_reason = None; return Some(runtime); } None } SegmentState::Checking => { // start_allow must all pass; start_deny rules being satisfied means we // should NOT start (per design doc §6.1, `start_deny` evaluates as a // "deny" condition — if its rule passes, start is denied). let start_allow: Vec<&SegmentInterlock> = interlocks .iter() .filter(|i| i.applies_to == "start_allow") .collect(); if let Err(reason) = interlock::evaluate_all(&start_allow, ctx, monitor) { let _ = state.event_manager.send(AppEvent::SegmentBlocked { segment_id: segment.id, reason: reason.clone(), }); runtime.state = SegmentState::Blocked; runtime.blocked_reason = Some(reason); return Some(runtime); } for rule in interlocks.iter().filter(|i| i.applies_to == "start_deny") { if interlock::evaluate(rule, ctx, monitor).is_ok() { let reason = format!( "start denied by rule {} ({})", rule.id, rule.rule_kind ); let _ = state.event_manager.send(AppEvent::SegmentBlocked { segment_id: segment.id, reason: reason.clone(), }); runtime.state = SegmentState::Blocked; runtime.blocked_reason = Some(reason); return Some(runtime); } } // Acquire declared resources. let mut acquired: Vec = Vec::new(); for res in resources { let ok = state .resource_registry .try_acquire(&res.resource_key, segment.id) .await; if !ok { for key in &acquired { state.resource_registry.release(key, segment.id).await; } let _ = state.event_manager.send(AppEvent::AlarmResourceBusy { segment_id: segment.id, resource_key: res.resource_key.clone(), }); runtime.state = SegmentState::Blocked; runtime.blocked_reason = Some(format!("resource_busy: {}", res.resource_key)); return Some(runtime); } acquired.push(res.resource_key.clone()); } let Some(first_step) = steps.iter().min_by_key(|s| s.step_no) else { runtime.state = SegmentState::Faulted; runtime.fault_message = Some("segment has no steps".to_string()); return Some(runtime); }; runtime.held_resources = acquired; runtime.current_step_no = Some(first_step.step_no); runtime.step_started_at = Some(Utc::now()); runtime.blocked_reason = None; runtime.state = SegmentState::Executing; Some(runtime) } SegmentState::Executing => { let Some(step_no) = runtime.current_step_no else { runtime.state = SegmentState::Faulted; runtime.fault_message = Some("Executing without current_step_no".to_string()); return Some(runtime); }; let Some(step) = steps.iter().find(|s| s.step_no == step_no) else { runtime.state = SegmentState::Faulted; runtime.fault_message = Some(format!("step {} not found", step_no)); return Some(runtime); }; // Resolve transfer_move_to inputs ahead of dispatch. let station_code = if step.action_kind == "transfer_move_to" { match step.target_station_id { Some(id) => match station_service::get_station_by_id(&state.platform.pool, id) .await { Ok(Some(s)) => Some(s.code), Ok(None) | Err(_) => None, }, None => None, } } else { None }; let inputs = DispatchInputs { target_station_code: station_code.as_deref(), }; let outcome = step_executor::dispatch( step, &state.platform.connection_manager, cmd_index, monitor, &inputs, ) .await; match outcome { DispatchOutcome::Issued => { runtime.state = SegmentState::Confirming; runtime.step_started_at = Some(Utc::now()); let _ = state.event_manager.send(AppEvent::SegmentStepAdvanced { segment_id: segment.id, step_no, }); // SIMULATE_PLC: schedule the confirm signal to arrive so the // engine can drive the segment end-to-end without a PLC. if simulate::enabled() { if let Some((pid, invert, expected)) = resolve_confirm_point(step, ctx) { let logical_value = expected ^ invert; simulate::schedule_confirm(state.clone(), pid, logical_value, 200); } } Some(runtime) } DispatchOutcome::Misconfigured(msg) | DispatchOutcome::WriteError(msg) => { if step.cancel_on_fault { if let Err(err) = step_executor::send_stop_command( step, &state.platform.connection_manager, cmd_index, monitor, ) .await { tracing::warn!( "Engine: segment {} stop on fault for step {} failed: {}", segment.id, step_no, err ); } } let _ = state.event_manager.send(AppEvent::SegmentFaultLocked { segment_id: segment.id, message: msg.clone(), }); runtime.state = SegmentState::Faulted; runtime.fault_message = Some(msg); Some(runtime) } } } SegmentState::Confirming => { let Some(step_no) = runtime.current_step_no else { runtime.state = SegmentState::Faulted; runtime.fault_message = Some("Confirming without current_step_no".to_string()); return Some(runtime); }; let Some(step) = steps.iter().find(|s| s.step_no == step_no) else { runtime.state = SegmentState::Faulted; runtime.fault_message = Some(format!("step {} not found", step_no)); return Some(runtime); }; let confirm = resolve_confirm_point(step, ctx); let confirmed = match confirm { Some((pid, invert, expected)) => check_confirm(monitor, pid, invert, expected), None => { // No confirm signal configured — treat the step as instantly done. Some(true) } }; if confirmed == Some(true) { if step.hold_until_confirm { if let Err(err) = step_executor::send_stop_command( step, &state.platform.connection_manager, cmd_index, monitor, ) .await { tracing::warn!( "Engine: segment {} stop command for step {} failed: {}", segment.id, step_no, err ); } } let next_step = step .next_step_no_on_success .or_else(|| next_sequential(steps, step_no)); match next_step { Some(next_no) => { runtime.state = SegmentState::Executing; runtime.current_step_no = Some(next_no); runtime.step_started_at = Some(Utc::now()); } None => { runtime.state = SegmentState::Completed; } } return Some(runtime); } // Not yet confirmed: check timeout. if let Some(started) = runtime.step_started_at { let elapsed_ms = Utc::now() .signed_duration_since(started) .num_milliseconds(); if elapsed_ms >= step.timeout_ms as i64 { let _ = state.event_manager.send(AppEvent::AlarmActionTimeout { segment_id: segment.id, step_no, }); match step.on_timeout.as_str() { "retry" => { runtime.state = SegmentState::Executing; runtime.step_started_at = Some(Utc::now()); } "block" => { runtime.state = SegmentState::Blocked; runtime.blocked_reason = Some(format!("step {} timeout", step_no)); } _ => { // "fault" or unknown if step.cancel_on_fault { if let Err(err) = step_executor::send_stop_command( step, &state.platform.connection_manager, cmd_index, monitor, ) .await { tracing::warn!( "Engine: segment {} timeout stop for step {} failed: {}", segment.id, step_no, err ); } } runtime.state = SegmentState::Faulted; runtime.fault_message = Some(format!("step {} timeout", step_no)); } } return Some(runtime); } } // Still waiting — no state change. None } SegmentState::Resetting => { // First-pass reset is a no-op; configurations that need a reset step // should encode it as a normal step. Drop back to Idle. runtime.state = SegmentState::Idle; Some(runtime) } SegmentState::Completed => { state.resource_registry.release_all_for(segment.id).await; runtime.held_resources.clear(); runtime.last_completed_at = Some(Utc::now()); runtime.current_step_no = None; let _ = state .event_manager .send(AppEvent::SegmentCompleted { segment_id: segment.id }); runtime.state = SegmentState::Idle; Some(runtime) } SegmentState::Blocked => { // Periodically re-check whether the block has cleared. if runtime.auto_enabled && segment.mode == "auto" { let start_allow: Vec<&SegmentInterlock> = interlocks .iter() .filter(|i| i.applies_to == "start_allow") .collect(); let any_deny = interlocks .iter() .filter(|i| i.applies_to == "start_deny") .any(|rule| interlock::evaluate(rule, ctx, monitor).is_ok()); if interlock::evaluate_all(&start_allow, ctx, monitor).is_ok() && !any_deny { runtime.state = SegmentState::Checking; runtime.blocked_reason = None; return Some(runtime); } } None } SegmentState::Faulted => { // Release any held resources on fault entry; first-pass keeps it simple. state.resource_registry.release_all_for(segment.id).await; runtime.held_resources.clear(); if segment.require_manual_ack_after_fault { runtime.manual_ack_required = true; runtime.state = SegmentState::ManualAckRequired; return Some(runtime); } // Otherwise we leave it Faulted; ack-fault API may flip it back to Idle. None } SegmentState::ManualAckRequired => { // ack-fault API will flip manual_ack_required=false + state=Idle and notify. None } } } fn next_sequential(steps: &[SegmentStep], current: i32) -> Option { steps .iter() .filter(|s| s.step_no > current) .map(|s| s.step_no) .min() } /// Returns `(point_id, invert, expected_value)` if a confirm signal is configured. fn resolve_confirm_point( step: &SegmentStep, ctx: &InterlockContext, ) -> Option<(Uuid, bool, bool)> { if let Some(point_id) = step.confirm_point_id { return Some((point_id, false, step.expected_value)); } let role = step.confirm_signal_role.as_deref()?; let station_id = step.target_station_id?; let (pid, invert) = ctx .station_role_points .get(&station_id) .and_then(|m| m.get(role)) .copied()?; Some((pid, invert, step.expected_value)) } fn check_confirm( monitor: &HashMap, point_id: Uuid, invert: bool, expected: bool, ) -> Option { let m = monitor.get(&point_id)?; if m.quality != plc_platform_core::telemetry::PointQuality::Good { return None; } let raw = super::monitor_value_as_bool(m); let logical = raw ^ invert; Some(logical == expected) } fn should_wait(runtime: &SegmentRuntime, mode: &str) -> bool { match runtime.state { SegmentState::Idle => !runtime.auto_enabled || mode != "auto", SegmentState::Confirming => true, SegmentState::Blocked | SegmentState::Faulted | SegmentState::ManualAckRequired => true, _ => false, } } async fn push_runtime_change(state: &AppState, runtime: &SegmentRuntime) { let payload = match serde_json::to_value(runtime) { Ok(v) => v, Err(err) => { tracing::warn!("Engine: serialize SegmentRuntime failed: {}", err); return; } }; let message = WsMessage::AppEvent(AppWsEvent { app: APP_NAME.to_string(), event_type: "segment_runtime_changed".to_string(), data: payload, }); if let Err(err) = state.platform.ws_manager.send_to_public(message).await { tracing::debug!("Engine: WS push skipped: {}", err); } }