Bootstrap operation-system app skeleton

Wires AppState with EventManager, SegmentRuntimeStore, ResourceRegistry
and an engine supervisor that idles until P1 lands the segment schema.
The run() bootstrap connects enabled sources, installs a Ctrl+C handler,
and disconnects on shutdown, matching the feeder app lifecycle. The
router exposes /ws/public, /ws/client/{id}, simple_logger middleware
and a permissive CORS layer.

AppEvent covers the full ops.* taxonomy from the spec; resource lease
tracking includes heartbeat timestamps for the §7 recovery strategy and
has two unit tests for acquire/release semantics.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-18 21:39:09 +08:00
parent 19ace9c2be
commit fd028b1320
11 changed files with 628 additions and 7 deletions

6
Cargo.lock generated
View File

@ -135,14 +135,20 @@ dependencies = [
name = "app_operation_system" name = "app_operation_system"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"axum", "axum",
"chrono",
"dotenv", "dotenv",
"plc_platform_core", "plc_platform_core",
"serde",
"serde_json",
"sqlx", "sqlx",
"tokio", "tokio",
"tower", "tower",
"tower-http", "tower-http",
"tracing", "tracing",
"uuid",
"validator",
] ]
[[package]] [[package]]

View File

@ -10,7 +10,13 @@ axum = { version = "0.8", features = ["ws"] }
tower-http = { version = "0.6", features = ["cors", "fs"] } tower-http = { version = "0.6", features = ["cors", "fs"] }
tracing = "0.1" tracing = "0.1"
dotenv = "0.15" dotenv = "0.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = "0.4"
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "json"] } sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "json"] }
uuid = { version = "1.21", features = ["serde", "v4"] }
validator = { version = "0.20", features = ["derive"] }
anyhow = "1.0"
[dev-dependencies] [dev-dependencies]
tower = { version = "0.5", features = ["util"] } tower = { version = "0.5", features = ["util"] }

View File

@ -1,6 +1,14 @@
use crate::router::build_router; use std::sync::Arc;
use axum::extract::FromRef; use axum::extract::FromRef;
use plc_platform_core::{bootstrap, platform_context::PlatformContext}; use plc_platform_core::{bootstrap, platform_context::PlatformContext};
use tokio::sync::mpsc;
use crate::{
control::{resource::ResourceRegistry, runtime::SegmentRuntimeStore},
event::EventManager,
router::build_router,
};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct AppConfig { pub struct AppConfig {
@ -26,6 +34,9 @@ pub struct AppState {
pub app_name: &'static str, pub app_name: &'static str,
pub config: AppConfig, pub config: AppConfig,
pub platform: PlatformContext, pub platform: PlatformContext,
pub event_manager: Arc<EventManager>,
pub segment_runtime: Arc<SegmentRuntimeStore>,
pub resource_registry: Arc<ResourceRegistry>,
} }
impl FromRef<AppState> for PlatformContext { impl FromRef<AppState> for PlatformContext {
@ -48,14 +59,44 @@ pub async fn run() {
.expect("Failed to bootstrap platform"); .expect("Failed to bootstrap platform");
let platform = builder.build(); let platform = builder.build();
let event_manager = Arc::new(EventManager::new(
platform.pool.clone(),
Some(platform.ws_manager.clone()),
platform.metadata.clone(),
));
let segment_runtime = Arc::new(SegmentRuntimeStore::new());
let resource_registry = Arc::new(ResourceRegistry::new());
bootstrap::connect_all_enabled_sources(&platform)
.await
.expect("Failed to connect enabled sources");
let state = AppState { let state = AppState {
app_name: "operation-system", app_name: "operation-system",
config, config: config.clone(),
platform, platform,
event_manager,
segment_runtime: segment_runtime.clone(),
resource_registry,
}; };
let app = build_router(state.clone());
bootstrap::serve_app(&state.config.server, "operation-system", app) crate::control::engine::start(state.clone(), segment_runtime);
let app = build_router(state.clone());
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let connection_manager_for_shutdown = state.platform.connection_manager.clone();
bootstrap::install_ctrl_c_shutdown(shutdown_tx);
bootstrap::serve_app_with_graceful_shutdown(
&state.config.server,
"operation-system",
app,
bootstrap::disconnect_all_on_shutdown(
shutdown_rx,
connection_manager_for_shutdown,
"operation-system",
),
)
.await .await
.expect("operation-system server should run"); .expect("operation-system server should run");
} }
@ -68,6 +109,12 @@ pub fn test_state() -> AppState {
let connection_manager = let connection_manager =
std::sync::Arc::new(plc_platform_core::connection::ConnectionManager::new()); std::sync::Arc::new(plc_platform_core::connection::ConnectionManager::new());
let ws_manager = std::sync::Arc::new(plc_platform_core::websocket::WebSocketManager::new()); let ws_manager = std::sync::Arc::new(plc_platform_core::websocket::WebSocketManager::new());
let platform = PlatformContext::new(pool.clone(), connection_manager, ws_manager.clone());
let event_manager = Arc::new(EventManager::new(
pool,
Some(ws_manager),
platform.metadata.clone(),
));
AppState { AppState {
app_name: "operation-system", app_name: "operation-system",
@ -78,6 +125,9 @@ pub fn test_state() -> AppState {
server_port: 0, server_port: 0,
}, },
}, },
platform: PlatformContext::new(pool, connection_manager, ws_manager), platform,
event_manager,
segment_runtime: Arc::new(SegmentRuntimeStore::new()),
resource_registry: Arc::new(ResourceRegistry::new()),
} }
} }

View File

@ -0,0 +1,27 @@
use std::sync::Arc;
use tokio::time::Duration;
use crate::{control::runtime::SegmentRuntimeStore, AppState};
/// Start the segment engine supervisor.
///
/// Skeleton only: at P0 there are no segment tables yet (P1 lands the schema),
/// so the supervisor logs and idles. P3 will replace this with a per-segment
/// task spawner that mirrors feeder's per-unit task model.
pub fn start(state: AppState, store: Arc<SegmentRuntimeStore>) {
tokio::spawn(async move {
supervise(state, store).await;
});
}
async fn supervise(_state: AppState, _store: Arc<SegmentRuntimeStore>) {
let mut interval = tokio::time::interval(Duration::from_secs(10));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tracing::info!("Operation-system engine supervisor started (skeleton; awaiting P1 schema)");
loop {
interval.tick().await;
// Segment supervision will live here once P1 lands process_segment.
}
}

View File

@ -0,0 +1,6 @@
pub use plc_platform_core::control::command;
pub mod engine;
pub mod resource;
pub mod runtime;
pub mod state;

View File

@ -0,0 +1,127 @@
use std::{collections::HashMap, sync::Arc};
use chrono::{DateTime, Utc};
use tokio::sync::RwLock;
use uuid::Uuid;
/// Resource lease held by a segment task.
///
/// `acquired_at` and `heartbeat_at` exist to support the recovery strategy in
/// design doc §7. Resources whose owner task has died (no heartbeat) can be
/// reclaimed by the supervisor.
#[derive(Debug, Clone)]
pub struct ResourceLease {
pub owner_segment_id: Uuid,
pub acquired_at: DateTime<Utc>,
pub heartbeat_at: DateTime<Utc>,
}
/// Named-lock registry for shared resources (transfer car, robot arm, unload
/// position, return line, etc.).
///
/// Segment configuration declares which resources it needs via the
/// `segment_resource` table. The engine acquires before `Executing` and
/// releases on `Completed` (or safe `Faulted`).
#[derive(Clone, Default)]
pub struct ResourceRegistry {
inner: Arc<RwLock<HashMap<String, ResourceLease>>>,
}
impl ResourceRegistry {
pub fn new() -> Self {
Self::default()
}
/// Attempt to take the resource for the given segment.
///
/// Returns `true` if the lease was granted (either freshly or already held
/// by the same segment). Returns `false` if another segment holds it.
pub async fn try_acquire(&self, key: &str, segment_id: Uuid) -> bool {
let mut inner = self.inner.write().await;
match inner.get(key) {
Some(lease) if lease.owner_segment_id != segment_id => false,
_ => {
let now = Utc::now();
inner.insert(
key.to_string(),
ResourceLease {
owner_segment_id: segment_id,
acquired_at: now,
heartbeat_at: now,
},
);
true
}
}
}
/// Refresh the heartbeat for a resource the segment already holds.
/// No-op if the resource is held by someone else or not held.
pub async fn heartbeat(&self, key: &str, segment_id: Uuid) {
let mut inner = self.inner.write().await;
if let Some(lease) = inner.get_mut(key) {
if lease.owner_segment_id == segment_id {
lease.heartbeat_at = Utc::now();
}
}
}
/// Release a resource held by the given segment.
/// No-op if the resource is held by someone else or not held.
pub async fn release(&self, key: &str, segment_id: Uuid) {
let mut inner = self.inner.write().await;
if let Some(lease) = inner.get(key) {
if lease.owner_segment_id == segment_id {
inner.remove(key);
}
}
}
/// Release every resource held by the given segment.
/// Used when a segment task exits or transitions to `Completed`.
pub async fn release_all_for(&self, segment_id: Uuid) {
let mut inner = self.inner.write().await;
inner.retain(|_, lease| lease.owner_segment_id != segment_id);
}
pub async fn snapshot(&self) -> HashMap<String, ResourceLease> {
self.inner.read().await.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn acquire_blocks_other_segment_until_released() {
let registry = ResourceRegistry::new();
let seg_a = Uuid::new_v4();
let seg_b = Uuid::new_v4();
assert!(registry.try_acquire("transfer_front", seg_a).await);
assert!(!registry.try_acquire("transfer_front", seg_b).await);
// same owner can re-acquire (idempotent)
assert!(registry.try_acquire("transfer_front", seg_a).await);
registry.release("transfer_front", seg_a).await;
assert!(registry.try_acquire("transfer_front", seg_b).await);
}
#[tokio::test]
async fn release_all_for_drops_only_owner_leases() {
let registry = ResourceRegistry::new();
let seg_a = Uuid::new_v4();
let seg_b = Uuid::new_v4();
registry.try_acquire("r1", seg_a).await;
registry.try_acquire("r2", seg_a).await;
registry.try_acquire("r3", seg_b).await;
registry.release_all_for(seg_a).await;
let snap = registry.snapshot().await;
assert!(!snap.contains_key("r1"));
assert!(!snap.contains_key("r2"));
assert_eq!(snap.get("r3").unwrap().owner_segment_id, seg_b);
}
}

View File

@ -0,0 +1,101 @@
use std::{collections::HashMap, sync::Arc};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::{Notify, RwLock};
use uuid::Uuid;
use super::state::SegmentState;
/// Per-segment runtime as defined in design doc §4.2.6.
///
/// Held in memory only; reset on restart.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentRuntime {
pub segment_id: Uuid,
pub state: SegmentState,
pub auto_enabled: bool,
pub current_step_no: Option<i32>,
pub step_started_at: Option<DateTime<Utc>>,
pub last_completed_at: Option<DateTime<Utc>>,
pub blocked_reason: Option<String>,
pub fault_message: Option<String>,
pub manual_ack_required: bool,
pub comm_locked: bool,
pub rem_local: bool,
pub held_resources: Vec<String>,
}
impl SegmentRuntime {
pub fn new(segment_id: Uuid) -> Self {
Self {
segment_id,
state: SegmentState::Idle,
auto_enabled: false,
current_step_no: None,
step_started_at: None,
last_completed_at: None,
blocked_reason: None,
fault_message: None,
manual_ack_required: false,
comm_locked: false,
rem_local: false,
held_resources: Vec::new(),
}
}
}
#[derive(Clone, Default)]
pub struct SegmentRuntimeStore {
inner: Arc<RwLock<HashMap<Uuid, SegmentRuntime>>>,
notifiers: Arc<RwLock<HashMap<Uuid, Arc<Notify>>>>,
}
impl SegmentRuntimeStore {
pub fn new() -> Self {
Self::default()
}
pub async fn get(&self, segment_id: Uuid) -> Option<SegmentRuntime> {
self.inner.read().await.get(&segment_id).cloned()
}
pub async fn get_or_init(&self, segment_id: Uuid) -> SegmentRuntime {
if let Some(runtime) = self.get(segment_id).await {
return runtime;
}
let runtime = SegmentRuntime::new(segment_id);
self.inner
.write()
.await
.insert(segment_id, runtime.clone());
runtime
}
pub async fn upsert(&self, runtime: SegmentRuntime) {
self.inner
.write()
.await
.insert(runtime.segment_id, runtime);
}
pub async fn get_or_create_notify(&self, segment_id: Uuid) -> Arc<Notify> {
self.notifiers
.write()
.await
.entry(segment_id)
.or_insert_with(|| Arc::new(Notify::new()))
.clone()
}
pub async fn get_all(&self) -> HashMap<Uuid, SegmentRuntime> {
self.inner.read().await.clone()
}
pub async fn notify_segment(&self, segment_id: Uuid) {
if let Some(notify) = self.notifiers.read().await.get(&segment_id) {
notify.notify_one();
}
}
}

View File

@ -0,0 +1,18 @@
use serde::{Deserialize, Serialize};
/// Segment execution state per design doc §5.2.
///
/// Matches the 9-state machine derived from spec §13.6.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SegmentState {
Idle,
Checking,
Executing,
Confirming,
Resetting,
Completed,
Blocked,
Faulted,
ManualAckRequired,
}

View File

@ -0,0 +1,266 @@
use std::sync::Arc;
use plc_platform_core::{
event::{record_event, EventInsert, MetadataCache},
websocket::WebSocketManager,
};
use tokio::sync::mpsc;
use uuid::Uuid;
const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024;
/// Operation-system business events.
///
/// Variants here will grow as engine phases land. Each variant maps to a
/// row in the `event` table (via `record_event`) and follows the `ops.*`
/// namespace agreed in the design doc §8.1.
#[derive(Debug, Clone)]
pub enum AppEvent {
SegmentAutoStarted {
segment_id: Uuid,
},
SegmentAutoStopped {
segment_id: Uuid,
},
SegmentStepAdvanced {
segment_id: Uuid,
step_no: i32,
},
SegmentCompleted {
segment_id: Uuid,
},
SegmentBlocked {
segment_id: Uuid,
reason: String,
},
SegmentFaultLocked {
segment_id: Uuid,
message: String,
},
SegmentFaultAcked {
segment_id: Uuid,
},
SegmentCommLocked {
segment_id: Uuid,
},
SegmentCommRecovered {
segment_id: Uuid,
},
StationStateChanged {
station_id: Uuid,
presence: bool,
vacancy: bool,
},
AlarmActionTimeout {
segment_id: Uuid,
step_no: i32,
},
AlarmSignalConflict {
segment_id: Uuid,
message: String,
},
AlarmResourceBusy {
segment_id: Uuid,
resource_key: String,
},
}
pub struct EventManager {
sender: mpsc::Sender<AppEvent>,
}
impl EventManager {
pub fn new(
pool: sqlx::PgPool,
ws_manager: Option<Arc<WebSocketManager>>,
metadata: Arc<MetadataCache>,
) -> Self {
let (sender, mut receiver) = mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
let pool_for_task = pool.clone();
let ws_for_task = ws_manager.clone();
tokio::spawn(async move {
while let Some(event) = receiver.recv().await {
handle_event(event, &pool_for_task, ws_for_task.as_ref(), &metadata).await;
}
});
Self { sender }
}
pub fn send(&self, event: AppEvent) -> Result<(), String> {
match self.sender.try_send(event) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Closed(e)) => {
Err(format!("ops event channel closed ({e:?})"))
}
Err(mpsc::error::TrySendError::Full(e)) => Err(format!("ops event queue full ({e:?})")),
}
}
}
async fn handle_event(
event: AppEvent,
pool: &sqlx::PgPool,
ws_manager: Option<&Arc<WebSocketManager>>,
_metadata: &MetadataCache,
) {
let record: Option<EventInsert> = match &event {
AppEvent::SegmentAutoStarted { segment_id } => Some(EventInsert {
event_type: "ops.segment.auto_started",
level: "info",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Segment {} auto control started", segment_id),
payload: serde_json::json!({ "segment_id": segment_id }),
}),
AppEvent::SegmentAutoStopped { segment_id } => Some(EventInsert {
event_type: "ops.segment.auto_stopped",
level: "info",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Segment {} auto control stopped", segment_id),
payload: serde_json::json!({ "segment_id": segment_id }),
}),
AppEvent::SegmentStepAdvanced {
segment_id,
step_no,
} => Some(EventInsert {
event_type: "ops.segment.step_advanced",
level: "info",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Segment {} advanced to step {}", segment_id, step_no),
payload: serde_json::json!({ "segment_id": segment_id, "step_no": step_no }),
}),
AppEvent::SegmentCompleted { segment_id } => Some(EventInsert {
event_type: "ops.segment.completed",
level: "info",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Segment {} completed", segment_id),
payload: serde_json::json!({ "segment_id": segment_id }),
}),
AppEvent::SegmentBlocked { segment_id, reason } => Some(EventInsert {
event_type: "ops.segment.blocked",
level: "warn",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Segment {} blocked: {}", segment_id, reason),
payload: serde_json::json!({ "segment_id": segment_id, "reason": reason }),
}),
AppEvent::SegmentFaultLocked {
segment_id,
message,
} => Some(EventInsert {
event_type: "ops.segment.fault_locked",
level: "error",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Segment {} fault locked: {}", segment_id, message),
payload: serde_json::json!({ "segment_id": segment_id, "message": message }),
}),
AppEvent::SegmentFaultAcked { segment_id } => Some(EventInsert {
event_type: "ops.segment.fault_acked",
level: "info",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Segment {} fault acknowledged", segment_id),
payload: serde_json::json!({ "segment_id": segment_id }),
}),
AppEvent::SegmentCommLocked { segment_id } => Some(EventInsert {
event_type: "ops.segment.comm_locked",
level: "warn",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Segment {} communication locked", segment_id),
payload: serde_json::json!({ "segment_id": segment_id }),
}),
AppEvent::SegmentCommRecovered { segment_id } => Some(EventInsert {
event_type: "ops.segment.comm_recovered",
level: "info",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Segment {} communication recovered", segment_id),
payload: serde_json::json!({ "segment_id": segment_id }),
}),
AppEvent::StationStateChanged {
station_id,
presence,
vacancy,
} => Some(EventInsert {
event_type: "ops.station.state_changed",
level: "info",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!(
"Station {} state changed (presence={}, vacancy={})",
station_id, presence, vacancy
),
payload: serde_json::json!({
"station_id": station_id,
"presence": presence,
"vacancy": vacancy
}),
}),
AppEvent::AlarmActionTimeout {
segment_id,
step_no,
} => Some(EventInsert {
event_type: "ops.alarm.action_timeout",
level: "error",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!(
"Action timeout on segment {} step {}",
segment_id, step_no
),
payload: serde_json::json!({ "segment_id": segment_id, "step_no": step_no }),
}),
AppEvent::AlarmSignalConflict {
segment_id,
message,
} => Some(EventInsert {
event_type: "ops.alarm.signal_conflict",
level: "error",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!("Signal conflict on segment {}: {}", segment_id, message),
payload: serde_json::json!({ "segment_id": segment_id, "message": message }),
}),
AppEvent::AlarmResourceBusy {
segment_id,
resource_key,
} => Some(EventInsert {
event_type: "ops.alarm.resource_busy",
level: "warn",
unit_id: None,
equipment_id: None,
source_id: None,
message: format!(
"Resource {} busy for segment {}",
resource_key, segment_id
),
payload: serde_json::json!({
"segment_id": segment_id,
"resource_key": resource_key
}),
}),
};
if let Some(record) = record {
record_event(pool, ws_manager.map(Arc::as_ref), record).await;
}
}

View File

@ -1,4 +1,6 @@
pub mod app; pub mod app;
pub mod control;
pub mod event;
pub mod handler; pub mod handler;
pub mod router; pub mod router;

View File

@ -22,6 +22,18 @@ pub fn build_router(state: AppState) -> Router {
"/ui", "/ui",
plc_platform_core::http::static_ui_routes("web/ops", "web/core"), plc_platform_core::http::static_ui_routes("web/ops", "web/core"),
) )
.route(
"/ws/public",
get(plc_platform_core::websocket::public_websocket_handler::<AppState>),
)
.route(
"/ws/client/{client_id}",
get(plc_platform_core::websocket::client_websocket_handler::<AppState>),
)
.layer(axum::middleware::from_fn(
plc_platform_core::http::simple_logger,
))
.layer(plc_platform_core::http::permissive_cors())
.with_state(state) .with_state(state)
} }