diff --git a/src/control/engine.rs b/src/control/engine.rs index ca16717..1824460 100644 --- a/src/control/engine.rs +++ b/src/control/engine.rs @@ -57,18 +57,6 @@ async fn supervise(state: AppState, store: Arc) { async fn unit_task(state: AppState, store: Arc, unit_id: Uuid) { let notify = store.get_or_create_notify(unit_id).await; - // Load equipment maps once at task start. - // If equipment config changes, the supervisor's next scan will restart the task. - let (kind_roles, kind_eq_ids, all_roles) = loop { - match load_equipment_maps(&state, unit_id).await { - Ok(maps) => break maps, - Err(e) => { - tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e); - tokio::time::sleep(Duration::from_secs(5)).await; - } - } - }; - // Fault/comm check ticker — still need periodic polling of point monitor data. let mut fault_tick = tokio::time::interval(Duration::from_millis(500)); fault_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -89,6 +77,15 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu }; // ── Fault / comm check ──────────────────────────────────────────────── + let (kind_roles, kind_eq_ids, all_roles) = match load_equipment_maps(&state, unit_id).await { + Ok(maps) => maps, + Err(e) => { + tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + let mut runtime = store.get_or_init(unit_id).await; if check_fault_comm(&state, &mut runtime, &unit, &all_roles).await { store.upsert(runtime.clone()).await; @@ -369,14 +366,40 @@ type EquipMaps = ( async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result { let equipment_list = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; + let equipment_ids: Vec = equipment_list.iter().map(|equip| equip.id).collect(); + let role_point_rows = + crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?; + let mut role_points_by_equipment: HashMap> = HashMap::new(); + for row in role_point_rows { + role_points_by_equipment + .entry(row.equipment_id) + .or_default() + .push(EquipmentRolePoint { + point_id: row.point_id, + signal_role: row.signal_role, + }); + } + Ok(build_equipment_maps( + unit_id, + &equipment_list, + role_points_by_equipment, + )) +} + +fn build_equipment_maps( + unit_id: Uuid, + equipment_list: &[crate::model::Equipment], + mut role_points_by_equipment: HashMap>, +) -> EquipMaps { let mut kind_roles: HashMap> = HashMap::new(); let mut kind_eq_ids: HashMap = HashMap::new(); let mut all_roles: Vec<(Uuid, HashMap)> = Vec::new(); - for equip in &equipment_list { - let role_points = crate::service::get_equipment_role_points(&state.pool, equip.id).await?; - let role_map: HashMap = role_points + for equip in equipment_list { + let role_map: HashMap = role_points_by_equipment + .remove(&equip.id) + .unwrap_or_default() .into_iter() .map(|rp| (rp.signal_role.clone(), rp)) .collect(); @@ -395,7 +418,7 @@ async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result Equipment { + Equipment { + id, + unit_id: Some(unit_id), + code: format!("EQ-{id}"), + name: format!("Equipment-{id}"), + kind: Some(kind.to_string()), + description: None, + created_at: Utc::now(), + updated_at: Utc::now(), + } + } + + #[test] + fn build_equipment_maps_reflects_latest_role_bindings() { + let unit_id = Uuid::new_v4(); + let equipment_id = Uuid::new_v4(); + let first_start_point = Uuid::new_v4(); + let second_start_point = Uuid::new_v4(); + let equipment_list = vec![equipment(equipment_id, unit_id, "coal_feeder")]; + + let mut first_roles = HashMap::new(); + first_roles.insert( + equipment_id, + vec![EquipmentRolePoint { + point_id: first_start_point, + signal_role: "start_cmd".to_string(), + }], + ); + let (first_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, first_roles); + + let mut second_roles = HashMap::new(); + second_roles.insert( + equipment_id, + vec![EquipmentRolePoint { + point_id: second_start_point, + signal_role: "start_cmd".to_string(), + }], + ); + let (second_kind_roles, _, _) = + build_equipment_maps(unit_id, &equipment_list, second_roles); + + assert_eq!( + first_kind_roles["coal_feeder"]["start_cmd"].point_id, + first_start_point + ); + assert_eq!( + second_kind_roles["coal_feeder"]["start_cmd"].point_id, + second_start_point + ); + } +} diff --git a/src/handler/control.rs b/src/handler/control.rs index 46c9f6f..2a84987 100644 --- a/src/handler/control.rs +++ b/src/handler/control.rs @@ -35,6 +35,10 @@ fn validate_unit_timing_order( Ok(()) } +fn auto_control_start_blocked(runtime: &crate::control::runtime::UnitRuntime) -> bool { + runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required +} + #[derive(Debug, Deserialize, Validate)] pub struct GetUnitListQuery { #[validate(length(min = 1, max = 100))] @@ -534,17 +538,15 @@ pub async fn start_auto_unit( } let mut runtime = state.control_runtime.get_or_init(unit_id).await; - if runtime.fault_locked { - return Err(ApiErr::BadRequest( - "Unit is fault locked, cannot start auto control".to_string(), - None, - )); - } - if runtime.manual_ack_required { - return Err(ApiErr::BadRequest( - "Fault acknowledgement required before starting auto control".to_string(), - None, - )); + if auto_control_start_blocked(&runtime) { + let message = if runtime.fault_locked { + "Unit is fault locked, cannot start auto control" + } else if runtime.comm_locked { + "Unit communication is locked, cannot start auto control" + } else { + "Fault acknowledgement required before starting auto control" + }; + return Err(ApiErr::BadRequest(message.to_string(), None)); } runtime.auto_enabled = true; runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; @@ -587,7 +589,7 @@ pub async fn batch_start_auto( skipped.push(unit.id); continue; } - if runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required { + if auto_control_start_blocked(&runtime) { skipped.push(unit.id); continue; } @@ -675,7 +677,11 @@ pub async fn get_unit_runtime( #[cfg(test)] mod tests { - use super::{validate_unit_timing_order, CreateUnitReq, UpdateUnitReq}; + use super::{ + auto_control_start_blocked, validate_unit_timing_order, CreateUnitReq, UpdateUnitReq, + }; + use crate::control::runtime::{UnitRuntime, UnitRuntimeState}; + use uuid::Uuid; use validator::Validate; #[test] @@ -721,4 +727,21 @@ mod tests { fn update_unit_req_rejects_acc_time_not_greater_than_run_time_when_both_present() { assert!(validate_unit_timing_order(20, 15).is_err()); } + + #[test] + fn auto_control_start_is_blocked_by_comm_lock() { + let runtime = UnitRuntime { + unit_id: Uuid::new_v4(), + state: UnitRuntimeState::Stopped, + auto_enabled: false, + accumulated_run_sec: 0, + display_acc_sec: 0, + fault_locked: false, + flt_active: false, + comm_locked: true, + manual_ack_required: false, + }; + + assert!(auto_control_start_blocked(&runtime)); + } } diff --git a/src/handler/equipment.rs b/src/handler/equipment.rs index 3c0ad01..c2472d9 100644 --- a/src/handler/equipment.rs +++ b/src/handler/equipment.rs @@ -14,6 +14,18 @@ use crate::util::{ }; use crate::AppState; +async fn notify_units( + state: &AppState, + unit_ids: impl IntoIterator, +) { + let mut seen = std::collections::HashSet::new(); + for unit_id in unit_ids { + if seen.insert(unit_id) { + state.control_runtime.notify_unit(unit_id).await; + } + } +} + #[derive(Deserialize, Validate)] pub struct GetEquipmentListQuery { #[validate(length(min = 1, max = 100))] @@ -176,6 +188,10 @@ pub async fn create_equipment( ) .await?; + if let Some(unit_id) = payload.unit_id { + notify_units(&state, [unit_id]).await; + } + Ok(( StatusCode::CREATED, Json(serde_json::json!({ @@ -202,9 +218,11 @@ pub async fn update_equipment( } let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; - if exists.is_none() { + let existing_equipment = if let Some(equipment) = exists { + equipment + } else { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); - } + }; if let Some(Some(unit_id)) = payload.unit_id { let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; @@ -237,6 +255,19 @@ pub async fn update_equipment( ) .await?; + let mut unit_ids = Vec::new(); + if let Some(unit_id) = existing_equipment.unit_id { + unit_ids.push(unit_id); + } + let next_unit_id = match payload.unit_id { + Some(next) => next, + None => existing_equipment.unit_id, + }; + if let Some(unit_id) = next_unit_id { + unit_ids.push(unit_id); + } + notify_units(&state, unit_ids).await; + Ok(Json(serde_json::json!({ "ok_msg": "Equipment updated successfully" }))) @@ -262,6 +293,9 @@ pub async fn batch_set_equipment_unit( } } + let before_unit_ids = + crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?; + let updated_count = crate::service::batch_set_equipment_unit( &state.pool, &payload.equipment_ids, @@ -269,6 +303,12 @@ pub async fn batch_set_equipment_unit( ) .await?; + let mut unit_ids = before_unit_ids; + if let Some(unit_id) = payload.unit_id { + unit_ids.push(unit_id); + } + notify_units(&state, unit_ids).await; + Ok(Json(serde_json::json!({ "ok_msg": "Equipment unit updated successfully", "updated_count": updated_count @@ -279,10 +319,13 @@ pub async fn delete_equipment( State(state): State, Path(equipment_id): Path, ) -> Result { + let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?; let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?; if !deleted { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } + notify_units(&state, unit_ids).await; + Ok(StatusCode::NO_CONTENT) } diff --git a/src/handler/point.rs b/src/handler/point.rs index f2401f5..6f5e219 100644 --- a/src/handler/point.rs +++ b/src/handler/point.rs @@ -21,6 +21,18 @@ use crate::{ AppState, }; +async fn notify_units( + state: &AppState, + unit_ids: impl IntoIterator, +) { + let mut seen = std::collections::HashSet::new(); + for unit_id in unit_ids { + if seen.insert(unit_id) { + state.control_runtime.notify_unit(unit_id).await; + } + } +} + /// List all points. #[derive(Deserialize, Validate)] pub struct GetPointListQuery { @@ -227,6 +239,7 @@ pub async fn update_point( if existing_point.is_none() { return Err(ApiErr::NotFound("Point not found".to_string(), None)); } + let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; let mut qb: QueryBuilder = QueryBuilder::new("UPDATE point SET "); let mut wrote_field = false; @@ -282,6 +295,9 @@ pub async fn update_point( qb.push(" WHERE id = ").push_bind(point_id); qb.build().execute(pool).await?; + let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; + notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await; + Ok(Json( serde_json::json!({"ok_msg": "Point updated successfully"}), )) @@ -382,6 +398,8 @@ pub async fn batch_set_point_equipment( return Err(ApiErr::NotFound("No valid points found".to_string(), None)); } + let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; + let result = sqlx::query( r#" UPDATE point @@ -397,6 +415,9 @@ pub async fn batch_set_point_equipment( .execute(pool) .await?; + let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; + notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await; + Ok(Json(serde_json::json!({ "ok_msg": "Point equipment updated successfully", "updated_count": result.rows_affected() @@ -409,6 +430,7 @@ pub async fn delete_point( Path(point_id): Path, ) -> Result { let pool = &state.pool; + let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; let source_id = { let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?; @@ -442,6 +464,8 @@ pub async fn delete_point( } } + notify_units(&state, affected_unit_ids).await; + Ok(Json( serde_json::json!({"ok_msg": "Point deleted successfully"}), )) @@ -594,6 +618,7 @@ pub async fn batch_delete_points( let point_ids = payload.point_ids; let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?; + let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?; let existing_point_ids: Vec = grouped .values() .flat_map(|points| points.iter().map(|p| p.point_id)) @@ -621,6 +646,8 @@ pub async fn batch_delete_points( } } + notify_units(&state, affected_unit_ids).await; + Ok(Json(BatchDeletePointsRes { deleted_count: result.rows_affected(), })) diff --git a/src/service/control.rs b/src/service/control.rs index f539ad8..f0266f6 100644 --- a/src/service/control.rs +++ b/src/service/control.rs @@ -380,6 +380,53 @@ pub async fn get_points_by_equipment_ids( .await } +pub async fn get_unit_ids_by_equipment_ids( + pool: &PgPool, + equipment_ids: &[Uuid], +) -> Result, sqlx::Error> { + if equipment_ids.is_empty() { + return Ok(vec![]); + } + + let rows = sqlx::query_scalar::<_, Uuid>( + r#" + SELECT DISTINCT unit_id + FROM equipment + WHERE id = ANY($1) + AND unit_id IS NOT NULL + "#, + ) + .bind(equipment_ids) + .fetch_all(pool) + .await?; + + Ok(rows) +} + +pub async fn get_unit_ids_by_point_ids( + pool: &PgPool, + point_ids: &[Uuid], +) -> Result, sqlx::Error> { + if point_ids.is_empty() { + return Ok(vec![]); + } + + let rows = sqlx::query_scalar::<_, Uuid>( + r#" + SELECT DISTINCT e.unit_id + FROM point p + INNER JOIN equipment e ON e.id = p.equipment_id + WHERE p.id = ANY($1) + AND e.unit_id IS NOT NULL + "#, + ) + .bind(point_ids) + .fetch_all(pool) + .await?; + + Ok(rows) +} + pub struct EquipmentSignalRole { pub equipment_id: Uuid, pub point_id: Uuid,