diff --git a/Cargo.lock b/Cargo.lock index 041d3e3..9dcf1bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,14 +135,20 @@ dependencies = [ name = "app_operation_system" version = "0.1.0" dependencies = [ + "anyhow", "axum", + "chrono", "dotenv", "plc_platform_core", + "serde", + "serde_json", "sqlx", "tokio", "tower", "tower-http", "tracing", + "uuid", + "validator", ] [[package]] diff --git a/crates/app_operation_system/Cargo.toml b/crates/app_operation_system/Cargo.toml index e6c68a5..5aa6ddd 100644 --- a/crates/app_operation_system/Cargo.toml +++ b/crates/app_operation_system/Cargo.toml @@ -10,7 +10,13 @@ axum = { version = "0.8", features = ["ws"] } tower-http = { version = "0.6", features = ["cors", "fs"] } tracing = "0.1" dotenv = "0.15" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +chrono = "0.4" sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "json"] } +uuid = { version = "1.21", features = ["serde", "v4"] } +validator = { version = "0.20", features = ["derive"] } +anyhow = "1.0" [dev-dependencies] tower = { version = "0.5", features = ["util"] } diff --git a/crates/app_operation_system/src/app.rs b/crates/app_operation_system/src/app.rs index 79d6f91..7b94b9e 100644 --- a/crates/app_operation_system/src/app.rs +++ b/crates/app_operation_system/src/app.rs @@ -1,6 +1,14 @@ -use crate::router::build_router; +use std::sync::Arc; + use axum::extract::FromRef; use plc_platform_core::{bootstrap, platform_context::PlatformContext}; +use tokio::sync::mpsc; + +use crate::{ + control::{resource::ResourceRegistry, runtime::SegmentRuntimeStore}, + event::EventManager, + router::build_router, +}; #[derive(Clone, Debug)] pub struct AppConfig { @@ -26,6 +34,9 @@ pub struct AppState { pub app_name: &'static str, pub config: AppConfig, pub platform: PlatformContext, + pub event_manager: Arc, + pub segment_runtime: Arc, + pub resource_registry: Arc, } impl FromRef for PlatformContext { @@ -48,16 +59,46 @@ pub async fn run() { .expect("Failed to bootstrap platform"); let platform = builder.build(); + let event_manager = Arc::new(EventManager::new( + platform.pool.clone(), + Some(platform.ws_manager.clone()), + platform.metadata.clone(), + )); + let segment_runtime = Arc::new(SegmentRuntimeStore::new()); + let resource_registry = Arc::new(ResourceRegistry::new()); + + bootstrap::connect_all_enabled_sources(&platform) + .await + .expect("Failed to connect enabled sources"); + let state = AppState { app_name: "operation-system", - config, + config: config.clone(), platform, + event_manager, + segment_runtime: segment_runtime.clone(), + resource_registry, }; - let app = build_router(state.clone()); - bootstrap::serve_app(&state.config.server, "operation-system", app) - .await - .expect("operation-system server should run"); + crate::control::engine::start(state.clone(), segment_runtime); + + let app = build_router(state.clone()); + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + let connection_manager_for_shutdown = state.platform.connection_manager.clone(); + bootstrap::install_ctrl_c_shutdown(shutdown_tx); + + bootstrap::serve_app_with_graceful_shutdown( + &state.config.server, + "operation-system", + app, + bootstrap::disconnect_all_on_shutdown( + shutdown_rx, + connection_manager_for_shutdown, + "operation-system", + ), + ) + .await + .expect("operation-system server should run"); } pub fn test_state() -> AppState { @@ -68,6 +109,12 @@ pub fn test_state() -> AppState { let connection_manager = std::sync::Arc::new(plc_platform_core::connection::ConnectionManager::new()); let ws_manager = std::sync::Arc::new(plc_platform_core::websocket::WebSocketManager::new()); + let platform = PlatformContext::new(pool.clone(), connection_manager, ws_manager.clone()); + let event_manager = Arc::new(EventManager::new( + pool, + Some(ws_manager), + platform.metadata.clone(), + )); AppState { app_name: "operation-system", @@ -78,6 +125,9 @@ pub fn test_state() -> AppState { server_port: 0, }, }, - platform: PlatformContext::new(pool, connection_manager, ws_manager), + platform, + event_manager, + segment_runtime: Arc::new(SegmentRuntimeStore::new()), + resource_registry: Arc::new(ResourceRegistry::new()), } } diff --git a/crates/app_operation_system/src/control/engine.rs b/crates/app_operation_system/src/control/engine.rs new file mode 100644 index 0000000..ef405e1 --- /dev/null +++ b/crates/app_operation_system/src/control/engine.rs @@ -0,0 +1,27 @@ +use std::sync::Arc; + +use tokio::time::Duration; + +use crate::{control::runtime::SegmentRuntimeStore, AppState}; + +/// Start the segment engine supervisor. +/// +/// Skeleton only: at P0 there are no segment tables yet (P1 lands the schema), +/// so the supervisor logs and idles. P3 will replace this with a per-segment +/// task spawner that mirrors feeder's per-unit task model. +pub fn start(state: AppState, store: Arc) { + tokio::spawn(async move { + supervise(state, store).await; + }); +} + +async fn supervise(_state: AppState, _store: Arc) { + let mut interval = tokio::time::interval(Duration::from_secs(10)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + tracing::info!("Operation-system engine supervisor started (skeleton; awaiting P1 schema)"); + + loop { + interval.tick().await; + // Segment supervision will live here once P1 lands process_segment. + } +} diff --git a/crates/app_operation_system/src/control/mod.rs b/crates/app_operation_system/src/control/mod.rs new file mode 100644 index 0000000..3af9eb4 --- /dev/null +++ b/crates/app_operation_system/src/control/mod.rs @@ -0,0 +1,6 @@ +pub use plc_platform_core::control::command; + +pub mod engine; +pub mod resource; +pub mod runtime; +pub mod state; diff --git a/crates/app_operation_system/src/control/resource.rs b/crates/app_operation_system/src/control/resource.rs new file mode 100644 index 0000000..05ff1a7 --- /dev/null +++ b/crates/app_operation_system/src/control/resource.rs @@ -0,0 +1,127 @@ +use std::{collections::HashMap, sync::Arc}; + +use chrono::{DateTime, Utc}; +use tokio::sync::RwLock; +use uuid::Uuid; + +/// Resource lease held by a segment task. +/// +/// `acquired_at` and `heartbeat_at` exist to support the recovery strategy in +/// design doc §7. Resources whose owner task has died (no heartbeat) can be +/// reclaimed by the supervisor. +#[derive(Debug, Clone)] +pub struct ResourceLease { + pub owner_segment_id: Uuid, + pub acquired_at: DateTime, + pub heartbeat_at: DateTime, +} + +/// Named-lock registry for shared resources (transfer car, robot arm, unload +/// position, return line, etc.). +/// +/// Segment configuration declares which resources it needs via the +/// `segment_resource` table. The engine acquires before `Executing` and +/// releases on `Completed` (or safe `Faulted`). +#[derive(Clone, Default)] +pub struct ResourceRegistry { + inner: Arc>>, +} + +impl ResourceRegistry { + pub fn new() -> Self { + Self::default() + } + + /// Attempt to take the resource for the given segment. + /// + /// Returns `true` if the lease was granted (either freshly or already held + /// by the same segment). Returns `false` if another segment holds it. + pub async fn try_acquire(&self, key: &str, segment_id: Uuid) -> bool { + let mut inner = self.inner.write().await; + match inner.get(key) { + Some(lease) if lease.owner_segment_id != segment_id => false, + _ => { + let now = Utc::now(); + inner.insert( + key.to_string(), + ResourceLease { + owner_segment_id: segment_id, + acquired_at: now, + heartbeat_at: now, + }, + ); + true + } + } + } + + /// Refresh the heartbeat for a resource the segment already holds. + /// No-op if the resource is held by someone else or not held. + pub async fn heartbeat(&self, key: &str, segment_id: Uuid) { + let mut inner = self.inner.write().await; + if let Some(lease) = inner.get_mut(key) { + if lease.owner_segment_id == segment_id { + lease.heartbeat_at = Utc::now(); + } + } + } + + /// Release a resource held by the given segment. + /// No-op if the resource is held by someone else or not held. + pub async fn release(&self, key: &str, segment_id: Uuid) { + let mut inner = self.inner.write().await; + if let Some(lease) = inner.get(key) { + if lease.owner_segment_id == segment_id { + inner.remove(key); + } + } + } + + /// Release every resource held by the given segment. + /// Used when a segment task exits or transitions to `Completed`. + pub async fn release_all_for(&self, segment_id: Uuid) { + let mut inner = self.inner.write().await; + inner.retain(|_, lease| lease.owner_segment_id != segment_id); + } + + pub async fn snapshot(&self) -> HashMap { + self.inner.read().await.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn acquire_blocks_other_segment_until_released() { + let registry = ResourceRegistry::new(); + let seg_a = Uuid::new_v4(); + let seg_b = Uuid::new_v4(); + + assert!(registry.try_acquire("transfer_front", seg_a).await); + assert!(!registry.try_acquire("transfer_front", seg_b).await); + // same owner can re-acquire (idempotent) + assert!(registry.try_acquire("transfer_front", seg_a).await); + + registry.release("transfer_front", seg_a).await; + assert!(registry.try_acquire("transfer_front", seg_b).await); + } + + #[tokio::test] + async fn release_all_for_drops_only_owner_leases() { + let registry = ResourceRegistry::new(); + let seg_a = Uuid::new_v4(); + let seg_b = Uuid::new_v4(); + + registry.try_acquire("r1", seg_a).await; + registry.try_acquire("r2", seg_a).await; + registry.try_acquire("r3", seg_b).await; + + registry.release_all_for(seg_a).await; + let snap = registry.snapshot().await; + assert!(!snap.contains_key("r1")); + assert!(!snap.contains_key("r2")); + assert_eq!(snap.get("r3").unwrap().owner_segment_id, seg_b); + } +} diff --git a/crates/app_operation_system/src/control/runtime.rs b/crates/app_operation_system/src/control/runtime.rs new file mode 100644 index 0000000..b40bd25 --- /dev/null +++ b/crates/app_operation_system/src/control/runtime.rs @@ -0,0 +1,101 @@ +use std::{collections::HashMap, sync::Arc}; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{Notify, RwLock}; +use uuid::Uuid; + +use super::state::SegmentState; + +/// Per-segment runtime as defined in design doc §4.2.6. +/// +/// Held in memory only; reset on restart. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SegmentRuntime { + pub segment_id: Uuid, + pub state: SegmentState, + pub auto_enabled: bool, + pub current_step_no: Option, + pub step_started_at: Option>, + pub last_completed_at: Option>, + pub blocked_reason: Option, + pub fault_message: Option, + pub manual_ack_required: bool, + pub comm_locked: bool, + pub rem_local: bool, + pub held_resources: Vec, +} + +impl SegmentRuntime { + pub fn new(segment_id: Uuid) -> Self { + Self { + segment_id, + state: SegmentState::Idle, + auto_enabled: false, + current_step_no: None, + step_started_at: None, + last_completed_at: None, + blocked_reason: None, + fault_message: None, + manual_ack_required: false, + comm_locked: false, + rem_local: false, + held_resources: Vec::new(), + } + } +} + +#[derive(Clone, Default)] +pub struct SegmentRuntimeStore { + inner: Arc>>, + notifiers: Arc>>>, +} + +impl SegmentRuntimeStore { + pub fn new() -> Self { + Self::default() + } + + pub async fn get(&self, segment_id: Uuid) -> Option { + self.inner.read().await.get(&segment_id).cloned() + } + + pub async fn get_or_init(&self, segment_id: Uuid) -> SegmentRuntime { + if let Some(runtime) = self.get(segment_id).await { + return runtime; + } + + let runtime = SegmentRuntime::new(segment_id); + self.inner + .write() + .await + .insert(segment_id, runtime.clone()); + runtime + } + + pub async fn upsert(&self, runtime: SegmentRuntime) { + self.inner + .write() + .await + .insert(runtime.segment_id, runtime); + } + + pub async fn get_or_create_notify(&self, segment_id: Uuid) -> Arc { + self.notifiers + .write() + .await + .entry(segment_id) + .or_insert_with(|| Arc::new(Notify::new())) + .clone() + } + + pub async fn get_all(&self) -> HashMap { + self.inner.read().await.clone() + } + + pub async fn notify_segment(&self, segment_id: Uuid) { + if let Some(notify) = self.notifiers.read().await.get(&segment_id) { + notify.notify_one(); + } + } +} diff --git a/crates/app_operation_system/src/control/state.rs b/crates/app_operation_system/src/control/state.rs new file mode 100644 index 0000000..ae1c2fd --- /dev/null +++ b/crates/app_operation_system/src/control/state.rs @@ -0,0 +1,18 @@ +use serde::{Deserialize, Serialize}; + +/// Segment execution state per design doc §5.2. +/// +/// Matches the 9-state machine derived from spec §13.6. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum SegmentState { + Idle, + Checking, + Executing, + Confirming, + Resetting, + Completed, + Blocked, + Faulted, + ManualAckRequired, +} diff --git a/crates/app_operation_system/src/event.rs b/crates/app_operation_system/src/event.rs new file mode 100644 index 0000000..8b7585d --- /dev/null +++ b/crates/app_operation_system/src/event.rs @@ -0,0 +1,266 @@ +use std::sync::Arc; + +use plc_platform_core::{ + event::{record_event, EventInsert, MetadataCache}, + websocket::WebSocketManager, +}; +use tokio::sync::mpsc; +use uuid::Uuid; + +const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; + +/// Operation-system business events. +/// +/// Variants here will grow as engine phases land. Each variant maps to a +/// row in the `event` table (via `record_event`) and follows the `ops.*` +/// namespace agreed in the design doc §8.1. +#[derive(Debug, Clone)] +pub enum AppEvent { + SegmentAutoStarted { + segment_id: Uuid, + }, + SegmentAutoStopped { + segment_id: Uuid, + }, + SegmentStepAdvanced { + segment_id: Uuid, + step_no: i32, + }, + SegmentCompleted { + segment_id: Uuid, + }, + SegmentBlocked { + segment_id: Uuid, + reason: String, + }, + SegmentFaultLocked { + segment_id: Uuid, + message: String, + }, + SegmentFaultAcked { + segment_id: Uuid, + }, + SegmentCommLocked { + segment_id: Uuid, + }, + SegmentCommRecovered { + segment_id: Uuid, + }, + StationStateChanged { + station_id: Uuid, + presence: bool, + vacancy: bool, + }, + AlarmActionTimeout { + segment_id: Uuid, + step_no: i32, + }, + AlarmSignalConflict { + segment_id: Uuid, + message: String, + }, + AlarmResourceBusy { + segment_id: Uuid, + resource_key: String, + }, +} + +pub struct EventManager { + sender: mpsc::Sender, +} + +impl EventManager { + pub fn new( + pool: sqlx::PgPool, + ws_manager: Option>, + metadata: Arc, + ) -> Self { + let (sender, mut receiver) = mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); + + let pool_for_task = pool.clone(); + let ws_for_task = ws_manager.clone(); + tokio::spawn(async move { + while let Some(event) = receiver.recv().await { + handle_event(event, &pool_for_task, ws_for_task.as_ref(), &metadata).await; + } + }); + + Self { sender } + } + + pub fn send(&self, event: AppEvent) -> Result<(), String> { + match self.sender.try_send(event) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Closed(e)) => { + Err(format!("ops event channel closed ({e:?})")) + } + Err(mpsc::error::TrySendError::Full(e)) => Err(format!("ops event queue full ({e:?})")), + } + } +} + +async fn handle_event( + event: AppEvent, + pool: &sqlx::PgPool, + ws_manager: Option<&Arc>, + _metadata: &MetadataCache, +) { + let record: Option = match &event { + AppEvent::SegmentAutoStarted { segment_id } => Some(EventInsert { + event_type: "ops.segment.auto_started", + level: "info", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Segment {} auto control started", segment_id), + payload: serde_json::json!({ "segment_id": segment_id }), + }), + AppEvent::SegmentAutoStopped { segment_id } => Some(EventInsert { + event_type: "ops.segment.auto_stopped", + level: "info", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Segment {} auto control stopped", segment_id), + payload: serde_json::json!({ "segment_id": segment_id }), + }), + AppEvent::SegmentStepAdvanced { + segment_id, + step_no, + } => Some(EventInsert { + event_type: "ops.segment.step_advanced", + level: "info", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Segment {} advanced to step {}", segment_id, step_no), + payload: serde_json::json!({ "segment_id": segment_id, "step_no": step_no }), + }), + AppEvent::SegmentCompleted { segment_id } => Some(EventInsert { + event_type: "ops.segment.completed", + level: "info", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Segment {} completed", segment_id), + payload: serde_json::json!({ "segment_id": segment_id }), + }), + AppEvent::SegmentBlocked { segment_id, reason } => Some(EventInsert { + event_type: "ops.segment.blocked", + level: "warn", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Segment {} blocked: {}", segment_id, reason), + payload: serde_json::json!({ "segment_id": segment_id, "reason": reason }), + }), + AppEvent::SegmentFaultLocked { + segment_id, + message, + } => Some(EventInsert { + event_type: "ops.segment.fault_locked", + level: "error", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Segment {} fault locked: {}", segment_id, message), + payload: serde_json::json!({ "segment_id": segment_id, "message": message }), + }), + AppEvent::SegmentFaultAcked { segment_id } => Some(EventInsert { + event_type: "ops.segment.fault_acked", + level: "info", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Segment {} fault acknowledged", segment_id), + payload: serde_json::json!({ "segment_id": segment_id }), + }), + AppEvent::SegmentCommLocked { segment_id } => Some(EventInsert { + event_type: "ops.segment.comm_locked", + level: "warn", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Segment {} communication locked", segment_id), + payload: serde_json::json!({ "segment_id": segment_id }), + }), + AppEvent::SegmentCommRecovered { segment_id } => Some(EventInsert { + event_type: "ops.segment.comm_recovered", + level: "info", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Segment {} communication recovered", segment_id), + payload: serde_json::json!({ "segment_id": segment_id }), + }), + AppEvent::StationStateChanged { + station_id, + presence, + vacancy, + } => Some(EventInsert { + event_type: "ops.station.state_changed", + level: "info", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!( + "Station {} state changed (presence={}, vacancy={})", + station_id, presence, vacancy + ), + payload: serde_json::json!({ + "station_id": station_id, + "presence": presence, + "vacancy": vacancy + }), + }), + AppEvent::AlarmActionTimeout { + segment_id, + step_no, + } => Some(EventInsert { + event_type: "ops.alarm.action_timeout", + level: "error", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!( + "Action timeout on segment {} step {}", + segment_id, step_no + ), + payload: serde_json::json!({ "segment_id": segment_id, "step_no": step_no }), + }), + AppEvent::AlarmSignalConflict { + segment_id, + message, + } => Some(EventInsert { + event_type: "ops.alarm.signal_conflict", + level: "error", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!("Signal conflict on segment {}: {}", segment_id, message), + payload: serde_json::json!({ "segment_id": segment_id, "message": message }), + }), + AppEvent::AlarmResourceBusy { + segment_id, + resource_key, + } => Some(EventInsert { + event_type: "ops.alarm.resource_busy", + level: "warn", + unit_id: None, + equipment_id: None, + source_id: None, + message: format!( + "Resource {} busy for segment {}", + resource_key, segment_id + ), + payload: serde_json::json!({ + "segment_id": segment_id, + "resource_key": resource_key + }), + }), + }; + + if let Some(record) = record { + record_event(pool, ws_manager.map(Arc::as_ref), record).await; + } +} diff --git a/crates/app_operation_system/src/lib.rs b/crates/app_operation_system/src/lib.rs index fc8028d..f54e2f1 100644 --- a/crates/app_operation_system/src/lib.rs +++ b/crates/app_operation_system/src/lib.rs @@ -1,4 +1,6 @@ pub mod app; +pub mod control; +pub mod event; pub mod handler; pub mod router; diff --git a/crates/app_operation_system/src/router.rs b/crates/app_operation_system/src/router.rs index 61703cf..28ed1f9 100644 --- a/crates/app_operation_system/src/router.rs +++ b/crates/app_operation_system/src/router.rs @@ -22,6 +22,18 @@ pub fn build_router(state: AppState) -> Router { "/ui", plc_platform_core::http::static_ui_routes("web/ops", "web/core"), ) + .route( + "/ws/public", + get(plc_platform_core::websocket::public_websocket_handler::), + ) + .route( + "/ws/client/{client_id}", + get(plc_platform_core::websocket::client_websocket_handler::), + ) + .layer(axum::middleware::from_fn( + plc_platform_core::http::simple_logger, + )) + .layer(plc_platform_core::http::permissive_cors()) .with_state(state) }