From 63683a24c88559e2cf9752fd8710bd6a5be7e89f Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 19 May 2026 08:39:14 +0800 Subject: [PATCH] Implement operation-system engine MVP (P3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the segment supervisor + per-segment state machine driving Idle → Checking → Executing → Confirming → Completed (plus Blocked / Faulted / ManualAckRequired), interlock evaluator, action-kind step executor, control + runtime HTTP handlers, and WebSocket runtime push via AppEvent. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/control/engine.rs | 556 +++++++++++++++++- .../src/control/interlock.rs | 387 ++++++++++++ .../app_operation_system/src/control/mod.rs | 25 + .../src/control/step_executor.rs | 239 ++++++++ crates/app_operation_system/src/handler.rs | 2 + .../src/handler/control.rs | 210 +++++++ .../src/handler/runtime.rs | 94 +++ crates/app_operation_system/src/router.rs | 42 ++ .../tests/router_smoke.rs | 35 ++ docs/api-ops.md | 58 +- 10 files changed, 1637 insertions(+), 11 deletions(-) create mode 100644 crates/app_operation_system/src/control/interlock.rs create mode 100644 crates/app_operation_system/src/control/step_executor.rs create mode 100644 crates/app_operation_system/src/handler/control.rs create mode 100644 crates/app_operation_system/src/handler/runtime.rs diff --git a/crates/app_operation_system/src/control/engine.rs b/crates/app_operation_system/src/control/engine.rs index ef405e1..715792e 100644 --- a/crates/app_operation_system/src/control/engine.rs +++ b/crates/app_operation_system/src/control/engine.rs @@ -1,27 +1,563 @@ +//! Segment supervisor + per-segment task (design doc §5.1–§5.3). +//! +//! Supervisor scans enabled segments every 10 s and ensures each has a running +//! task (mirrors the `app_feeder_distributor` supervisor). Each per-segment +//! task drives the 9-state machine in §5.2 by re-reading config + interlocks +//! every iteration and reacting to runtime change notifications. + +use std::collections::HashMap; use std::sync::Arc; +use chrono::Utc; +use plc_platform_core::telemetry::PointMonitorInfo; +use plc_platform_core::websocket::{AppWsEvent, WsMessage}; use tokio::time::Duration; +use uuid::Uuid; -use crate::{control::runtime::SegmentRuntimeStore, AppState}; +use crate::{ + control::{ + interlock::{self, InterlockContext}, + runtime::{SegmentRuntime, SegmentRuntimeStore}, + state::SegmentState, + step_executor::{self, CommandPointIndex, DispatchOutcome}, + }, + event::AppEvent, + model::{ProcessSegment, SegmentInterlock, SegmentResource, SegmentStep}, + service::segment as segment_service, + AppState, +}; -/// Start the segment engine supervisor. -/// -/// Skeleton only: at P0 there are no segment tables yet (P1 lands the schema), -/// so the supervisor logs and idles. P3 will replace this with a per-segment -/// task spawner that mirrors feeder's per-unit task model. +const APP_NAME: &str = "operation-system"; +const SUPERVISOR_INTERVAL_SECS: u64 = 10; +const FAULT_TICK_MS: u64 = 500; + +/// Start the engine supervisor. Mirrors the feeder entry point. pub fn start(state: AppState, store: Arc) { tokio::spawn(async move { supervise(state, store).await; }); } -async fn supervise(_state: AppState, _store: Arc) { - let mut interval = tokio::time::interval(Duration::from_secs(10)); +async fn supervise(state: AppState, store: Arc) { + let mut tasks: HashMap> = HashMap::new(); + let mut interval = tokio::time::interval(Duration::from_secs(SUPERVISOR_INTERVAL_SECS)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - tracing::info!("Operation-system engine supervisor started (skeleton; awaiting P1 schema)"); + tracing::info!("Operation-system engine supervisor started"); loop { interval.tick().await; - // Segment supervision will live here once P1 lands process_segment. + match segment_service::list_segments(&state.platform.pool, None).await { + Ok(segments) => { + for segment in segments.into_iter().filter(|s| s.enabled) { + let needs_spawn = tasks + .get(&segment.id) + .is_none_or(|handle| handle.is_finished()); + if needs_spawn { + let task_state = state.clone(); + let task_store = store.clone(); + let segment_id = segment.id; + let handle = tokio::spawn(async move { + segment_task(task_state, task_store, segment_id).await; + }); + tasks.insert(segment.id, handle); + } + } + } + Err(err) => tracing::error!("Engine supervisor: list_segments failed: {}", err), + } } } + +async fn segment_task(state: AppState, store: Arc, segment_id: Uuid) { + let notify = store.get_or_create_notify(segment_id).await; + let mut fault_tick = tokio::time::interval(Duration::from_millis(FAULT_TICK_MS)); + fault_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + // 1. Reload segment config; exit when disabled or removed. + let segment = match segment_service::get_segment_by_id(&state.platform.pool, segment_id) + .await + { + Ok(Some(s)) if s.enabled && s.mode != "disabled" => s, + Ok(_) => { + tracing::info!( + "Engine: segment {} disabled or removed, task exiting", + segment_id + ); + state.resource_registry.release_all_for(segment_id).await; + return; + } + Err(err) => { + tracing::error!("Engine: segment {} reload failed: {}", segment_id, err); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + + // 2. Reload steps + interlocks + resource keys. + let steps = match segment_service::list_steps(&state.platform.pool, segment_id).await { + Ok(s) => s, + Err(err) => { + tracing::error!("Engine: segment {} steps reload failed: {}", segment_id, err); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + let interlocks = + match segment_service::list_interlocks(&state.platform.pool, segment_id).await { + Ok(v) => v, + Err(err) => { + tracing::error!( + "Engine: segment {} interlocks reload failed: {}", + segment_id, + err + ); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + let resources = + match segment_service::list_resources(&state.platform.pool, segment_id).await { + Ok(v) => v, + Err(err) => { + tracing::error!( + "Engine: segment {} resources reload failed: {}", + segment_id, + err + ); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + + let cmd_index = match CommandPointIndex::for_steps(&state.platform.pool, &steps).await { + Ok(idx) => idx, + Err(err) => { + tracing::error!( + "Engine: segment {} command-point load failed: {}", + segment_id, + err + ); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + let ctx = match InterlockContext::load_for_interlocks(&state.platform.pool, &interlocks) + .await + { + Ok(c) => c, + Err(err) => { + tracing::error!( + "Engine: segment {} interlock-context load failed: {}", + segment_id, + err + ); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + + // 3. Snapshot the monitor map for the rest of this tick. + let monitor_guard = state + .platform + .connection_manager + .get_point_monitor_data_read_guard() + .await; + let monitor: HashMap = monitor_guard.clone(); + drop(monitor_guard); + + // 4. Apply one state-machine step. + let runtime = store.get_or_init(segment_id).await; + let next_runtime = tick( + &state, + &segment, + &steps, + &interlocks, + &resources, + &ctx, + &cmd_index, + &monitor, + runtime, + ) + .await; + let runtime_changed = match next_runtime { + Some(updated) => { + store.upsert(updated.clone()).await; + push_runtime_change(&state, &updated).await; + true + } + None => false, + }; + + // 5. Decide how long to sleep based on next state. + let snapshot = store.get_or_init(segment_id).await; + if !runtime_changed && should_wait(&snapshot, segment.mode.as_str()) { + tokio::select! { + _ = fault_tick.tick() => {} + _ = notify.notified() => {} + } + } + } +} + +#[allow(clippy::too_many_arguments)] +async fn tick( + state: &AppState, + segment: &ProcessSegment, + steps: &[SegmentStep], + interlocks: &[SegmentInterlock], + resources: &[SegmentResource], + ctx: &InterlockContext, + cmd_index: &CommandPointIndex, + monitor: &HashMap, + mut runtime: SegmentRuntime, +) -> Option { + // Run-halt interlocks apply once we're past Checking. + if matches!( + runtime.state, + SegmentState::Executing | SegmentState::Confirming | SegmentState::Resetting + ) { + let run_halt: Vec<&SegmentInterlock> = interlocks + .iter() + .filter(|i| i.applies_to == "run_halt") + .collect(); + if let Err(reason) = interlock::evaluate_all(&run_halt, ctx, monitor) { + let _ = state.event_manager.send(AppEvent::SegmentFaultLocked { + segment_id: segment.id, + message: reason.clone(), + }); + runtime.state = SegmentState::Faulted; + runtime.fault_message = Some(reason); + return Some(runtime); + } + } + + match runtime.state { + SegmentState::Idle => { + // Wait for auto activation or remote-manual notifications. + if runtime.auto_enabled && segment.mode == "auto" { + runtime.state = SegmentState::Checking; + runtime.blocked_reason = None; + return Some(runtime); + } + None + } + SegmentState::Checking => { + // start_allow must all pass; start_deny rules being satisfied means we + // should NOT start (per design doc §6.1, `start_deny` evaluates as a + // "deny" condition — if its rule passes, start is denied). + let start_allow: Vec<&SegmentInterlock> = interlocks + .iter() + .filter(|i| i.applies_to == "start_allow") + .collect(); + if let Err(reason) = interlock::evaluate_all(&start_allow, ctx, monitor) { + let _ = state.event_manager.send(AppEvent::SegmentBlocked { + segment_id: segment.id, + reason: reason.clone(), + }); + runtime.state = SegmentState::Blocked; + runtime.blocked_reason = Some(reason); + return Some(runtime); + } + for rule in interlocks.iter().filter(|i| i.applies_to == "start_deny") { + if interlock::evaluate(rule, ctx, monitor).is_ok() { + let reason = format!( + "start denied by rule {} ({})", + rule.id, rule.rule_kind + ); + let _ = state.event_manager.send(AppEvent::SegmentBlocked { + segment_id: segment.id, + reason: reason.clone(), + }); + runtime.state = SegmentState::Blocked; + runtime.blocked_reason = Some(reason); + return Some(runtime); + } + } + // Acquire declared resources. + let mut acquired: Vec = Vec::new(); + for res in resources { + let ok = state + .resource_registry + .try_acquire(&res.resource_key, segment.id) + .await; + if !ok { + for key in &acquired { + state.resource_registry.release(key, segment.id).await; + } + let _ = state.event_manager.send(AppEvent::AlarmResourceBusy { + segment_id: segment.id, + resource_key: res.resource_key.clone(), + }); + runtime.state = SegmentState::Blocked; + runtime.blocked_reason = + Some(format!("resource_busy: {}", res.resource_key)); + return Some(runtime); + } + acquired.push(res.resource_key.clone()); + } + + let Some(first_step) = steps.iter().min_by_key(|s| s.step_no) else { + runtime.state = SegmentState::Faulted; + runtime.fault_message = Some("segment has no steps".to_string()); + return Some(runtime); + }; + + runtime.held_resources = acquired; + runtime.current_step_no = Some(first_step.step_no); + runtime.step_started_at = Some(Utc::now()); + runtime.blocked_reason = None; + runtime.state = SegmentState::Executing; + Some(runtime) + } + SegmentState::Executing => { + let Some(step_no) = runtime.current_step_no else { + runtime.state = SegmentState::Faulted; + runtime.fault_message = Some("Executing without current_step_no".to_string()); + return Some(runtime); + }; + let Some(step) = steps.iter().find(|s| s.step_no == step_no) else { + runtime.state = SegmentState::Faulted; + runtime.fault_message = Some(format!("step {} not found", step_no)); + return Some(runtime); + }; + let outcome = step_executor::dispatch( + step, + &state.platform.connection_manager, + cmd_index, + monitor, + ) + .await; + match outcome { + DispatchOutcome::Issued => { + runtime.state = SegmentState::Confirming; + runtime.step_started_at = Some(Utc::now()); + let _ = state.event_manager.send(AppEvent::SegmentStepAdvanced { + segment_id: segment.id, + step_no, + }); + Some(runtime) + } + DispatchOutcome::Misconfigured(msg) | DispatchOutcome::WriteError(msg) => { + let _ = state.event_manager.send(AppEvent::SegmentFaultLocked { + segment_id: segment.id, + message: msg.clone(), + }); + runtime.state = SegmentState::Faulted; + runtime.fault_message = Some(msg); + Some(runtime) + } + } + } + SegmentState::Confirming => { + let Some(step_no) = runtime.current_step_no else { + runtime.state = SegmentState::Faulted; + runtime.fault_message = Some("Confirming without current_step_no".to_string()); + return Some(runtime); + }; + let Some(step) = steps.iter().find(|s| s.step_no == step_no) else { + runtime.state = SegmentState::Faulted; + runtime.fault_message = Some(format!("step {} not found", step_no)); + return Some(runtime); + }; + + let confirm = resolve_confirm_point(step, ctx); + let confirmed = match confirm { + Some((pid, invert, expected)) => check_confirm(monitor, pid, invert, expected), + None => { + // No confirm signal configured — treat the step as instantly done. + Some(true) + } + }; + + if confirmed == Some(true) { + if step.hold_until_confirm { + if let Err(err) = step_executor::send_stop_command( + step, + &state.platform.connection_manager, + cmd_index, + monitor, + ) + .await + { + tracing::warn!( + "Engine: segment {} stop command for step {} failed: {}", + segment.id, + step_no, + err + ); + } + } + let next_step = step + .next_step_no_on_success + .or_else(|| next_sequential(steps, step_no)); + match next_step { + Some(next_no) => { + runtime.state = SegmentState::Executing; + runtime.current_step_no = Some(next_no); + runtime.step_started_at = Some(Utc::now()); + } + None => { + runtime.state = SegmentState::Completed; + } + } + return Some(runtime); + } + + // Not yet confirmed: check timeout. + if let Some(started) = runtime.step_started_at { + let elapsed_ms = Utc::now() + .signed_duration_since(started) + .num_milliseconds(); + if elapsed_ms >= step.timeout_ms as i64 { + let _ = state.event_manager.send(AppEvent::AlarmActionTimeout { + segment_id: segment.id, + step_no, + }); + match step.on_timeout.as_str() { + "retry" => { + runtime.state = SegmentState::Executing; + runtime.step_started_at = Some(Utc::now()); + } + "block" => { + runtime.state = SegmentState::Blocked; + runtime.blocked_reason = + Some(format!("step {} timeout", step_no)); + } + _ => { + // "fault" or unknown + runtime.state = SegmentState::Faulted; + runtime.fault_message = Some(format!("step {} timeout", step_no)); + } + } + return Some(runtime); + } + } + // Still waiting — no state change. + None + } + SegmentState::Resetting => { + // First-pass reset is a no-op; configurations that need a reset step + // should encode it as a normal step. Drop back to Idle. + runtime.state = SegmentState::Idle; + Some(runtime) + } + SegmentState::Completed => { + state.resource_registry.release_all_for(segment.id).await; + runtime.held_resources.clear(); + runtime.last_completed_at = Some(Utc::now()); + runtime.current_step_no = None; + let _ = state + .event_manager + .send(AppEvent::SegmentCompleted { segment_id: segment.id }); + runtime.state = SegmentState::Idle; + Some(runtime) + } + SegmentState::Blocked => { + // Periodically re-check whether the block has cleared. + if runtime.auto_enabled && segment.mode == "auto" { + let start_allow: Vec<&SegmentInterlock> = interlocks + .iter() + .filter(|i| i.applies_to == "start_allow") + .collect(); + let any_deny = interlocks + .iter() + .filter(|i| i.applies_to == "start_deny") + .any(|rule| interlock::evaluate(rule, ctx, monitor).is_ok()); + if interlock::evaluate_all(&start_allow, ctx, monitor).is_ok() && !any_deny { + runtime.state = SegmentState::Checking; + runtime.blocked_reason = None; + return Some(runtime); + } + } + None + } + SegmentState::Faulted => { + // Release any held resources on fault entry; first-pass keeps it simple. + state.resource_registry.release_all_for(segment.id).await; + runtime.held_resources.clear(); + if segment.require_manual_ack_after_fault { + runtime.manual_ack_required = true; + runtime.state = SegmentState::ManualAckRequired; + return Some(runtime); + } + // Otherwise we leave it Faulted; ack-fault API may flip it back to Idle. + None + } + SegmentState::ManualAckRequired => { + // ack-fault API will flip manual_ack_required=false + state=Idle and notify. + None + } + } +} + +fn next_sequential(steps: &[SegmentStep], current: i32) -> Option { + steps + .iter() + .filter(|s| s.step_no > current) + .map(|s| s.step_no) + .min() +} + +/// Returns `(point_id, invert, expected_value)` if a confirm signal is configured. +fn resolve_confirm_point( + step: &SegmentStep, + ctx: &InterlockContext, +) -> Option<(Uuid, bool, bool)> { + if let Some(point_id) = step.confirm_point_id { + return Some((point_id, false, step.expected_value)); + } + let role = step.confirm_signal_role.as_deref()?; + let station_id = step.target_station_id?; + let (pid, invert) = ctx + .station_role_points + .get(&station_id) + .and_then(|m| m.get(role)) + .copied()?; + Some((pid, invert, step.expected_value)) +} + +fn check_confirm( + monitor: &HashMap, + point_id: Uuid, + invert: bool, + expected: bool, +) -> Option { + let m = monitor.get(&point_id)?; + if m.quality != plc_platform_core::telemetry::PointQuality::Good { + return None; + } + let raw = super::monitor_value_as_bool(m); + let logical = raw ^ invert; + Some(logical == expected) +} + +fn should_wait(runtime: &SegmentRuntime, mode: &str) -> bool { + match runtime.state { + SegmentState::Idle => !runtime.auto_enabled || mode != "auto", + SegmentState::Confirming => true, + SegmentState::Blocked + | SegmentState::Faulted + | SegmentState::ManualAckRequired => true, + _ => false, + } +} + +async fn push_runtime_change(state: &AppState, runtime: &SegmentRuntime) { + let payload = match serde_json::to_value(runtime) { + Ok(v) => v, + Err(err) => { + tracing::warn!("Engine: serialize SegmentRuntime failed: {}", err); + return; + } + }; + let message = WsMessage::AppEvent(AppWsEvent { + app: APP_NAME.to_string(), + event_type: "segment_runtime_changed".to_string(), + data: payload, + }); + if let Err(err) = state.platform.ws_manager.send_to_public(message).await { + tracing::debug!("Engine: WS push skipped: {}", err); + } +} + diff --git a/crates/app_operation_system/src/control/interlock.rs b/crates/app_operation_system/src/control/interlock.rs new file mode 100644 index 0000000..bd1b095 --- /dev/null +++ b/crates/app_operation_system/src/control/interlock.rs @@ -0,0 +1,387 @@ +//! Interlock evaluator (design doc §6). +//! +//! Evaluates a single `segment_interlock` row against the current point monitor +//! snapshot. Returns `Ok(())` when the rule passes, `Err(reason)` when it fails. +//! +//! The first-pass rule set is fixed (no expression engine). New rule kinds are +//! added by extending the `rule_kind` match. + +use std::collections::HashMap; + +use plc_platform_core::telemetry::PointMonitorInfo; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::model::{SegmentInterlock, StationSignal}; + +use super::{monitor_quality_good, monitor_value_as_bool}; + +/// Pre-loaded lookup maps so the engine evaluates interlocks without per-rule DB hits. +pub struct InterlockContext { + /// equipment_id → (role → point_id) + pub equipment_role_points: HashMap>, + /// station_id → (role → point_id, invert) + pub station_role_points: HashMap>, +} + +impl InterlockContext { + pub async fn load_for_interlocks( + pool: &PgPool, + interlocks: &[SegmentInterlock], + ) -> Result { + let equipment_ids: Vec = interlocks.iter().filter_map(|i| i.equipment_id).collect(); + let station_ids: Vec = interlocks.iter().filter_map(|i| i.station_id).collect(); + + Self::load(pool, &equipment_ids, &station_ids).await + } + + pub async fn load( + pool: &PgPool, + equipment_ids: &[Uuid], + station_ids: &[Uuid], + ) -> Result { + let mut equipment_role_points: HashMap> = HashMap::new(); + if !equipment_ids.is_empty() { + let rows = plc_platform_core::service::get_signal_role_points_batch(pool, equipment_ids).await?; + for row in rows { + equipment_role_points + .entry(row.equipment_id) + .or_default() + .insert(row.signal_role, row.point_id); + } + } + + let mut station_role_points: HashMap> = HashMap::new(); + if !station_ids.is_empty() { + let signals = sqlx::query_as::<_, StationSignal>( + r#"SELECT * FROM station_signal WHERE station_id = ANY($1)"#, + ) + .bind(station_ids) + .fetch_all(pool) + .await?; + for sig in signals { + if let Some(point_id) = sig.point_id { + station_role_points + .entry(sig.station_id) + .or_default() + .insert(sig.signal_role, (point_id, sig.invert_value)); + } + } + } + + Ok(Self { + equipment_role_points, + station_role_points, + }) + } +} + +/// Resolve the point id for a station's role, honoring `derived_from_role`. +/// +/// Returns the resolved `(point_id, invert_value)`. The caller XORs `invert_value` +/// with the raw bool to obtain the logical signal. +fn resolve_station_point( + ctx: &InterlockContext, + station_id: Uuid, + role: &str, +) -> Option<(Uuid, bool)> { + ctx.station_role_points + .get(&station_id) + .and_then(|m| m.get(role)) + .copied() +} + +/// Read a (point_id, invert) → logical bool, requiring Good quality. +/// Returns `None` if the point is missing from the monitor map, has bad quality, +/// or has no value yet. +fn read_logical_bool( + monitor: &HashMap, + point_id: Uuid, + invert: bool, +) -> Option { + let m = monitor.get(&point_id)?; + if !monitor_quality_good(m) { + return None; + } + let raw = monitor_value_as_bool(m); + Some(raw ^ invert) +} + +/// Evaluate one interlock rule. Returns Ok when the rule is satisfied, +/// Err with a human-readable reason when it is not. +pub fn evaluate( + rule: &SegmentInterlock, + ctx: &InterlockContext, + monitor: &HashMap, +) -> Result<(), String> { + match rule.rule_kind.as_str() { + "point_eq" => { + let point_id = rule + .point_id + .ok_or_else(|| format!("point_eq rule {} missing point_id", rule.id))?; + let expected = rule.expected_value.unwrap_or(true); + let actual = read_logical_bool(monitor, point_id, false) + .ok_or_else(|| format!("point {} unavailable or bad quality", point_id))?; + if actual == expected { + Ok(()) + } else { + Err(format!( + "point {} expected {} got {}", + point_id, expected, actual + )) + } + } + "station_vacant" => { + let station_id = rule + .station_id + .ok_or_else(|| format!("station_vacant rule {} missing station_id", rule.id))?; + // Prefer explicit vacancy signal; fall back to !presence. + if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "vacancy") { + let v = read_logical_bool(monitor, pid, invert) + .ok_or_else(|| format!("vacancy signal for station {} unavailable", station_id))?; + if v { + Ok(()) + } else { + Err(format!("station {} occupied (vacancy=false)", station_id)) + } + } else if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "presence") { + let v = read_logical_bool(monitor, pid, invert) + .ok_or_else(|| format!("presence signal for station {} unavailable", station_id))?; + if !v { + Ok(()) + } else { + Err(format!("station {} occupied (presence=true)", station_id)) + } + } else { + Err(format!("station {} has no presence/vacancy binding", station_id)) + } + } + "station_occupied" => { + let station_id = rule + .station_id + .ok_or_else(|| format!("station_occupied rule {} missing station_id", rule.id))?; + if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "presence") { + let v = read_logical_bool(monitor, pid, invert) + .ok_or_else(|| format!("presence signal for station {} unavailable", station_id))?; + if v { + Ok(()) + } else { + Err(format!("station {} empty (presence=false)", station_id)) + } + } else { + Err(format!("station {} has no presence binding", station_id)) + } + } + "equipment_origin" => check_equipment_role(rule, ctx, monitor, "home", true, "not at origin"), + "equipment_no_fault" => { + check_equipment_role(rule, ctx, monitor, "flt", false, "fault active") + } + "equipment_remote" => { + check_equipment_role(rule, ctx, monitor, "rem", true, "not in remote mode") + } + "safety_chain_ok" => { + // First-pass: a safety_chain_ok rule must bind to a point that is true. + let point_id = rule + .point_id + .ok_or_else(|| format!("safety_chain_ok rule {} missing point_id", rule.id))?; + let actual = read_logical_bool(monitor, point_id, false) + .ok_or_else(|| format!("safety chain point {} unavailable", point_id))?; + if actual { + Ok(()) + } else { + Err(format!("safety chain point {} broken", point_id)) + } + } + other => Err(format!("unknown rule_kind {}", other)), + } +} + +fn check_equipment_role( + rule: &SegmentInterlock, + ctx: &InterlockContext, + monitor: &HashMap, + role: &str, + expected: bool, + fail_reason: &str, +) -> Result<(), String> { + let equipment_id = rule + .equipment_id + .ok_or_else(|| format!("{} rule {} missing equipment_id", rule.rule_kind, rule.id))?; + let point_id = ctx + .equipment_role_points + .get(&equipment_id) + .and_then(|m| m.get(role)) + .copied() + .ok_or_else(|| format!("equipment {} has no {} role binding", equipment_id, role))?; + let actual = read_logical_bool(monitor, point_id, false).ok_or_else(|| { + format!( + "equipment {} role {} point {} unavailable", + equipment_id, role, point_id + ) + })?; + if actual == expected { + Ok(()) + } else { + Err(format!("equipment {} {}", equipment_id, fail_reason)) + } +} + +/// Evaluate the supplied interlock set; returns the first failure. +pub fn evaluate_all( + rules: &[&SegmentInterlock], + ctx: &InterlockContext, + monitor: &HashMap, +) -> Result<(), String> { + for rule in rules { + evaluate(rule, ctx, monitor)?; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use plc_platform_core::model::ScanMode; + use plc_platform_core::telemetry::{DataValue, PointQuality, ValueType}; + + fn monitor_entry(point_id: Uuid, value: bool, good: bool) -> PointMonitorInfo { + PointMonitorInfo { + protocol: "test".to_string(), + source_id: Uuid::nil(), + point_id, + client_handle: 0, + scan_mode: ScanMode::Subscribe, + timestamp: Some(Utc::now()), + quality: if good { + PointQuality::Good + } else { + PointQuality::Bad + }, + value: Some(DataValue::Bool(value)), + value_type: Some(ValueType::Bool), + value_text: None, + old_value: None, + old_timestamp: None, + value_changed: false, + } + } + + fn dummy_interlock(rule_kind: &str) -> SegmentInterlock { + SegmentInterlock { + id: Uuid::new_v4(), + segment_id: Uuid::new_v4(), + applies_to: "start_allow".to_string(), + rule_kind: rule_kind.to_string(), + point_id: None, + station_id: None, + equipment_id: None, + expected_value: None, + description: None, + created_at: Utc::now(), + updated_at: Utc::now(), + } + } + + #[test] + fn point_eq_passes_when_value_matches_expected() { + let pid = Uuid::new_v4(); + let mut rule = dummy_interlock("point_eq"); + rule.point_id = Some(pid); + rule.expected_value = Some(true); + + let mut monitor = HashMap::new(); + monitor.insert(pid, monitor_entry(pid, true, true)); + let ctx = InterlockContext { + equipment_role_points: HashMap::new(), + station_role_points: HashMap::new(), + }; + + assert!(evaluate(&rule, &ctx, &monitor).is_ok()); + } + + #[test] + fn point_eq_fails_when_quality_bad() { + let pid = Uuid::new_v4(); + let mut rule = dummy_interlock("point_eq"); + rule.point_id = Some(pid); + rule.expected_value = Some(true); + + let mut monitor = HashMap::new(); + monitor.insert(pid, monitor_entry(pid, true, false)); + let ctx = InterlockContext { + equipment_role_points: HashMap::new(), + station_role_points: HashMap::new(), + }; + + assert!(evaluate(&rule, &ctx, &monitor).is_err()); + } + + #[test] + fn station_vacant_uses_presence_when_no_vacancy_signal() { + let station_id = Uuid::new_v4(); + let pid = Uuid::new_v4(); + let mut rule = dummy_interlock("station_vacant"); + rule.station_id = Some(station_id); + + let mut monitor = HashMap::new(); + monitor.insert(pid, monitor_entry(pid, false, true)); + let mut station_role_points = HashMap::new(); + let mut roles = HashMap::new(); + roles.insert("presence".to_string(), (pid, false)); + station_role_points.insert(station_id, roles); + let ctx = InterlockContext { + equipment_role_points: HashMap::new(), + station_role_points, + }; + + assert!(evaluate(&rule, &ctx, &monitor).is_ok()); + + // Flip presence to true ⇒ vacant should fail. + monitor.insert(pid, monitor_entry(pid, true, true)); + assert!(evaluate(&rule, &ctx, &monitor).is_err()); + } + + #[test] + fn equipment_no_fault_fails_when_flt_true() { + let eq_id = Uuid::new_v4(); + let pid = Uuid::new_v4(); + let mut rule = dummy_interlock("equipment_no_fault"); + rule.equipment_id = Some(eq_id); + + let mut monitor = HashMap::new(); + monitor.insert(pid, monitor_entry(pid, true, true)); + let mut equipment_role_points = HashMap::new(); + let mut roles = HashMap::new(); + roles.insert("flt".to_string(), pid); + equipment_role_points.insert(eq_id, roles); + let ctx = InterlockContext { + equipment_role_points, + station_role_points: HashMap::new(), + }; + + assert!(evaluate(&rule, &ctx, &monitor).is_err()); + } + + #[test] + fn evaluate_all_returns_first_failure() { + let pid_ok = Uuid::new_v4(); + let pid_bad = Uuid::new_v4(); + let mut rule_a = dummy_interlock("point_eq"); + rule_a.point_id = Some(pid_ok); + rule_a.expected_value = Some(true); + let mut rule_b = dummy_interlock("point_eq"); + rule_b.point_id = Some(pid_bad); + rule_b.expected_value = Some(true); + + let mut monitor = HashMap::new(); + monitor.insert(pid_ok, monitor_entry(pid_ok, true, true)); + monitor.insert(pid_bad, monitor_entry(pid_bad, false, true)); + let ctx = InterlockContext { + equipment_role_points: HashMap::new(), + station_role_points: HashMap::new(), + }; + + assert!(evaluate_all(&[&rule_a, &rule_b], &ctx, &monitor).is_err()); + } +} diff --git a/crates/app_operation_system/src/control/mod.rs b/crates/app_operation_system/src/control/mod.rs index 3af9eb4..8ba451a 100644 --- a/crates/app_operation_system/src/control/mod.rs +++ b/crates/app_operation_system/src/control/mod.rs @@ -1,6 +1,31 @@ pub use plc_platform_core::control::command; pub mod engine; +pub mod interlock; pub mod resource; pub mod runtime; pub mod state; +pub mod step_executor; + +use plc_platform_core::telemetry::{DataValue, PointMonitorInfo, PointQuality}; + +/// Interpret a monitored point value as a boolean signal. +/// Mirrors `app_feeder_distributor::control::monitor_value_as_bool`. +pub(crate) fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool { + match monitor.value.as_ref() { + Some(DataValue::Bool(value)) => *value, + Some(DataValue::Int(value)) => *value != 0, + Some(DataValue::UInt(value)) => *value != 0, + Some(DataValue::Float(value)) => *value != 0.0, + Some(DataValue::Text(value)) => matches!( + value.trim().to_ascii_lowercase().as_str(), + "1" | "true" | "on" | "yes" + ), + _ => false, + } +} + +/// Returns true iff the point is present in the monitor map and reports `Good` quality. +pub(crate) fn monitor_quality_good(monitor: &PointMonitorInfo) -> bool { + monitor.quality == PointQuality::Good +} diff --git a/crates/app_operation_system/src/control/step_executor.rs b/crates/app_operation_system/src/control/step_executor.rs new file mode 100644 index 0000000..0f7cc9d --- /dev/null +++ b/crates/app_operation_system/src/control/step_executor.rs @@ -0,0 +1,239 @@ +//! Step executor (design doc §5.4). +//! +//! Resolves a `segment_step.action_kind` to a concrete write on a command point +//! using `plc_platform_core::control::command::send_pulse_command`. Confirmation +//! is handled by the engine's `Confirming` state; this module only sends the +//! initial command. + +use std::collections::HashMap; + +use plc_platform_core::{ + connection::ConnectionManager, + control::command::send_pulse_command, + service::EquipmentSignalRole, + telemetry::PointMonitorInfo, +}; +use sqlx::PgPool; +use std::sync::Arc; +use uuid::Uuid; + +use crate::model::SegmentStep; + +/// Cached lookup of (equipment_id, signal_role) → point_id for all equipment a +/// segment touches. Loaded once per segment task tick. +#[derive(Default)] +pub struct CommandPointIndex { + map: HashMap<(Uuid, String), Uuid>, +} + +impl CommandPointIndex { + pub async fn for_steps(pool: &PgPool, steps: &[SegmentStep]) -> Result { + let equipment_ids: Vec = steps.iter().filter_map(|s| s.target_equipment_id).collect(); + if equipment_ids.is_empty() { + return Ok(Self::default()); + } + let rows: Vec = + plc_platform_core::service::get_signal_role_points_batch(pool, &equipment_ids).await?; + let mut map = HashMap::new(); + for row in rows { + map.insert((row.equipment_id, row.signal_role), row.point_id); + } + Ok(Self { map }) + } + + pub fn lookup(&self, equipment_id: Uuid, role: &str) -> Option { + self.map.get(&(equipment_id, role.to_string())).copied() + } +} + +/// Outcome of dispatching a step's command. +pub enum DispatchOutcome { + /// A command was issued (or skipped because the action is wait-only). + /// The engine moves to `Confirming`. + Issued, + /// The step is mis-configured (missing role/equipment). The engine should + /// transition to `Faulted` with this message. + Misconfigured(String), + /// The underlying write failed. Engine should transition to `Faulted`. + WriteError(String), +} + +/// Dispatch `step.action_kind`. For first-pass the executor supports both pulse +/// commands and "hold until confirm" commands — pulse is the default unless +/// `step.hold_until_confirm` is true, in which case we send a single high value +/// and let the engine emit the stop command after confirmation. +pub async fn dispatch( + step: &SegmentStep, + connection: &Arc, + command_points: &CommandPointIndex, + monitor: &HashMap, +) -> DispatchOutcome { + if step.action_kind == "wait_signal" { + return DispatchOutcome::Issued; + } + + let command_role = match step.command_role.as_deref() { + Some(role) => role, + None => match default_command_role(step.action_kind.as_str()) { + Some(role) => role, + None => { + return DispatchOutcome::Misconfigured(format!( + "step {} action {} has no command_role and no default", + step.step_no, step.action_kind + )) + } + }, + }; + + let equipment_id = match step.target_equipment_id { + Some(id) => id, + None => { + return DispatchOutcome::Misconfigured(format!( + "step {} action {} has no target_equipment_id", + step.step_no, step.action_kind + )) + } + }; + + let point_id = match command_points.lookup(equipment_id, command_role) { + Some(p) => p, + None => { + return DispatchOutcome::Misconfigured(format!( + "equipment {} has no '{}' role binding", + equipment_id, command_role + )) + } + }; + + let value_type = monitor.get(&point_id).and_then(|m| m.value_type.clone()); + let pulse_ms = step.pulse_ms.unwrap_or(default_pulse_ms(&step.action_kind)) as u64; + + if let Err(err) = send_pulse_command(connection, point_id, value_type.as_ref(), pulse_ms).await + { + return DispatchOutcome::WriteError(err); + } + + DispatchOutcome::Issued +} + +/// Send the configured stop command (used when `hold_until_confirm` is true or +/// on fault cleanup). No-op if no stop role is configured. +pub async fn send_stop_command( + step: &SegmentStep, + connection: &Arc, + command_points: &CommandPointIndex, + monitor: &HashMap, +) -> Result<(), String> { + let role = match step.stop_command_role.as_deref() { + Some(r) => r, + None => return Ok(()), + }; + let equipment_id = step + .target_equipment_id + .ok_or_else(|| format!("step {} stop command missing target_equipment_id", step.step_no))?; + let point_id = command_points.lookup(equipment_id, role).ok_or_else(|| { + format!( + "equipment {} has no '{}' stop-role binding", + equipment_id, role + ) + })?; + let value_type = monitor.get(&point_id).and_then(|m| m.value_type.clone()); + send_pulse_command(connection, point_id, value_type.as_ref(), 300).await +} + +/// Default command-role mapping per design doc §4.2.4 table. +fn default_command_role(action_kind: &str) -> Option<&'static str> { + match action_kind { + "open_door" => Some("open_cmd"), + "close_door" => Some("close_cmd"), + "push_forward" => Some("forward_cmd"), + "push_retract" => Some("retract_cmd"), + "pull_run" => Some("start_cmd"), + "pull_retract" => Some("retract_cmd"), + "transfer_move_to" => Some("move_cmd"), + "step_once" => Some("step_cmd"), + "robot_permit" => Some("permit_cmd"), + "robot_release" => Some("release_cmd"), + _ => None, + } +} + +/// Default pulse width for actions where the spec doesn't override. +fn default_pulse_ms(action_kind: &str) -> i32 { + match action_kind { + "open_door" | "close_door" | "robot_permit" | "robot_release" | "step_once" => 300, + _ => 500, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + + fn make_step(action_kind: &str, equipment: Option, role: Option<&str>) -> SegmentStep { + SegmentStep { + id: Uuid::new_v4(), + segment_id: Uuid::new_v4(), + step_no: 1, + step_code: "S1".to_string(), + action_kind: action_kind.to_string(), + target_equipment_id: equipment, + target_station_id: None, + confirm_signal_role: None, + confirm_point_id: None, + expected_value: true, + timeout_ms: 30_000, + command_role: role.map(|s| s.to_string()), + stop_command_role: None, + pulse_ms: None, + hold_until_confirm: false, + cancel_on_fault: true, + next_step_no_on_success: None, + next_step_no_on_failure: None, + on_timeout: "fault".to_string(), + description: None, + created_at: Utc::now(), + updated_at: Utc::now(), + } + } + + #[test] + fn default_pulse_short_for_short_actions() { + assert_eq!(default_pulse_ms("open_door"), 300); + assert_eq!(default_pulse_ms("transfer_move_to"), 500); + } + + #[test] + fn default_role_resolves_for_known_actions() { + assert_eq!(default_command_role("open_door"), Some("open_cmd")); + assert_eq!(default_command_role("transfer_move_to"), Some("move_cmd")); + assert_eq!(default_command_role("wait_signal"), None); + } + + #[test] + fn command_point_index_lookup_returns_registered_point() { + let eq_id = Uuid::new_v4(); + let pid = Uuid::new_v4(); + let mut idx = CommandPointIndex::default(); + idx.map.insert((eq_id, "open_cmd".to_string()), pid); + assert_eq!(idx.lookup(eq_id, "open_cmd"), Some(pid)); + assert_eq!(idx.lookup(eq_id, "close_cmd"), None); + } + + #[test] + fn wait_signal_step_is_dispatched_without_command_role() { + // wait_signal returns Issued even if no command_role / equipment configured. + let step = make_step("wait_signal", None, None); + // Build a sync runtime to drive the async call. + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let connection = Arc::new(ConnectionManager::new()); + rt.block_on(async { + let outcome = dispatch(&step, &connection, &CommandPointIndex::default(), &HashMap::new()).await; + assert!(matches!(outcome, DispatchOutcome::Issued)); + }); + } +} diff --git a/crates/app_operation_system/src/handler.rs b/crates/app_operation_system/src/handler.rs index 524b6cc..cd013ec 100644 --- a/crates/app_operation_system/src/handler.rs +++ b/crates/app_operation_system/src/handler.rs @@ -1,3 +1,5 @@ +pub mod control; pub mod doc; +pub mod runtime; pub mod segment; pub mod station; diff --git a/crates/app_operation_system/src/handler/control.rs b/crates/app_operation_system/src/handler/control.rs new file mode 100644 index 0000000..ad45fe8 --- /dev/null +++ b/crates/app_operation_system/src/handler/control.rs @@ -0,0 +1,210 @@ +//! Segment control endpoints (design doc §9.2). +//! +//! These endpoints flip flags on the in-memory `SegmentRuntime` and notify the +//! segment task. The engine task picks up the change on its next tick. + +use axum::{extract::{Path, State}, response::IntoResponse, Json}; +use serde_json::json; +use uuid::Uuid; + +use plc_platform_core::util::response::ApiErr; + +use crate::{ + control::state::SegmentState, + event::AppEvent, + service::segment as segment_service, + AppState, +}; + +async fn require_segment( + state: &AppState, + segment_id: Uuid, +) -> Result { + let segment = segment_service::get_segment_by_id(&state.platform.pool, segment_id) + .await? + .ok_or_else(|| ApiErr::NotFound("Segment not found".to_string(), None))?; + if !segment.enabled { + return Err(ApiErr::BadRequest( + "Segment is disabled".to_string(), + Some(json!({ "segment_id": segment_id })), + )); + } + Ok(segment) +} + +pub async fn start_auto_segment( + State(state): State, + Path(segment_id): Path, +) -> Result { + let segment = require_segment(&state, segment_id).await?; + if segment.mode != "auto" { + return Err(ApiErr::BadRequest( + format!("Segment mode {} does not allow auto start", segment.mode), + Some(json!({ "segment_id": segment_id, "mode": segment.mode })), + )); + } + + let mut runtime = state.segment_runtime.get_or_init(segment_id).await; + if matches!( + runtime.state, + SegmentState::Faulted | SegmentState::ManualAckRequired + ) { + return Err(ApiErr::BadRequest( + "Segment is fault-locked; acknowledge before starting".to_string(), + Some(json!({ + "segment_id": segment_id, + "state": serde_json::to_value(&runtime.state).unwrap_or(serde_json::Value::Null) + })), + )); + } + + runtime.auto_enabled = true; + if matches!(runtime.state, SegmentState::Idle) { + runtime.blocked_reason = None; + } + state.segment_runtime.upsert(runtime).await; + state.segment_runtime.notify_segment(segment_id).await; + + let _ = state + .event_manager + .send(AppEvent::SegmentAutoStarted { segment_id }); + + Ok(Json( + json!({ "ok_msg": "Auto control started", "segment_id": segment_id }), + )) +} + +pub async fn stop_auto_segment( + State(state): State, + Path(segment_id): Path, +) -> Result { + require_segment(&state, segment_id).await?; + let mut runtime = state.segment_runtime.get_or_init(segment_id).await; + runtime.auto_enabled = false; + state.segment_runtime.upsert(runtime).await; + state.segment_runtime.notify_segment(segment_id).await; + + let _ = state + .event_manager + .send(AppEvent::SegmentAutoStopped { segment_id }); + + Ok(Json( + json!({ "ok_msg": "Auto control stopped", "segment_id": segment_id }), + )) +} + +pub async fn ack_fault_segment( + State(state): State, + Path(segment_id): Path, +) -> Result { + require_segment(&state, segment_id).await?; + let mut runtime = state.segment_runtime.get_or_init(segment_id).await; + + if !matches!( + runtime.state, + SegmentState::Faulted | SegmentState::ManualAckRequired + ) { + return Err(ApiErr::BadRequest( + "Segment is not in a faulted state".to_string(), + Some(json!({ "segment_id": segment_id })), + )); + } + + runtime.fault_message = None; + runtime.manual_ack_required = false; + runtime.current_step_no = None; + runtime.blocked_reason = None; + runtime.state = SegmentState::Idle; + state.segment_runtime.upsert(runtime).await; + state.segment_runtime.notify_segment(segment_id).await; + + let _ = state + .event_manager + .send(AppEvent::SegmentFaultAcked { segment_id }); + + Ok(Json( + json!({ "ok_msg": "Fault acknowledged", "segment_id": segment_id }), + )) +} + +pub async fn reset_segment( + State(state): State, + Path(segment_id): Path, +) -> Result { + require_segment(&state, segment_id).await?; + let mut runtime = state.segment_runtime.get_or_init(segment_id).await; + if !matches!( + runtime.state, + SegmentState::Blocked | SegmentState::Faulted | SegmentState::ManualAckRequired + ) { + return Err(ApiErr::BadRequest( + "Reset only allowed from Blocked / Faulted / ManualAckRequired".to_string(), + Some(json!({ "segment_id": segment_id })), + )); + } + state.resource_registry.release_all_for(segment_id).await; + runtime.held_resources.clear(); + runtime.auto_enabled = false; + runtime.current_step_no = None; + runtime.step_started_at = None; + runtime.blocked_reason = None; + runtime.fault_message = None; + runtime.manual_ack_required = false; + runtime.state = SegmentState::Idle; + state.segment_runtime.upsert(runtime).await; + state.segment_runtime.notify_segment(segment_id).await; + + Ok(Json( + json!({ "ok_msg": "Segment reset to idle", "segment_id": segment_id }), + )) +} + +pub async fn batch_start_auto( + State(state): State, +) -> Result { + let segments = segment_service::list_segments(&state.platform.pool, None).await?; + let mut started = Vec::new(); + let mut skipped = Vec::new(); + for segment in segments { + if !segment.enabled || segment.mode != "auto" { + skipped.push(segment.id); + continue; + } + let mut runtime = state.segment_runtime.get_or_init(segment.id).await; + if matches!( + runtime.state, + SegmentState::Faulted | SegmentState::ManualAckRequired + ) || runtime.auto_enabled + { + skipped.push(segment.id); + continue; + } + runtime.auto_enabled = true; + state.segment_runtime.upsert(runtime).await; + state.segment_runtime.notify_segment(segment.id).await; + let _ = state + .event_manager + .send(AppEvent::SegmentAutoStarted { segment_id: segment.id }); + started.push(segment.id); + } + Ok(Json(json!({ "started": started, "skipped": skipped }))) +} + +pub async fn batch_stop_auto(State(state): State) -> Result { + let segments = segment_service::list_segments(&state.platform.pool, None).await?; + let mut stopped = Vec::new(); + for segment in segments { + let mut runtime = state.segment_runtime.get_or_init(segment.id).await; + if !runtime.auto_enabled { + continue; + } + runtime.auto_enabled = false; + state.segment_runtime.upsert(runtime).await; + state.segment_runtime.notify_segment(segment.id).await; + let _ = state + .event_manager + .send(AppEvent::SegmentAutoStopped { segment_id: segment.id }); + stopped.push(segment.id); + } + Ok(Json(json!({ "stopped": stopped }))) +} diff --git a/crates/app_operation_system/src/handler/runtime.rs b/crates/app_operation_system/src/handler/runtime.rs new file mode 100644 index 0000000..794b593 --- /dev/null +++ b/crates/app_operation_system/src/handler/runtime.rs @@ -0,0 +1,94 @@ +//! Runtime read endpoints (design doc §9.3). + +use axum::{extract::{Path, State}, response::IntoResponse, Json}; +use serde_json::json; +use uuid::Uuid; + +use plc_platform_core::util::response::ApiErr; + +use crate::{ + service::{segment as segment_service, station as station_service}, + AppState, +}; + +pub async fn get_overview(State(state): State) -> Result { + let segments = segment_service::list_segments(&state.platform.pool, None).await?; + let runtimes = state.segment_runtime.get_all().await; + + let segment_payload: Vec<_> = segments + .into_iter() + .map(|seg| { + let runtime = runtimes.get(&seg.id).cloned(); + json!({ + "segment": seg, + "runtime": runtime, + }) + }) + .collect(); + + let resource_snapshot = state.resource_registry.snapshot().await; + let resources: Vec<_> = resource_snapshot + .into_iter() + .map(|(key, lease)| { + json!({ + "resource_key": key, + "owner_segment_id": lease.owner_segment_id, + "acquired_at": lease.acquired_at, + "heartbeat_at": lease.heartbeat_at, + }) + }) + .collect(); + + Ok(Json(json!({ + "segments": segment_payload, + "resources": resources, + }))) +} + +pub async fn get_segment_runtime( + State(state): State, + Path(segment_id): Path, +) -> Result { + let segment = segment_service::get_segment_by_id(&state.platform.pool, segment_id) + .await? + .ok_or_else(|| ApiErr::NotFound("Segment not found".to_string(), None))?; + let runtime = state.segment_runtime.get_or_init(segment_id).await; + Ok(Json(json!({ + "segment": segment, + "runtime": runtime, + }))) +} + +pub async fn get_station_runtime( + State(state): State, + Path(station_id): Path, +) -> Result { + let station = station_service::get_station_by_id(&state.platform.pool, station_id) + .await? + .ok_or_else(|| ApiErr::NotFound("Station not found".to_string(), None))?; + let signals = station_service::list_station_signals(&state.platform.pool, station_id).await?; + + // Attach the latest monitor sample for each bound point. + let monitor_guard = state + .platform + .connection_manager + .get_point_monitor_data_read_guard() + .await; + + let signal_payload: Vec<_> = signals + .iter() + .map(|sig| { + let monitor = sig.point_id.and_then(|pid| monitor_guard.get(&pid).cloned()); + json!({ + "signal": sig, + "point_monitor": monitor, + }) + }) + .collect(); + drop(monitor_guard); + + Ok(Json(json!({ + "station": station, + "signals": signal_payload, + }))) +} diff --git a/crates/app_operation_system/src/router.rs b/crates/app_operation_system/src/router.rs index 28fd2d6..b055060 100644 --- a/crates/app_operation_system/src/router.rs +++ b/crates/app_operation_system/src/router.rs @@ -71,6 +71,46 @@ pub fn build_router(state: AppState) -> Router { .put(crate::handler::segment::replace_resources), ); + let control_routes = Router::new() + .route( + "/api/control/segment/{segment_id}/start-auto", + post(crate::handler::control::start_auto_segment), + ) + .route( + "/api/control/segment/{segment_id}/stop-auto", + post(crate::handler::control::stop_auto_segment), + ) + .route( + "/api/control/segment/{segment_id}/ack-fault", + post(crate::handler::control::ack_fault_segment), + ) + .route( + "/api/control/segment/{segment_id}/reset", + post(crate::handler::control::reset_segment), + ) + .route( + "/api/control/segment/batch-start-auto", + post(crate::handler::control::batch_start_auto), + ) + .route( + "/api/control/segment/batch-stop-auto", + post(crate::handler::control::batch_stop_auto), + ); + + let runtime_routes = Router::new() + .route( + "/api/runtime/overview", + get(crate::handler::runtime::get_overview), + ) + .route( + "/api/runtime/segment/{segment_id}", + get(crate::handler::runtime::get_segment_runtime), + ) + .route( + "/api/runtime/station/{station_id}", + get(crate::handler::runtime::get_station_runtime), + ); + let ops_routes = Router::new() .route("/api/health", get(health_check)) .route("/api/docs/api-md", get(crate::handler::doc::get_api_md)) @@ -82,6 +122,8 @@ pub fn build_router(state: AppState) -> Router { Router::new() .merge(platform) .merge(config_routes) + .merge(control_routes) + .merge(runtime_routes) .merge(ops_routes) .nest( "/ui", diff --git a/crates/app_operation_system/tests/router_smoke.rs b/crates/app_operation_system/tests/router_smoke.rs index fee76d3..fe51747 100644 --- a/crates/app_operation_system/tests/router_smoke.rs +++ b/crates/app_operation_system/tests/router_smoke.rs @@ -57,3 +57,38 @@ async fn operation_system_router_exposes_segment_collection() { assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); } + +/// Runtime overview is GET-only; a POST should be METHOD_NOT_ALLOWED rather +/// than 404 — proving the route is registered. +#[tokio::test] +async fn operation_system_router_exposes_runtime_overview() { + let response = build_app() + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/api/runtime/overview") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("router should answer request"); + + assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); +} + +/// Control endpoints are POST-only; GETting one should be METHOD_NOT_ALLOWED. +#[tokio::test] +async fn operation_system_router_exposes_control_batch_routes() { + let response = build_app() + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/api/control/segment/batch-start-auto") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("router should answer request"); + + assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); +} diff --git a/docs/api-ops.md b/docs/api-ops.md index 0f139c1..c9cb9f5 100644 --- a/docs/api-ops.md +++ b/docs/api-ops.md @@ -1,5 +1,7 @@ # 运转系统 API +> 参考来源:`docs/superpowers/specs/2026-05-18-operation-system-engine-design.md` + ## 健康检查 - `GET /api/health` — 返回应用名称和状态 @@ -11,5 +13,59 @@ ## 文档 -- `GET /api/docs/api-md` — 获取 API 文档 +- `GET /api/docs/api-md` — 获取本 API 文档 - `GET /api/docs/readme-md` — 获取 README + +## 平台基础接口 + +复用 `plc_platform_core::handler::platform_routes`:源 / 设备 / 点位 / 标签 / 页面。 + +## 工位配置(§9.1.1) + +- `GET /api/station` — 列出工位(可选 `?line_code=`) +- `POST /api/station` — 新建工位 +- `GET /api/station/{id}` — 工位详情含信号绑定 +- `PUT /api/station/{id}` — 更新工位 +- `DELETE /api/station/{id}` +- `POST /api/station/{id}/signal` — Upsert 工位信号绑定 +- `DELETE /api/station/{id}/signal/{role}` + +## 流程段配置(§9.1.1) + +- `GET /api/segment`(可选 `?line_code=`) +- `POST /api/segment` +- `GET /api/segment/{id}` +- `GET /api/segment/{id}/detail` — 包含 step / interlock / resource +- `PUT /api/segment/{id}` +- `DELETE /api/segment/{id}` +- `GET /api/segment/{id}/step` +- `POST /api/segment/{id}/step` +- `PUT /api/segment/{id}/step/{step_no}` +- `DELETE /api/segment/{id}/step/{step_no}` +- `GET /api/segment/{id}/interlock` +- `POST /api/segment/{id}/interlock` +- `DELETE /api/segment/{id}/interlock/{interlock_id}` +- `GET /api/segment/{id}/resource` +- `PUT /api/segment/{id}/resource` — 用新的 `resource_keys` 数组整体替换 + +## 段运行控制(§9.2) + +- `POST /api/control/segment/{id}/start-auto` +- `POST /api/control/segment/{id}/stop-auto` +- `POST /api/control/segment/{id}/ack-fault` +- `POST /api/control/segment/{id}/reset` — 仅在 Blocked / Faulted / ManualAckRequired 状态允许 +- `POST /api/control/segment/batch-start-auto` +- `POST /api/control/segment/batch-stop-auto` + +## 运行态查询(§9.3) + +- `GET /api/runtime/overview` — 所有段 + 资源占用快照 +- `GET /api/runtime/segment/{id}` — 单段配置 + runtime +- `GET /api/runtime/station/{id}` — 工位信号 + 最新点位监控值 + +## WebSocket(§8.2) + +- `GET /ws/public` — 推送 + - `point_new_value`(核心) + - `event_created`(核心) + - `app_event`:`{ app: "operation-system", event_type: "segment_runtime_changed", data: SegmentRuntime }`