diff --git a/crates/app_feeder_distributor/src/control/engine.rs b/crates/app_feeder_distributor/src/control/engine.rs index 9476524..08eb8c4 100644 --- a/crates/app_feeder_distributor/src/control/engine.rs +++ b/crates/app_feeder_distributor/src/control/engine.rs @@ -38,7 +38,7 @@ async fn supervise(state: AppState, store: Arc) { match plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await { Ok(units) => { for unit in units { - let needs_spawn = tasks.get(&unit.id).map_or(true, |h| h.is_finished()); + let needs_spawn = tasks.get(&unit.id).is_none_or(|h| h.is_finished()); if needs_spawn { let s = state.clone(); let st = store.clone(); @@ -370,7 +370,7 @@ async fn check_fault_comm( roles .get("flt") .and_then(|rp| monitor.get(&rp.point_id)) - .map(|m| super::monitor_value_as_bool(m)) + .map(super::monitor_value_as_bool) .unwrap_or(false) }); @@ -381,7 +381,7 @@ async fn check_fault_comm( roles .get("flt") .and_then(|rp| monitor.get(&rp.point_id)) - .map(|m| super::monitor_value_as_bool(m)) + .map(super::monitor_value_as_bool) .unwrap_or(false) }) .map(|(eq_id, _)| *eq_id) diff --git a/crates/app_feeder_distributor/src/event.rs b/crates/app_feeder_distributor/src/event.rs index 9e32dc7..19d5498 100644 --- a/crates/app_feeder_distributor/src/event.rs +++ b/crates/app_feeder_distributor/src/event.rs @@ -180,39 +180,41 @@ async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String { .unwrap_or_else(|| id.to_string()) } +struct PersistableEvent { + event_type: &'static str, + level: &'static str, + unit_id: Option, + equipment_id: Option, + source_id: Option, + message: String, + payload: serde_json::Value, +} + async fn persist_event_if_needed( event: &AppEvent, pool: &sqlx::PgPool, ws_manager: Option<&std::sync::Arc>, ) { - let record: Option<( - &str, - &str, - Option, - Option, - Option, - String, - serde_json::Value, - )> = match event { + let record: Option = match event { AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id, } => { let code = fetch_equipment_code(pool, *equipment_id).await; - Some(( - "feeder.equipment.start_command_sent", - "info", - *unit_id, - Some(*equipment_id), - None, - format!("Start command sent to equipment {}", code), - serde_json::json!({ + Some(PersistableEvent { + event_type: "feeder.equipment.start_command_sent", + level: "info", + unit_id: *unit_id, + equipment_id: Some(*equipment_id), + source_id: None, + message: format!("Start command sent to equipment {}", code), + payload: serde_json::json!({ "equipment_id": equipment_id, "unit_id": unit_id, "point_id": point_id }), - )) + }) } AppEvent::EquipmentStopCommandSent { equipment_id, @@ -220,43 +222,43 @@ async fn persist_event_if_needed( point_id, } => { let code = fetch_equipment_code(pool, *equipment_id).await; - Some(( - "feeder.equipment.stop_command_sent", - "info", - *unit_id, - Some(*equipment_id), - None, - format!("Stop command sent to equipment {}", code), - serde_json::json!({ + Some(PersistableEvent { + event_type: "feeder.equipment.stop_command_sent", + level: "info", + unit_id: *unit_id, + equipment_id: Some(*equipment_id), + source_id: None, + message: format!("Stop command sent to equipment {}", code), + payload: serde_json::json!({ "equipment_id": equipment_id, "unit_id": unit_id, "point_id": point_id }), - )) + }) } AppEvent::AutoControlStarted { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "feeder.unit.auto_control_started", - "info", - Some(*unit_id), - None, - None, - format!("Auto control started for unit {}", code), - serde_json::json!({ "unit_id": unit_id }), - )) + Some(PersistableEvent { + event_type: "feeder.unit.auto_control_started", + level: "info", + unit_id: Some(*unit_id), + equipment_id: None, + source_id: None, + message: format!("Auto control started for unit {}", code), + payload: serde_json::json!({ "unit_id": unit_id }), + }) } AppEvent::AutoControlStopped { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "feeder.unit.auto_control_stopped", - "info", - Some(*unit_id), - None, - None, - format!("Auto control stopped for unit {}", code), - serde_json::json!({ "unit_id": unit_id }), - )) + Some(PersistableEvent { + event_type: "feeder.unit.auto_control_stopped", + level: "info", + unit_id: Some(*unit_id), + equipment_id: None, + source_id: None, + message: format!("Auto control stopped for unit {}", code), + payload: serde_json::json!({ "unit_id": unit_id }), + }) } AppEvent::FaultLocked { unit_id, @@ -264,54 +266,54 @@ async fn persist_event_if_needed( } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; - Some(( - "feeder.unit.fault_locked", - "error", - Some(*unit_id), - Some(*equipment_id), - None, - format!( + Some(PersistableEvent { + event_type: "feeder.unit.fault_locked", + level: "error", + unit_id: Some(*unit_id), + equipment_id: Some(*equipment_id), + source_id: None, + message: format!( "Fault locked for unit {} by equipment {}", unit_code, eq_code ), - serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), - )) + payload: serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), + }) } AppEvent::FaultAcked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "feeder.unit.fault_acked", - "info", - Some(*unit_id), - None, - None, - format!("Fault acknowledged for unit {}", code), - serde_json::json!({ "unit_id": unit_id }), - )) + Some(PersistableEvent { + event_type: "feeder.unit.fault_acked", + level: "info", + unit_id: Some(*unit_id), + equipment_id: None, + source_id: None, + message: format!("Fault acknowledged for unit {}", code), + payload: serde_json::json!({ "unit_id": unit_id }), + }) } AppEvent::CommLocked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "feeder.unit.comm_locked", - "warn", - Some(*unit_id), - None, - None, - format!("Communication locked for unit {}", code), - serde_json::json!({ "unit_id": unit_id }), - )) + Some(PersistableEvent { + event_type: "feeder.unit.comm_locked", + level: "warn", + unit_id: Some(*unit_id), + equipment_id: None, + source_id: None, + message: format!("Communication locked for unit {}", code), + payload: serde_json::json!({ "unit_id": unit_id }), + }) } AppEvent::CommRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "feeder.unit.comm_recovered", - "info", - Some(*unit_id), - None, - None, - format!("Communication recovered for unit {}", code), - serde_json::json!({ "unit_id": unit_id }), - )) + Some(PersistableEvent { + event_type: "feeder.unit.comm_recovered", + level: "info", + unit_id: Some(*unit_id), + equipment_id: None, + source_id: None, + message: format!("Communication recovered for unit {}", code), + payload: serde_json::json!({ "unit_id": unit_id }), + }) } AppEvent::RemLocal { unit_id, @@ -319,42 +321,42 @@ async fn persist_event_if_needed( } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; - Some(( - "feeder.unit.rem_local", - "warn", - Some(*unit_id), - Some(*equipment_id), - None, - format!( + Some(PersistableEvent { + event_type: "feeder.unit.rem_local", + level: "warn", + unit_id: Some(*unit_id), + equipment_id: Some(*equipment_id), + source_id: None, + message: format!( "Unit {} switched to local control via equipment {}", unit_code, eq_code ), - serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), - )) + payload: serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), + }) } AppEvent::RemRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "feeder.unit.rem_recovered", - "warn", - Some(*unit_id), - None, - None, - format!( + Some(PersistableEvent { + event_type: "feeder.unit.rem_recovered", + level: "warn", + unit_id: Some(*unit_id), + equipment_id: None, + source_id: None, + message: format!( "Unit {} returned to remote control; auto control requires manual restart", code ), - serde_json::json!({ "unit_id": unit_id }), - )) + payload: serde_json::json!({ "unit_id": unit_id }), + }) } AppEvent::UnitStateChanged { .. } => None, }; - let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record + let Some(record) = record else { return; }; - let envelope = EventEnvelope::new(event_type, payload); + let envelope = EventEnvelope::new(record.event_type, record.payload); let inserted = sqlx::query_as::<_, EventRecord>( r#" @@ -364,11 +366,11 @@ async fn persist_event_if_needed( "#, ) .bind(envelope.event_type) - .bind(level) - .bind(unit_id as Option) - .bind(equipment_id as Option) - .bind(source_id) - .bind(message) + .bind(record.level) + .bind(record.unit_id) + .bind(record.equipment_id) + .bind(record.source_id) + .bind(record.message) .bind(sqlx::types::Json(envelope.payload)) .fetch_one(pool) .await; diff --git a/crates/plc_platform_core/src/service/control.rs b/crates/plc_platform_core/src/service/control.rs index 530ea14..08d0fac 100644 --- a/crates/plc_platform_core/src/service/control.rs +++ b/crates/plc_platform_core/src/service/control.rs @@ -431,21 +431,6 @@ pub async fn get_signal_role_points_batch( .collect()) } -#[cfg(test)] -mod tests { - use super::{equipment_order_clause_with_unit, unit_order_clause}; - - #[test] - fn unit_ordering_defaults_to_code() { - assert_eq!(unit_order_clause(), "code"); - } - - #[test] - fn unit_equipment_ordering_uses_code_within_unit() { - assert_eq!(equipment_order_clause_with_unit(), "unit_id, code"); - } -} - pub async fn get_equipment_role_points( pool: &PgPool, equipment_id: Uuid, @@ -474,3 +459,18 @@ pub async fn get_equipment_role_points( }) .collect()) } + +#[cfg(test)] +mod tests { + use super::{equipment_order_clause_with_unit, unit_order_clause}; + + #[test] + fn unit_ordering_defaults_to_code() { + assert_eq!(unit_order_clause(), "code"); + } + + #[test] + fn unit_equipment_ordering_uses_code_within_unit() { + assert_eq!(equipment_order_clause_with_unit(), "unit_id, code"); + } +}