fix(control): refresh unit mappings on config changes

This commit is contained in:
caoqianming 2026-03-26 13:30:14 +08:00
parent dbfa673468
commit 9f833f3a5e
5 changed files with 255 additions and 31 deletions

View File

@ -57,18 +57,6 @@ async fn supervise(state: AppState, store: Arc<ControlRuntimeStore>) {
async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uuid) {
let notify = store.get_or_create_notify(unit_id).await;
// Load equipment maps once at task start.
// If equipment config changes, the supervisor's next scan will restart the task.
let (kind_roles, kind_eq_ids, all_roles) = loop {
match load_equipment_maps(&state, unit_id).await {
Ok(maps) => break maps,
Err(e) => {
tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
};
// Fault/comm check ticker — still need periodic polling of point monitor data.
let mut fault_tick = tokio::time::interval(Duration::from_millis(500));
fault_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@ -89,6 +77,15 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
};
// ── Fault / comm check ────────────────────────────────────────────────
let (kind_roles, kind_eq_ids, all_roles) = match load_equipment_maps(&state, unit_id).await {
Ok(maps) => maps,
Err(e) => {
tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
};
let mut runtime = store.get_or_init(unit_id).await;
if check_fault_comm(&state, &mut runtime, &unit, &all_roles).await {
store.upsert(runtime.clone()).await;
@ -369,14 +366,40 @@ type EquipMaps = (
async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result<EquipMaps, sqlx::Error> {
let equipment_list = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?;
let equipment_ids: Vec<Uuid> = equipment_list.iter().map(|equip| equip.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?;
let mut role_points_by_equipment: HashMap<Uuid, Vec<EquipmentRolePoint>> = HashMap::new();
for row in role_point_rows {
role_points_by_equipment
.entry(row.equipment_id)
.or_default()
.push(EquipmentRolePoint {
point_id: row.point_id,
signal_role: row.signal_role,
});
}
Ok(build_equipment_maps(
unit_id,
&equipment_list,
role_points_by_equipment,
))
}
fn build_equipment_maps(
unit_id: Uuid,
equipment_list: &[crate::model::Equipment],
mut role_points_by_equipment: HashMap<Uuid, Vec<EquipmentRolePoint>>,
) -> EquipMaps {
let mut kind_roles: HashMap<String, HashMap<String, EquipmentRolePoint>> = HashMap::new();
let mut kind_eq_ids: HashMap<String, Uuid> = HashMap::new();
let mut all_roles: Vec<(Uuid, HashMap<String, EquipmentRolePoint>)> = Vec::new();
for equip in &equipment_list {
let role_points = crate::service::get_equipment_role_points(&state.pool, equip.id).await?;
let role_map: HashMap<String, EquipmentRolePoint> = role_points
for equip in equipment_list {
let role_map: HashMap<String, EquipmentRolePoint> = role_points_by_equipment
.remove(&equip.id)
.unwrap_or_default()
.into_iter()
.map(|rp| (rp.signal_role.clone(), rp))
.collect();
@ -395,7 +418,7 @@ async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result<EquipMap
all_roles.push((equip.id, role_map));
}
Ok((kind_roles, kind_eq_ids, all_roles))
(kind_roles, kind_eq_ids, all_roles)
}
/// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad.
@ -428,3 +451,64 @@ fn find_cmd(
}
}
#[cfg(test)]
mod tests {
use super::build_equipment_maps;
use crate::model::Equipment;
use crate::service::EquipmentRolePoint;
use chrono::Utc;
use std::collections::HashMap;
use uuid::Uuid;
fn equipment(id: Uuid, unit_id: Uuid, kind: &str) -> Equipment {
Equipment {
id,
unit_id: Some(unit_id),
code: format!("EQ-{id}"),
name: format!("Equipment-{id}"),
kind: Some(kind.to_string()),
description: None,
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
#[test]
fn build_equipment_maps_reflects_latest_role_bindings() {
let unit_id = Uuid::new_v4();
let equipment_id = Uuid::new_v4();
let first_start_point = Uuid::new_v4();
let second_start_point = Uuid::new_v4();
let equipment_list = vec![equipment(equipment_id, unit_id, "coal_feeder")];
let mut first_roles = HashMap::new();
first_roles.insert(
equipment_id,
vec![EquipmentRolePoint {
point_id: first_start_point,
signal_role: "start_cmd".to_string(),
}],
);
let (first_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, first_roles);
let mut second_roles = HashMap::new();
second_roles.insert(
equipment_id,
vec![EquipmentRolePoint {
point_id: second_start_point,
signal_role: "start_cmd".to_string(),
}],
);
let (second_kind_roles, _, _) =
build_equipment_maps(unit_id, &equipment_list, second_roles);
assert_eq!(
first_kind_roles["coal_feeder"]["start_cmd"].point_id,
first_start_point
);
assert_eq!(
second_kind_roles["coal_feeder"]["start_cmd"].point_id,
second_start_point
);
}
}

View File

@ -35,6 +35,10 @@ fn validate_unit_timing_order(
Ok(())
}
fn auto_control_start_blocked(runtime: &crate::control::runtime::UnitRuntime) -> bool {
runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required
}
#[derive(Debug, Deserialize, Validate)]
pub struct GetUnitListQuery {
#[validate(length(min = 1, max = 100))]
@ -534,17 +538,15 @@ pub async fn start_auto_unit(
}
let mut runtime = state.control_runtime.get_or_init(unit_id).await;
if runtime.fault_locked {
return Err(ApiErr::BadRequest(
"Unit is fault locked, cannot start auto control".to_string(),
None,
));
}
if runtime.manual_ack_required {
return Err(ApiErr::BadRequest(
"Fault acknowledgement required before starting auto control".to_string(),
None,
));
if auto_control_start_blocked(&runtime) {
let message = if runtime.fault_locked {
"Unit is fault locked, cannot start auto control"
} else if runtime.comm_locked {
"Unit communication is locked, cannot start auto control"
} else {
"Fault acknowledgement required before starting auto control"
};
return Err(ApiErr::BadRequest(message.to_string(), None));
}
runtime.auto_enabled = true;
runtime.state = crate::control::runtime::UnitRuntimeState::Stopped;
@ -587,7 +589,7 @@ pub async fn batch_start_auto(
skipped.push(unit.id);
continue;
}
if runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required {
if auto_control_start_blocked(&runtime) {
skipped.push(unit.id);
continue;
}
@ -675,7 +677,11 @@ pub async fn get_unit_runtime(
#[cfg(test)]
mod tests {
use super::{validate_unit_timing_order, CreateUnitReq, UpdateUnitReq};
use super::{
auto_control_start_blocked, validate_unit_timing_order, CreateUnitReq, UpdateUnitReq,
};
use crate::control::runtime::{UnitRuntime, UnitRuntimeState};
use uuid::Uuid;
use validator::Validate;
#[test]
@ -721,4 +727,21 @@ mod tests {
fn update_unit_req_rejects_acc_time_not_greater_than_run_time_when_both_present() {
assert!(validate_unit_timing_order(20, 15).is_err());
}
#[test]
fn auto_control_start_is_blocked_by_comm_lock() {
let runtime = UnitRuntime {
unit_id: Uuid::new_v4(),
state: UnitRuntimeState::Stopped,
auto_enabled: false,
accumulated_run_sec: 0,
display_acc_sec: 0,
fault_locked: false,
flt_active: false,
comm_locked: true,
manual_ack_required: false,
};
assert!(auto_control_start_blocked(&runtime));
}
}

View File

@ -14,6 +14,18 @@ use crate::util::{
};
use crate::AppState;
async fn notify_units(
state: &AppState,
unit_ids: impl IntoIterator<Item = Uuid>,
) {
let mut seen = std::collections::HashSet::new();
for unit_id in unit_ids {
if seen.insert(unit_id) {
state.control_runtime.notify_unit(unit_id).await;
}
}
}
#[derive(Deserialize, Validate)]
pub struct GetEquipmentListQuery {
#[validate(length(min = 1, max = 100))]
@ -176,6 +188,10 @@ pub async fn create_equipment(
)
.await?;
if let Some(unit_id) = payload.unit_id {
notify_units(&state, [unit_id]).await;
}
Ok((
StatusCode::CREATED,
Json(serde_json::json!({
@ -202,9 +218,11 @@ pub async fn update_equipment(
}
let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?;
if exists.is_none() {
let existing_equipment = if let Some(equipment) = exists {
equipment
} else {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
};
if let Some(Some(unit_id)) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
@ -237,6 +255,19 @@ pub async fn update_equipment(
)
.await?;
let mut unit_ids = Vec::new();
if let Some(unit_id) = existing_equipment.unit_id {
unit_ids.push(unit_id);
}
let next_unit_id = match payload.unit_id {
Some(next) => next,
None => existing_equipment.unit_id,
};
if let Some(unit_id) = next_unit_id {
unit_ids.push(unit_id);
}
notify_units(&state, unit_ids).await;
Ok(Json(serde_json::json!({
"ok_msg": "Equipment updated successfully"
})))
@ -262,6 +293,9 @@ pub async fn batch_set_equipment_unit(
}
}
let before_unit_ids =
crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?;
let updated_count = crate::service::batch_set_equipment_unit(
&state.pool,
&payload.equipment_ids,
@ -269,6 +303,12 @@ pub async fn batch_set_equipment_unit(
)
.await?;
let mut unit_ids = before_unit_ids;
if let Some(unit_id) = payload.unit_id {
unit_ids.push(unit_id);
}
notify_units(&state, unit_ids).await;
Ok(Json(serde_json::json!({
"ok_msg": "Equipment unit updated successfully",
"updated_count": updated_count
@ -279,10 +319,13 @@ pub async fn delete_equipment(
State(state): State<AppState>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?;
let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
notify_units(&state, unit_ids).await;
Ok(StatusCode::NO_CONTENT)
}

View File

@ -21,6 +21,18 @@ use crate::{
AppState,
};
async fn notify_units(
state: &AppState,
unit_ids: impl IntoIterator<Item = Uuid>,
) {
let mut seen = std::collections::HashSet::new();
for unit_id in unit_ids {
if seen.insert(unit_id) {
state.control_runtime.notify_unit(unit_id).await;
}
}
}
/// List all points.
#[derive(Deserialize, Validate)]
pub struct GetPointListQuery {
@ -227,6 +239,7 @@ pub async fn update_point(
if existing_point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let mut qb: QueryBuilder<sqlx::Postgres> = QueryBuilder::new("UPDATE point SET ");
let mut wrote_field = false;
@ -282,6 +295,9 @@ pub async fn update_point(
qb.push(" WHERE id = ").push_bind(point_id);
qb.build().execute(pool).await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await;
Ok(Json(
serde_json::json!({"ok_msg": "Point updated successfully"}),
))
@ -382,6 +398,8 @@ pub async fn batch_set_point_equipment(
return Err(ApiErr::NotFound("No valid points found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
let result = sqlx::query(
r#"
UPDATE point
@ -397,6 +415,9 @@ pub async fn batch_set_point_equipment(
.execute(pool)
.await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await;
Ok(Json(serde_json::json!({
"ok_msg": "Point equipment updated successfully",
"updated_count": result.rows_affected()
@ -409,6 +430,7 @@ pub async fn delete_point(
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let source_id = {
let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?;
@ -442,6 +464,8 @@ pub async fn delete_point(
}
}
notify_units(&state, affected_unit_ids).await;
Ok(Json(
serde_json::json!({"ok_msg": "Point deleted successfully"}),
))
@ -594,6 +618,7 @@ pub async fn batch_delete_points(
let point_ids = payload.point_ids;
let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?;
let existing_point_ids: Vec<Uuid> = grouped
.values()
.flat_map(|points| points.iter().map(|p| p.point_id))
@ -621,6 +646,8 @@ pub async fn batch_delete_points(
}
}
notify_units(&state, affected_unit_ids).await;
Ok(Json(BatchDeletePointsRes {
deleted_count: result.rows_affected(),
}))

View File

@ -380,6 +380,53 @@ pub async fn get_points_by_equipment_ids(
.await
}
pub async fn get_unit_ids_by_equipment_ids(
pool: &PgPool,
equipment_ids: &[Uuid],
) -> Result<Vec<Uuid>, sqlx::Error> {
if equipment_ids.is_empty() {
return Ok(vec![]);
}
let rows = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT DISTINCT unit_id
FROM equipment
WHERE id = ANY($1)
AND unit_id IS NOT NULL
"#,
)
.bind(equipment_ids)
.fetch_all(pool)
.await?;
Ok(rows)
}
pub async fn get_unit_ids_by_point_ids(
pool: &PgPool,
point_ids: &[Uuid],
) -> Result<Vec<Uuid>, sqlx::Error> {
if point_ids.is_empty() {
return Ok(vec![]);
}
let rows = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT DISTINCT e.unit_id
FROM point p
INNER JOIN equipment e ON e.id = p.equipment_id
WHERE p.id = ANY($1)
AND e.unit_id IS NOT NULL
"#,
)
.bind(point_ids)
.fetch_all(pool)
.await?;
Ok(rows)
}
pub struct EquipmentSignalRole {
pub equipment_id: Uuid,
pub point_id: Uuid,