From 459bb49c654e44327752a64ac4de66b48cb27bd0 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 24 Mar 2026 14:53:59 +0800 Subject: [PATCH] feat(control): implement state machine engine with fault/comm monitoring Co-Authored-By: Claude Sonnet 4.6 --- src/control/engine.rs | 382 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 379 insertions(+), 3 deletions(-) diff --git a/src/control/engine.rs b/src/control/engine.rs index 2e69c1c..65b8b21 100644 --- a/src/control/engine.rs +++ b/src/control/engine.rs @@ -1,7 +1,383 @@ +use std::collections::HashMap; use std::sync::Arc; -use crate::{control::runtime::ControlRuntimeStore, AppState}; +use chrono::Utc; +use uuid::Uuid; -pub fn start(_state: AppState, _runtime_store: Arc) { - // Automatic control state machine will be added in the next step. +use crate::{ + control::{ + command::send_pulse_command, + runtime::{ControlRuntimeStore, UnitRuntime, UnitRuntimeState}, + }, + event::AppEvent, + service::EquipmentRolePoint, + telemetry::{DataValue, PointMonitorInfo, PointQuality}, + websocket::WsMessage, + AppState, +}; + +pub fn start(state: AppState, runtime_store: Arc) { + tokio::spawn(async move { + let mut ticker = tokio::time::interval(std::time::Duration::from_millis(500)); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + ticker.tick().await; + tick_all_units(&state, &runtime_store).await; + } + }); +} + +async fn tick_all_units(state: &AppState, store: &ControlRuntimeStore) { + let units = match crate::service::get_all_enabled_units(&state.pool).await { + Ok(u) => u, + Err(e) => { + tracing::error!("Engine: failed to load units: {}", e); + return; + } + }; + for unit in units { + tick_unit(state, store, &unit).await; + } +} + +async fn tick_unit( + state: &AppState, + store: &ControlRuntimeStore, + unit: &crate::model::ControlUnit, +) { + let mut runtime = store.get_or_init(unit.id).await; + + // ── Load equipment role-point maps by kind ─────────────── + let equipment_list = match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await { + Ok(e) => e, + Err(e) => { + tracing::error!( + "Engine: equipment load failed for unit {}: {}", + unit.id, + e + ); + return; + } + }; + + // kind -> role -> EquipmentRolePoint (first equipment per kind wins) + let mut kind_roles: HashMap> = HashMap::new(); + // all role maps for fault/comm scanning across all equipment + let mut all_roles: Vec> = Vec::new(); + + for equip in &equipment_list { + match crate::service::get_equipment_role_points(&state.pool, equip.id).await { + Ok(role_points) => { + let role_map: HashMap = role_points + .into_iter() + .map(|rp| (rp.signal_role.clone(), rp)) + .collect(); + + if let Some(kind) = &equip.kind { + if kind_roles.contains_key(kind.as_str()) { + tracing::warn!( + "Engine: unit {} has multiple {} equipment; using first", + unit.id, + kind + ); + } else { + kind_roles.insert(kind.clone(), role_map.clone()); + } + } + all_roles.push(role_map); + } + Err(e) => { + tracing::warn!( + "Engine: role points load failed for equipment {}: {}", + equip.id, + e + ); + } + } + } + + let monitor_guard = state + .connection_manager + .get_point_monitor_data_read_guard() + .await; + + // ── Communication check ────────────────────────────────── + let any_bad_quality = all_roles.iter().flat_map(|r| r.values()).any(|rp| { + monitor_guard + .get(&rp.point_id) + .map(|m| m.quality != PointQuality::Good) + .unwrap_or(false) + }); + + let prev_comm = runtime.comm_locked; + runtime.comm_locked = any_bad_quality; + if !prev_comm && runtime.comm_locked { + let _ = state + .event_manager + .send(AppEvent::CommLocked { unit_id: unit.id }); + } else if prev_comm && !runtime.comm_locked { + let _ = state + .event_manager + .send(AppEvent::CommRecovered { unit_id: unit.id }); + } + + // ── Fault check ────────────────────────────────────────── + let any_flt = all_roles.iter().any(|roles| { + roles + .get("flt") + .and_then(|rp| monitor_guard.get(&rp.point_id)) + .map(|m| monitor_value_as_bool(m)) + .unwrap_or(false) + }); + + let prev_flt = runtime.flt_active; + runtime.flt_active = any_flt; + + if any_flt && !runtime.fault_locked { + // Find which equipment triggered the fault + let flt_eq_id = equipment_list + .iter() + .find(|_e| { + all_roles.iter().any(|roles| { + roles + .get("flt") + .and_then(|rp| monitor_guard.get(&rp.point_id)) + .map(|m| monitor_value_as_bool(m)) + .unwrap_or(false) + }) + }) + .map(|e| e.id) + .unwrap_or(Uuid::nil()); + + runtime.fault_locked = true; + let _ = state.event_manager.send(AppEvent::FaultLocked { + unit_id: unit.id, + equipment_id: flt_eq_id, + }); + if runtime.auto_enabled { + runtime.auto_enabled = false; + let _ = state + .event_manager + .send(AppEvent::AutoControlStopped { unit_id: unit.id }); + } + } + + // FLT just cleared → require manual ack if unit is configured that way + if prev_flt && !any_flt && runtime.fault_locked { + if unit.require_manual_ack_after_fault { + runtime.manual_ack_required = true; + } else { + // Auto-clear fault lock + runtime.fault_locked = false; + } + } + + drop(monitor_guard); + + // ── State machine tick ─────────────────────────────────── + if runtime.auto_enabled && !runtime.fault_locked && !runtime.comm_locked { + let now = Utc::now(); + // Accumulate in milliseconds to avoid sub-second truncation + let delta_ms = runtime + .last_tick_at + .map(|t| (now - t).num_milliseconds().max(0)) + .unwrap_or(0); + + let prev_state = runtime.state.clone(); + tick_state_machine(state, &mut runtime, unit, &kind_roles, delta_ms).await; + if runtime.state != prev_state { + let _ = state.event_manager.send(AppEvent::UnitStateChanged { + unit_id: unit.id, + from_state: format!("{:?}", prev_state), + to_state: format!("{:?}", runtime.state), + }); + } + } + + runtime.last_tick_at = Some(Utc::now()); + + store.upsert(runtime.clone()).await; + if let Err(e) = state + .ws_manager + .send_to_public(WsMessage::UnitRuntimeChanged(runtime)) + .await + { + tracing::debug!("Engine: WS push skipped (no subscribers): {}", e); + } +} + +/// Drive one state-machine tick for a unit. +/// All elapsed counters accumulate in **milliseconds**; comparisons use `*_time_sec * 1000`. +async fn tick_state_machine( + state: &AppState, + runtime: &mut UnitRuntime, + unit: &crate::model::ControlUnit, + kind_roles: &HashMap>, + delta_ms: i64, +) { + let feeder_roles = kind_roles.get("coal_feeder"); + let dist_roles = kind_roles.get("distributor"); + + match runtime.state { + UnitRuntimeState::Stopped => { + if unit.stop_time_sec == 0 { + return; + } + runtime.current_stop_elapsed_sec += delta_ms; // field holds ms + if runtime.current_stop_elapsed_sec >= unit.stop_time_sec as i64 * 1000 { + let monitor = state + .connection_manager + .get_point_monitor_data_read_guard() + .await; + if let Some((pid, vt)) = + feeder_roles.and_then(|r| find_cmd(r, "start_cmd", &monitor)) + { + drop(monitor); + if let Err(e) = + send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await + { + tracing::warn!("Engine: auto start coal_feeder failed: {}", e); + return; + } + runtime.state = UnitRuntimeState::Running; + runtime.current_stop_elapsed_sec = 0; + runtime.current_run_elapsed_sec = 0; + } + } + } + + UnitRuntimeState::Running => { + runtime.current_run_elapsed_sec += delta_ms; + runtime.accumulated_run_sec += delta_ms; + + // Check RunTime first — stop feeder before considering distributor trigger + if unit.run_time_sec > 0 + && runtime.current_run_elapsed_sec >= unit.run_time_sec as i64 * 1000 + { + let monitor = state + .connection_manager + .get_point_monitor_data_read_guard() + .await; + if let Some((pid, vt)) = + feeder_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor)) + { + drop(monitor); + if let Err(e) = + send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await + { + tracing::warn!("Engine: auto stop coal_feeder failed: {}", e); + return; + } + runtime.state = UnitRuntimeState::Stopped; + runtime.current_run_elapsed_sec = 0; + runtime.current_stop_elapsed_sec = 0; + } + return; + } + + // Check AccTime — trigger distributor + if unit.acc_time_sec > 0 + && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 + { + runtime.state = UnitRuntimeState::DistributorRunning; + runtime.distributor_run_elapsed_sec = 0; + } + } + + UnitRuntimeState::DistributorRunning => { + // First tick in this state (distributor_run_elapsed_sec == 0): send start pulse then return. + // Time advance happens on subsequent ticks. + if runtime.distributor_run_elapsed_sec == 0 { + let monitor = state + .connection_manager + .get_point_monitor_data_read_guard() + .await; + if let Some((pid, vt)) = + dist_roles.and_then(|r| find_cmd(r, "start_cmd", &monitor)) + { + drop(monitor); + if let Err(e) = + send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await + { + tracing::warn!("Engine: auto start distributor failed: {}", e); + } + } + // Mark as "started" by advancing to 1ms so this branch won't re-fire + runtime.distributor_run_elapsed_sec = 1; + return; + } + + runtime.distributor_run_elapsed_sec += delta_ms; + + if unit.bl_time_sec > 0 + && runtime.distributor_run_elapsed_sec >= unit.bl_time_sec as i64 * 1000 + { + let monitor = state + .connection_manager + .get_point_monitor_data_read_guard() + .await; + if let Some((pid, vt)) = + dist_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor)) + { + drop(monitor); + if let Err(e) = + send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await + { + tracing::warn!("Engine: auto stop distributor failed: {}", e); + return; + } + } + runtime.accumulated_run_sec = 0; + runtime.distributor_run_elapsed_sec = 0; + runtime.state = UnitRuntimeState::Stopped; + runtime.current_stop_elapsed_sec = 0; + } + } + + UnitRuntimeState::FaultLocked | UnitRuntimeState::CommLocked => {} + } +} + +/// Find a command point by role in a single equipment's role map. +/// Returns `None` if REM==0 or FLT==1 or quality is bad. +fn find_cmd( + roles: &HashMap, + role: &str, + monitor: &HashMap, +) -> Option<(Uuid, Option)> { + let cmd_rp = roles.get(role)?; + + let rem_ok = roles + .get("rem") + .and_then(|rp| monitor.get(&rp.point_id)) + .map(|m| monitor_value_as_bool(m) && m.quality == PointQuality::Good) + .unwrap_or(true); + + let flt_ok = roles + .get("flt") + .and_then(|rp| monitor.get(&rp.point_id)) + .map(|m| !monitor_value_as_bool(m) && m.quality == PointQuality::Good) + .unwrap_or(true); + + if rem_ok && flt_ok { + let vtype = monitor + .get(&cmd_rp.point_id) + .and_then(|m| m.value_type.clone()); + Some((cmd_rp.point_id, vtype)) + } else { + None + } +} + +fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool { + match monitor.value.as_ref() { + Some(DataValue::Bool(v)) => *v, + Some(DataValue::Int(v)) => *v != 0, + Some(DataValue::UInt(v)) => *v != 0, + Some(DataValue::Float(v)) => *v != 0.0, + Some(DataValue::Text(v)) => { + matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "on") + } + _ => false, + } }