use axum::{ extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, Json, }; use serde::Deserialize; use serde_json::json; use uuid::Uuid; use validator::Validate; use crate::{ control::validator::{validate_manual_control, ControlAction}, util::{ pagination::{PaginatedResponse, PaginationParams}, response::ApiErr, }, AppState, }; fn validate_unit_timing_order( run_time_sec: i32, acc_time_sec: i32, ) -> Result<(), ApiErr> { if acc_time_sec <= run_time_sec { return Err(ApiErr::BadRequest( "acc_time_sec must be greater than run_time_sec".to_string(), Some(json!({ "run_time_sec": ["must be less than acc_time_sec"], "acc_time_sec": ["must be greater than run_time_sec"] })), )); } Ok(()) } fn auto_control_start_blocked(runtime: &crate::control::runtime::UnitRuntime) -> bool { runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required } #[derive(Debug, Deserialize, Validate)] pub struct GetUnitListQuery { #[validate(length(min = 1, max = 100))] pub keyword: Option, #[serde(flatten)] pub pagination: PaginationParams, } #[derive(serde::Serialize)] pub struct UnitEquipmentItem { #[serde(flatten)] pub equipment: crate::model::Equipment, pub role_points: Vec, } #[derive(serde::Serialize)] pub struct UnitWithRuntime { #[serde(flatten)] pub unit: crate::model::ControlUnit, pub runtime: Option, pub equipments: Vec, } pub async fn get_unit_list( State(state): State, Query(query): Query, ) -> Result { query.validate()?; let total = crate::service::get_units_count(&state.pool, query.keyword.as_deref()).await?; let units = crate::service::get_units_paginated( &state.pool, query.keyword.as_deref(), query.pagination.page_size, query.pagination.offset(), ) .await?; let all_runtimes = state.control_runtime.get_all().await; let unit_ids: Vec = units.iter().map(|u| u.id).collect(); let all_equipments = crate::service::get_equipment_by_unit_ids(&state.pool, &unit_ids).await?; let eq_ids: Vec = all_equipments.iter().map(|e| e.id).collect(); let role_point_rows = crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?; let monitor_guard = state .connection_manager .get_point_monitor_data_read_guard() .await; let mut role_points_map: std::collections::HashMap< Uuid, Vec, > = std::collections::HashMap::new(); for rp in role_point_rows { role_points_map .entry(rp.equipment_id) .or_default() .push(crate::handler::equipment::SignalRolePoint { point_id: rp.point_id, signal_role: rp.signal_role, point_monitor: monitor_guard.get(&rp.point_id).cloned(), }); } drop(monitor_guard); let mut equipments_by_unit: std::collections::HashMap> = std::collections::HashMap::new(); for eq in all_equipments { let role_points = role_points_map.remove(&eq.id).unwrap_or_default(); if let Some(unit_id) = eq.unit_id { equipments_by_unit .entry(unit_id) .or_default() .push(UnitEquipmentItem { equipment: eq, role_points }); } } let data = units .into_iter() .map(|unit| { let runtime = all_runtimes.get(&unit.id).cloned(); let equipments = equipments_by_unit.remove(&unit.id).unwrap_or_default(); UnitWithRuntime { unit, runtime, equipments } }) .collect::>(); Ok(Json(PaginatedResponse::new( data, total, query.pagination.page, query.pagination.page_size, ))) } pub async fn start_equipment( State(state): State, Path(equipment_id): Path, ) -> Result { send_equipment_command(state, equipment_id, ControlAction::Start).await } pub async fn stop_equipment( State(state): State, Path(equipment_id): Path, ) -> Result { send_equipment_command(state, equipment_id, ControlAction::Stop).await } async fn send_equipment_command( state: AppState, equipment_id: Uuid, action: ControlAction, ) -> Result { let context = validate_manual_control(&state, equipment_id, action).await?; let pulse_ms = 300u64; crate::control::command::send_pulse_command( &state.connection_manager, context.command_point.point_id, context.command_value_type.as_ref(), pulse_ms, ) .await .map_err(|e| ApiErr::Internal(e, None))?; if state.config.simulate_plc { crate::control::simulate::simulate_run_feedback( &state, equipment_id, matches!(action, ControlAction::Start), ) .await; } let event = match action { ControlAction::Start => crate::event::AppEvent::EquipmentStartCommandSent { equipment_id, unit_id: context.unit_id, point_id: context.command_point.point_id, }, ControlAction::Stop => crate::event::AppEvent::EquipmentStopCommandSent { equipment_id, unit_id: context.unit_id, point_id: context.command_point.point_id, }, }; let _ = state.event_manager.send(event); Ok(Json(json!({ "ok_msg": format!("Equipment {} command sent", action.as_str()), "equipment_id": equipment_id, "unit_id": context.unit_id, "command_role": context.command_point.signal_role, "command_point_id": context.command_point.point_id, "pulse_ms": pulse_ms }))) } pub async fn get_unit( State(state): State, Path(unit_id): Path, ) -> Result { let unit = crate::service::get_unit_by_id(&state.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let runtime = state.control_runtime.get(unit_id).await; let all_equipments = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; let eq_ids: Vec = all_equipments.iter().map(|e| e.id).collect(); let role_point_rows = crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?; let monitor_guard = state .connection_manager .get_point_monitor_data_read_guard() .await; let mut role_points_map: std::collections::HashMap< Uuid, Vec, > = std::collections::HashMap::new(); for rp in role_point_rows { role_points_map .entry(rp.equipment_id) .or_default() .push(crate::handler::equipment::SignalRolePoint { point_id: rp.point_id, signal_role: rp.signal_role, point_monitor: monitor_guard.get(&rp.point_id).cloned(), }); } drop(monitor_guard); let equipments = all_equipments .into_iter() .map(|eq| { let role_points = role_points_map.remove(&eq.id).unwrap_or_default(); UnitEquipmentItem { equipment: eq, role_points } }) .collect(); Ok(Json(UnitWithRuntime { unit, runtime, equipments })) } #[derive(serde::Serialize)] pub struct PointDetail { #[serde(flatten)] pub point: crate::model::Point, pub point_monitor: Option, } #[derive(serde::Serialize)] pub struct EquipmentDetail { #[serde(flatten)] pub equipment: crate::model::Equipment, pub points: Vec, } #[derive(serde::Serialize)] pub struct UnitDetail { #[serde(flatten)] pub unit: crate::model::ControlUnit, pub runtime: Option, pub equipments: Vec, } pub async fn get_unit_detail( State(state): State, Path(unit_id): Path, ) -> Result { let unit = crate::service::get_unit_by_id(&state.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let runtime = state.control_runtime.get(unit_id).await; let equipments = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; let equipment_ids: Vec = equipments.iter().map(|e| e.id).collect(); let all_points = crate::service::get_points_by_equipment_ids(&state.pool, &equipment_ids).await?; let monitor_guard = state .connection_manager .get_point_monitor_data_read_guard() .await; let equipments = equipments .into_iter() .map(|eq| { let points = all_points .iter() .filter(|p| p.equipment_id == Some(eq.id)) .map(|p| PointDetail { point_monitor: monitor_guard.get(&p.id).cloned(), point: p.clone(), }) .collect(); EquipmentDetail { equipment: eq, points } }) .collect(); Ok(Json(UnitDetail { unit, runtime, equipments })) } #[derive(Debug, Deserialize, Validate)] pub struct CreateUnitReq { #[validate(length(min = 1, max = 100))] pub code: String, #[validate(length(min = 1, max = 100))] pub name: String, pub description: Option, pub enabled: Option, #[validate(range(min = 1, message = "must be greater than 0"))] pub run_time_sec: Option, #[validate(range(min = 1, message = "must be greater than 0"))] pub stop_time_sec: Option, #[validate(range(min = 1, message = "must be greater than 0"))] pub acc_time_sec: Option, #[validate(range(min = 1, message = "must be greater than 0"))] pub bl_time_sec: Option, pub require_manual_ack_after_fault: Option, } pub async fn create_unit( State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; let run_time_sec = payload.run_time_sec.ok_or_else(|| { ApiErr::BadRequest( "run_time_sec is required".to_string(), Some(json!({ "run_time_sec": ["is required"] })), ) })?; let stop_time_sec = payload.stop_time_sec.ok_or_else(|| { ApiErr::BadRequest( "stop_time_sec is required".to_string(), Some(json!({ "stop_time_sec": ["is required"] })), ) })?; let acc_time_sec = payload.acc_time_sec.ok_or_else(|| { ApiErr::BadRequest( "acc_time_sec is required".to_string(), Some(json!({ "acc_time_sec": ["is required"] })), ) })?; let bl_time_sec = payload.bl_time_sec.ok_or_else(|| { ApiErr::BadRequest( "bl_time_sec is required".to_string(), Some(json!({ "bl_time_sec": ["is required"] })), ) })?; validate_unit_timing_order(run_time_sec, acc_time_sec)?; if crate::service::get_unit_by_code(&state.pool, &payload.code) .await? .is_some() { return Err(ApiErr::BadRequest( "Unit code already exists".to_string(), None, )); } let unit_id = crate::service::create_unit( &state.pool, crate::service::CreateUnitParams { code: &payload.code, name: &payload.name, description: payload.description.as_deref(), enabled: payload.enabled.unwrap_or(true), run_time_sec, stop_time_sec, acc_time_sec, bl_time_sec, require_manual_ack_after_fault: payload .require_manual_ack_after_fault .unwrap_or(true), }, ) .await?; Ok(( StatusCode::CREATED, Json(serde_json::json!({ "id": unit_id, "ok_msg": "Unit created successfully" })), )) } #[derive(Debug, Deserialize, Validate)] pub struct UpdateUnitReq { #[validate(length(min = 1, max = 100))] pub code: Option, #[validate(length(min = 1, max = 100))] pub name: Option, pub description: Option, pub enabled: Option, #[validate(range(min = 1, message = "must be greater than 0"))] pub run_time_sec: Option, #[validate(range(min = 1, message = "must be greater than 0"))] pub stop_time_sec: Option, #[validate(range(min = 1, message = "must be greater than 0"))] pub acc_time_sec: Option, #[validate(range(min = 1, message = "must be greater than 0"))] pub bl_time_sec: Option, pub require_manual_ack_after_fault: Option, } pub async fn update_unit( State(state): State, Path(unit_id): Path, Json(payload): Json, ) -> Result { payload.validate()?; let existing_unit = crate::service::get_unit_by_id(&state.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; validate_unit_timing_order( payload.run_time_sec.unwrap_or(existing_unit.run_time_sec), payload.acc_time_sec.unwrap_or(existing_unit.acc_time_sec), )?; if let Some(code) = payload.code.as_deref() { let duplicate = crate::service::get_unit_by_code(&state.pool, code).await?; if duplicate.as_ref().is_some_and(|item| item.id != unit_id) { return Err(ApiErr::BadRequest( "Unit code already exists".to_string(), None, )); } } if payload.code.is_none() && payload.name.is_none() && payload.description.is_none() && payload.enabled.is_none() && payload.run_time_sec.is_none() && payload.stop_time_sec.is_none() && payload.acc_time_sec.is_none() && payload.bl_time_sec.is_none() && payload.require_manual_ack_after_fault.is_none() { return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } crate::service::update_unit( &state.pool, unit_id, crate::service::UpdateUnitParams { code: payload.code.as_deref(), name: payload.name.as_deref(), description: payload.description.as_deref(), enabled: payload.enabled, run_time_sec: payload.run_time_sec, stop_time_sec: payload.stop_time_sec, acc_time_sec: payload.acc_time_sec, bl_time_sec: payload.bl_time_sec, require_manual_ack_after_fault: payload.require_manual_ack_after_fault, }, ) .await?; Ok(Json(serde_json::json!({ "ok_msg": "Unit updated successfully" }))) } pub async fn delete_unit( State(state): State, Path(unit_id): Path, ) -> Result { let deleted = crate::service::delete_unit(&state.pool, unit_id).await?; if !deleted { return Err(ApiErr::NotFound("Unit not found".to_string(), None)); } Ok(StatusCode::NO_CONTENT) } #[derive(Debug, Deserialize, Validate)] pub struct GetEventListQuery { pub unit_id: Option, #[validate(length(min = 1, max = 100))] pub event_type: Option, #[serde(flatten)] pub pagination: PaginationParams, } pub async fn get_event_list( State(state): State, Query(query): Query, ) -> Result { query.validate()?; let total = crate::service::get_events_count( &state.pool, query.unit_id, query.event_type.as_deref(), ) .await?; let data = crate::service::get_events_paginated( &state.pool, query.unit_id, query.event_type.as_deref(), query.pagination.page_size, query.pagination.offset(), ) .await?; Ok(Json(PaginatedResponse::new( data, total, query.pagination.page, query.pagination.page_size, ))) } pub async fn start_auto_unit( State(state): State, Path(unit_id): Path, ) -> Result { let unit = crate::service::get_unit_by_id(&state.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; if !unit.enabled { return Err(ApiErr::BadRequest("Unit is disabled".to_string(), None)); } let mut runtime = state.control_runtime.get_or_init(unit_id).await; if auto_control_start_blocked(&runtime) { let message = if runtime.fault_locked { "Unit is fault locked, cannot start auto control" } else if runtime.comm_locked { "Unit communication is locked, cannot start auto control" } else { "Fault acknowledgement required before starting auto control" }; return Err(ApiErr::BadRequest(message.to_string(), None)); } runtime.auto_enabled = true; runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; state.control_runtime.upsert(runtime).await; state.control_runtime.notify_unit(unit_id).await; let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStarted { unit_id }); Ok(Json(json!({ "ok_msg": "Auto control started", "unit_id": unit_id }))) } pub async fn stop_auto_unit( State(state): State, Path(unit_id): Path, ) -> Result { crate::service::get_unit_by_id(&state.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let mut runtime = state.control_runtime.get_or_init(unit_id).await; runtime.auto_enabled = false; state.control_runtime.upsert(runtime).await; state.control_runtime.notify_unit(unit_id).await; let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStopped { unit_id }); Ok(Json(json!({ "ok_msg": "Auto control stopped", "unit_id": unit_id }))) } pub async fn batch_start_auto( State(state): State, ) -> Result { let units = crate::service::get_all_enabled_units(&state.pool).await?; let mut started = Vec::new(); let mut skipped = Vec::new(); for unit in units { let mut runtime = state.control_runtime.get_or_init(unit.id).await; if runtime.auto_enabled { skipped.push(unit.id); continue; } if auto_control_start_blocked(&runtime) { skipped.push(unit.id); continue; } runtime.auto_enabled = true; runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; state.control_runtime.upsert(runtime).await; state.control_runtime.notify_unit(unit.id).await; let _ = state .event_manager .send(crate::event::AppEvent::AutoControlStarted { unit_id: unit.id }); started.push(unit.id); } Ok(Json(json!({ "started": started, "skipped": skipped }))) } pub async fn batch_stop_auto( State(state): State, ) -> Result { let units = crate::service::get_all_enabled_units(&state.pool).await?; let mut stopped = Vec::new(); for unit in units { let mut runtime = state.control_runtime.get_or_init(unit.id).await; if !runtime.auto_enabled { continue; } runtime.auto_enabled = false; state.control_runtime.upsert(runtime).await; state.control_runtime.notify_unit(unit.id).await; let _ = state .event_manager .send(crate::event::AppEvent::AutoControlStopped { unit_id: unit.id }); stopped.push(unit.id); } Ok(Json(json!({ "stopped": stopped }))) } pub async fn ack_fault_unit( State(state): State, Path(unit_id): Path, ) -> Result { crate::service::get_unit_by_id(&state.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let mut runtime = state.control_runtime.get_or_init(unit_id).await; if !runtime.fault_locked { return Err(ApiErr::BadRequest( "Unit is not fault locked".to_string(), Some(json!({ "unit_id": unit_id })), )); } if runtime.flt_active { return Err(ApiErr::BadRequest( "FLT is still active, cannot acknowledge".to_string(), Some(json!({ "unit_id": unit_id })), )); } runtime.fault_locked = false; runtime.manual_ack_required = false; runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; state.control_runtime.upsert(runtime).await; state.control_runtime.notify_unit(unit_id).await; let _ = state.event_manager.send(crate::event::AppEvent::FaultAcked { unit_id }); Ok(Json(json!({ "ok_msg": "Fault acknowledged", "unit_id": unit_id }))) } pub async fn get_unit_runtime( State(state): State, Path(unit_id): Path, ) -> Result { crate::service::get_unit_by_id(&state.pool, unit_id) .await? .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; let runtime = state.control_runtime.get_or_init(unit_id).await; Ok(Json(runtime)) } #[cfg(test)] mod tests { use super::{ auto_control_start_blocked, validate_unit_timing_order, CreateUnitReq, UpdateUnitReq, }; use crate::control::runtime::{UnitRuntime, UnitRuntimeState}; use uuid::Uuid; use validator::Validate; #[test] fn create_unit_req_rejects_zero_second_fields() { let payload = CreateUnitReq { code: "U-01".to_string(), name: "Unit 01".to_string(), description: None, enabled: Some(true), run_time_sec: Some(0), stop_time_sec: Some(10), acc_time_sec: Some(20), bl_time_sec: Some(5), require_manual_ack_after_fault: Some(true), }; assert!(payload.validate().is_err()); } #[test] fn create_unit_req_rejects_acc_time_not_greater_than_run_time() { assert!(validate_unit_timing_order(10, 10).is_err()); } #[test] fn update_unit_req_rejects_zero_second_fields() { let payload = UpdateUnitReq { code: None, name: None, description: None, enabled: None, run_time_sec: None, stop_time_sec: Some(0), acc_time_sec: Some(20), bl_time_sec: Some(5), require_manual_ack_after_fault: None, }; assert!(payload.validate().is_err()); } #[test] fn update_unit_req_rejects_acc_time_not_greater_than_run_time_when_both_present() { assert!(validate_unit_timing_order(20, 15).is_err()); } #[test] fn auto_control_start_is_blocked_by_comm_lock() { let runtime = UnitRuntime { unit_id: Uuid::new_v4(), state: UnitRuntimeState::Stopped, auto_enabled: false, accumulated_run_sec: 0, display_acc_sec: 0, fault_locked: false, flt_active: false, comm_locked: true, manual_ack_required: false, }; assert!(auto_control_start_blocked(&runtime)); } }