use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Notify; use tokio::time::Duration; use uuid::Uuid; use crate::{ control::{ command::send_pulse_command, runtime::{ControlRuntimeStore, UnitRuntime, UnitRuntimeState}, }, event::AppEvent, service::EquipmentRolePoint, telemetry::{PointMonitorInfo, PointQuality}, websocket::WsMessage, AppState, }; /// Start the engine: a supervisor spawns one async task per enabled unit. pub fn start(state: AppState, runtime_store: Arc) { tokio::spawn(async move { supervise(state, runtime_store).await; }); } /// Supervisor: scans for enabled units every 10 s and ensures each has a running task. /// Uses JoinHandle to detect exited tasks so disabled-then-re-enabled units are restarted. async fn supervise(state: AppState, store: Arc) { let mut tasks: HashMap> = HashMap::new(); let mut interval = tokio::time::interval(Duration::from_secs(10)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; match crate::service::get_all_enabled_units(&state.pool).await { Ok(units) => { for unit in units { let needs_spawn = tasks .get(&unit.id) .map_or(true, |h| h.is_finished()); if needs_spawn { let s = state.clone(); let st = store.clone(); let handle = tokio::spawn(async move { unit_task(s, st, unit.id).await; }); tasks.insert(unit.id, handle); } } } Err(e) => tracing::error!("Engine supervisor: failed to load units: {}", e), } } } // ── Per-unit task ───────────────────────────────────────────────────────────── async fn unit_task(state: AppState, store: Arc, unit_id: Uuid) { let notify = store.get_or_create_notify(unit_id).await; // Fault/comm check ticker — still need periodic polling of point monitor data. let mut fault_tick = tokio::time::interval(Duration::from_millis(500)); fault_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { // Reload unit config on each iteration to detect disable/delete. let unit = match crate::service::get_unit_by_id(&state.pool, unit_id).await { Ok(Some(u)) if u.enabled => u, Ok(_) => { tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id); return; } Err(e) => { tracing::error!("Engine: unit {} config reload failed: {}", unit_id, e); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; // ── Fault / comm check ──────────────────────────────────────────────── let (kind_roles, kind_eq_ids, all_roles) = match load_equipment_maps(&state, unit_id).await { Ok(maps) => maps, Err(e) => { tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; let mut runtime = store.get_or_init(unit_id).await; if check_fault_comm(&state, &mut runtime, &unit, &all_roles).await { store.upsert(runtime.clone()).await; push_ws(&state, &runtime).await; } // ── Wait when not active ────────────────────────────────────────────── if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required { tokio::select! { _ = fault_tick.tick() => {} _ = notify.notified() => { // Push fresh runtime immediately so the frontend reflects the change // (e.g. auto_enabled toggled) without waiting for the next state transition. let runtime = store.get_or_init(unit_id).await; push_ws(&state, &runtime).await; } } continue; } // ── State machine step ──────────────────────────────────────────────── match runtime.state { UnitRuntimeState::Stopped => { // Wait stop_time_sec (0 = skip wait, start immediately). if !wait_phase(&state, &store, &unit, &all_roles, ¬ify, &mut fault_tick).await { continue; } // Send feeder start command. let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "start_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = cmd { if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e); continue; } if state.config.simulate_plc { if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await; } } } let mut runtime = store.get_or_init(unit_id).await; runtime.state = UnitRuntimeState::Running; store.upsert(runtime.clone()).await; push_ws(&state, &runtime).await; } UnitRuntimeState::Running => { // Wait run_time_sec. run_time_sec == 0 means run without a time limit // (relies on acc_time_sec to eventually stop). Treat as a very long phase. let secs = if unit.run_time_sec > 0 { unit.run_time_sec } else { i32::MAX }; let unit_for_wait = crate::model::ControlUnit { run_time_sec: secs, ..unit.clone() }; if !wait_phase(&state, &store, &unit_for_wait, &all_roles, ¬ify, &mut fault_tick).await { continue; } // Stop feeder. let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = cmd { if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e); continue; } if state.config.simulate_plc { if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await; } } } let mut runtime = store.get_or_init(unit_id).await; runtime.accumulated_run_sec += secs as i64 * 1000; runtime.display_acc_sec = runtime.accumulated_run_sec; if unit.acc_time_sec > 0 && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 { // Accumulated threshold reached — start distributor. let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; let dist_cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "start_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = dist_cmd { if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e); } else if state.config.simulate_plc { if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await; } } } runtime.state = UnitRuntimeState::DistributorRunning; } else { runtime.state = UnitRuntimeState::Stopped; } store.upsert(runtime.clone()).await; push_ws(&state, &runtime).await; } UnitRuntimeState::DistributorRunning => { // Wait bl_time_sec then stop distributor. if !wait_phase(&state, &store, &unit, &all_roles, ¬ify, &mut fault_tick).await { continue; } let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; let cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); drop(monitor); if let Some((pid, vt)) = cmd { if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e); continue; } if state.config.simulate_plc { if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await; } } } let mut runtime = store.get_or_init(unit_id).await; runtime.accumulated_run_sec = 0; runtime.display_acc_sec = 0; runtime.state = UnitRuntimeState::Stopped; store.upsert(runtime.clone()).await; push_ws(&state, &runtime).await; } UnitRuntimeState::FaultLocked | UnitRuntimeState::CommLocked => { tokio::select! { _ = fault_tick.tick() => {} _ = notify.notified() => {} } } } } } // ── Helpers ─────────────────────────────────────────────────────────────────── /// Sleep for the duration appropriate to the *current* state, interrupting every /// 500 ms to re-check fault/comm. Returns `true` when the full time elapsed, /// `false` if the phase was interrupted (auto disabled, fault, or comm lock). async fn wait_phase( state: &AppState, store: &ControlRuntimeStore, unit: &crate::model::ControlUnit, all_roles: &[(Uuid, HashMap)], notify: &Arc, fault_tick: &mut tokio::time::Interval, ) -> bool { let secs = match store.get_or_init(unit.id).await.state { UnitRuntimeState::Stopped => unit.stop_time_sec, UnitRuntimeState::Running => unit.run_time_sec, UnitRuntimeState::DistributorRunning => unit.bl_time_sec, _ => return false, }; if secs <= 0 { return true; } let deadline = tokio::time::Instant::now() + Duration::from_secs(secs as u64); loop { let completed = tokio::select! { _ = tokio::time::sleep_until(deadline) => true, _ = fault_tick.tick() => false, _ = notify.notified() => false, }; if completed { return true; } // Re-check fault/comm mid-phase. let mut runtime = store.get_or_init(unit.id).await; if check_fault_comm(state, &mut runtime, unit, all_roles).await { store.upsert(runtime.clone()).await; push_ws(state, &runtime).await; } if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required { return false; } } } async fn push_ws(state: &AppState, runtime: &UnitRuntime) { if let Err(e) = state .ws_manager .send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone())) .await { tracing::debug!("Engine: WS push skipped (no subscribers): {}", e); } } /// Check fault and comm status, mutate runtime, fire events. /// Returns `true` if any field changed. async fn check_fault_comm( state: &AppState, runtime: &mut UnitRuntime, unit: &crate::model::ControlUnit, all_roles: &[(Uuid, HashMap)], ) -> bool { let monitor = state .connection_manager .get_point_monitor_data_read_guard() .await; let any_bad = all_roles.iter().flat_map(|(_, r)| r.values()).any(|rp| { monitor .get(&rp.point_id) .map(|m| m.quality != PointQuality::Good) .unwrap_or(false) }); let any_flt = all_roles.iter().any(|(_, roles)| { roles .get("flt") .and_then(|rp| monitor.get(&rp.point_id)) .map(|m| super::monitor_value_as_bool(m)) .unwrap_or(false) }); let flt_eq_id = if any_flt && !runtime.fault_locked { all_roles .iter() .find(|(_, roles)| { roles .get("flt") .and_then(|rp| monitor.get(&rp.point_id)) .map(|m| super::monitor_value_as_bool(m)) .unwrap_or(false) }) .map(|(eq_id, _)| *eq_id) } else { None }; // REM local: any equipment with a rem point that is explicitly false (local mode) with good quality. let any_rem_local = all_roles.iter().any(|(_, roles)| { roles .get("rem") .and_then(|rp| monitor.get(&rp.point_id)) .map(|m| !super::monitor_value_as_bool(m) && m.quality == PointQuality::Good) .unwrap_or(false) }); // Find the first equipment that just switched to local (for event payload). let rem_local_eq_id = if any_rem_local && !runtime.rem_local { all_roles .iter() .find(|(_, roles)| { roles .get("rem") .and_then(|rp| monitor.get(&rp.point_id)) .map(|m| !super::monitor_value_as_bool(m) && m.quality == PointQuality::Good) .unwrap_or(false) }) .map(|(eq_id, _)| *eq_id) } else { None }; drop(monitor); let prev_comm = runtime.comm_locked; let prev_flt = runtime.flt_active; let prev_fault_locked = runtime.fault_locked; let prev_auto = runtime.auto_enabled; let prev_ack = runtime.manual_ack_required; let prev_rem_local = runtime.rem_local; runtime.comm_locked = any_bad; runtime.flt_active = any_flt; runtime.rem_local = any_rem_local; if !prev_comm && runtime.comm_locked { let _ = state.event_manager.send(AppEvent::CommLocked { unit_id: unit.id }); } else if prev_comm && !runtime.comm_locked { let _ = state.event_manager.send(AppEvent::CommRecovered { unit_id: unit.id }); } if let Some(eq_id) = flt_eq_id { runtime.fault_locked = true; let _ = state.event_manager.send(AppEvent::FaultLocked { unit_id: unit.id, equipment_id: eq_id }); if runtime.auto_enabled { runtime.auto_enabled = false; let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id }); } } if prev_flt && !any_flt && runtime.fault_locked { if unit.require_manual_ack_after_fault { runtime.manual_ack_required = true; } else { runtime.fault_locked = false; } } // Fire RemLocal event when any equipment first switches to local mode. if let Some(eq_id) = rem_local_eq_id { let _ = state.event_manager.send(AppEvent::RemLocal { unit_id: unit.id, equipment_id: eq_id }); if runtime.auto_enabled { runtime.auto_enabled = false; let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id }); } } // Fire RemRecovered when all rem signals return to remote. if prev_rem_local && !any_rem_local { let _ = state.event_manager.send(AppEvent::RemRecovered { unit_id: unit.id }); } runtime.comm_locked != prev_comm || runtime.flt_active != prev_flt || runtime.fault_locked != prev_fault_locked || runtime.auto_enabled != prev_auto || runtime.manual_ack_required != prev_ack || runtime.rem_local != prev_rem_local } type EquipMaps = ( HashMap>, HashMap, Vec<(Uuid, HashMap)>, ); async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result { let equipment_list = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; let equipment_ids: Vec = equipment_list.iter().map(|equip| equip.id).collect(); let role_point_rows = crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?; let mut role_points_by_equipment: HashMap> = HashMap::new(); for row in role_point_rows { role_points_by_equipment .entry(row.equipment_id) .or_default() .push(EquipmentRolePoint { point_id: row.point_id, signal_role: row.signal_role, }); } Ok(build_equipment_maps( unit_id, &equipment_list, role_points_by_equipment, )) } fn build_equipment_maps( unit_id: Uuid, equipment_list: &[crate::model::Equipment], mut role_points_by_equipment: HashMap>, ) -> EquipMaps { let mut kind_roles: HashMap> = HashMap::new(); let mut kind_eq_ids: HashMap = HashMap::new(); let mut all_roles: Vec<(Uuid, HashMap)> = Vec::new(); for equip in equipment_list { let role_map: HashMap = role_points_by_equipment .remove(&equip.id) .unwrap_or_default() .into_iter() .map(|rp| (rp.signal_role.clone(), rp)) .collect(); if let Some(kind) = &equip.kind { if !kind_roles.contains_key(kind.as_str()) { kind_roles.insert(kind.clone(), role_map.clone()); kind_eq_ids.insert(kind.clone(), equip.id); } else { tracing::warn!( "Engine: unit {} has multiple {} equipment; using first", unit_id, kind ); } } all_roles.push((equip.id, role_map)); } (kind_roles, kind_eq_ids, all_roles) } /// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad. fn find_cmd( roles: &HashMap, role: &str, monitor: &HashMap, ) -> Option<(Uuid, Option)> { let cmd_rp = roles.get(role)?; let rem_ok = roles .get("rem") .and_then(|rp| monitor.get(&rp.point_id)) .map(|m| super::monitor_value_as_bool(m) && m.quality == PointQuality::Good) .unwrap_or(true); let flt_ok = roles .get("flt") .and_then(|rp| monitor.get(&rp.point_id)) .map(|m| !super::monitor_value_as_bool(m) && m.quality == PointQuality::Good) .unwrap_or(true); if rem_ok && flt_ok { let vtype = monitor .get(&cmd_rp.point_id) .and_then(|m| m.value_type.clone()); Some((cmd_rp.point_id, vtype)) } else { None } } #[cfg(test)] mod tests { use super::build_equipment_maps; use crate::model::Equipment; use crate::service::EquipmentRolePoint; use chrono::Utc; use std::collections::HashMap; use uuid::Uuid; fn equipment(id: Uuid, unit_id: Uuid, kind: &str) -> Equipment { Equipment { id, unit_id: Some(unit_id), code: format!("EQ-{id}"), name: format!("Equipment-{id}"), kind: Some(kind.to_string()), description: None, created_at: Utc::now(), updated_at: Utc::now(), } } #[test] fn build_equipment_maps_reflects_latest_role_bindings() { let unit_id = Uuid::new_v4(); let equipment_id = Uuid::new_v4(); let first_start_point = Uuid::new_v4(); let second_start_point = Uuid::new_v4(); let equipment_list = vec![equipment(equipment_id, unit_id, "coal_feeder")]; let mut first_roles = HashMap::new(); first_roles.insert( equipment_id, vec![EquipmentRolePoint { point_id: first_start_point, signal_role: "start_cmd".to_string(), }], ); let (first_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, first_roles); let mut second_roles = HashMap::new(); second_roles.insert( equipment_id, vec![EquipmentRolePoint { point_id: second_start_point, signal_role: "start_cmd".to_string(), }], ); let (second_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, second_roles); assert_eq!( first_kind_roles["coal_feeder"]["start_cmd"].point_id, first_start_point ); assert_eq!( second_kind_roles["coal_feeder"]["start_cmd"].point_id, second_start_point ); } }