From ed1067f6e5c272b2c4e65bd6da7de77a81d10097 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 19 May 2026 08:49:27 +0800 Subject: [PATCH] Reclaim stale resource leases and refresh heartbeats Adds ResourceRegistry::sweep_stale and runs it on each supervisor tick so a panicked or stuck segment task can't keep a shared resource locked indefinitely. The per-segment task refreshes heartbeat on every iteration for each key in runtime.held_resources, distinguishing live owners from dead ones. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/control/engine.rs | 24 ++++++ .../src/control/resource.rs | 74 +++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/crates/app_operation_system/src/control/engine.rs b/crates/app_operation_system/src/control/engine.rs index 846c6b9..23682a2 100644 --- a/crates/app_operation_system/src/control/engine.rs +++ b/crates/app_operation_system/src/control/engine.rs @@ -31,6 +31,10 @@ use crate::{ const APP_NAME: &str = "operation-system"; const SUPERVISOR_INTERVAL_SECS: u64 = 10; const FAULT_TICK_MS: u64 = 500; +/// Resource leases older than this with no heartbeat are reclaimed by the +/// supervisor. Three supervisor ticks is enough headroom for a slow segment +/// task to refresh, but short enough to recover quickly from panics. +const RESOURCE_LEASE_MAX_AGE_SECS: i64 = 30; /// Start the engine supervisor. Mirrors the feeder entry point. pub fn start(state: AppState, store: Arc) { @@ -66,6 +70,19 @@ async fn supervise(state: AppState, store: Arc) { } Err(err) => tracing::error!("Engine supervisor: list_segments failed: {}", err), } + + // Reclaim stale resource leases (design doc §7 recovery path). + let reclaimed = state + .resource_registry + .sweep_stale(chrono::Duration::seconds(RESOURCE_LEASE_MAX_AGE_SECS)) + .await; + for (key, owner) in reclaimed { + tracing::warn!( + "Engine: reclaimed stale resource '{}' previously held by segment {}", + key, + owner + ); + } } } @@ -190,6 +207,13 @@ async fn segment_task(state: AppState, store: Arc, segment_ None => false, }; + // Refresh heartbeat on every tick we hold resources. Keeps the + // supervisor sweep from reclaiming a live but slow segment. + let snapshot_for_heartbeat = store.get_or_init(segment_id).await; + for key in &snapshot_for_heartbeat.held_resources { + state.resource_registry.heartbeat(key, segment_id).await; + } + // 5. Decide how long to sleep based on next state. let snapshot = store.get_or_init(segment_id).await; if !runtime_changed && should_wait(&snapshot, segment.mode.as_str()) { diff --git a/crates/app_operation_system/src/control/resource.rs b/crates/app_operation_system/src/control/resource.rs index 05ff1a7..67c7e89 100644 --- a/crates/app_operation_system/src/control/resource.rs +++ b/crates/app_operation_system/src/control/resource.rs @@ -87,6 +87,29 @@ impl ResourceRegistry { pub async fn snapshot(&self) -> HashMap { self.inner.read().await.clone() } + + /// Drop any lease whose heartbeat is older than `max_age`. Returns the keys + /// that were reclaimed so the caller can log or alarm. + /// + /// Recovery path from design doc §7 — a panicked or stuck segment task can + /// otherwise keep a public resource locked indefinitely. + pub async fn sweep_stale( + &self, + max_age: chrono::Duration, + ) -> Vec<(String, Uuid)> { + let cutoff = Utc::now() - max_age; + let mut reclaimed = Vec::new(); + let mut inner = self.inner.write().await; + inner.retain(|key, lease| { + if lease.heartbeat_at < cutoff { + reclaimed.push((key.clone(), lease.owner_segment_id)); + false + } else { + true + } + }); + reclaimed + } } #[cfg(test)] @@ -124,4 +147,55 @@ mod tests { assert!(!snap.contains_key("r2")); assert_eq!(snap.get("r3").unwrap().owner_segment_id, seg_b); } + + #[tokio::test] + async fn heartbeat_refreshes_only_owner_lease() { + let registry = ResourceRegistry::new(); + let seg_a = Uuid::new_v4(); + let seg_b = Uuid::new_v4(); + registry.try_acquire("r1", seg_a).await; + + // Force the existing lease to look ancient. + { + let mut inner = registry.inner.write().await; + if let Some(lease) = inner.get_mut("r1") { + lease.heartbeat_at = Utc::now() - chrono::Duration::seconds(120); + } + } + // Other-owner heartbeat is rejected. + registry.heartbeat("r1", seg_b).await; + let before = registry.snapshot().await.get("r1").unwrap().heartbeat_at; + assert!(Utc::now() - before > chrono::Duration::seconds(60)); + + // Owner heartbeat updates timestamp. + registry.heartbeat("r1", seg_a).await; + let after = registry.snapshot().await.get("r1").unwrap().heartbeat_at; + assert!(Utc::now() - after < chrono::Duration::seconds(10)); + } + + #[tokio::test] + async fn sweep_stale_reclaims_only_old_leases() { + let registry = ResourceRegistry::new(); + let seg_a = Uuid::new_v4(); + let seg_b = Uuid::new_v4(); + registry.try_acquire("stale", seg_a).await; + registry.try_acquire("fresh", seg_b).await; + + // Age out the "stale" lease. + { + let mut inner = registry.inner.write().await; + if let Some(lease) = inner.get_mut("stale") { + lease.heartbeat_at = Utc::now() - chrono::Duration::seconds(60); + } + } + + let reclaimed = registry.sweep_stale(chrono::Duration::seconds(30)).await; + assert_eq!(reclaimed.len(), 1); + assert_eq!(reclaimed[0].0, "stale"); + assert_eq!(reclaimed[0].1, seg_a); + + let snap = registry.snapshot().await; + assert!(!snap.contains_key("stale")); + assert!(snap.contains_key("fresh")); + } }