Harden operation segment control

This commit is contained in:
caoqianming 2026-05-22 14:09:12 +08:00
parent ed638eadb2
commit 5613c9f0d5
14 changed files with 472 additions and 192 deletions

View File

@ -96,30 +96,33 @@ async fn segment_task(state: AppState, store: Arc<SegmentRuntimeStore>, segment_
loop { loop {
// 1. Reload segment config; exit when disabled or removed. // 1. Reload segment config; exit when disabled or removed.
let segment = match segment_service::get_segment_by_id(&state.platform.pool, segment_id) let segment =
.await match segment_service::get_segment_by_id(&state.platform.pool, segment_id).await {
{ Ok(Some(s)) if s.enabled && s.mode != "disabled" => s,
Ok(Some(s)) if s.enabled && s.mode != "disabled" => s, Ok(_) => {
Ok(_) => { tracing::info!(
tracing::info!( "Engine: segment {} disabled or removed, task exiting",
"Engine: segment {} disabled or removed, task exiting", segment_id
segment_id );
); state.resource_registry.release_all_for(segment_id).await;
state.resource_registry.release_all_for(segment_id).await; return;
return; }
} Err(err) => {
Err(err) => { tracing::error!("Engine: segment {} reload failed: {}", segment_id, err);
tracing::error!("Engine: segment {} reload failed: {}", segment_id, err); tokio::time::sleep(Duration::from_secs(5)).await;
tokio::time::sleep(Duration::from_secs(5)).await; continue;
continue; }
} };
};
// 2. Reload steps + interlocks + resource keys. // 2. Reload steps + interlocks + resource keys.
let steps = match segment_service::list_steps(&state.platform.pool, segment_id).await { let steps = match segment_service::list_steps(&state.platform.pool, segment_id).await {
Ok(s) => s, Ok(s) => s,
Err(err) => { Err(err) => {
tracing::error!("Engine: segment {} steps reload failed: {}", segment_id, err); tracing::error!(
"Engine: segment {} steps reload failed: {}",
segment_id,
err
);
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
continue; continue;
} }
@ -163,20 +166,21 @@ async fn segment_task(state: AppState, store: Arc<SegmentRuntimeStore>, segment_
continue; continue;
} }
}; };
let ctx = match InterlockContext::load_for_interlocks(&state.platform.pool, &interlocks) let ctx =
.await match InterlockContext::load_for_segment(&state.platform.pool, &steps, &interlocks)
{ .await
Ok(c) => c, {
Err(err) => { Ok(c) => c,
tracing::error!( Err(err) => {
"Engine: segment {} interlock-context load failed: {}", tracing::error!(
segment_id, "Engine: segment {} interlock-context load failed: {}",
err segment_id,
); err
tokio::time::sleep(Duration::from_secs(5)).await; );
continue; tokio::time::sleep(Duration::from_secs(5)).await;
} continue;
}; }
};
// 3. Snapshot the monitor map for the rest of this tick. // 3. Snapshot the monitor map for the rest of this tick.
let monitor_guard = state let monitor_guard = state
@ -240,6 +244,38 @@ async fn tick(
monitor: &HashMap<Uuid, PointMonitorInfo>, monitor: &HashMap<Uuid, PointMonitorInfo>,
mut runtime: SegmentRuntime, mut runtime: SegmentRuntime,
) -> Option<SegmentRuntime> { ) -> Option<SegmentRuntime> {
if matches!(
runtime.state,
SegmentState::Executing | SegmentState::Confirming | SegmentState::Resetting
) && !runtime.auto_enabled
{
if let Some(step_no) = runtime.current_step_no {
if let Some(step) = steps.iter().find(|s| s.step_no == step_no) {
if step.cancel_on_fault {
if let Err(err) = step_executor::send_stop_command(
step,
&state.platform.connection_manager,
cmd_index,
monitor,
)
.await
{
tracing::warn!(
"Engine: segment {} auto-stop command for step {} failed: {}",
segment.id,
step_no,
err
);
}
}
}
}
runtime.manual_ack_required = true;
runtime.blocked_reason = Some("auto stopped during active step".to_string());
runtime.state = SegmentState::ManualAckRequired;
return Some(runtime);
}
// Run-halt interlocks apply once we're past Checking. // Run-halt interlocks apply once we're past Checking.
if matches!( if matches!(
runtime.state, runtime.state,
@ -337,10 +373,7 @@ async fn tick(
} }
for rule in interlocks.iter().filter(|i| i.applies_to == "start_deny") { for rule in interlocks.iter().filter(|i| i.applies_to == "start_deny") {
if interlock::evaluate(rule, ctx, monitor).is_ok() { if interlock::evaluate(rule, ctx, monitor).is_ok() {
let reason = format!( let reason = format!("start denied by rule {} ({})", rule.id, rule.rule_kind);
"start denied by rule {} ({})",
rule.id, rule.rule_kind
);
let _ = state.event_manager.send(AppEvent::SegmentBlocked { let _ = state.event_manager.send(AppEvent::SegmentBlocked {
segment_id: segment.id, segment_id: segment.id,
reason: reason.clone(), reason: reason.clone(),
@ -366,8 +399,7 @@ async fn tick(
resource_key: res.resource_key.clone(), resource_key: res.resource_key.clone(),
}); });
runtime.state = SegmentState::Blocked; runtime.state = SegmentState::Blocked;
runtime.blocked_reason = runtime.blocked_reason = Some(format!("resource_busy: {}", res.resource_key));
Some(format!("resource_busy: {}", res.resource_key));
return Some(runtime); return Some(runtime);
} }
acquired.push(res.resource_key.clone()); acquired.push(res.resource_key.clone());
@ -401,12 +433,12 @@ async fn tick(
// Resolve transfer_move_to inputs ahead of dispatch. // Resolve transfer_move_to inputs ahead of dispatch.
let station_code = if step.action_kind == "transfer_move_to" { let station_code = if step.action_kind == "transfer_move_to" {
match step.target_station_id { match step.target_station_id {
Some(id) => match station_service::get_station_by_id(&state.platform.pool, id) Some(id) => {
.await match station_service::get_station_by_id(&state.platform.pool, id).await {
{ Ok(Some(s)) => Some(s.code),
Ok(Some(s)) => Some(s.code), Ok(None) | Err(_) => None,
Ok(None) | Err(_) => None, }
}, }
None => None, None => None,
} }
} else { } else {
@ -435,7 +467,8 @@ async fn tick(
// SIMULATE_PLC: schedule the confirm signal to arrive so the // SIMULATE_PLC: schedule the confirm signal to arrive so the
// engine can drive the segment end-to-end without a PLC. // engine can drive the segment end-to-end without a PLC.
if simulate::enabled() { if simulate::enabled() {
if let Some((pid, invert, expected)) = resolve_confirm_point(step, ctx) { if let Ok(Some((pid, invert, expected))) = resolve_confirm_point(step, ctx)
{
let logical_value = expected ^ invert; let logical_value = expected ^ invert;
simulate::schedule_confirm(state.clone(), pid, logical_value, 200); simulate::schedule_confirm(state.clone(), pid, logical_value, 200);
} }
@ -482,7 +515,18 @@ async fn tick(
return Some(runtime); return Some(runtime);
}; };
let confirm = resolve_confirm_point(step, ctx); let confirm = match resolve_confirm_point(step, ctx) {
Ok(confirm) => confirm,
Err(message) => {
let _ = state.event_manager.send(AppEvent::SegmentFaultLocked {
segment_id: segment.id,
message: message.clone(),
});
runtime.state = SegmentState::Faulted;
runtime.fault_message = Some(message);
return Some(runtime);
}
};
let confirmed = match confirm { let confirmed = match confirm {
Some((pid, invert, expected)) => check_confirm(monitor, pid, invert, expected), Some((pid, invert, expected)) => check_confirm(monitor, pid, invert, expected),
None => { None => {
@ -527,9 +571,7 @@ async fn tick(
// Not yet confirmed: check timeout. // Not yet confirmed: check timeout.
if let Some(started) = runtime.step_started_at { if let Some(started) = runtime.step_started_at {
let elapsed_ms = Utc::now() let elapsed_ms = Utc::now().signed_duration_since(started).num_milliseconds();
.signed_duration_since(started)
.num_milliseconds();
if elapsed_ms >= step.timeout_ms as i64 { if elapsed_ms >= step.timeout_ms as i64 {
let _ = state.event_manager.send(AppEvent::AlarmActionTimeout { let _ = state.event_manager.send(AppEvent::AlarmActionTimeout {
segment_id: segment.id, segment_id: segment.id,
@ -542,8 +584,7 @@ async fn tick(
} }
"block" => { "block" => {
runtime.state = SegmentState::Blocked; runtime.state = SegmentState::Blocked;
runtime.blocked_reason = runtime.blocked_reason = Some(format!("step {} timeout", step_no));
Some(format!("step {} timeout", step_no));
} }
_ => { _ => {
// "fault" or unknown // "fault" or unknown
@ -585,9 +626,9 @@ async fn tick(
runtime.held_resources.clear(); runtime.held_resources.clear();
runtime.last_completed_at = Some(Utc::now()); runtime.last_completed_at = Some(Utc::now());
runtime.current_step_no = None; runtime.current_step_no = None;
let _ = state let _ = state.event_manager.send(AppEvent::SegmentCompleted {
.event_manager segment_id: segment.id,
.send(AppEvent::SegmentCompleted { segment_id: segment.id }); });
runtime.state = SegmentState::Idle; runtime.state = SegmentState::Idle;
Some(runtime) Some(runtime)
} }
@ -651,21 +692,36 @@ fn next_sequential(steps: &[SegmentStep], current: i32) -> Option<i32> {
} }
/// Returns `(point_id, invert, expected_value)` if a confirm signal is configured. /// Returns `(point_id, invert, expected_value)` if a confirm signal is configured.
/// Missing bindings for an explicitly configured role are configuration faults,
/// not optional confirms.
fn resolve_confirm_point( fn resolve_confirm_point(
step: &SegmentStep, step: &SegmentStep,
ctx: &InterlockContext, ctx: &InterlockContext,
) -> Option<(Uuid, bool, bool)> { ) -> Result<Option<(Uuid, bool, bool)>, String> {
if let Some(point_id) = step.confirm_point_id { if let Some(point_id) = step.confirm_point_id {
return Some((point_id, false, step.expected_value)); return Ok(Some((point_id, false, step.expected_value)));
} }
let role = step.confirm_signal_role.as_deref()?; let Some(role) = step.confirm_signal_role.as_deref() else {
let station_id = step.target_station_id?; return Ok(None);
};
let station_id = step.target_station_id.ok_or_else(|| {
format!(
"step {} confirm signal role '{}' requires target_station_id",
step.step_no, role
)
})?;
let (pid, invert) = ctx let (pid, invert) = ctx
.station_role_points .station_role_points
.get(&station_id) .get(&station_id)
.and_then(|m| m.get(role)) .and_then(|m| m.get(role))
.copied()?; .copied()
Some((pid, invert, step.expected_value)) .ok_or_else(|| {
format!(
"step {} confirm signal role '{}' could not be resolved",
step.step_no, role
)
})?;
Ok(Some((pid, invert, step.expected_value)))
} }
fn check_confirm( fn check_confirm(
@ -687,9 +743,7 @@ fn should_wait(runtime: &SegmentRuntime, mode: &str) -> bool {
match runtime.state { match runtime.state {
SegmentState::Idle => !runtime.auto_enabled || mode != "auto", SegmentState::Idle => !runtime.auto_enabled || mode != "auto",
SegmentState::Confirming => true, SegmentState::Confirming => true,
SegmentState::Blocked SegmentState::Blocked | SegmentState::Faulted | SegmentState::ManualAckRequired => true,
| SegmentState::Faulted
| SegmentState::ManualAckRequired => true,
_ => false, _ => false,
} }
} }
@ -712,3 +766,153 @@ async fn push_runtime_change(state: &AppState, runtime: &SegmentRuntime) {
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
fn test_segment() -> ProcessSegment {
ProcessSegment {
id: Uuid::new_v4(),
code: "SEG-TEST".to_string(),
name: "Test Segment".to_string(),
segment_type: "test".to_string(),
line_code: None,
priority: 0,
enabled: true,
mode: "auto".to_string(),
require_manual_ack_after_fault: true,
description: None,
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
fn test_step(station_id: Uuid) -> SegmentStep {
SegmentStep {
id: Uuid::new_v4(),
segment_id: Uuid::new_v4(),
step_no: 1,
step_code: "WAIT_ARRIVED".to_string(),
action_kind: "wait_signal".to_string(),
target_equipment_id: None,
target_station_id: Some(station_id),
confirm_signal_role: Some("arrived".to_string()),
confirm_point_id: None,
expected_value: true,
timeout_ms: 30_000,
command_role: None,
stop_command_role: None,
pulse_ms: None,
hold_until_confirm: false,
cancel_on_fault: true,
next_step_no_on_success: None,
next_step_no_on_failure: None,
on_timeout: "fault".to_string(),
description: None,
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
fn test_step_with_confirm_point(point_id: Uuid) -> SegmentStep {
let mut step = test_step(Uuid::new_v4());
step.confirm_signal_role = None;
step.confirm_point_id = Some(point_id);
step.hold_until_confirm = true;
step
}
#[tokio::test]
async fn confirming_faults_when_configured_confirm_role_cannot_resolve() {
let state = crate::app::test_state();
let segment = test_segment();
let station_id = Uuid::new_v4();
let steps = vec![test_step(station_id)];
let ctx = InterlockContext {
equipment_role_points: HashMap::new(),
station_role_points: HashMap::new(),
};
let runtime = SegmentRuntime {
segment_id: segment.id,
state: SegmentState::Confirming,
auto_enabled: true,
current_step_no: Some(1),
step_started_at: Some(Utc::now()),
last_completed_at: None,
blocked_reason: None,
fault_message: None,
manual_ack_required: false,
comm_locked: false,
rem_local: false,
held_resources: Vec::new(),
};
let updated = tick(
&state,
&segment,
&steps,
&[],
&[],
&ctx,
&CommandPointIndex::default(),
&HashMap::new(),
runtime,
)
.await
.expect("missing configured confirm point should change runtime");
assert_eq!(updated.state, SegmentState::Faulted);
assert_eq!(
updated.fault_message.as_deref(),
Some("step 1 confirm signal role 'arrived' could not be resolved")
);
}
#[tokio::test]
async fn active_segment_moves_to_manual_ack_when_auto_is_stopped() {
let state = crate::app::test_state();
let segment = test_segment();
let steps = vec![test_step_with_confirm_point(Uuid::new_v4())];
let ctx = InterlockContext {
equipment_role_points: HashMap::new(),
station_role_points: HashMap::new(),
};
let runtime = SegmentRuntime {
segment_id: segment.id,
state: SegmentState::Confirming,
auto_enabled: false,
current_step_no: Some(1),
step_started_at: Some(Utc::now()),
last_completed_at: None,
blocked_reason: None,
fault_message: None,
manual_ack_required: false,
comm_locked: false,
rem_local: false,
held_resources: Vec::new(),
};
let updated = tick(
&state,
&segment,
&steps,
&[],
&[],
&ctx,
&CommandPointIndex::default(),
&HashMap::new(),
runtime,
)
.await
.expect("active segment should react to auto stop");
assert_eq!(updated.state, SegmentState::ManualAckRequired);
assert_eq!(updated.current_step_no, Some(1));
assert!(updated.manual_ack_required);
assert_eq!(
updated.blocked_reason.as_deref(),
Some("auto stopped during active step")
);
}
}

View File

@ -12,6 +12,7 @@ use plc_platform_core::telemetry::PointMonitorInfo;
use sqlx::PgPool; use sqlx::PgPool;
use uuid::Uuid; use uuid::Uuid;
use crate::model::SegmentStep;
use crate::model::{SegmentInterlock, StationSignal}; use crate::model::{SegmentInterlock, StationSignal};
use super::{monitor_quality_good, monitor_value_as_bool}; use super::{monitor_quality_good, monitor_value_as_bool};
@ -35,6 +36,25 @@ impl InterlockContext {
Self::load(pool, &equipment_ids, &station_ids).await Self::load(pool, &equipment_ids, &station_ids).await
} }
pub async fn load_for_segment(
pool: &PgPool,
steps: &[SegmentStep],
interlocks: &[SegmentInterlock],
) -> Result<Self, sqlx::Error> {
let mut equipment_ids: Vec<Uuid> =
interlocks.iter().filter_map(|i| i.equipment_id).collect();
equipment_ids.extend(steps.iter().filter_map(|s| s.target_equipment_id));
equipment_ids.sort();
equipment_ids.dedup();
let mut station_ids: Vec<Uuid> = interlocks.iter().filter_map(|i| i.station_id).collect();
station_ids.extend(steps.iter().filter_map(|s| s.target_station_id));
station_ids.sort();
station_ids.dedup();
Self::load(pool, &equipment_ids, &station_ids).await
}
pub async fn load( pub async fn load(
pool: &PgPool, pool: &PgPool,
equipment_ids: &[Uuid], equipment_ids: &[Uuid],
@ -42,7 +62,9 @@ impl InterlockContext {
) -> Result<Self, sqlx::Error> { ) -> Result<Self, sqlx::Error> {
let mut equipment_role_points: HashMap<Uuid, HashMap<String, Uuid>> = HashMap::new(); let mut equipment_role_points: HashMap<Uuid, HashMap<String, Uuid>> = HashMap::new();
if !equipment_ids.is_empty() { if !equipment_ids.is_empty() {
let rows = plc_platform_core::service::get_signal_role_points_batch(pool, equipment_ids).await?; let rows =
plc_platform_core::service::get_signal_role_points_batch(pool, equipment_ids)
.await?;
for row in rows { for row in rows {
equipment_role_points equipment_role_points
.entry(row.equipment_id) .entry(row.equipment_id)
@ -137,23 +159,28 @@ pub fn evaluate(
.ok_or_else(|| format!("station_vacant rule {} missing station_id", rule.id))?; .ok_or_else(|| format!("station_vacant rule {} missing station_id", rule.id))?;
// Prefer explicit vacancy signal; fall back to !presence. // Prefer explicit vacancy signal; fall back to !presence.
if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "vacancy") { if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "vacancy") {
let v = read_logical_bool(monitor, pid, invert) let v = read_logical_bool(monitor, pid, invert).ok_or_else(|| {
.ok_or_else(|| format!("vacancy signal for station {} unavailable", station_id))?; format!("vacancy signal for station {} unavailable", station_id)
})?;
if v { if v {
Ok(()) Ok(())
} else { } else {
Err(format!("station {} occupied (vacancy=false)", station_id)) Err(format!("station {} occupied (vacancy=false)", station_id))
} }
} else if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "presence") { } else if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "presence") {
let v = read_logical_bool(monitor, pid, invert) let v = read_logical_bool(monitor, pid, invert).ok_or_else(|| {
.ok_or_else(|| format!("presence signal for station {} unavailable", station_id))?; format!("presence signal for station {} unavailable", station_id)
})?;
if !v { if !v {
Ok(()) Ok(())
} else { } else {
Err(format!("station {} occupied (presence=true)", station_id)) Err(format!("station {} occupied (presence=true)", station_id))
} }
} else { } else {
Err(format!("station {} has no presence/vacancy binding", station_id)) Err(format!(
"station {} has no presence/vacancy binding",
station_id
))
} }
} }
"station_occupied" => { "station_occupied" => {
@ -161,8 +188,9 @@ pub fn evaluate(
.station_id .station_id
.ok_or_else(|| format!("station_occupied rule {} missing station_id", rule.id))?; .ok_or_else(|| format!("station_occupied rule {} missing station_id", rule.id))?;
if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "presence") { if let Some((pid, invert)) = resolve_station_point(ctx, station_id, "presence") {
let v = read_logical_bool(monitor, pid, invert) let v = read_logical_bool(monitor, pid, invert).ok_or_else(|| {
.ok_or_else(|| format!("presence signal for station {} unavailable", station_id))?; format!("presence signal for station {} unavailable", station_id)
})?;
if v { if v {
Ok(()) Ok(())
} else { } else {
@ -172,7 +200,9 @@ pub fn evaluate(
Err(format!("station {} has no presence binding", station_id)) Err(format!("station {} has no presence binding", station_id))
} }
} }
"equipment_origin" => check_equipment_role(rule, ctx, monitor, "home", true, "not at origin"), "equipment_origin" => {
check_equipment_role(rule, ctx, monitor, "home", true, "not at origin")
}
"equipment_no_fault" => { "equipment_no_fault" => {
check_equipment_role(rule, ctx, monitor, "flt", false, "fault active") check_equipment_role(rule, ctx, monitor, "flt", false, "fault active")
} }

View File

@ -93,10 +93,7 @@ impl ResourceRegistry {
/// ///
/// Recovery path from design doc §7 — a panicked or stuck segment task can /// Recovery path from design doc §7 — a panicked or stuck segment task can
/// otherwise keep a public resource locked indefinitely. /// otherwise keep a public resource locked indefinitely.
pub async fn sweep_stale( pub async fn sweep_stale(&self, max_age: chrono::Duration) -> Vec<(String, Uuid)> {
&self,
max_age: chrono::Duration,
) -> Vec<(String, Uuid)> {
let cutoff = Utc::now() - max_age; let cutoff = Utc::now() - max_age;
let mut reclaimed = Vec::new(); let mut reclaimed = Vec::new();
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;

View File

@ -66,18 +66,12 @@ impl SegmentRuntimeStore {
} }
let runtime = SegmentRuntime::new(segment_id); let runtime = SegmentRuntime::new(segment_id);
self.inner self.inner.write().await.insert(segment_id, runtime.clone());
.write()
.await
.insert(segment_id, runtime.clone());
runtime runtime
} }
pub async fn upsert(&self, runtime: SegmentRuntime) { pub async fn upsert(&self, runtime: SegmentRuntime) {
self.inner self.inner.write().await.insert(runtime.segment_id, runtime);
.write()
.await
.insert(runtime.segment_id, runtime);
} }
pub async fn get_or_create_notify(&self, segment_id: Uuid) -> Arc<Notify> { pub async fn get_or_create_notify(&self, segment_id: Uuid) -> Arc<Notify> {

View File

@ -163,9 +163,12 @@ pub async fn send_stop_command(
Some(r) => r, Some(r) => r,
None => return Ok(()), None => return Ok(()),
}; };
let equipment_id = step let equipment_id = step.target_equipment_id.ok_or_else(|| {
.target_equipment_id format!(
.ok_or_else(|| format!("step {} stop command missing target_equipment_id", step.step_no))?; "step {} stop command missing target_equipment_id",
step.step_no
)
})?;
let point_id = command_points.lookup(equipment_id, role).ok_or_else(|| { let point_id = command_points.lookup(equipment_id, role).ok_or_else(|| {
format!( format!(
"equipment {} has no '{}' stop-role binding", "equipment {} has no '{}' stop-role binding",

View File

@ -3,17 +3,18 @@
//! These endpoints flip flags on the in-memory `SegmentRuntime` and notify the //! These endpoints flip flags on the in-memory `SegmentRuntime` and notify the
//! segment task. The engine task picks up the change on its next tick. //! segment task. The engine task picks up the change on its next tick.
use axum::{extract::{Path, State}, response::IntoResponse, Json}; use axum::{
extract::{Path, State},
response::IntoResponse,
Json,
};
use serde_json::json; use serde_json::json;
use uuid::Uuid; use uuid::Uuid;
use plc_platform_core::util::response::ApiErr; use plc_platform_core::util::response::ApiErr;
use crate::{ use crate::{
control::state::SegmentState, control::state::SegmentState, event::AppEvent, service::segment as segment_service, AppState,
event::AppEvent,
service::segment as segment_service,
AppState,
}; };
async fn require_segment( async fn require_segment(
@ -159,9 +160,7 @@ pub async fn reset_segment(
)) ))
} }
pub async fn batch_start_auto( pub async fn batch_start_auto(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
State(state): State<AppState>,
) -> Result<impl IntoResponse, ApiErr> {
let segments = segment_service::list_segments(&state.platform.pool, None).await?; let segments = segment_service::list_segments(&state.platform.pool, None).await?;
let mut started = Vec::new(); let mut started = Vec::new();
let mut skipped = Vec::new(); let mut skipped = Vec::new();
@ -182,9 +181,9 @@ pub async fn batch_start_auto(
runtime.auto_enabled = true; runtime.auto_enabled = true;
state.segment_runtime.upsert(runtime).await; state.segment_runtime.upsert(runtime).await;
state.segment_runtime.notify_segment(segment.id).await; state.segment_runtime.notify_segment(segment.id).await;
let _ = state let _ = state.event_manager.send(AppEvent::SegmentAutoStarted {
.event_manager segment_id: segment.id,
.send(AppEvent::SegmentAutoStarted { segment_id: segment.id }); });
started.push(segment.id); started.push(segment.id);
} }
Ok(Json(json!({ "started": started, "skipped": skipped }))) Ok(Json(json!({ "started": started, "skipped": skipped })))
@ -201,9 +200,9 @@ pub async fn batch_stop_auto(State(state): State<AppState>) -> Result<impl IntoR
runtime.auto_enabled = false; runtime.auto_enabled = false;
state.segment_runtime.upsert(runtime).await; state.segment_runtime.upsert(runtime).await;
state.segment_runtime.notify_segment(segment.id).await; state.segment_runtime.notify_segment(segment.id).await;
let _ = state let _ = state.event_manager.send(AppEvent::SegmentAutoStopped {
.event_manager segment_id: segment.id,
.send(AppEvent::SegmentAutoStopped { segment_id: segment.id }); });
stopped.push(segment.id); stopped.push(segment.id);
} }
Ok(Json(json!({ "stopped": stopped }))) Ok(Json(json!({ "stopped": stopped })))

View File

@ -5,7 +5,11 @@
//! migration so the front-end can show a per-segment / per-station timeline //! migration so the front-end can show a per-segment / per-station timeline
//! without parsing event_type strings. //! without parsing event_type strings.
use axum::{extract::{Query, State}, response::IntoResponse, Json}; use axum::{
extract::{Query, State},
response::IntoResponse,
Json,
};
use serde::Deserialize; use serde::Deserialize;
use uuid::Uuid; use uuid::Uuid;
use validator::Validate; use validator::Validate;

View File

@ -1,6 +1,10 @@
//! Runtime read endpoints (design doc §9.3). //! Runtime read endpoints (design doc §9.3).
use axum::{extract::{Path, State}, response::IntoResponse, Json}; use axum::{
extract::{Path, State},
response::IntoResponse,
Json,
};
use serde_json::json; use serde_json::json;
use uuid::Uuid; use uuid::Uuid;
@ -78,7 +82,9 @@ pub async fn get_station_runtime(
let signal_payload: Vec<_> = signals let signal_payload: Vec<_> = signals
.iter() .iter()
.map(|sig| { .map(|sig| {
let monitor = sig.point_id.and_then(|pid| monitor_guard.get(&pid).cloned()); let monitor = sig
.point_id
.and_then(|pid| monitor_guard.get(&pid).cloned());
json!({ json!({
"signal": sig, "signal": sig,
"point_monitor": monitor, "point_monitor": monitor,

View File

@ -11,10 +11,7 @@ use validator::Validate;
use plc_platform_core::util::response::ApiErr; use plc_platform_core::util::response::ApiErr;
use crate::{ use crate::{service::segment as segment_service, AppState};
service::segment as segment_service,
AppState,
};
const SEGMENT_TYPES: &[&str] = &[ const SEGMENT_TYPES: &[&str] = &[
"front_load", "front_load",
@ -106,10 +103,8 @@ pub async fn get_segment_detail(
.await? .await?
.ok_or_else(|| ApiErr::NotFound("Segment not found".to_string(), None))?; .ok_or_else(|| ApiErr::NotFound("Segment not found".to_string(), None))?;
let steps = segment_service::list_steps(&state.platform.pool, segment_id).await?; let steps = segment_service::list_steps(&state.platform.pool, segment_id).await?;
let interlocks = let interlocks = segment_service::list_interlocks(&state.platform.pool, segment_id).await?;
segment_service::list_interlocks(&state.platform.pool, segment_id).await?; let resources = segment_service::list_resources(&state.platform.pool, segment_id).await?;
let resources =
segment_service::list_resources(&state.platform.pool, segment_id).await?;
Ok(Json(json!({ Ok(Json(json!({
"segment": segment, "segment": segment,
@ -436,8 +431,7 @@ pub async fn list_interlocks(
State(state): State<AppState>, State(state): State<AppState>,
Path(segment_id): Path<Uuid>, Path(segment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
let interlocks = let interlocks = segment_service::list_interlocks(&state.platform.pool, segment_id).await?;
segment_service::list_interlocks(&state.platform.pool, segment_id).await?;
Ok(Json(interlocks)) Ok(Json(interlocks))
} }
@ -506,8 +500,7 @@ pub async fn list_resources(
State(state): State<AppState>, State(state): State<AppState>,
Path(segment_id): Path<Uuid>, Path(segment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
let resources = let resources = segment_service::list_resources(&state.platform.pool, segment_id).await?;
segment_service::list_resources(&state.platform.pool, segment_id).await?;
Ok(Json(resources)) Ok(Json(resources))
} }

View File

@ -27,12 +27,7 @@ const STATION_TYPES: &[&str] = &[
]; ];
const SIGNAL_ROLES: &[&str] = &[ const SIGNAL_ROLES: &[&str] = &[
"presence", "presence", "vacancy", "arrived", "allow_in", "done", "fault",
"vacancy",
"arrived",
"allow_in",
"done",
"fault",
]; ];
fn validate_station_type(value: &str) -> Result<(), ApiErr> { fn validate_station_type(value: &str) -> Result<(), ApiErr> {
@ -80,8 +75,7 @@ pub async fn get_station(
let station = station_service::get_station_by_id(&state.platform.pool, station_id) let station = station_service::get_station_by_id(&state.platform.pool, station_id)
.await? .await?
.ok_or_else(|| ApiErr::NotFound("Station not found".to_string(), None))?; .ok_or_else(|| ApiErr::NotFound("Station not found".to_string(), None))?;
let signals = let signals = station_service::list_station_signals(&state.platform.pool, station_id).await?;
station_service::list_station_signals(&state.platform.pool, station_id).await?;
Ok(Json(json!({ Ok(Json(json!({
"station": station, "station": station,
"signals": signals, "signals": signals,

View File

@ -48,13 +48,11 @@ pub fn build_router(state: AppState) -> Router {
) )
.route( .route(
"/api/segment/{segment_id}/step", "/api/segment/{segment_id}/step",
get(crate::handler::segment::list_steps) get(crate::handler::segment::list_steps).post(crate::handler::segment::create_step),
.post(crate::handler::segment::create_step),
) )
.route( .route(
"/api/segment/{segment_id}/step/{step_no}", "/api/segment/{segment_id}/step/{step_no}",
put(crate::handler::segment::update_step) put(crate::handler::segment::update_step).delete(crate::handler::segment::delete_step),
.delete(crate::handler::segment::delete_step),
) )
.route( .route(
"/api/segment/{segment_id}/interlock", "/api/segment/{segment_id}/interlock",
@ -110,10 +108,7 @@ pub fn build_router(state: AppState) -> Router {
"/api/runtime/station/{station_id}", "/api/runtime/station/{station_id}",
get(crate::handler::runtime::get_station_runtime), get(crate::handler::runtime::get_station_runtime),
) )
.route( .route("/api/event", get(crate::handler::event::get_event_list));
"/api/event",
get(crate::handler::event::get_event_list),
);
let ops_routes = Router::new() let ops_routes = Router::new()
.route("/api/health", get(health_check)) .route("/api/health", get(health_check))

View File

@ -69,7 +69,9 @@ pub async fn ensure_default_templates(pool: &PgPool) -> Result<TemplateReport, s
.bind(segment.code) .bind(segment.code)
.fetch_optional(&mut *tx) .fetch_optional(&mut *tx)
.await?; .await?;
let Some(segment_id) = segment_id else { continue }; let Some(segment_id) = segment_id else {
continue;
};
// Resource declarations are idempotent at the row level (UNIQUE // Resource declarations are idempotent at the row level (UNIQUE
// constraint). Insert each declared key. // constraint). Insert each declared key.
@ -182,8 +184,18 @@ fn public_template_stations() -> Vec<StationTemplate> {
let stations: &[(&'static str, &'static str, &'static str, &'static str)] = &[ let stations: &[(&'static str, &'static str, &'static str, &'static str)] = &[
// (code, name, segment_code, station_type) // (code, name, segment_code, station_type)
("ST-FRONT-LOAD", "前端码车位", "FRONT_LOAD", "load"), ("ST-FRONT-LOAD", "前端码车位", "FRONT_LOAD", "load"),
("ST-FRONT-TRANSFER", "前端摆渡接车位", "FRONT_TRANSFER", "transfer"), (
("ST-TAIL-TRANSFER", "窑尾摆渡接车位", "TAIL_TRANSFER", "transfer"), "ST-FRONT-TRANSFER",
"前端摆渡接车位",
"FRONT_TRANSFER",
"transfer",
),
(
"ST-TAIL-TRANSFER",
"窑尾摆渡接车位",
"TAIL_TRANSFER",
"transfer",
),
("ST-UNLOAD", "卸砖机位", "UNLOAD", "unload"), ("ST-UNLOAD", "卸砖机位", "UNLOAD", "unload"),
("ST-RETURN-IN", "回车线入口位", "RETURN", "return"), ("ST-RETURN-IN", "回车线入口位", "RETURN", "return"),
]; ];
@ -220,24 +232,62 @@ fn default_template_segments() -> Vec<SegmentTemplate> {
fn kiln_template_segments() -> Vec<SegmentTemplate> { fn kiln_template_segments() -> Vec<SegmentTemplate> {
let entries: &[(&'static str, &'static str, &'static str, &'static str, i32)] = &[ let entries: &[(&'static str, &'static str, &'static str, &'static str, i32)] = &[
// (line, code, name, segment_type, priority) // (line, code, name, segment_type, priority)
("KILN_1", "SEG-DRY1-INFEED", "1 号干燥窑进口段", "kiln_infeed", 10), (
("KILN_1", "SEG-DRY1-STEP", "1 号干燥窑内前移段", "kiln_step", 5), "KILN_1",
("KILN_1", "SEG-DRY1-OUTFEED", "1 号干燥窑出口段", "kiln_outfeed", 10), "SEG-DRY1-INFEED",
("KILN_2", "SEG-DRY2-INFEED", "2 号干燥窑进口段", "kiln_infeed", 10), "1 号干燥窑进口段",
("KILN_2", "SEG-DRY2-STEP", "2 号干燥窑内前移段", "kiln_step", 5), "kiln_infeed",
("KILN_2", "SEG-DRY2-OUTFEED", "2 号干燥窑出口段", "kiln_outfeed", 10), 10,
),
(
"KILN_1",
"SEG-DRY1-STEP",
"1 号干燥窑内前移段",
"kiln_step",
5,
),
(
"KILN_1",
"SEG-DRY1-OUTFEED",
"1 号干燥窑出口段",
"kiln_outfeed",
10,
),
(
"KILN_2",
"SEG-DRY2-INFEED",
"2 号干燥窑进口段",
"kiln_infeed",
10,
),
(
"KILN_2",
"SEG-DRY2-STEP",
"2 号干燥窑内前移段",
"kiln_step",
5,
),
(
"KILN_2",
"SEG-DRY2-OUTFEED",
"2 号干燥窑出口段",
"kiln_outfeed",
10,
),
]; ];
entries entries
.iter() .iter()
.map(|(line, code, name, segment_type, priority)| SegmentTemplate { .map(
code, |(line, code, name, segment_type, priority)| SegmentTemplate {
name, code,
segment_type, name,
line_code: line, segment_type,
priority: *priority, line_code: line,
mode: "disabled", priority: *priority,
description: "Seeded skeleton; bind equipment + station signals to enable.", mode: "disabled",
}) description: "Seeded skeleton; bind equipment + station signals to enable.",
},
)
.collect() .collect()
} }
@ -503,11 +553,23 @@ mod tests {
#[test] #[test]
fn resource_keys_match_design_doc_section_7() { fn resource_keys_match_design_doc_section_7() {
assert_eq!(default_segment_resources("SEG-FRONT-TRANSFER"), vec!["transfer_front"]); assert_eq!(
assert_eq!(default_segment_resources("SEG-TAIL-TRANSFER"), vec!["transfer_tail"]); default_segment_resources("SEG-FRONT-TRANSFER"),
assert_eq!(default_segment_resources("SEG-UNLOAD"), vec!["unload_position"]); vec!["transfer_front"]
);
assert_eq!(
default_segment_resources("SEG-TAIL-TRANSFER"),
vec!["transfer_tail"]
);
assert_eq!(
default_segment_resources("SEG-UNLOAD"),
vec!["unload_position"]
);
assert_eq!(default_segment_resources("SEG-RETURN"), vec!["return_line"]); assert_eq!(default_segment_resources("SEG-RETURN"), vec!["return_line"]);
assert_eq!(default_segment_resources("SEG-FRONT-LOAD"), vec!["robot_arm"]); assert_eq!(
default_segment_resources("SEG-FRONT-LOAD"),
vec!["robot_arm"]
);
assert!(default_segment_resources("SEG-DRY1-INFEED").is_empty()); assert!(default_segment_resources("SEG-DRY1-INFEED").is_empty());
} }

View File

@ -16,11 +16,13 @@ pub async fn list_segments(
.bind(line) .bind(line)
.fetch_all(pool) .fetch_all(pool)
.await, .await,
None => sqlx::query_as::<_, ProcessSegment>( None => {
r#"SELECT * FROM process_segment ORDER BY priority DESC, code"#, sqlx::query_as::<_, ProcessSegment>(
) r#"SELECT * FROM process_segment ORDER BY priority DESC, code"#,
.fetch_all(pool) )
.await, .fetch_all(pool)
.await
}
} }
} }
@ -143,10 +145,7 @@ pub async fn delete_segment(pool: &PgPool, segment_id: Uuid) -> Result<bool, sql
// segment_step // segment_step
pub async fn list_steps( pub async fn list_steps(pool: &PgPool, segment_id: Uuid) -> Result<Vec<SegmentStep>, sqlx::Error> {
pool: &PgPool,
segment_id: Uuid,
) -> Result<Vec<SegmentStep>, sqlx::Error> {
sqlx::query_as::<_, SegmentStep>( sqlx::query_as::<_, SegmentStep>(
r#"SELECT * FROM segment_step WHERE segment_id = $1 ORDER BY step_no"#, r#"SELECT * FROM segment_step WHERE segment_id = $1 ORDER BY step_no"#,
) )
@ -319,12 +318,11 @@ pub async fn delete_step(
segment_id: Uuid, segment_id: Uuid,
step_no: i32, step_no: i32,
) -> Result<bool, sqlx::Error> { ) -> Result<bool, sqlx::Error> {
let result = let result = sqlx::query(r#"DELETE FROM segment_step WHERE segment_id = $1 AND step_no = $2"#)
sqlx::query(r#"DELETE FROM segment_step WHERE segment_id = $1 AND step_no = $2"#) .bind(segment_id)
.bind(segment_id) .bind(step_no)
.bind(step_no) .execute(pool)
.execute(pool) .await?;
.await?;
Ok(result.rows_affected() > 0) Ok(result.rows_affected() > 0)
} }
@ -387,13 +385,11 @@ pub async fn delete_interlock(
segment_id: Uuid, segment_id: Uuid,
interlock_id: Uuid, interlock_id: Uuid,
) -> Result<bool, sqlx::Error> { ) -> Result<bool, sqlx::Error> {
let result = sqlx::query( let result = sqlx::query(r#"DELETE FROM segment_interlock WHERE segment_id = $1 AND id = $2"#)
r#"DELETE FROM segment_interlock WHERE segment_id = $1 AND id = $2"#, .bind(segment_id)
) .bind(interlock_id)
.bind(segment_id) .execute(pool)
.bind(interlock_id) .await?;
.execute(pool)
.await?;
Ok(result.rows_affected() > 0) Ok(result.rows_affected() > 0)
} }

View File

@ -8,15 +8,19 @@ pub async fn list_stations(
line_code: Option<&str>, line_code: Option<&str>,
) -> Result<Vec<Station>, sqlx::Error> { ) -> Result<Vec<Station>, sqlx::Error> {
match line_code { match line_code {
Some(line) => sqlx::query_as::<_, Station>( Some(line) => {
r#"SELECT * FROM station WHERE line_code = $1 ORDER BY code"#, sqlx::query_as::<_, Station>(
) r#"SELECT * FROM station WHERE line_code = $1 ORDER BY code"#,
.bind(line) )
.fetch_all(pool) .bind(line)
.await,
None => sqlx::query_as::<_, Station>(r#"SELECT * FROM station ORDER BY code"#)
.fetch_all(pool) .fetch_all(pool)
.await, .await
}
None => {
sqlx::query_as::<_, Station>(r#"SELECT * FROM station ORDER BY code"#)
.fetch_all(pool)
.await
}
} }
} }
@ -180,12 +184,11 @@ pub async fn delete_station_signal(
station_id: Uuid, station_id: Uuid,
signal_role: &str, signal_role: &str,
) -> Result<bool, sqlx::Error> { ) -> Result<bool, sqlx::Error> {
let result = sqlx::query( let result =
r#"DELETE FROM station_signal WHERE station_id = $1 AND signal_role = $2"#, sqlx::query(r#"DELETE FROM station_signal WHERE station_id = $1 AND signal_role = $2"#)
) .bind(station_id)
.bind(station_id) .bind(signal_role)
.bind(signal_role) .execute(pool)
.execute(pool) .await?;
.await?;
Ok(result.rows_affected() > 0) Ok(result.rows_affected() > 0)
} }