feat(control): implement state machine engine with fault/comm monitoring

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-03-24 14:53:59 +08:00
parent 5c0b99c0d4
commit 459bb49c65
1 changed files with 379 additions and 3 deletions

View File

@ -1,7 +1,383 @@
use std::collections::HashMap;
use std::sync::Arc;
use crate::{control::runtime::ControlRuntimeStore, AppState};
use chrono::Utc;
use uuid::Uuid;
pub fn start(_state: AppState, _runtime_store: Arc<ControlRuntimeStore>) {
// Automatic control state machine will be added in the next step.
use crate::{
control::{
command::send_pulse_command,
runtime::{ControlRuntimeStore, UnitRuntime, UnitRuntimeState},
},
event::AppEvent,
service::EquipmentRolePoint,
telemetry::{DataValue, PointMonitorInfo, PointQuality},
websocket::WsMessage,
AppState,
};
pub fn start(state: AppState, runtime_store: Arc<ControlRuntimeStore>) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(500));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
tick_all_units(&state, &runtime_store).await;
}
});
}
async fn tick_all_units(state: &AppState, store: &ControlRuntimeStore) {
let units = match crate::service::get_all_enabled_units(&state.pool).await {
Ok(u) => u,
Err(e) => {
tracing::error!("Engine: failed to load units: {}", e);
return;
}
};
for unit in units {
tick_unit(state, store, &unit).await;
}
}
async fn tick_unit(
state: &AppState,
store: &ControlRuntimeStore,
unit: &crate::model::ControlUnit,
) {
let mut runtime = store.get_or_init(unit.id).await;
// ── Load equipment role-point maps by kind ───────────────
let equipment_list = match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await {
Ok(e) => e,
Err(e) => {
tracing::error!(
"Engine: equipment load failed for unit {}: {}",
unit.id,
e
);
return;
}
};
// kind -> role -> EquipmentRolePoint (first equipment per kind wins)
let mut kind_roles: HashMap<String, HashMap<String, EquipmentRolePoint>> = HashMap::new();
// all role maps for fault/comm scanning across all equipment
let mut all_roles: Vec<HashMap<String, EquipmentRolePoint>> = Vec::new();
for equip in &equipment_list {
match crate::service::get_equipment_role_points(&state.pool, equip.id).await {
Ok(role_points) => {
let role_map: HashMap<String, EquipmentRolePoint> = role_points
.into_iter()
.map(|rp| (rp.signal_role.clone(), rp))
.collect();
if let Some(kind) = &equip.kind {
if kind_roles.contains_key(kind.as_str()) {
tracing::warn!(
"Engine: unit {} has multiple {} equipment; using first",
unit.id,
kind
);
} else {
kind_roles.insert(kind.clone(), role_map.clone());
}
}
all_roles.push(role_map);
}
Err(e) => {
tracing::warn!(
"Engine: role points load failed for equipment {}: {}",
equip.id,
e
);
}
}
}
let monitor_guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
// ── Communication check ──────────────────────────────────
let any_bad_quality = all_roles.iter().flat_map(|r| r.values()).any(|rp| {
monitor_guard
.get(&rp.point_id)
.map(|m| m.quality != PointQuality::Good)
.unwrap_or(false)
});
let prev_comm = runtime.comm_locked;
runtime.comm_locked = any_bad_quality;
if !prev_comm && runtime.comm_locked {
let _ = state
.event_manager
.send(AppEvent::CommLocked { unit_id: unit.id });
} else if prev_comm && !runtime.comm_locked {
let _ = state
.event_manager
.send(AppEvent::CommRecovered { unit_id: unit.id });
}
// ── Fault check ──────────────────────────────────────────
let any_flt = all_roles.iter().any(|roles| {
roles
.get("flt")
.and_then(|rp| monitor_guard.get(&rp.point_id))
.map(|m| monitor_value_as_bool(m))
.unwrap_or(false)
});
let prev_flt = runtime.flt_active;
runtime.flt_active = any_flt;
if any_flt && !runtime.fault_locked {
// Find which equipment triggered the fault
let flt_eq_id = equipment_list
.iter()
.find(|_e| {
all_roles.iter().any(|roles| {
roles
.get("flt")
.and_then(|rp| monitor_guard.get(&rp.point_id))
.map(|m| monitor_value_as_bool(m))
.unwrap_or(false)
})
})
.map(|e| e.id)
.unwrap_or(Uuid::nil());
runtime.fault_locked = true;
let _ = state.event_manager.send(AppEvent::FaultLocked {
unit_id: unit.id,
equipment_id: flt_eq_id,
});
if runtime.auto_enabled {
runtime.auto_enabled = false;
let _ = state
.event_manager
.send(AppEvent::AutoControlStopped { unit_id: unit.id });
}
}
// FLT just cleared → require manual ack if unit is configured that way
if prev_flt && !any_flt && runtime.fault_locked {
if unit.require_manual_ack_after_fault {
runtime.manual_ack_required = true;
} else {
// Auto-clear fault lock
runtime.fault_locked = false;
}
}
drop(monitor_guard);
// ── State machine tick ───────────────────────────────────
if runtime.auto_enabled && !runtime.fault_locked && !runtime.comm_locked {
let now = Utc::now();
// Accumulate in milliseconds to avoid sub-second truncation
let delta_ms = runtime
.last_tick_at
.map(|t| (now - t).num_milliseconds().max(0))
.unwrap_or(0);
let prev_state = runtime.state.clone();
tick_state_machine(state, &mut runtime, unit, &kind_roles, delta_ms).await;
if runtime.state != prev_state {
let _ = state.event_manager.send(AppEvent::UnitStateChanged {
unit_id: unit.id,
from_state: format!("{:?}", prev_state),
to_state: format!("{:?}", runtime.state),
});
}
}
runtime.last_tick_at = Some(Utc::now());
store.upsert(runtime.clone()).await;
if let Err(e) = state
.ws_manager
.send_to_public(WsMessage::UnitRuntimeChanged(runtime))
.await
{
tracing::debug!("Engine: WS push skipped (no subscribers): {}", e);
}
}
/// Drive one state-machine tick for a unit.
/// All elapsed counters accumulate in **milliseconds**; comparisons use `*_time_sec * 1000`.
async fn tick_state_machine(
state: &AppState,
runtime: &mut UnitRuntime,
unit: &crate::model::ControlUnit,
kind_roles: &HashMap<String, HashMap<String, EquipmentRolePoint>>,
delta_ms: i64,
) {
let feeder_roles = kind_roles.get("coal_feeder");
let dist_roles = kind_roles.get("distributor");
match runtime.state {
UnitRuntimeState::Stopped => {
if unit.stop_time_sec == 0 {
return;
}
runtime.current_stop_elapsed_sec += delta_ms; // field holds ms
if runtime.current_stop_elapsed_sec >= unit.stop_time_sec as i64 * 1000 {
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
if let Some((pid, vt)) =
feeder_roles.and_then(|r| find_cmd(r, "start_cmd", &monitor))
{
drop(monitor);
if let Err(e) =
send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await
{
tracing::warn!("Engine: auto start coal_feeder failed: {}", e);
return;
}
runtime.state = UnitRuntimeState::Running;
runtime.current_stop_elapsed_sec = 0;
runtime.current_run_elapsed_sec = 0;
}
}
}
UnitRuntimeState::Running => {
runtime.current_run_elapsed_sec += delta_ms;
runtime.accumulated_run_sec += delta_ms;
// Check RunTime first — stop feeder before considering distributor trigger
if unit.run_time_sec > 0
&& runtime.current_run_elapsed_sec >= unit.run_time_sec as i64 * 1000
{
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
if let Some((pid, vt)) =
feeder_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor))
{
drop(monitor);
if let Err(e) =
send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await
{
tracing::warn!("Engine: auto stop coal_feeder failed: {}", e);
return;
}
runtime.state = UnitRuntimeState::Stopped;
runtime.current_run_elapsed_sec = 0;
runtime.current_stop_elapsed_sec = 0;
}
return;
}
// Check AccTime — trigger distributor
if unit.acc_time_sec > 0
&& runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000
{
runtime.state = UnitRuntimeState::DistributorRunning;
runtime.distributor_run_elapsed_sec = 0;
}
}
UnitRuntimeState::DistributorRunning => {
// First tick in this state (distributor_run_elapsed_sec == 0): send start pulse then return.
// Time advance happens on subsequent ticks.
if runtime.distributor_run_elapsed_sec == 0 {
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
if let Some((pid, vt)) =
dist_roles.and_then(|r| find_cmd(r, "start_cmd", &monitor))
{
drop(monitor);
if let Err(e) =
send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await
{
tracing::warn!("Engine: auto start distributor failed: {}", e);
}
}
// Mark as "started" by advancing to 1ms so this branch won't re-fire
runtime.distributor_run_elapsed_sec = 1;
return;
}
runtime.distributor_run_elapsed_sec += delta_ms;
if unit.bl_time_sec > 0
&& runtime.distributor_run_elapsed_sec >= unit.bl_time_sec as i64 * 1000
{
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
if let Some((pid, vt)) =
dist_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor))
{
drop(monitor);
if let Err(e) =
send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await
{
tracing::warn!("Engine: auto stop distributor failed: {}", e);
return;
}
}
runtime.accumulated_run_sec = 0;
runtime.distributor_run_elapsed_sec = 0;
runtime.state = UnitRuntimeState::Stopped;
runtime.current_stop_elapsed_sec = 0;
}
}
UnitRuntimeState::FaultLocked | UnitRuntimeState::CommLocked => {}
}
}
/// Find a command point by role in a single equipment's role map.
/// Returns `None` if REM==0 or FLT==1 or quality is bad.
fn find_cmd(
roles: &HashMap<String, EquipmentRolePoint>,
role: &str,
monitor: &HashMap<Uuid, PointMonitorInfo>,
) -> Option<(Uuid, Option<crate::telemetry::ValueType>)> {
let cmd_rp = roles.get(role)?;
let rem_ok = roles
.get("rem")
.and_then(|rp| monitor.get(&rp.point_id))
.map(|m| monitor_value_as_bool(m) && m.quality == PointQuality::Good)
.unwrap_or(true);
let flt_ok = roles
.get("flt")
.and_then(|rp| monitor.get(&rp.point_id))
.map(|m| !monitor_value_as_bool(m) && m.quality == PointQuality::Good)
.unwrap_or(true);
if rem_ok && flt_ok {
let vtype = monitor
.get(&cmd_rp.point_id)
.and_then(|m| m.value_type.clone());
Some((cmd_rp.point_id, vtype))
} else {
None
}
}
fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool {
match monitor.value.as_ref() {
Some(DataValue::Bool(v)) => *v,
Some(DataValue::Int(v)) => *v != 0,
Some(DataValue::UInt(v)) => *v != 0,
Some(DataValue::Float(v)) => *v != 0.0,
Some(DataValue::Text(v)) => {
matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "on")
}
_ => false,
}
}