Clean up clippy warnings and improve code organization
- engine.rs: use is_none_or instead of map_or, remove redundant closures - service/control.rs: move get_equipment_role_points before test module - event.rs: replace complex 7-element tuple with PersistableEvent struct Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
706fb4f72a
commit
58fdb9f58e
|
|
@ -38,7 +38,7 @@ async fn supervise(state: AppState, store: Arc<ControlRuntimeStore>) {
|
|||
match plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await {
|
||||
Ok(units) => {
|
||||
for unit in units {
|
||||
let needs_spawn = tasks.get(&unit.id).map_or(true, |h| h.is_finished());
|
||||
let needs_spawn = tasks.get(&unit.id).is_none_or(|h| h.is_finished());
|
||||
if needs_spawn {
|
||||
let s = state.clone();
|
||||
let st = store.clone();
|
||||
|
|
@ -370,7 +370,7 @@ async fn check_fault_comm(
|
|||
roles
|
||||
.get("flt")
|
||||
.and_then(|rp| monitor.get(&rp.point_id))
|
||||
.map(|m| super::monitor_value_as_bool(m))
|
||||
.map(super::monitor_value_as_bool)
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
|
|
@ -381,7 +381,7 @@ async fn check_fault_comm(
|
|||
roles
|
||||
.get("flt")
|
||||
.and_then(|rp| monitor.get(&rp.point_id))
|
||||
.map(|m| super::monitor_value_as_bool(m))
|
||||
.map(super::monitor_value_as_bool)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.map(|(eq_id, _)| *eq_id)
|
||||
|
|
|
|||
|
|
@ -180,39 +180,41 @@ async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String {
|
|||
.unwrap_or_else(|| id.to_string())
|
||||
}
|
||||
|
||||
struct PersistableEvent {
|
||||
event_type: &'static str,
|
||||
level: &'static str,
|
||||
unit_id: Option<Uuid>,
|
||||
equipment_id: Option<Uuid>,
|
||||
source_id: Option<Uuid>,
|
||||
message: String,
|
||||
payload: serde_json::Value,
|
||||
}
|
||||
|
||||
async fn persist_event_if_needed(
|
||||
event: &AppEvent,
|
||||
pool: &sqlx::PgPool,
|
||||
ws_manager: Option<&std::sync::Arc<WebSocketManager>>,
|
||||
) {
|
||||
let record: Option<(
|
||||
&str,
|
||||
&str,
|
||||
Option<Uuid>,
|
||||
Option<Uuid>,
|
||||
Option<Uuid>,
|
||||
String,
|
||||
serde_json::Value,
|
||||
)> = match event {
|
||||
let record: Option<PersistableEvent> = match event {
|
||||
AppEvent::EquipmentStartCommandSent {
|
||||
equipment_id,
|
||||
unit_id,
|
||||
point_id,
|
||||
} => {
|
||||
let code = fetch_equipment_code(pool, *equipment_id).await;
|
||||
Some((
|
||||
"feeder.equipment.start_command_sent",
|
||||
"info",
|
||||
*unit_id,
|
||||
Some(*equipment_id),
|
||||
None,
|
||||
format!("Start command sent to equipment {}", code),
|
||||
serde_json::json!({
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.equipment.start_command_sent",
|
||||
level: "info",
|
||||
unit_id: *unit_id,
|
||||
equipment_id: Some(*equipment_id),
|
||||
source_id: None,
|
||||
message: format!("Start command sent to equipment {}", code),
|
||||
payload: serde_json::json!({
|
||||
"equipment_id": equipment_id,
|
||||
"unit_id": unit_id,
|
||||
"point_id": point_id
|
||||
}),
|
||||
))
|
||||
})
|
||||
}
|
||||
AppEvent::EquipmentStopCommandSent {
|
||||
equipment_id,
|
||||
|
|
@ -220,43 +222,43 @@ async fn persist_event_if_needed(
|
|||
point_id,
|
||||
} => {
|
||||
let code = fetch_equipment_code(pool, *equipment_id).await;
|
||||
Some((
|
||||
"feeder.equipment.stop_command_sent",
|
||||
"info",
|
||||
*unit_id,
|
||||
Some(*equipment_id),
|
||||
None,
|
||||
format!("Stop command sent to equipment {}", code),
|
||||
serde_json::json!({
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.equipment.stop_command_sent",
|
||||
level: "info",
|
||||
unit_id: *unit_id,
|
||||
equipment_id: Some(*equipment_id),
|
||||
source_id: None,
|
||||
message: format!("Stop command sent to equipment {}", code),
|
||||
payload: serde_json::json!({
|
||||
"equipment_id": equipment_id,
|
||||
"unit_id": unit_id,
|
||||
"point_id": point_id
|
||||
}),
|
||||
))
|
||||
})
|
||||
}
|
||||
AppEvent::AutoControlStarted { unit_id } => {
|
||||
let code = fetch_unit_code(pool, *unit_id).await;
|
||||
Some((
|
||||
"feeder.unit.auto_control_started",
|
||||
"info",
|
||||
Some(*unit_id),
|
||||
None,
|
||||
None,
|
||||
format!("Auto control started for unit {}", code),
|
||||
serde_json::json!({ "unit_id": unit_id }),
|
||||
))
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.unit.auto_control_started",
|
||||
level: "info",
|
||||
unit_id: Some(*unit_id),
|
||||
equipment_id: None,
|
||||
source_id: None,
|
||||
message: format!("Auto control started for unit {}", code),
|
||||
payload: serde_json::json!({ "unit_id": unit_id }),
|
||||
})
|
||||
}
|
||||
AppEvent::AutoControlStopped { unit_id } => {
|
||||
let code = fetch_unit_code(pool, *unit_id).await;
|
||||
Some((
|
||||
"feeder.unit.auto_control_stopped",
|
||||
"info",
|
||||
Some(*unit_id),
|
||||
None,
|
||||
None,
|
||||
format!("Auto control stopped for unit {}", code),
|
||||
serde_json::json!({ "unit_id": unit_id }),
|
||||
))
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.unit.auto_control_stopped",
|
||||
level: "info",
|
||||
unit_id: Some(*unit_id),
|
||||
equipment_id: None,
|
||||
source_id: None,
|
||||
message: format!("Auto control stopped for unit {}", code),
|
||||
payload: serde_json::json!({ "unit_id": unit_id }),
|
||||
})
|
||||
}
|
||||
AppEvent::FaultLocked {
|
||||
unit_id,
|
||||
|
|
@ -264,54 +266,54 @@ async fn persist_event_if_needed(
|
|||
} => {
|
||||
let unit_code = fetch_unit_code(pool, *unit_id).await;
|
||||
let eq_code = fetch_equipment_code(pool, *equipment_id).await;
|
||||
Some((
|
||||
"feeder.unit.fault_locked",
|
||||
"error",
|
||||
Some(*unit_id),
|
||||
Some(*equipment_id),
|
||||
None,
|
||||
format!(
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.unit.fault_locked",
|
||||
level: "error",
|
||||
unit_id: Some(*unit_id),
|
||||
equipment_id: Some(*equipment_id),
|
||||
source_id: None,
|
||||
message: format!(
|
||||
"Fault locked for unit {} by equipment {}",
|
||||
unit_code, eq_code
|
||||
),
|
||||
serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
|
||||
))
|
||||
payload: serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
|
||||
})
|
||||
}
|
||||
AppEvent::FaultAcked { unit_id } => {
|
||||
let code = fetch_unit_code(pool, *unit_id).await;
|
||||
Some((
|
||||
"feeder.unit.fault_acked",
|
||||
"info",
|
||||
Some(*unit_id),
|
||||
None,
|
||||
None,
|
||||
format!("Fault acknowledged for unit {}", code),
|
||||
serde_json::json!({ "unit_id": unit_id }),
|
||||
))
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.unit.fault_acked",
|
||||
level: "info",
|
||||
unit_id: Some(*unit_id),
|
||||
equipment_id: None,
|
||||
source_id: None,
|
||||
message: format!("Fault acknowledged for unit {}", code),
|
||||
payload: serde_json::json!({ "unit_id": unit_id }),
|
||||
})
|
||||
}
|
||||
AppEvent::CommLocked { unit_id } => {
|
||||
let code = fetch_unit_code(pool, *unit_id).await;
|
||||
Some((
|
||||
"feeder.unit.comm_locked",
|
||||
"warn",
|
||||
Some(*unit_id),
|
||||
None,
|
||||
None,
|
||||
format!("Communication locked for unit {}", code),
|
||||
serde_json::json!({ "unit_id": unit_id }),
|
||||
))
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.unit.comm_locked",
|
||||
level: "warn",
|
||||
unit_id: Some(*unit_id),
|
||||
equipment_id: None,
|
||||
source_id: None,
|
||||
message: format!("Communication locked for unit {}", code),
|
||||
payload: serde_json::json!({ "unit_id": unit_id }),
|
||||
})
|
||||
}
|
||||
AppEvent::CommRecovered { unit_id } => {
|
||||
let code = fetch_unit_code(pool, *unit_id).await;
|
||||
Some((
|
||||
"feeder.unit.comm_recovered",
|
||||
"info",
|
||||
Some(*unit_id),
|
||||
None,
|
||||
None,
|
||||
format!("Communication recovered for unit {}", code),
|
||||
serde_json::json!({ "unit_id": unit_id }),
|
||||
))
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.unit.comm_recovered",
|
||||
level: "info",
|
||||
unit_id: Some(*unit_id),
|
||||
equipment_id: None,
|
||||
source_id: None,
|
||||
message: format!("Communication recovered for unit {}", code),
|
||||
payload: serde_json::json!({ "unit_id": unit_id }),
|
||||
})
|
||||
}
|
||||
AppEvent::RemLocal {
|
||||
unit_id,
|
||||
|
|
@ -319,42 +321,42 @@ async fn persist_event_if_needed(
|
|||
} => {
|
||||
let unit_code = fetch_unit_code(pool, *unit_id).await;
|
||||
let eq_code = fetch_equipment_code(pool, *equipment_id).await;
|
||||
Some((
|
||||
"feeder.unit.rem_local",
|
||||
"warn",
|
||||
Some(*unit_id),
|
||||
Some(*equipment_id),
|
||||
None,
|
||||
format!(
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.unit.rem_local",
|
||||
level: "warn",
|
||||
unit_id: Some(*unit_id),
|
||||
equipment_id: Some(*equipment_id),
|
||||
source_id: None,
|
||||
message: format!(
|
||||
"Unit {} switched to local control via equipment {}",
|
||||
unit_code, eq_code
|
||||
),
|
||||
serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
|
||||
))
|
||||
payload: serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
|
||||
})
|
||||
}
|
||||
AppEvent::RemRecovered { unit_id } => {
|
||||
let code = fetch_unit_code(pool, *unit_id).await;
|
||||
Some((
|
||||
"feeder.unit.rem_recovered",
|
||||
"warn",
|
||||
Some(*unit_id),
|
||||
None,
|
||||
None,
|
||||
format!(
|
||||
Some(PersistableEvent {
|
||||
event_type: "feeder.unit.rem_recovered",
|
||||
level: "warn",
|
||||
unit_id: Some(*unit_id),
|
||||
equipment_id: None,
|
||||
source_id: None,
|
||||
message: format!(
|
||||
"Unit {} returned to remote control; auto control requires manual restart",
|
||||
code
|
||||
),
|
||||
serde_json::json!({ "unit_id": unit_id }),
|
||||
))
|
||||
payload: serde_json::json!({ "unit_id": unit_id }),
|
||||
})
|
||||
}
|
||||
AppEvent::UnitStateChanged { .. } => None,
|
||||
};
|
||||
|
||||
let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record
|
||||
let Some(record) = record
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let envelope = EventEnvelope::new(event_type, payload);
|
||||
let envelope = EventEnvelope::new(record.event_type, record.payload);
|
||||
|
||||
let inserted = sqlx::query_as::<_, EventRecord>(
|
||||
r#"
|
||||
|
|
@ -364,11 +366,11 @@ async fn persist_event_if_needed(
|
|||
"#,
|
||||
)
|
||||
.bind(envelope.event_type)
|
||||
.bind(level)
|
||||
.bind(unit_id as Option<Uuid>)
|
||||
.bind(equipment_id as Option<Uuid>)
|
||||
.bind(source_id)
|
||||
.bind(message)
|
||||
.bind(record.level)
|
||||
.bind(record.unit_id)
|
||||
.bind(record.equipment_id)
|
||||
.bind(record.source_id)
|
||||
.bind(record.message)
|
||||
.bind(sqlx::types::Json(envelope.payload))
|
||||
.fetch_one(pool)
|
||||
.await;
|
||||
|
|
|
|||
|
|
@ -431,21 +431,6 @@ pub async fn get_signal_role_points_batch(
|
|||
.collect())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{equipment_order_clause_with_unit, unit_order_clause};
|
||||
|
||||
#[test]
|
||||
fn unit_ordering_defaults_to_code() {
|
||||
assert_eq!(unit_order_clause(), "code");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unit_equipment_ordering_uses_code_within_unit() {
|
||||
assert_eq!(equipment_order_clause_with_unit(), "unit_id, code");
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_equipment_role_points(
|
||||
pool: &PgPool,
|
||||
equipment_id: Uuid,
|
||||
|
|
@ -474,3 +459,18 @@ pub async fn get_equipment_role_points(
|
|||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{equipment_order_clause_with_unit, unit_order_clause};
|
||||
|
||||
#[test]
|
||||
fn unit_ordering_defaults_to_code() {
|
||||
assert_eq!(unit_order_clause(), "code");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unit_equipment_ordering_uses_code_within_unit() {
|
||||
assert_eq!(equipment_order_clause_with_unit(), "unit_id, code");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue