Reclaim stale resource leases and refresh heartbeats

Adds ResourceRegistry::sweep_stale and runs it on each supervisor tick
so a panicked or stuck segment task can't keep a shared resource
locked indefinitely. The per-segment task refreshes heartbeat on every
iteration for each key in runtime.held_resources, distinguishing live
owners from dead ones.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-19 08:49:27 +08:00
parent e3e7917078
commit ed1067f6e5
2 changed files with 98 additions and 0 deletions

View File

@ -31,6 +31,10 @@ use crate::{
const APP_NAME: &str = "operation-system";
const SUPERVISOR_INTERVAL_SECS: u64 = 10;
const FAULT_TICK_MS: u64 = 500;
/// Resource leases older than this with no heartbeat are reclaimed by the
/// supervisor. Three supervisor ticks is enough headroom for a slow segment
/// task to refresh, but short enough to recover quickly from panics.
const RESOURCE_LEASE_MAX_AGE_SECS: i64 = 30;
/// Start the engine supervisor. Mirrors the feeder entry point.
pub fn start(state: AppState, store: Arc<SegmentRuntimeStore>) {
@ -66,6 +70,19 @@ async fn supervise(state: AppState, store: Arc<SegmentRuntimeStore>) {
}
Err(err) => tracing::error!("Engine supervisor: list_segments failed: {}", err),
}
// Reclaim stale resource leases (design doc §7 recovery path).
let reclaimed = state
.resource_registry
.sweep_stale(chrono::Duration::seconds(RESOURCE_LEASE_MAX_AGE_SECS))
.await;
for (key, owner) in reclaimed {
tracing::warn!(
"Engine: reclaimed stale resource '{}' previously held by segment {}",
key,
owner
);
}
}
}
@ -190,6 +207,13 @@ async fn segment_task(state: AppState, store: Arc<SegmentRuntimeStore>, segment_
None => false,
};
// Refresh heartbeat on every tick we hold resources. Keeps the
// supervisor sweep from reclaiming a live but slow segment.
let snapshot_for_heartbeat = store.get_or_init(segment_id).await;
for key in &snapshot_for_heartbeat.held_resources {
state.resource_registry.heartbeat(key, segment_id).await;
}
// 5. Decide how long to sleep based on next state.
let snapshot = store.get_or_init(segment_id).await;
if !runtime_changed && should_wait(&snapshot, segment.mode.as_str()) {

View File

@ -87,6 +87,29 @@ impl ResourceRegistry {
pub async fn snapshot(&self) -> HashMap<String, ResourceLease> {
self.inner.read().await.clone()
}
/// Drop any lease whose heartbeat is older than `max_age`. Returns the keys
/// that were reclaimed so the caller can log or alarm.
///
/// Recovery path from design doc §7 — a panicked or stuck segment task can
/// otherwise keep a public resource locked indefinitely.
pub async fn sweep_stale(
&self,
max_age: chrono::Duration,
) -> Vec<(String, Uuid)> {
let cutoff = Utc::now() - max_age;
let mut reclaimed = Vec::new();
let mut inner = self.inner.write().await;
inner.retain(|key, lease| {
if lease.heartbeat_at < cutoff {
reclaimed.push((key.clone(), lease.owner_segment_id));
false
} else {
true
}
});
reclaimed
}
}
#[cfg(test)]
@ -124,4 +147,55 @@ mod tests {
assert!(!snap.contains_key("r2"));
assert_eq!(snap.get("r3").unwrap().owner_segment_id, seg_b);
}
#[tokio::test]
async fn heartbeat_refreshes_only_owner_lease() {
let registry = ResourceRegistry::new();
let seg_a = Uuid::new_v4();
let seg_b = Uuid::new_v4();
registry.try_acquire("r1", seg_a).await;
// Force the existing lease to look ancient.
{
let mut inner = registry.inner.write().await;
if let Some(lease) = inner.get_mut("r1") {
lease.heartbeat_at = Utc::now() - chrono::Duration::seconds(120);
}
}
// Other-owner heartbeat is rejected.
registry.heartbeat("r1", seg_b).await;
let before = registry.snapshot().await.get("r1").unwrap().heartbeat_at;
assert!(Utc::now() - before > chrono::Duration::seconds(60));
// Owner heartbeat updates timestamp.
registry.heartbeat("r1", seg_a).await;
let after = registry.snapshot().await.get("r1").unwrap().heartbeat_at;
assert!(Utc::now() - after < chrono::Duration::seconds(10));
}
#[tokio::test]
async fn sweep_stale_reclaims_only_old_leases() {
let registry = ResourceRegistry::new();
let seg_a = Uuid::new_v4();
let seg_b = Uuid::new_v4();
registry.try_acquire("stale", seg_a).await;
registry.try_acquire("fresh", seg_b).await;
// Age out the "stale" lease.
{
let mut inner = registry.inner.write().await;
if let Some(lease) = inner.get_mut("stale") {
lease.heartbeat_at = Utc::now() - chrono::Duration::seconds(60);
}
}
let reclaimed = registry.sweep_stale(chrono::Duration::seconds(30)).await;
assert_eq!(reclaimed.len(), 1);
assert_eq!(reclaimed[0].0, "stale");
assert_eq!(reclaimed[0].1, seg_a);
let snap = registry.snapshot().await;
assert!(!snap.contains_key("stale"));
assert!(snap.contains_key("fresh"));
}
}