Implement operation-system engine MVP (P3)
Adds the segment supervisor + per-segment state machine driving Idle → Checking → Executing → Confirming → Completed (plus Blocked / Faulted / ManualAckRequired), interlock evaluator, action-kind step executor, control + runtime HTTP handlers, and WebSocket runtime push via AppEvent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
a33c013da5
commit
63683a24c8
|
|
@ -1,27 +1,563 @@
|
||||||
|
//! Segment supervisor + per-segment task (design doc §5.1–§5.3).
|
||||||
|
//!
|
||||||
|
//! Supervisor scans enabled segments every 10 s and ensures each has a running
|
||||||
|
//! task (mirrors the `app_feeder_distributor` supervisor). Each per-segment
|
||||||
|
//! task drives the 9-state machine in §5.2 by re-reading config + interlocks
|
||||||
|
//! every iteration and reacting to runtime change notifications.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use chrono::Utc;
|
||||||
|
use plc_platform_core::telemetry::PointMonitorInfo;
|
||||||
|
use plc_platform_core::websocket::{AppWsEvent, WsMessage};
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{control::runtime::SegmentRuntimeStore, AppState};
|
use crate::{
|
||||||
|
control::{
|
||||||
|
interlock::{self, InterlockContext},
|
||||||
|
runtime::{SegmentRuntime, SegmentRuntimeStore},
|
||||||
|
state::SegmentState,
|
||||||
|
step_executor::{self, CommandPointIndex, DispatchOutcome},
|
||||||
|
},
|
||||||
|
event::AppEvent,
|
||||||
|
model::{ProcessSegment, SegmentInterlock, SegmentResource, SegmentStep},
|
||||||
|
service::segment as segment_service,
|
||||||
|
AppState,
|
||||||
|
};
|
||||||
|
|
||||||
/// Start the segment engine supervisor.
|
const APP_NAME: &str = "operation-system";
|
||||||
///
|
const SUPERVISOR_INTERVAL_SECS: u64 = 10;
|
||||||
/// Skeleton only: at P0 there are no segment tables yet (P1 lands the schema),
|
const FAULT_TICK_MS: u64 = 500;
|
||||||
/// so the supervisor logs and idles. P3 will replace this with a per-segment
|
|
||||||
/// task spawner that mirrors feeder's per-unit task model.
|
/// Start the engine supervisor. Mirrors the feeder entry point.
|
||||||
pub fn start(state: AppState, store: Arc<SegmentRuntimeStore>) {
|
pub fn start(state: AppState, store: Arc<SegmentRuntimeStore>) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
supervise(state, store).await;
|
supervise(state, store).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn supervise(_state: AppState, _store: Arc<SegmentRuntimeStore>) {
|
async fn supervise(state: AppState, store: Arc<SegmentRuntimeStore>) {
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(10));
|
let mut tasks: HashMap<Uuid, tokio::task::JoinHandle<()>> = HashMap::new();
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_secs(SUPERVISOR_INTERVAL_SECS));
|
||||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
tracing::info!("Operation-system engine supervisor started (skeleton; awaiting P1 schema)");
|
tracing::info!("Operation-system engine supervisor started");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
// Segment supervision will live here once P1 lands process_segment.
|
match segment_service::list_segments(&state.platform.pool, None).await {
|
||||||
|
Ok(segments) => {
|
||||||
|
for segment in segments.into_iter().filter(|s| s.enabled) {
|
||||||
|
let needs_spawn = tasks
|
||||||
|
.get(&segment.id)
|
||||||
|
.is_none_or(|handle| handle.is_finished());
|
||||||
|
if needs_spawn {
|
||||||
|
let task_state = state.clone();
|
||||||
|
let task_store = store.clone();
|
||||||
|
let segment_id = segment.id;
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
segment_task(task_state, task_store, segment_id).await;
|
||||||
|
});
|
||||||
|
tasks.insert(segment.id, handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
Err(err) => tracing::error!("Engine supervisor: list_segments failed: {}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn segment_task(state: AppState, store: Arc<SegmentRuntimeStore>, segment_id: Uuid) {
|
||||||
|
let notify = store.get_or_create_notify(segment_id).await;
|
||||||
|
let mut fault_tick = tokio::time::interval(Duration::from_millis(FAULT_TICK_MS));
|
||||||
|
fault_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// 1. Reload segment config; exit when disabled or removed.
|
||||||
|
let segment = match segment_service::get_segment_by_id(&state.platform.pool, segment_id)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Some(s)) if s.enabled && s.mode != "disabled" => s,
|
||||||
|
Ok(_) => {
|
||||||
|
tracing::info!(
|
||||||
|
"Engine: segment {} disabled or removed, task exiting",
|
||||||
|
segment_id
|
||||||
|
);
|
||||||
|
state.resource_registry.release_all_for(segment_id).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!("Engine: segment {} reload failed: {}", segment_id, err);
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 2. Reload steps + interlocks + resource keys.
|
||||||
|
let steps = match segment_service::list_steps(&state.platform.pool, segment_id).await {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!("Engine: segment {} steps reload failed: {}", segment_id, err);
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let interlocks =
|
||||||
|
match segment_service::list_interlocks(&state.platform.pool, segment_id).await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(
|
||||||
|
"Engine: segment {} interlocks reload failed: {}",
|
||||||
|
segment_id,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let resources =
|
||||||
|
match segment_service::list_resources(&state.platform.pool, segment_id).await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(
|
||||||
|
"Engine: segment {} resources reload failed: {}",
|
||||||
|
segment_id,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let cmd_index = match CommandPointIndex::for_steps(&state.platform.pool, &steps).await {
|
||||||
|
Ok(idx) => idx,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(
|
||||||
|
"Engine: segment {} command-point load failed: {}",
|
||||||
|
segment_id,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let ctx = match InterlockContext::load_for_interlocks(&state.platform.pool, &interlocks)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(
|
||||||
|
"Engine: segment {} interlock-context load failed: {}",
|
||||||
|
segment_id,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 3. Snapshot the monitor map for the rest of this tick.
|
||||||
|
let monitor_guard = state
|
||||||
|
.platform
|
||||||
|
.connection_manager
|
||||||
|
.get_point_monitor_data_read_guard()
|
||||||
|
.await;
|
||||||
|
let monitor: HashMap<Uuid, PointMonitorInfo> = monitor_guard.clone();
|
||||||
|
drop(monitor_guard);
|
||||||
|
|
||||||
|
// 4. Apply one state-machine step.
|
||||||
|
let runtime = store.get_or_init(segment_id).await;
|
||||||
|
let next_runtime = tick(
|
||||||
|
&state,
|
||||||
|
&segment,
|
||||||
|
&steps,
|
||||||
|
&interlocks,
|
||||||
|
&resources,
|
||||||
|
&ctx,
|
||||||
|
&cmd_index,
|
||||||
|
&monitor,
|
||||||
|
runtime,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let runtime_changed = match next_runtime {
|
||||||
|
Some(updated) => {
|
||||||
|
store.upsert(updated.clone()).await;
|
||||||
|
push_runtime_change(&state, &updated).await;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
None => false,
|
||||||
|
};
|
||||||
|
|
||||||
|
// 5. Decide how long to sleep based on next state.
|
||||||
|
let snapshot = store.get_or_init(segment_id).await;
|
||||||
|
if !runtime_changed && should_wait(&snapshot, segment.mode.as_str()) {
|
||||||
|
tokio::select! {
|
||||||
|
_ = fault_tick.tick() => {}
|
||||||
|
_ = notify.notified() => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
async fn tick(
|
||||||
|
state: &AppState,
|
||||||
|
segment: &ProcessSegment,
|
||||||
|
steps: &[SegmentStep],
|
||||||
|
interlocks: &[SegmentInterlock],
|
||||||
|
resources: &[SegmentResource],
|
||||||
|
ctx: &InterlockContext,
|
||||||
|
cmd_index: &CommandPointIndex,
|
||||||
|
monitor: &HashMap<Uuid, PointMonitorInfo>,
|
||||||
|
mut runtime: SegmentRuntime,
|
||||||
|
) -> Option<SegmentRuntime> {
|
||||||
|
// Run-halt interlocks apply once we're past Checking.
|
||||||
|
if matches!(
|
||||||
|
runtime.state,
|
||||||
|
SegmentState::Executing | SegmentState::Confirming | SegmentState::Resetting
|
||||||
|
) {
|
||||||
|
let run_halt: Vec<&SegmentInterlock> = interlocks
|
||||||
|
.iter()
|
||||||
|
.filter(|i| i.applies_to == "run_halt")
|
||||||
|
.collect();
|
||||||
|
if let Err(reason) = interlock::evaluate_all(&run_halt, ctx, monitor) {
|
||||||
|
let _ = state.event_manager.send(AppEvent::SegmentFaultLocked {
|
||||||
|
segment_id: segment.id,
|
||||||
|
message: reason.clone(),
|
||||||
|
});
|
||||||
|
runtime.state = SegmentState::Faulted;
|
||||||
|
runtime.fault_message = Some(reason);
|
||||||
|
return Some(runtime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match runtime.state {
|
||||||
|
SegmentState::Idle => {
|
||||||
|
// Wait for auto activation or remote-manual notifications.
|
||||||
|
if runtime.auto_enabled && segment.mode == "auto" {
|
||||||
|
runtime.state = SegmentState::Checking;
|
||||||
|
runtime.blocked_reason = None;
|
||||||
|
return Some(runtime);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
SegmentState::Checking => {
|
||||||
|
// start_allow must all pass; start_deny rules being satisfied means we
|
||||||
|
// should NOT start (per design doc §6.1, `start_deny` evaluates as a
|
||||||
|
// "deny" condition — if its rule passes, start is denied).
|
||||||
|
let start_allow: Vec<&SegmentInterlock> = interlocks
|
||||||
|
.iter()
|
||||||
|
.filter(|i| i.applies_to == "start_allow")
|
||||||
|
.collect();
|
||||||
|
if let Err(reason) = interlock::evaluate_all(&start_allow, ctx, monitor) {
|
||||||
|
let _ = state.event_manager.send(AppEvent::SegmentBlocked {
|
||||||
|
segment_id: segment.id,
|
||||||
|
reason: reason.clone(),
|
||||||
|
});
|
||||||
|
runtime.state = SegmentState::Blocked;
|
||||||
|
runtime.blocked_reason = Some(reason);
|
||||||
|
return Some(runtime);
|
||||||
|
}
|
||||||
|
for rule in interlocks.iter().filter(|i| i.applies_to == "start_deny") {
|
||||||
|
if interlock::evaluate(rule, ctx, monitor).is_ok() {
|
||||||
|
let reason = format!(
|
||||||
|
"start denied by rule {} ({})",
|
||||||
|
rule.id, rule.rule_kind
|
||||||
|
);
|
||||||
|
let _ = state.event_manager.send(AppEvent::SegmentBlocked {
|
||||||
|
segment_id: segment.id,
|
||||||
|
reason: reason.clone(),
|
||||||
|
});
|
||||||
|
runtime.state = SegmentState::Blocked;
|
||||||
|
runtime.blocked_reason = Some(reason);
|
||||||
|
return Some(runtime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Acquire declared resources.
|
||||||
|
let mut acquired: Vec<String> = Vec::new();
|
||||||
|
for res in resources {
|
||||||
|
let ok = state
|
||||||
|
.resource_registry
|
||||||
|
.try_acquire(&res.resource_key, segment.id)
|
||||||
|
.await;
|
||||||
|
if !ok {
|
||||||
|
for key in &acquired {
|
||||||
|
state.resource_registry.release(key, segment.id).await;
|
||||||
|
}
|
||||||
|
let _ = state.event_manager.send(AppEvent::AlarmResourceBusy {
|
||||||
|
segment_id: segment.id,
|
||||||
|
resource_key: res.resource_key.clone(),
|
||||||
|
});
|
||||||
|
runtime.state = SegmentState::Blocked;
|
||||||
|
runtime.blocked_reason =
|
||||||
|
Some(format!("resource_busy: {}", res.resource_key));
|
||||||
|
return Some(runtime);
|
||||||
|
}
|
||||||
|
acquired.push(res.resource_key.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(first_step) = steps.iter().min_by_key(|s| s.step_no) else {
|
||||||
|
runtime.state = SegmentState::Faulted;
|
||||||
|
runtime.fault_message = Some("segment has no steps".to_string());
|
||||||
|
return Some(runtime);
|
||||||
|
};
|
||||||
|
|
||||||
|
runtime.held_resources = acquired;
|
||||||
|
runtime.current_step_no = Some(first_step.step_no);
|
||||||
|
runtime.step_started_at = Some(Utc::now());
|
||||||
|
runtime.blocked_reason = None;
|
||||||
|
runtime.state = SegmentState::Executing;
|
||||||
|
Some(runtime)
|
||||||
|
}
|
||||||
|
SegmentState::Executing => {
|
||||||
|
let Some(step_no) = runtime.current_step_no else {
|
||||||
|
runtime.state = SegmentState::Faulted;
|
||||||
|
runtime.fault_message = Some("Executing without current_step_no".to_string());
|
||||||
|
return Some(runtime);
|
||||||
|
};
|
||||||
|
let Some(step) = steps.iter().find(|s| s.step_no == step_no) else {
|
||||||
|
runtime.state = SegmentState::Faulted;
|
||||||
|
runtime.fault_message = Some(format!("step {} not found", step_no));
|
||||||
|
return Some(runtime);
|
||||||
|
};
|
||||||
|
let outcome = step_executor::dispatch(
|
||||||
|
step,
|
||||||
|
&state.platform.connection_manager,
|
||||||
|
cmd_index,
|
||||||
|
monitor,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match outcome {
|
||||||
|
DispatchOutcome::Issued => {
|
||||||
|
runtime.state = SegmentState::Confirming;
|
||||||
|
runtime.step_started_at = Some(Utc::now());
|
||||||
|
let _ = state.event_manager.send(AppEvent::SegmentStepAdvanced {
|
||||||
|
segment_id: segment.id,
|
||||||
|
step_no,
|
||||||
|
});
|
||||||
|
Some(runtime)
|
||||||
|
}
|
||||||
|
DispatchOutcome::Misconfigured(msg) | DispatchOutcome::WriteError(msg) => {
|
||||||
|
let _ = state.event_manager.send(AppEvent::SegmentFaultLocked {
|
||||||
|
segment_id: segment.id,
|
||||||
|
message: msg.clone(),
|
||||||
|
});
|
||||||
|
runtime.state = SegmentState::Faulted;
|
||||||
|
runtime.fault_message = Some(msg);
|
||||||
|
Some(runtime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SegmentState::Confirming => {
|
||||||
|
let Some(step_no) = runtime.current_step_no else {
|
||||||
|
runtime.state = SegmentState::Faulted;
|
||||||
|
runtime.fault_message = Some("Confirming without current_step_no".to_string());
|
||||||
|
return Some(runtime);
|
||||||
|
};
|
||||||
|
let Some(step) = steps.iter().find(|s| s.step_no == step_no) else {
|
||||||
|
runtime.state = SegmentState::Faulted;
|
||||||
|
runtime.fault_message = Some(format!("step {} not found", step_no));
|
||||||
|
return Some(runtime);
|
||||||
|
};
|
||||||
|
|
||||||
|
let confirm = resolve_confirm_point(step, ctx);
|
||||||
|
let confirmed = match confirm {
|
||||||
|
Some((pid, invert, expected)) => check_confirm(monitor, pid, invert, expected),
|
||||||
|
None => {
|
||||||
|
// No confirm signal configured — treat the step as instantly done.
|
||||||
|
Some(true)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if confirmed == Some(true) {
|
||||||
|
if step.hold_until_confirm {
|
||||||
|
if let Err(err) = step_executor::send_stop_command(
|
||||||
|
step,
|
||||||
|
&state.platform.connection_manager,
|
||||||
|
cmd_index,
|
||||||
|
monitor,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(
|
||||||
|
"Engine: segment {} stop command for step {} failed: {}",
|
||||||
|
segment.id,
|
||||||
|
step_no,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let next_step = step
|
||||||
|
.next_step_no_on_success
|
||||||
|
.or_else(|| next_sequential(steps, step_no));
|
||||||
|
match next_step {
|
||||||
|
Some(next_no) => {
|
||||||
|
runtime.state = SegmentState::Executing;
|
||||||
|
runtime.current_step_no = Some(next_no);
|
||||||
|
runtime.step_started_at = Some(Utc::now());
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
runtime.state = SegmentState::Completed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Some(runtime);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Not yet confirmed: check timeout.
|
||||||
|
if let Some(started) = runtime.step_started_at {
|
||||||
|
let elapsed_ms = Utc::now()
|
||||||
|
.signed_duration_since(started)
|
||||||
|
.num_milliseconds();
|
||||||
|
if elapsed_ms >= step.timeout_ms as i64 {
|
||||||
|
let _ = state.event_manager.send(AppEvent::AlarmActionTimeout {
|
||||||
|
segment_id: segment.id,
|
||||||
|
step_no,
|
||||||
|
});
|
||||||
|
match step.on_timeout.as_str() {
|
||||||
|
"retry" => {
|
||||||
|
runtime.state = SegmentState::Executing;
|
||||||
|
runtime.step_started_at = Some(Utc::now());
|
||||||
|
}
|
||||||
|
"block" => {
|
||||||
|
runtime.state = SegmentState::Blocked;
|
||||||
|
runtime.blocked_reason =
|
||||||
|
Some(format!("step {} timeout", step_no));
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// "fault" or unknown
|
||||||
|
runtime.state = SegmentState::Faulted;
|
||||||
|
runtime.fault_message = Some(format!("step {} timeout", step_no));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Some(runtime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Still waiting — no state change.
|
||||||
|
None
|
||||||
|
}
|
||||||
|
SegmentState::Resetting => {
|
||||||
|
// First-pass reset is a no-op; configurations that need a reset step
|
||||||
|
// should encode it as a normal step. Drop back to Idle.
|
||||||
|
runtime.state = SegmentState::Idle;
|
||||||
|
Some(runtime)
|
||||||
|
}
|
||||||
|
SegmentState::Completed => {
|
||||||
|
state.resource_registry.release_all_for(segment.id).await;
|
||||||
|
runtime.held_resources.clear();
|
||||||
|
runtime.last_completed_at = Some(Utc::now());
|
||||||
|
runtime.current_step_no = None;
|
||||||
|
let _ = state
|
||||||
|
.event_manager
|
||||||
|
.send(AppEvent::SegmentCompleted { segment_id: segment.id });
|
||||||
|
runtime.state = SegmentState::Idle;
|
||||||
|
Some(runtime)
|
||||||
|
}
|
||||||
|
SegmentState::Blocked => {
|
||||||
|
// Periodically re-check whether the block has cleared.
|
||||||
|
if runtime.auto_enabled && segment.mode == "auto" {
|
||||||
|
let start_allow: Vec<&SegmentInterlock> = interlocks
|
||||||
|
.iter()
|
||||||
|
.filter(|i| i.applies_to == "start_allow")
|
||||||
|
.collect();
|
||||||
|
let any_deny = interlocks
|
||||||
|
.iter()
|
||||||
|
.filter(|i| i.applies_to == "start_deny")
|
||||||
|
.any(|rule| interlock::evaluate(rule, ctx, monitor).is_ok());
|
||||||
|
if interlock::evaluate_all(&start_allow, ctx, monitor).is_ok() && !any_deny {
|
||||||
|
runtime.state = SegmentState::Checking;
|
||||||
|
runtime.blocked_reason = None;
|
||||||
|
return Some(runtime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
SegmentState::Faulted => {
|
||||||
|
// Release any held resources on fault entry; first-pass keeps it simple.
|
||||||
|
state.resource_registry.release_all_for(segment.id).await;
|
||||||
|
runtime.held_resources.clear();
|
||||||
|
if segment.require_manual_ack_after_fault {
|
||||||
|
runtime.manual_ack_required = true;
|
||||||
|
runtime.state = SegmentState::ManualAckRequired;
|
||||||
|
return Some(runtime);
|
||||||
|
}
|
||||||
|
// Otherwise we leave it Faulted; ack-fault API may flip it back to Idle.
|
||||||
|
None
|
||||||
|
}
|
||||||
|
SegmentState::ManualAckRequired => {
|
||||||
|
// ack-fault API will flip manual_ack_required=false + state=Idle and notify.
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn next_sequential(steps: &[SegmentStep], current: i32) -> Option<i32> {
|
||||||
|
steps
|
||||||
|
.iter()
|
||||||
|
.filter(|s| s.step_no > current)
|
||||||
|
.map(|s| s.step_no)
|
||||||
|
.min()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `(point_id, invert, expected_value)` if a confirm signal is configured.
|
||||||
|
fn resolve_confirm_point(
|
||||||
|
step: &SegmentStep,
|
||||||
|
ctx: &InterlockContext,
|
||||||
|
) -> Option<(Uuid, bool, bool)> {
|
||||||
|
if let Some(point_id) = step.confirm_point_id {
|
||||||
|
return Some((point_id, false, step.expected_value));
|
||||||
|
}
|
||||||
|
let role = step.confirm_signal_role.as_deref()?;
|
||||||
|
let station_id = step.target_station_id?;
|
||||||
|
let (pid, invert) = ctx
|
||||||
|
.station_role_points
|
||||||
|
.get(&station_id)
|
||||||
|
.and_then(|m| m.get(role))
|
||||||
|
.copied()?;
|
||||||
|
Some((pid, invert, step.expected_value))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_confirm(
|
||||||
|
monitor: &HashMap<Uuid, PointMonitorInfo>,
|
||||||
|
point_id: Uuid,
|
||||||
|
invert: bool,
|
||||||
|
expected: bool,
|
||||||
|
) -> Option<bool> {
|
||||||
|
let m = monitor.get(&point_id)?;
|
||||||
|
if m.quality != plc_platform_core::telemetry::PointQuality::Good {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let raw = super::monitor_value_as_bool(m);
|
||||||
|
let logical = raw ^ invert;
|
||||||
|
Some(logical == expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_wait(runtime: &SegmentRuntime, mode: &str) -> bool {
|
||||||
|
match runtime.state {
|
||||||
|
SegmentState::Idle => !runtime.auto_enabled || mode != "auto",
|
||||||
|
SegmentState::Confirming => true,
|
||||||
|
SegmentState::Blocked
|
||||||
|
| SegmentState::Faulted
|
||||||
|
| SegmentState::ManualAckRequired => true,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn push_runtime_change(state: &AppState, runtime: &SegmentRuntime) {
|
||||||
|
let payload = match serde_json::to_value(runtime) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::warn!("Engine: serialize SegmentRuntime failed: {}", err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let message = WsMessage::AppEvent(AppWsEvent {
|
||||||
|
app: APP_NAME.to_string(),
|
||||||
|
event_type: "segment_runtime_changed".to_string(),
|
||||||
|
data: payload,
|
||||||
|
});
|
||||||
|
if let Err(err) = state.platform.ws_manager.send_to_public(message).await {
|
||||||
|
tracing::debug!("Engine: WS push skipped: {}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,387 @@
|
||||||
|
//! Interlock evaluator (design doc §6).
|
||||||
|
//!
|
||||||
|
//! Evaluates a single `segment_interlock` row against the current point monitor
|
||||||
|
//! snapshot. Returns `Ok(())` when the rule passes, `Err(reason)` when it fails.
|
||||||
|
//!
|
||||||
|
//! The first-pass rule set is fixed (no expression engine). New rule kinds are
|
||||||
|
//! added by extending the `rule_kind` match.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use plc_platform_core::telemetry::PointMonitorInfo;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::model::{SegmentInterlock, StationSignal};
|
||||||
|
|
||||||
|
use super::{monitor_quality_good, monitor_value_as_bool};
|
||||||
|
|
||||||
|
/// Pre-loaded lookup maps so the engine evaluates interlocks without per-rule DB hits.
|
||||||
|
pub struct InterlockContext {
|
||||||
|
/// equipment_id → (role → point_id)
|
||||||
|
pub equipment_role_points: HashMap<Uuid, HashMap<String, Uuid>>,
|
||||||
|
/// station_id → (role → point_id, invert)
|
||||||
|
pub station_role_points: HashMap<Uuid, HashMap<String, (Uuid, bool)>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InterlockContext {
|
||||||
|
pub async fn load_for_interlocks(
|
||||||
|
pool: &PgPool,
|
||||||
|
interlocks: &[SegmentInterlock],
|
||||||
|
) -> Result<Self, sqlx::Error> {
|
||||||
|
let equipment_ids: Vec<Uuid> = interlocks.iter().filter_map(|i| i.equipment_id).collect();
|
||||||
|
let station_ids: Vec<Uuid> = interlocks.iter().filter_map(|i| i.station_id).collect();
|
||||||
|
|
||||||
|
Self::load(pool, &equipment_ids, &station_ids).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load(
|
||||||
|
pool: &PgPool,
|
||||||
|
equipment_ids: &[Uuid],
|
||||||
|
station_ids: &[Uuid],
|
||||||
|
) -> Result<Self, sqlx::Error> {
|
||||||
|
let mut equipment_role_points: HashMap<Uuid, HashMap<String, Uuid>> = HashMap::new();
|
||||||
|
if !equipment_ids.is_empty() {
|
||||||
|
let rows = plc_platform_core::service::get_signal_role_points_batch(pool, equipment_ids).await?;
|
||||||
|
for row in rows {
|
||||||
|
equipment_role_points
|
||||||
|
.entry(row.equipment_id)
|
||||||
|
.or_default()
|
||||||
|
.insert(row.signal_role, row.point_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut station_role_points: HashMap<Uuid, HashMap<String, (Uuid, bool)>> = HashMap::new();
|
||||||
|
if !station_ids.is_empty() {
|
||||||
|
let signals = sqlx::query_as::<_, StationSignal>(
|
||||||
|
r#"SELECT * FROM station_signal WHERE station_id = ANY($1)"#,
|
||||||
|
)
|
||||||
|
.bind(station_ids)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await?;
|
||||||
|
for sig in signals {
|
||||||
|
if let Some(point_id) = sig.point_id {
|
||||||
|
station_role_points
|
||||||
|
.entry(sig.station_id)
|
||||||
|
.or_default()
|
||||||
|
.insert(sig.signal_role, (point_id, sig.invert_value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
equipment_role_points,
|
||||||
|
station_role_points,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resolve the point id for a station's role, honoring `derived_from_role`.
|
||||||
|
///
|
||||||
|
/// Returns the resolved `(point_id, invert_value)`. The caller XORs `invert_value`
|
||||||
|
/// with the raw bool to obtain the logical signal.
|
||||||
|
fn resolve_station_point(
|
||||||
|
ctx: &InterlockContext,
|
||||||
|
station_id: Uuid,
|
||||||
|
role: &str,
|
||||||
|
) -> Option<(Uuid, bool)> {
|
||||||
|
ctx.station_role_points
|
||||||
|
.get(&station_id)
|
||||||
|
.and_then(|m| m.get(role))
|
||||||
|
.copied()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a (point_id, invert) → logical bool, requiring Good quality.
|
||||||
|
/// Returns `None` if the point is missing from the monitor map, has bad quality,
|
||||||
|
/// or has no value yet.
|
||||||
|
fn read_logical_bool(
|
||||||
|
monitor: &HashMap<Uuid, PointMonitorInfo>,
|
||||||
|
point_id: Uuid,
|
||||||
|
invert: bool,
|
||||||
|
) -> Option<bool> {
|
||||||
|
let m = monitor.get(&point_id)?;
|
||||||
|
if !monitor_quality_good(m) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let raw = monitor_value_as_bool(m);
|
||||||
|
Some(raw ^ invert)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Evaluate one interlock rule. Returns Ok when the rule is satisfied,
|
||||||
|
/// Err with a human-readable reason when it is not.
|
||||||
|
pub fn evaluate(
|
||||||
|
rule: &SegmentInterlock,
|
||||||
|
ctx: &InterlockContext,
|
||||||
|
monitor: &HashMap<Uuid, PointMonitorInfo>,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
match rule.rule_kind.as_str() {
|
||||||
|
"point_eq" => {
|
||||||
|
let point_id = rule
|
||||||
|
.point_id
|
||||||
|
.ok_or_else(|| format!("point_eq rule {} missing point_id", rule.id))?;
|
||||||
|
let expected = rule.expected_value.unwrap_or(true);
|
||||||
|
let actual = read_logical_bool(monitor, point_id, false)
|
||||||
|
.ok_or_else(|| format!("point {} unavailable or bad quality", point_id))?;
|
||||||
|
if actual == expected {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(format!(
|
||||||
|
"point {} expected {} got {}",
|
||||||
|
point_id, expected, actual
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"station_vacant" => {
|
||||||
|
let station_id = rule
|
||||||
|
.station_id
|
||||||
|
.ok_or_else(|| format!("station_vacant rule {} missing station_id", rule.id))?;
|
||||||
|
// Prefer explicit vacancy signal; fall back to !presence.
|
||||||
|
if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "vacancy") {
|
||||||
|
let v = read_logical_bool(monitor, pid, invert)
|
||||||
|
.ok_or_else(|| format!("vacancy signal for station {} unavailable", station_id))?;
|
||||||
|
if v {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(format!("station {} occupied (vacancy=false)", station_id))
|
||||||
|
}
|
||||||
|
} else if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "presence") {
|
||||||
|
let v = read_logical_bool(monitor, pid, invert)
|
||||||
|
.ok_or_else(|| format!("presence signal for station {} unavailable", station_id))?;
|
||||||
|
if !v {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(format!("station {} occupied (presence=true)", station_id))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Err(format!("station {} has no presence/vacancy binding", station_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"station_occupied" => {
|
||||||
|
let station_id = rule
|
||||||
|
.station_id
|
||||||
|
.ok_or_else(|| format!("station_occupied rule {} missing station_id", rule.id))?;
|
||||||
|
if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "presence") {
|
||||||
|
let v = read_logical_bool(monitor, pid, invert)
|
||||||
|
.ok_or_else(|| format!("presence signal for station {} unavailable", station_id))?;
|
||||||
|
if v {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(format!("station {} empty (presence=false)", station_id))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Err(format!("station {} has no presence binding", station_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"equipment_origin" => check_equipment_role(rule, ctx, monitor, "home", true, "not at origin"),
|
||||||
|
"equipment_no_fault" => {
|
||||||
|
check_equipment_role(rule, ctx, monitor, "flt", false, "fault active")
|
||||||
|
}
|
||||||
|
"equipment_remote" => {
|
||||||
|
check_equipment_role(rule, ctx, monitor, "rem", true, "not in remote mode")
|
||||||
|
}
|
||||||
|
"safety_chain_ok" => {
|
||||||
|
// First-pass: a safety_chain_ok rule must bind to a point that is true.
|
||||||
|
let point_id = rule
|
||||||
|
.point_id
|
||||||
|
.ok_or_else(|| format!("safety_chain_ok rule {} missing point_id", rule.id))?;
|
||||||
|
let actual = read_logical_bool(monitor, point_id, false)
|
||||||
|
.ok_or_else(|| format!("safety chain point {} unavailable", point_id))?;
|
||||||
|
if actual {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(format!("safety chain point {} broken", point_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
other => Err(format!("unknown rule_kind {}", other)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_equipment_role(
|
||||||
|
rule: &SegmentInterlock,
|
||||||
|
ctx: &InterlockContext,
|
||||||
|
monitor: &HashMap<Uuid, PointMonitorInfo>,
|
||||||
|
role: &str,
|
||||||
|
expected: bool,
|
||||||
|
fail_reason: &str,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let equipment_id = rule
|
||||||
|
.equipment_id
|
||||||
|
.ok_or_else(|| format!("{} rule {} missing equipment_id", rule.rule_kind, rule.id))?;
|
||||||
|
let point_id = ctx
|
||||||
|
.equipment_role_points
|
||||||
|
.get(&equipment_id)
|
||||||
|
.and_then(|m| m.get(role))
|
||||||
|
.copied()
|
||||||
|
.ok_or_else(|| format!("equipment {} has no {} role binding", equipment_id, role))?;
|
||||||
|
let actual = read_logical_bool(monitor, point_id, false).ok_or_else(|| {
|
||||||
|
format!(
|
||||||
|
"equipment {} role {} point {} unavailable",
|
||||||
|
equipment_id, role, point_id
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
if actual == expected {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(format!("equipment {} {}", equipment_id, fail_reason))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Evaluate the supplied interlock set; returns the first failure.
|
||||||
|
pub fn evaluate_all(
|
||||||
|
rules: &[&SegmentInterlock],
|
||||||
|
ctx: &InterlockContext,
|
||||||
|
monitor: &HashMap<Uuid, PointMonitorInfo>,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
for rule in rules {
|
||||||
|
evaluate(rule, ctx, monitor)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use chrono::Utc;
|
||||||
|
use plc_platform_core::model::ScanMode;
|
||||||
|
use plc_platform_core::telemetry::{DataValue, PointQuality, ValueType};
|
||||||
|
|
||||||
|
fn monitor_entry(point_id: Uuid, value: bool, good: bool) -> PointMonitorInfo {
|
||||||
|
PointMonitorInfo {
|
||||||
|
protocol: "test".to_string(),
|
||||||
|
source_id: Uuid::nil(),
|
||||||
|
point_id,
|
||||||
|
client_handle: 0,
|
||||||
|
scan_mode: ScanMode::Subscribe,
|
||||||
|
timestamp: Some(Utc::now()),
|
||||||
|
quality: if good {
|
||||||
|
PointQuality::Good
|
||||||
|
} else {
|
||||||
|
PointQuality::Bad
|
||||||
|
},
|
||||||
|
value: Some(DataValue::Bool(value)),
|
||||||
|
value_type: Some(ValueType::Bool),
|
||||||
|
value_text: None,
|
||||||
|
old_value: None,
|
||||||
|
old_timestamp: None,
|
||||||
|
value_changed: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dummy_interlock(rule_kind: &str) -> SegmentInterlock {
|
||||||
|
SegmentInterlock {
|
||||||
|
id: Uuid::new_v4(),
|
||||||
|
segment_id: Uuid::new_v4(),
|
||||||
|
applies_to: "start_allow".to_string(),
|
||||||
|
rule_kind: rule_kind.to_string(),
|
||||||
|
point_id: None,
|
||||||
|
station_id: None,
|
||||||
|
equipment_id: None,
|
||||||
|
expected_value: None,
|
||||||
|
description: None,
|
||||||
|
created_at: Utc::now(),
|
||||||
|
updated_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn point_eq_passes_when_value_matches_expected() {
|
||||||
|
let pid = Uuid::new_v4();
|
||||||
|
let mut rule = dummy_interlock("point_eq");
|
||||||
|
rule.point_id = Some(pid);
|
||||||
|
rule.expected_value = Some(true);
|
||||||
|
|
||||||
|
let mut monitor = HashMap::new();
|
||||||
|
monitor.insert(pid, monitor_entry(pid, true, true));
|
||||||
|
let ctx = InterlockContext {
|
||||||
|
equipment_role_points: HashMap::new(),
|
||||||
|
station_role_points: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(evaluate(&rule, &ctx, &monitor).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn point_eq_fails_when_quality_bad() {
|
||||||
|
let pid = Uuid::new_v4();
|
||||||
|
let mut rule = dummy_interlock("point_eq");
|
||||||
|
rule.point_id = Some(pid);
|
||||||
|
rule.expected_value = Some(true);
|
||||||
|
|
||||||
|
let mut monitor = HashMap::new();
|
||||||
|
monitor.insert(pid, monitor_entry(pid, true, false));
|
||||||
|
let ctx = InterlockContext {
|
||||||
|
equipment_role_points: HashMap::new(),
|
||||||
|
station_role_points: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(evaluate(&rule, &ctx, &monitor).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn station_vacant_uses_presence_when_no_vacancy_signal() {
|
||||||
|
let station_id = Uuid::new_v4();
|
||||||
|
let pid = Uuid::new_v4();
|
||||||
|
let mut rule = dummy_interlock("station_vacant");
|
||||||
|
rule.station_id = Some(station_id);
|
||||||
|
|
||||||
|
let mut monitor = HashMap::new();
|
||||||
|
monitor.insert(pid, monitor_entry(pid, false, true));
|
||||||
|
let mut station_role_points = HashMap::new();
|
||||||
|
let mut roles = HashMap::new();
|
||||||
|
roles.insert("presence".to_string(), (pid, false));
|
||||||
|
station_role_points.insert(station_id, roles);
|
||||||
|
let ctx = InterlockContext {
|
||||||
|
equipment_role_points: HashMap::new(),
|
||||||
|
station_role_points,
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(evaluate(&rule, &ctx, &monitor).is_ok());
|
||||||
|
|
||||||
|
// Flip presence to true ⇒ vacant should fail.
|
||||||
|
monitor.insert(pid, monitor_entry(pid, true, true));
|
||||||
|
assert!(evaluate(&rule, &ctx, &monitor).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn equipment_no_fault_fails_when_flt_true() {
|
||||||
|
let eq_id = Uuid::new_v4();
|
||||||
|
let pid = Uuid::new_v4();
|
||||||
|
let mut rule = dummy_interlock("equipment_no_fault");
|
||||||
|
rule.equipment_id = Some(eq_id);
|
||||||
|
|
||||||
|
let mut monitor = HashMap::new();
|
||||||
|
monitor.insert(pid, monitor_entry(pid, true, true));
|
||||||
|
let mut equipment_role_points = HashMap::new();
|
||||||
|
let mut roles = HashMap::new();
|
||||||
|
roles.insert("flt".to_string(), pid);
|
||||||
|
equipment_role_points.insert(eq_id, roles);
|
||||||
|
let ctx = InterlockContext {
|
||||||
|
equipment_role_points,
|
||||||
|
station_role_points: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(evaluate(&rule, &ctx, &monitor).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn evaluate_all_returns_first_failure() {
|
||||||
|
let pid_ok = Uuid::new_v4();
|
||||||
|
let pid_bad = Uuid::new_v4();
|
||||||
|
let mut rule_a = dummy_interlock("point_eq");
|
||||||
|
rule_a.point_id = Some(pid_ok);
|
||||||
|
rule_a.expected_value = Some(true);
|
||||||
|
let mut rule_b = dummy_interlock("point_eq");
|
||||||
|
rule_b.point_id = Some(pid_bad);
|
||||||
|
rule_b.expected_value = Some(true);
|
||||||
|
|
||||||
|
let mut monitor = HashMap::new();
|
||||||
|
monitor.insert(pid_ok, monitor_entry(pid_ok, true, true));
|
||||||
|
monitor.insert(pid_bad, monitor_entry(pid_bad, false, true));
|
||||||
|
let ctx = InterlockContext {
|
||||||
|
equipment_role_points: HashMap::new(),
|
||||||
|
station_role_points: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(evaluate_all(&[&rule_a, &rule_b], &ctx, &monitor).is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,6 +1,31 @@
|
||||||
pub use plc_platform_core::control::command;
|
pub use plc_platform_core::control::command;
|
||||||
|
|
||||||
pub mod engine;
|
pub mod engine;
|
||||||
|
pub mod interlock;
|
||||||
pub mod resource;
|
pub mod resource;
|
||||||
pub mod runtime;
|
pub mod runtime;
|
||||||
pub mod state;
|
pub mod state;
|
||||||
|
pub mod step_executor;
|
||||||
|
|
||||||
|
use plc_platform_core::telemetry::{DataValue, PointMonitorInfo, PointQuality};
|
||||||
|
|
||||||
|
/// Interpret a monitored point value as a boolean signal.
|
||||||
|
/// Mirrors `app_feeder_distributor::control::monitor_value_as_bool`.
|
||||||
|
pub(crate) fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool {
|
||||||
|
match monitor.value.as_ref() {
|
||||||
|
Some(DataValue::Bool(value)) => *value,
|
||||||
|
Some(DataValue::Int(value)) => *value != 0,
|
||||||
|
Some(DataValue::UInt(value)) => *value != 0,
|
||||||
|
Some(DataValue::Float(value)) => *value != 0.0,
|
||||||
|
Some(DataValue::Text(value)) => matches!(
|
||||||
|
value.trim().to_ascii_lowercase().as_str(),
|
||||||
|
"1" | "true" | "on" | "yes"
|
||||||
|
),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true iff the point is present in the monitor map and reports `Good` quality.
|
||||||
|
pub(crate) fn monitor_quality_good(monitor: &PointMonitorInfo) -> bool {
|
||||||
|
monitor.quality == PointQuality::Good
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,239 @@
|
||||||
|
//! Step executor (design doc §5.4).
|
||||||
|
//!
|
||||||
|
//! Resolves a `segment_step.action_kind` to a concrete write on a command point
|
||||||
|
//! using `plc_platform_core::control::command::send_pulse_command`. Confirmation
|
||||||
|
//! is handled by the engine's `Confirming` state; this module only sends the
|
||||||
|
//! initial command.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use plc_platform_core::{
|
||||||
|
connection::ConnectionManager,
|
||||||
|
control::command::send_pulse_command,
|
||||||
|
service::EquipmentSignalRole,
|
||||||
|
telemetry::PointMonitorInfo,
|
||||||
|
};
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::model::SegmentStep;
|
||||||
|
|
||||||
|
/// Cached lookup of (equipment_id, signal_role) → point_id for all equipment a
|
||||||
|
/// segment touches. Loaded once per segment task tick.
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct CommandPointIndex {
|
||||||
|
map: HashMap<(Uuid, String), Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CommandPointIndex {
|
||||||
|
pub async fn for_steps(pool: &PgPool, steps: &[SegmentStep]) -> Result<Self, sqlx::Error> {
|
||||||
|
let equipment_ids: Vec<Uuid> = steps.iter().filter_map(|s| s.target_equipment_id).collect();
|
||||||
|
if equipment_ids.is_empty() {
|
||||||
|
return Ok(Self::default());
|
||||||
|
}
|
||||||
|
let rows: Vec<EquipmentSignalRole> =
|
||||||
|
plc_platform_core::service::get_signal_role_points_batch(pool, &equipment_ids).await?;
|
||||||
|
let mut map = HashMap::new();
|
||||||
|
for row in rows {
|
||||||
|
map.insert((row.equipment_id, row.signal_role), row.point_id);
|
||||||
|
}
|
||||||
|
Ok(Self { map })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn lookup(&self, equipment_id: Uuid, role: &str) -> Option<Uuid> {
|
||||||
|
self.map.get(&(equipment_id, role.to_string())).copied()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Outcome of dispatching a step's command.
|
||||||
|
pub enum DispatchOutcome {
|
||||||
|
/// A command was issued (or skipped because the action is wait-only).
|
||||||
|
/// The engine moves to `Confirming`.
|
||||||
|
Issued,
|
||||||
|
/// The step is mis-configured (missing role/equipment). The engine should
|
||||||
|
/// transition to `Faulted` with this message.
|
||||||
|
Misconfigured(String),
|
||||||
|
/// The underlying write failed. Engine should transition to `Faulted`.
|
||||||
|
WriteError(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dispatch `step.action_kind`. For first-pass the executor supports both pulse
|
||||||
|
/// commands and "hold until confirm" commands — pulse is the default unless
|
||||||
|
/// `step.hold_until_confirm` is true, in which case we send a single high value
|
||||||
|
/// and let the engine emit the stop command after confirmation.
|
||||||
|
pub async fn dispatch(
|
||||||
|
step: &SegmentStep,
|
||||||
|
connection: &Arc<ConnectionManager>,
|
||||||
|
command_points: &CommandPointIndex,
|
||||||
|
monitor: &HashMap<Uuid, PointMonitorInfo>,
|
||||||
|
) -> DispatchOutcome {
|
||||||
|
if step.action_kind == "wait_signal" {
|
||||||
|
return DispatchOutcome::Issued;
|
||||||
|
}
|
||||||
|
|
||||||
|
let command_role = match step.command_role.as_deref() {
|
||||||
|
Some(role) => role,
|
||||||
|
None => match default_command_role(step.action_kind.as_str()) {
|
||||||
|
Some(role) => role,
|
||||||
|
None => {
|
||||||
|
return DispatchOutcome::Misconfigured(format!(
|
||||||
|
"step {} action {} has no command_role and no default",
|
||||||
|
step.step_no, step.action_kind
|
||||||
|
))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let equipment_id = match step.target_equipment_id {
|
||||||
|
Some(id) => id,
|
||||||
|
None => {
|
||||||
|
return DispatchOutcome::Misconfigured(format!(
|
||||||
|
"step {} action {} has no target_equipment_id",
|
||||||
|
step.step_no, step.action_kind
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let point_id = match command_points.lookup(equipment_id, command_role) {
|
||||||
|
Some(p) => p,
|
||||||
|
None => {
|
||||||
|
return DispatchOutcome::Misconfigured(format!(
|
||||||
|
"equipment {} has no '{}' role binding",
|
||||||
|
equipment_id, command_role
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let value_type = monitor.get(&point_id).and_then(|m| m.value_type.clone());
|
||||||
|
let pulse_ms = step.pulse_ms.unwrap_or(default_pulse_ms(&step.action_kind)) as u64;
|
||||||
|
|
||||||
|
if let Err(err) = send_pulse_command(connection, point_id, value_type.as_ref(), pulse_ms).await
|
||||||
|
{
|
||||||
|
return DispatchOutcome::WriteError(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
DispatchOutcome::Issued
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send the configured stop command (used when `hold_until_confirm` is true or
|
||||||
|
/// on fault cleanup). No-op if no stop role is configured.
|
||||||
|
pub async fn send_stop_command(
|
||||||
|
step: &SegmentStep,
|
||||||
|
connection: &Arc<ConnectionManager>,
|
||||||
|
command_points: &CommandPointIndex,
|
||||||
|
monitor: &HashMap<Uuid, PointMonitorInfo>,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let role = match step.stop_command_role.as_deref() {
|
||||||
|
Some(r) => r,
|
||||||
|
None => return Ok(()),
|
||||||
|
};
|
||||||
|
let equipment_id = step
|
||||||
|
.target_equipment_id
|
||||||
|
.ok_or_else(|| format!("step {} stop command missing target_equipment_id", step.step_no))?;
|
||||||
|
let point_id = command_points.lookup(equipment_id, role).ok_or_else(|| {
|
||||||
|
format!(
|
||||||
|
"equipment {} has no '{}' stop-role binding",
|
||||||
|
equipment_id, role
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
let value_type = monitor.get(&point_id).and_then(|m| m.value_type.clone());
|
||||||
|
send_pulse_command(connection, point_id, value_type.as_ref(), 300).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Default command-role mapping per design doc §4.2.4 table.
|
||||||
|
fn default_command_role(action_kind: &str) -> Option<&'static str> {
|
||||||
|
match action_kind {
|
||||||
|
"open_door" => Some("open_cmd"),
|
||||||
|
"close_door" => Some("close_cmd"),
|
||||||
|
"push_forward" => Some("forward_cmd"),
|
||||||
|
"push_retract" => Some("retract_cmd"),
|
||||||
|
"pull_run" => Some("start_cmd"),
|
||||||
|
"pull_retract" => Some("retract_cmd"),
|
||||||
|
"transfer_move_to" => Some("move_cmd"),
|
||||||
|
"step_once" => Some("step_cmd"),
|
||||||
|
"robot_permit" => Some("permit_cmd"),
|
||||||
|
"robot_release" => Some("release_cmd"),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Default pulse width for actions where the spec doesn't override.
|
||||||
|
fn default_pulse_ms(action_kind: &str) -> i32 {
|
||||||
|
match action_kind {
|
||||||
|
"open_door" | "close_door" | "robot_permit" | "robot_release" | "step_once" => 300,
|
||||||
|
_ => 500,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use chrono::Utc;
|
||||||
|
|
||||||
|
fn make_step(action_kind: &str, equipment: Option<Uuid>, role: Option<&str>) -> SegmentStep {
|
||||||
|
SegmentStep {
|
||||||
|
id: Uuid::new_v4(),
|
||||||
|
segment_id: Uuid::new_v4(),
|
||||||
|
step_no: 1,
|
||||||
|
step_code: "S1".to_string(),
|
||||||
|
action_kind: action_kind.to_string(),
|
||||||
|
target_equipment_id: equipment,
|
||||||
|
target_station_id: None,
|
||||||
|
confirm_signal_role: None,
|
||||||
|
confirm_point_id: None,
|
||||||
|
expected_value: true,
|
||||||
|
timeout_ms: 30_000,
|
||||||
|
command_role: role.map(|s| s.to_string()),
|
||||||
|
stop_command_role: None,
|
||||||
|
pulse_ms: None,
|
||||||
|
hold_until_confirm: false,
|
||||||
|
cancel_on_fault: true,
|
||||||
|
next_step_no_on_success: None,
|
||||||
|
next_step_no_on_failure: None,
|
||||||
|
on_timeout: "fault".to_string(),
|
||||||
|
description: None,
|
||||||
|
created_at: Utc::now(),
|
||||||
|
updated_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn default_pulse_short_for_short_actions() {
|
||||||
|
assert_eq!(default_pulse_ms("open_door"), 300);
|
||||||
|
assert_eq!(default_pulse_ms("transfer_move_to"), 500);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn default_role_resolves_for_known_actions() {
|
||||||
|
assert_eq!(default_command_role("open_door"), Some("open_cmd"));
|
||||||
|
assert_eq!(default_command_role("transfer_move_to"), Some("move_cmd"));
|
||||||
|
assert_eq!(default_command_role("wait_signal"), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn command_point_index_lookup_returns_registered_point() {
|
||||||
|
let eq_id = Uuid::new_v4();
|
||||||
|
let pid = Uuid::new_v4();
|
||||||
|
let mut idx = CommandPointIndex::default();
|
||||||
|
idx.map.insert((eq_id, "open_cmd".to_string()), pid);
|
||||||
|
assert_eq!(idx.lookup(eq_id, "open_cmd"), Some(pid));
|
||||||
|
assert_eq!(idx.lookup(eq_id, "close_cmd"), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn wait_signal_step_is_dispatched_without_command_role() {
|
||||||
|
// wait_signal returns Issued even if no command_role / equipment configured.
|
||||||
|
let step = make_step("wait_signal", None, None);
|
||||||
|
// Build a sync runtime to drive the async call.
|
||||||
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
let connection = Arc::new(ConnectionManager::new());
|
||||||
|
rt.block_on(async {
|
||||||
|
let outcome = dispatch(&step, &connection, &CommandPointIndex::default(), &HashMap::new()).await;
|
||||||
|
assert!(matches!(outcome, DispatchOutcome::Issued));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
pub mod control;
|
||||||
pub mod doc;
|
pub mod doc;
|
||||||
|
pub mod runtime;
|
||||||
pub mod segment;
|
pub mod segment;
|
||||||
pub mod station;
|
pub mod station;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,210 @@
|
||||||
|
//! Segment control endpoints (design doc §9.2).
|
||||||
|
//!
|
||||||
|
//! These endpoints flip flags on the in-memory `SegmentRuntime` and notify the
|
||||||
|
//! segment task. The engine task picks up the change on its next tick.
|
||||||
|
|
||||||
|
use axum::{extract::{Path, State}, response::IntoResponse, Json};
|
||||||
|
use serde_json::json;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use plc_platform_core::util::response::ApiErr;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
control::state::SegmentState,
|
||||||
|
event::AppEvent,
|
||||||
|
service::segment as segment_service,
|
||||||
|
AppState,
|
||||||
|
};
|
||||||
|
|
||||||
|
async fn require_segment(
|
||||||
|
state: &AppState,
|
||||||
|
segment_id: Uuid,
|
||||||
|
) -> Result<crate::model::ProcessSegment, ApiErr> {
|
||||||
|
let segment = segment_service::get_segment_by_id(&state.platform.pool, segment_id)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| ApiErr::NotFound("Segment not found".to_string(), None))?;
|
||||||
|
if !segment.enabled {
|
||||||
|
return Err(ApiErr::BadRequest(
|
||||||
|
"Segment is disabled".to_string(),
|
||||||
|
Some(json!({ "segment_id": segment_id })),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(segment)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_auto_segment(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(segment_id): Path<Uuid>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
let segment = require_segment(&state, segment_id).await?;
|
||||||
|
if segment.mode != "auto" {
|
||||||
|
return Err(ApiErr::BadRequest(
|
||||||
|
format!("Segment mode {} does not allow auto start", segment.mode),
|
||||||
|
Some(json!({ "segment_id": segment_id, "mode": segment.mode })),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut runtime = state.segment_runtime.get_or_init(segment_id).await;
|
||||||
|
if matches!(
|
||||||
|
runtime.state,
|
||||||
|
SegmentState::Faulted | SegmentState::ManualAckRequired
|
||||||
|
) {
|
||||||
|
return Err(ApiErr::BadRequest(
|
||||||
|
"Segment is fault-locked; acknowledge before starting".to_string(),
|
||||||
|
Some(json!({
|
||||||
|
"segment_id": segment_id,
|
||||||
|
"state": serde_json::to_value(&runtime.state).unwrap_or(serde_json::Value::Null)
|
||||||
|
})),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
runtime.auto_enabled = true;
|
||||||
|
if matches!(runtime.state, SegmentState::Idle) {
|
||||||
|
runtime.blocked_reason = None;
|
||||||
|
}
|
||||||
|
state.segment_runtime.upsert(runtime).await;
|
||||||
|
state.segment_runtime.notify_segment(segment_id).await;
|
||||||
|
|
||||||
|
let _ = state
|
||||||
|
.event_manager
|
||||||
|
.send(AppEvent::SegmentAutoStarted { segment_id });
|
||||||
|
|
||||||
|
Ok(Json(
|
||||||
|
json!({ "ok_msg": "Auto control started", "segment_id": segment_id }),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn stop_auto_segment(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(segment_id): Path<Uuid>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
require_segment(&state, segment_id).await?;
|
||||||
|
let mut runtime = state.segment_runtime.get_or_init(segment_id).await;
|
||||||
|
runtime.auto_enabled = false;
|
||||||
|
state.segment_runtime.upsert(runtime).await;
|
||||||
|
state.segment_runtime.notify_segment(segment_id).await;
|
||||||
|
|
||||||
|
let _ = state
|
||||||
|
.event_manager
|
||||||
|
.send(AppEvent::SegmentAutoStopped { segment_id });
|
||||||
|
|
||||||
|
Ok(Json(
|
||||||
|
json!({ "ok_msg": "Auto control stopped", "segment_id": segment_id }),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn ack_fault_segment(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(segment_id): Path<Uuid>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
require_segment(&state, segment_id).await?;
|
||||||
|
let mut runtime = state.segment_runtime.get_or_init(segment_id).await;
|
||||||
|
|
||||||
|
if !matches!(
|
||||||
|
runtime.state,
|
||||||
|
SegmentState::Faulted | SegmentState::ManualAckRequired
|
||||||
|
) {
|
||||||
|
return Err(ApiErr::BadRequest(
|
||||||
|
"Segment is not in a faulted state".to_string(),
|
||||||
|
Some(json!({ "segment_id": segment_id })),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
runtime.fault_message = None;
|
||||||
|
runtime.manual_ack_required = false;
|
||||||
|
runtime.current_step_no = None;
|
||||||
|
runtime.blocked_reason = None;
|
||||||
|
runtime.state = SegmentState::Idle;
|
||||||
|
state.segment_runtime.upsert(runtime).await;
|
||||||
|
state.segment_runtime.notify_segment(segment_id).await;
|
||||||
|
|
||||||
|
let _ = state
|
||||||
|
.event_manager
|
||||||
|
.send(AppEvent::SegmentFaultAcked { segment_id });
|
||||||
|
|
||||||
|
Ok(Json(
|
||||||
|
json!({ "ok_msg": "Fault acknowledged", "segment_id": segment_id }),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn reset_segment(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(segment_id): Path<Uuid>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
require_segment(&state, segment_id).await?;
|
||||||
|
let mut runtime = state.segment_runtime.get_or_init(segment_id).await;
|
||||||
|
if !matches!(
|
||||||
|
runtime.state,
|
||||||
|
SegmentState::Blocked | SegmentState::Faulted | SegmentState::ManualAckRequired
|
||||||
|
) {
|
||||||
|
return Err(ApiErr::BadRequest(
|
||||||
|
"Reset only allowed from Blocked / Faulted / ManualAckRequired".to_string(),
|
||||||
|
Some(json!({ "segment_id": segment_id })),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
state.resource_registry.release_all_for(segment_id).await;
|
||||||
|
runtime.held_resources.clear();
|
||||||
|
runtime.auto_enabled = false;
|
||||||
|
runtime.current_step_no = None;
|
||||||
|
runtime.step_started_at = None;
|
||||||
|
runtime.blocked_reason = None;
|
||||||
|
runtime.fault_message = None;
|
||||||
|
runtime.manual_ack_required = false;
|
||||||
|
runtime.state = SegmentState::Idle;
|
||||||
|
state.segment_runtime.upsert(runtime).await;
|
||||||
|
state.segment_runtime.notify_segment(segment_id).await;
|
||||||
|
|
||||||
|
Ok(Json(
|
||||||
|
json!({ "ok_msg": "Segment reset to idle", "segment_id": segment_id }),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn batch_start_auto(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
let segments = segment_service::list_segments(&state.platform.pool, None).await?;
|
||||||
|
let mut started = Vec::new();
|
||||||
|
let mut skipped = Vec::new();
|
||||||
|
for segment in segments {
|
||||||
|
if !segment.enabled || segment.mode != "auto" {
|
||||||
|
skipped.push(segment.id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let mut runtime = state.segment_runtime.get_or_init(segment.id).await;
|
||||||
|
if matches!(
|
||||||
|
runtime.state,
|
||||||
|
SegmentState::Faulted | SegmentState::ManualAckRequired
|
||||||
|
) || runtime.auto_enabled
|
||||||
|
{
|
||||||
|
skipped.push(segment.id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
runtime.auto_enabled = true;
|
||||||
|
state.segment_runtime.upsert(runtime).await;
|
||||||
|
state.segment_runtime.notify_segment(segment.id).await;
|
||||||
|
let _ = state
|
||||||
|
.event_manager
|
||||||
|
.send(AppEvent::SegmentAutoStarted { segment_id: segment.id });
|
||||||
|
started.push(segment.id);
|
||||||
|
}
|
||||||
|
Ok(Json(json!({ "started": started, "skipped": skipped })))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn batch_stop_auto(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
let segments = segment_service::list_segments(&state.platform.pool, None).await?;
|
||||||
|
let mut stopped = Vec::new();
|
||||||
|
for segment in segments {
|
||||||
|
let mut runtime = state.segment_runtime.get_or_init(segment.id).await;
|
||||||
|
if !runtime.auto_enabled {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
runtime.auto_enabled = false;
|
||||||
|
state.segment_runtime.upsert(runtime).await;
|
||||||
|
state.segment_runtime.notify_segment(segment.id).await;
|
||||||
|
let _ = state
|
||||||
|
.event_manager
|
||||||
|
.send(AppEvent::SegmentAutoStopped { segment_id: segment.id });
|
||||||
|
stopped.push(segment.id);
|
||||||
|
}
|
||||||
|
Ok(Json(json!({ "stopped": stopped })))
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,94 @@
|
||||||
|
//! Runtime read endpoints (design doc §9.3).
|
||||||
|
|
||||||
|
use axum::{extract::{Path, State}, response::IntoResponse, Json};
|
||||||
|
use serde_json::json;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use plc_platform_core::util::response::ApiErr;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
service::{segment as segment_service, station as station_service},
|
||||||
|
AppState,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub async fn get_overview(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
let segments = segment_service::list_segments(&state.platform.pool, None).await?;
|
||||||
|
let runtimes = state.segment_runtime.get_all().await;
|
||||||
|
|
||||||
|
let segment_payload: Vec<_> = segments
|
||||||
|
.into_iter()
|
||||||
|
.map(|seg| {
|
||||||
|
let runtime = runtimes.get(&seg.id).cloned();
|
||||||
|
json!({
|
||||||
|
"segment": seg,
|
||||||
|
"runtime": runtime,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let resource_snapshot = state.resource_registry.snapshot().await;
|
||||||
|
let resources: Vec<_> = resource_snapshot
|
||||||
|
.into_iter()
|
||||||
|
.map(|(key, lease)| {
|
||||||
|
json!({
|
||||||
|
"resource_key": key,
|
||||||
|
"owner_segment_id": lease.owner_segment_id,
|
||||||
|
"acquired_at": lease.acquired_at,
|
||||||
|
"heartbeat_at": lease.heartbeat_at,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(Json(json!({
|
||||||
|
"segments": segment_payload,
|
||||||
|
"resources": resources,
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_segment_runtime(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(segment_id): Path<Uuid>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
let segment = segment_service::get_segment_by_id(&state.platform.pool, segment_id)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| ApiErr::NotFound("Segment not found".to_string(), None))?;
|
||||||
|
let runtime = state.segment_runtime.get_or_init(segment_id).await;
|
||||||
|
Ok(Json(json!({
|
||||||
|
"segment": segment,
|
||||||
|
"runtime": runtime,
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_station_runtime(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(station_id): Path<Uuid>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
let station = station_service::get_station_by_id(&state.platform.pool, station_id)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| ApiErr::NotFound("Station not found".to_string(), None))?;
|
||||||
|
let signals = station_service::list_station_signals(&state.platform.pool, station_id).await?;
|
||||||
|
|
||||||
|
// Attach the latest monitor sample for each bound point.
|
||||||
|
let monitor_guard = state
|
||||||
|
.platform
|
||||||
|
.connection_manager
|
||||||
|
.get_point_monitor_data_read_guard()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let signal_payload: Vec<_> = signals
|
||||||
|
.iter()
|
||||||
|
.map(|sig| {
|
||||||
|
let monitor = sig.point_id.and_then(|pid| monitor_guard.get(&pid).cloned());
|
||||||
|
json!({
|
||||||
|
"signal": sig,
|
||||||
|
"point_monitor": monitor,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
drop(monitor_guard);
|
||||||
|
|
||||||
|
Ok(Json(json!({
|
||||||
|
"station": station,
|
||||||
|
"signals": signal_payload,
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
@ -71,6 +71,46 @@ pub fn build_router(state: AppState) -> Router {
|
||||||
.put(crate::handler::segment::replace_resources),
|
.put(crate::handler::segment::replace_resources),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let control_routes = Router::new()
|
||||||
|
.route(
|
||||||
|
"/api/control/segment/{segment_id}/start-auto",
|
||||||
|
post(crate::handler::control::start_auto_segment),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/control/segment/{segment_id}/stop-auto",
|
||||||
|
post(crate::handler::control::stop_auto_segment),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/control/segment/{segment_id}/ack-fault",
|
||||||
|
post(crate::handler::control::ack_fault_segment),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/control/segment/{segment_id}/reset",
|
||||||
|
post(crate::handler::control::reset_segment),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/control/segment/batch-start-auto",
|
||||||
|
post(crate::handler::control::batch_start_auto),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/control/segment/batch-stop-auto",
|
||||||
|
post(crate::handler::control::batch_stop_auto),
|
||||||
|
);
|
||||||
|
|
||||||
|
let runtime_routes = Router::new()
|
||||||
|
.route(
|
||||||
|
"/api/runtime/overview",
|
||||||
|
get(crate::handler::runtime::get_overview),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/runtime/segment/{segment_id}",
|
||||||
|
get(crate::handler::runtime::get_segment_runtime),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/runtime/station/{station_id}",
|
||||||
|
get(crate::handler::runtime::get_station_runtime),
|
||||||
|
);
|
||||||
|
|
||||||
let ops_routes = Router::new()
|
let ops_routes = Router::new()
|
||||||
.route("/api/health", get(health_check))
|
.route("/api/health", get(health_check))
|
||||||
.route("/api/docs/api-md", get(crate::handler::doc::get_api_md))
|
.route("/api/docs/api-md", get(crate::handler::doc::get_api_md))
|
||||||
|
|
@ -82,6 +122,8 @@ pub fn build_router(state: AppState) -> Router {
|
||||||
Router::new()
|
Router::new()
|
||||||
.merge(platform)
|
.merge(platform)
|
||||||
.merge(config_routes)
|
.merge(config_routes)
|
||||||
|
.merge(control_routes)
|
||||||
|
.merge(runtime_routes)
|
||||||
.merge(ops_routes)
|
.merge(ops_routes)
|
||||||
.nest(
|
.nest(
|
||||||
"/ui",
|
"/ui",
|
||||||
|
|
|
||||||
|
|
@ -57,3 +57,38 @@ async fn operation_system_router_exposes_segment_collection() {
|
||||||
|
|
||||||
assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
|
assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Runtime overview is GET-only; a POST should be METHOD_NOT_ALLOWED rather
|
||||||
|
/// than 404 — proving the route is registered.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn operation_system_router_exposes_runtime_overview() {
|
||||||
|
let response = build_app()
|
||||||
|
.oneshot(
|
||||||
|
Request::builder()
|
||||||
|
.method(Method::POST)
|
||||||
|
.uri("/api/runtime/overview")
|
||||||
|
.body(Body::empty())
|
||||||
|
.expect("request should build"),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("router should answer request");
|
||||||
|
|
||||||
|
assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Control endpoints are POST-only; GETting one should be METHOD_NOT_ALLOWED.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn operation_system_router_exposes_control_batch_routes() {
|
||||||
|
let response = build_app()
|
||||||
|
.oneshot(
|
||||||
|
Request::builder()
|
||||||
|
.method(Method::GET)
|
||||||
|
.uri("/api/control/segment/batch-start-auto")
|
||||||
|
.body(Body::empty())
|
||||||
|
.expect("request should build"),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("router should answer request");
|
||||||
|
|
||||||
|
assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,7 @@
|
||||||
# 运转系统 API
|
# 运转系统 API
|
||||||
|
|
||||||
|
> 参考来源:`docs/superpowers/specs/2026-05-18-operation-system-engine-design.md`
|
||||||
|
|
||||||
## 健康检查
|
## 健康检查
|
||||||
|
|
||||||
- `GET /api/health` — 返回应用名称和状态
|
- `GET /api/health` — 返回应用名称和状态
|
||||||
|
|
@ -11,5 +13,59 @@
|
||||||
|
|
||||||
## 文档
|
## 文档
|
||||||
|
|
||||||
- `GET /api/docs/api-md` — 获取 API 文档
|
- `GET /api/docs/api-md` — 获取本 API 文档
|
||||||
- `GET /api/docs/readme-md` — 获取 README
|
- `GET /api/docs/readme-md` — 获取 README
|
||||||
|
|
||||||
|
## 平台基础接口
|
||||||
|
|
||||||
|
复用 `plc_platform_core::handler::platform_routes`:源 / 设备 / 点位 / 标签 / 页面。
|
||||||
|
|
||||||
|
## 工位配置(§9.1.1)
|
||||||
|
|
||||||
|
- `GET /api/station` — 列出工位(可选 `?line_code=`)
|
||||||
|
- `POST /api/station` — 新建工位
|
||||||
|
- `GET /api/station/{id}` — 工位详情含信号绑定
|
||||||
|
- `PUT /api/station/{id}` — 更新工位
|
||||||
|
- `DELETE /api/station/{id}`
|
||||||
|
- `POST /api/station/{id}/signal` — Upsert 工位信号绑定
|
||||||
|
- `DELETE /api/station/{id}/signal/{role}`
|
||||||
|
|
||||||
|
## 流程段配置(§9.1.1)
|
||||||
|
|
||||||
|
- `GET /api/segment`(可选 `?line_code=`)
|
||||||
|
- `POST /api/segment`
|
||||||
|
- `GET /api/segment/{id}`
|
||||||
|
- `GET /api/segment/{id}/detail` — 包含 step / interlock / resource
|
||||||
|
- `PUT /api/segment/{id}`
|
||||||
|
- `DELETE /api/segment/{id}`
|
||||||
|
- `GET /api/segment/{id}/step`
|
||||||
|
- `POST /api/segment/{id}/step`
|
||||||
|
- `PUT /api/segment/{id}/step/{step_no}`
|
||||||
|
- `DELETE /api/segment/{id}/step/{step_no}`
|
||||||
|
- `GET /api/segment/{id}/interlock`
|
||||||
|
- `POST /api/segment/{id}/interlock`
|
||||||
|
- `DELETE /api/segment/{id}/interlock/{interlock_id}`
|
||||||
|
- `GET /api/segment/{id}/resource`
|
||||||
|
- `PUT /api/segment/{id}/resource` — 用新的 `resource_keys` 数组整体替换
|
||||||
|
|
||||||
|
## 段运行控制(§9.2)
|
||||||
|
|
||||||
|
- `POST /api/control/segment/{id}/start-auto`
|
||||||
|
- `POST /api/control/segment/{id}/stop-auto`
|
||||||
|
- `POST /api/control/segment/{id}/ack-fault`
|
||||||
|
- `POST /api/control/segment/{id}/reset` — 仅在 Blocked / Faulted / ManualAckRequired 状态允许
|
||||||
|
- `POST /api/control/segment/batch-start-auto`
|
||||||
|
- `POST /api/control/segment/batch-stop-auto`
|
||||||
|
|
||||||
|
## 运行态查询(§9.3)
|
||||||
|
|
||||||
|
- `GET /api/runtime/overview` — 所有段 + 资源占用快照
|
||||||
|
- `GET /api/runtime/segment/{id}` — 单段配置 + runtime
|
||||||
|
- `GET /api/runtime/station/{id}` — 工位信号 + 最新点位监控值
|
||||||
|
|
||||||
|
## WebSocket(§8.2)
|
||||||
|
|
||||||
|
- `GET /ws/public` — 推送
|
||||||
|
- `point_new_value`(核心)
|
||||||
|
- `event_created`(核心)
|
||||||
|
- `app_event`:`{ app: "operation-system", event_type: "segment_runtime_changed", data: SegmentRuntime }`
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue