From a49f6adf9bfaacfb0407eb910c8b9b4942ee9b2a Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 21 Apr 2026 16:24:26 +0800 Subject: [PATCH] Format feeder event module --- crates/app_feeder_distributor/src/event.rs | 200 +++++++++++++++------ 1 file changed, 149 insertions(+), 51 deletions(-) diff --git a/crates/app_feeder_distributor/src/event.rs b/crates/app_feeder_distributor/src/event.rs index 6d53c42..517e479 100644 --- a/crates/app_feeder_distributor/src/event.rs +++ b/crates/app_feeder_distributor/src/event.rs @@ -22,15 +22,37 @@ pub enum AppEvent { unit_id: Option, point_id: Uuid, }, - AutoControlStarted { unit_id: Uuid }, - AutoControlStopped { unit_id: Uuid }, - FaultLocked { unit_id: Uuid, equipment_id: Uuid }, - FaultAcked { unit_id: Uuid }, - CommLocked { unit_id: Uuid }, - CommRecovered { unit_id: Uuid }, - RemLocal { unit_id: Uuid, equipment_id: Uuid }, - RemRecovered { unit_id: Uuid }, - UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String }, + AutoControlStarted { + unit_id: Uuid, + }, + AutoControlStopped { + unit_id: Uuid, + }, + FaultLocked { + unit_id: Uuid, + equipment_id: Uuid, + }, + FaultAcked { + unit_id: Uuid, + }, + CommLocked { + unit_id: Uuid, + }, + CommRecovered { + unit_id: Uuid, + }, + RemLocal { + unit_id: Uuid, + equipment_id: Uuid, + }, + RemRecovered { + unit_id: Uuid, + }, + UnitStateChanged { + unit_id: Uuid, + from_state: String, + to_state: String, + }, } pub struct EventManager { @@ -38,10 +60,7 @@ pub struct EventManager { } impl EventManager { - pub fn new( - pool: sqlx::PgPool, - ws_manager: Option>, - ) -> Self { + pub fn new(pool: sqlx::PgPool, ws_manager: Option>) -> Self { let (control_sender, mut control_receiver) = mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); @@ -59,9 +78,9 @@ impl EventManager { pub fn send(&self, event: AppEvent) -> Result<(), String> { match self.control_sender.try_send(event) { Ok(()) => Ok(()), - Err(mpsc::error::TrySendError::Closed(e)) => { - Err(format!("Failed to send control event: channel closed ({e:?})")) - } + Err(mpsc::error::TrySendError::Closed(e)) => Err(format!( + "Failed to send control event: channel closed ({e:?})" + )), Err(mpsc::error::TrySendError::Full(e)) => { Err(format!("Failed to send control event: queue full ({e:?})")) } @@ -110,19 +129,27 @@ async fn handle_control_event( match event { AppEvent::EquipmentStartCommandSent { - equipment_id, unit_id, point_id, + equipment_id, + unit_id, + point_id, } => { tracing::info!( "Equipment start command sent: equipment={}, unit={:?}, point={}", - equipment_id, unit_id, point_id + equipment_id, + unit_id, + point_id ); } AppEvent::EquipmentStopCommandSent { - equipment_id, unit_id, point_id, + equipment_id, + unit_id, + point_id, } => { tracing::info!( "Equipment stop command sent: equipment={}, unit={:?}, point={}", - equipment_id, unit_id, point_id + equipment_id, + unit_id, + point_id ); } AppEvent::AutoControlStarted { unit_id } => { @@ -131,7 +158,10 @@ async fn handle_control_event( AppEvent::AutoControlStopped { unit_id } => { tracing::info!("Auto control stopped for unit {}", unit_id); } - AppEvent::FaultLocked { unit_id, equipment_id } => { + AppEvent::FaultLocked { + unit_id, + equipment_id, + } => { tracing::warn!("Fault locked: unit={}, equipment={}", unit_id, equipment_id); } AppEvent::FaultAcked { unit_id } => { @@ -143,13 +173,20 @@ async fn handle_control_event( AppEvent::CommRecovered { unit_id } => { tracing::info!("Comm recovered for unit {}", unit_id); } - AppEvent::RemLocal { unit_id, equipment_id } => { + AppEvent::RemLocal { + unit_id, + equipment_id, + } => { tracing::warn!("REM local: unit={}, equipment={}", unit_id, equipment_id); } AppEvent::RemRecovered { unit_id } => { tracing::info!("REM recovered for unit {}", unit_id); } - AppEvent::UnitStateChanged { unit_id, from_state, to_state } => { + AppEvent::UnitStateChanged { + unit_id, + from_state, + to_state, + } => { tracing::info!("Unit {} state: {} -> {}", unit_id, from_state, to_state); } } @@ -180,12 +217,27 @@ async fn persist_event_if_needed( pool: &sqlx::PgPool, ws_manager: Option<&std::sync::Arc>, ) { - let record: Option<(&str, &str, Option, Option, Option, String, serde_json::Value)> = match event { - AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => { + let record: Option<( + &str, + &str, + Option, + Option, + Option, + String, + serde_json::Value, + )> = match event { + AppEvent::EquipmentStartCommandSent { + equipment_id, + unit_id, + point_id, + } => { let code = fetch_equipment_code(pool, *equipment_id).await; Some(( - "feeder.equipment.start_command_sent", "info", - *unit_id, Some(*equipment_id), None, + "feeder.equipment.start_command_sent", + "info", + *unit_id, + Some(*equipment_id), + None, format!("Start command sent to equipment {}", code), serde_json::json!({ "equipment_id": equipment_id, @@ -194,11 +246,18 @@ async fn persist_event_if_needed( }), )) } - AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id } => { + AppEvent::EquipmentStopCommandSent { + equipment_id, + unit_id, + point_id, + } => { let code = fetch_equipment_code(pool, *equipment_id).await; Some(( - "feeder.equipment.stop_command_sent", "info", - *unit_id, Some(*equipment_id), None, + "feeder.equipment.stop_command_sent", + "info", + *unit_id, + Some(*equipment_id), + None, format!("Stop command sent to equipment {}", code), serde_json::json!({ "equipment_id": equipment_id, @@ -210,8 +269,11 @@ async fn persist_event_if_needed( AppEvent::AutoControlStarted { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "feeder.unit.auto_control_started", "info", - Some(*unit_id), None, None, + "feeder.unit.auto_control_started", + "info", + Some(*unit_id), + None, + None, format!("Auto control started for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) @@ -219,27 +281,42 @@ async fn persist_event_if_needed( AppEvent::AutoControlStopped { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "feeder.unit.auto_control_stopped", "info", - Some(*unit_id), None, None, + "feeder.unit.auto_control_stopped", + "info", + Some(*unit_id), + None, + None, format!("Auto control stopped for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } - AppEvent::FaultLocked { unit_id, equipment_id } => { + AppEvent::FaultLocked { + unit_id, + equipment_id, + } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; Some(( - "feeder.unit.fault_locked", "error", - Some(*unit_id), Some(*equipment_id), None, - format!("Fault locked for unit {} by equipment {}", unit_code, eq_code), + "feeder.unit.fault_locked", + "error", + Some(*unit_id), + Some(*equipment_id), + None, + format!( + "Fault locked for unit {} by equipment {}", + unit_code, eq_code + ), serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), )) } AppEvent::FaultAcked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "feeder.unit.fault_acked", "info", - Some(*unit_id), None, None, + "feeder.unit.fault_acked", + "info", + Some(*unit_id), + None, + None, format!("Fault acknowledged for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) @@ -247,8 +324,11 @@ async fn persist_event_if_needed( AppEvent::CommLocked { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "feeder.unit.comm_locked", "warn", - Some(*unit_id), None, None, + "feeder.unit.comm_locked", + "warn", + Some(*unit_id), + None, + None, format!("Communication locked for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) @@ -256,28 +336,46 @@ async fn persist_event_if_needed( AppEvent::CommRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "feeder.unit.comm_recovered", "info", - Some(*unit_id), None, None, + "feeder.unit.comm_recovered", + "info", + Some(*unit_id), + None, + None, format!("Communication recovered for unit {}", code), serde_json::json!({ "unit_id": unit_id }), )) } - AppEvent::RemLocal { unit_id, equipment_id } => { + AppEvent::RemLocal { + unit_id, + equipment_id, + } => { let unit_code = fetch_unit_code(pool, *unit_id).await; let eq_code = fetch_equipment_code(pool, *equipment_id).await; Some(( - "feeder.unit.rem_local", "warn", - Some(*unit_id), Some(*equipment_id), None, - format!("Unit {} switched to local control via equipment {}", unit_code, eq_code), + "feeder.unit.rem_local", + "warn", + Some(*unit_id), + Some(*equipment_id), + None, + format!( + "Unit {} switched to local control via equipment {}", + unit_code, eq_code + ), serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), )) } AppEvent::RemRecovered { unit_id } => { let code = fetch_unit_code(pool, *unit_id).await; Some(( - "feeder.unit.rem_recovered", "warn", - Some(*unit_id), None, None, - format!("Unit {} returned to remote control; auto control requires manual restart", code), + "feeder.unit.rem_recovered", + "warn", + Some(*unit_id), + None, + None, + format!( + "Unit {} returned to remote control; auto control requires manual restart", + code + ), serde_json::json!({ "unit_id": unit_id }), )) }