diff --git a/src/control/engine.rs b/src/control/engine.rs new file mode 100644 index 0000000..2e69c1c --- /dev/null +++ b/src/control/engine.rs @@ -0,0 +1,7 @@ +use std::sync::Arc; + +use crate::{control::runtime::ControlRuntimeStore, AppState}; + +pub fn start(_state: AppState, _runtime_store: Arc) { + // Automatic control state machine will be added in the next step. +} diff --git a/src/control/mod.rs b/src/control/mod.rs new file mode 100644 index 0000000..77d9fe5 --- /dev/null +++ b/src/control/mod.rs @@ -0,0 +1,3 @@ +pub mod engine; +pub mod runtime; +pub mod validator; diff --git a/src/control/runtime.rs b/src/control/runtime.rs new file mode 100644 index 0000000..d99347b --- /dev/null +++ b/src/control/runtime.rs @@ -0,0 +1,75 @@ +use std::{collections::HashMap, sync::Arc}; + +use chrono::{DateTime, Utc}; +use tokio::sync::RwLock; +use uuid::Uuid; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum UnitRuntimeState { + Stopped, + Running, + DistributorRunning, + FaultLocked, + CommLocked, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct UnitRuntime { + pub unit_id: Uuid, + pub state: UnitRuntimeState, + pub accumulated_run_sec: i64, + pub current_run_elapsed_sec: i64, + pub current_stop_elapsed_sec: i64, + pub distributor_run_elapsed_sec: i64, + pub fault_locked: bool, + pub comm_locked: bool, + pub manual_ack_required: bool, + pub last_tick_at: Option>, +} + +impl UnitRuntime { + pub fn new(unit_id: Uuid) -> Self { + Self { + unit_id, + state: UnitRuntimeState::Stopped, + accumulated_run_sec: 0, + current_run_elapsed_sec: 0, + current_stop_elapsed_sec: 0, + distributor_run_elapsed_sec: 0, + fault_locked: false, + comm_locked: false, + manual_ack_required: false, + last_tick_at: None, + } + } +} + +#[derive(Clone, Default)] +pub struct ControlRuntimeStore { + inner: Arc>>, +} + +impl ControlRuntimeStore { + pub fn new() -> Self { + Self::default() + } + + pub async fn get(&self, unit_id: Uuid) -> Option { + self.inner.read().await.get(&unit_id).cloned() + } + + pub async fn get_or_init(&self, unit_id: Uuid) -> UnitRuntime { + if let Some(runtime) = self.get(unit_id).await { + return runtime; + } + + let runtime = UnitRuntime::new(unit_id); + self.inner.write().await.insert(unit_id, runtime.clone()); + runtime + } + + pub async fn upsert(&self, runtime: UnitRuntime) { + self.inner.write().await.insert(runtime.unit_id, runtime); + } +} diff --git a/src/control/validator.rs b/src/control/validator.rs new file mode 100644 index 0000000..934baa7 --- /dev/null +++ b/src/control/validator.rs @@ -0,0 +1,191 @@ +use std::collections::HashMap; + +use serde_json::json; +use uuid::Uuid; + +use crate::{ + service::EquipmentRolePoint, + telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType}, + util::response::ApiErr, + AppState, +}; + +#[derive(Debug, Clone, Copy)] +pub enum ControlAction { + Start, + Stop, +} + +impl ControlAction { + pub fn as_str(self) -> &'static str { + match self { + Self::Start => "start", + Self::Stop => "stop", + } + } + + pub fn command_role(self) -> &'static str { + match self { + Self::Start => "start_cmd", + Self::Stop => "stop_cmd", + } + } +} + +pub struct ManualControlContext { + pub unit_id: Option, + pub command_point: EquipmentRolePoint, + pub command_value_type: Option, +} + +pub async fn validate_manual_control( + state: &AppState, + equipment_id: Uuid, + action: ControlAction, +) -> Result { + let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id) + .await? + .ok_or_else(|| ApiErr::NotFound("Equipment not found".to_string(), None))?; + + let role_points = crate::service::get_equipment_role_points(&state.pool, equipment_id).await?; + if role_points.is_empty() { + return Err(ApiErr::BadRequest( + "Equipment has no bound role points".to_string(), + Some(json!({ "equipment_id": equipment_id })), + )); + } + + let role_map: HashMap<&str, &EquipmentRolePoint> = role_points + .iter() + .map(|point| (point.signal_role.as_str(), point)) + .collect(); + + let command_point = role_map + .get(action.command_role()) + .copied() + .ok_or_else(|| { + ApiErr::BadRequest( + format!("Equipment missing role point {}", action.command_role()), + Some(json!({ + "equipment_id": equipment_id, + "required_role": action.command_role() + })), + ) + })? + .clone(); + + let monitor_guard = state + .connection_manager + .get_point_monitor_data_read_guard() + .await; + + validate_quality( + role_map.get("rem").copied(), + &monitor_guard, + "REM", + equipment_id, + )?; + validate_quality( + role_map.get("flt").copied(), + &monitor_guard, + "FLT", + equipment_id, + )?; + if let Some(rem_point) = role_map.get("rem").copied() { + let rem_monitor = monitor_guard + .get(&rem_point.point_id) + .ok_or_else(|| missing_monitor_err("REM", equipment_id))?; + if !monitor_value_as_bool(rem_monitor) { + return Err(ApiErr::Forbidden( + "Remote control not allowed, REM is not enabled".to_string(), + Some(json!({ "equipment_id": equipment_id })), + )); + } + } + + if let Some(flt_point) = role_map.get("flt").copied() { + let flt_monitor = monitor_guard + .get(&flt_point.point_id) + .ok_or_else(|| missing_monitor_err("FLT", equipment_id))?; + if monitor_value_as_bool(flt_monitor) { + return Err(ApiErr::Forbidden( + "Equipment fault is active, command denied".to_string(), + Some(json!({ "equipment_id": equipment_id })), + )); + } + } + + if let Some(estop_point) = role_map.get("estop").copied() { + let estop_monitor = monitor_guard + .get(&estop_point.point_id) + .ok_or_else(|| missing_monitor_err("ESTOP", equipment_id))?; + if monitor_value_as_bool(estop_monitor) { + return Err(ApiErr::Forbidden( + "Emergency stop is active, command denied".to_string(), + Some(json!({ "equipment_id": equipment_id })), + )); + } + } + + let command_value_type = monitor_guard + .get(&command_point.point_id) + .and_then(|item| item.value_type.clone()); + + Ok(ManualControlContext { + unit_id: equipment.unit_id, + command_point, + command_value_type, + }) +} + +fn validate_quality( + role_point: Option<&EquipmentRolePoint>, + monitor_map: &HashMap, + role: &str, + equipment_id: Uuid, +) -> Result<(), ApiErr> { + let Some(role_point) = role_point else { + return Ok(()); + }; + + let monitor = monitor_map + .get(&role_point.point_id) + .ok_or_else(|| missing_monitor_err(role, equipment_id))?; + + if monitor.quality != PointQuality::Good { + return Err(ApiErr::Forbidden( + format!("Communication abnormal for role {}", role), + Some(json!({ + "equipment_id": equipment_id, + "role": role, + "quality": monitor.quality + })), + )); + } + + Ok(()) +} + +fn missing_monitor_err(role: &str, equipment_id: Uuid) -> ApiErr { + ApiErr::Forbidden( + format!("No realtime value for role {}", role), + Some(json!({ + "equipment_id": equipment_id, + "role": role + })), + ) +} + +fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool { + match monitor.value.as_ref() { + Some(DataValue::Bool(value)) => *value, + Some(DataValue::Int(value)) => *value != 0, + Some(DataValue::UInt(value)) => *value != 0, + Some(DataValue::Float(value)) => *value != 0.0, + Some(DataValue::Text(value)) => matches!( + value.trim().to_ascii_lowercase().as_str(), + "1" | "true" | "on" | "yes" + ), + _ => false, + } +} diff --git a/src/event.rs b/src/event.rs index c6b0004..509cf5d 100644 --- a/src/event.rs +++ b/src/event.rs @@ -24,6 +24,16 @@ pub enum AppEvent { source_id: Uuid, point_ids: Vec, }, + EquipmentStartCommandSent { + equipment_id: Uuid, + unit_id: Option, + point_id: Uuid, + }, + EquipmentStopCommandSent { + equipment_id: Uuid, + unit_id: Option, + point_id: Uuid, + }, PointNewValue(crate::telemetry::PointNewValue), } @@ -182,6 +192,30 @@ async fn handle_control_event( tracing::error!("Failed to unsubscribe points: {}", e); } } + AppEvent::EquipmentStartCommandSent { + equipment_id, + unit_id, + point_id, + } => { + tracing::info!( + "Equipment start command sent: equipment={}, unit={:?}, point={}", + equipment_id, + unit_id, + point_id + ); + } + AppEvent::EquipmentStopCommandSent { + equipment_id, + unit_id, + point_id, + } => { + tracing::info!( + "Equipment stop command sent: equipment={}, unit={:?}, point={}", + equipment_id, + unit_id, + point_id + ); + } AppEvent::PointNewValue(_) => { tracing::warn!("PointNewValue routed to control worker unexpectedly"); } @@ -213,7 +247,7 @@ async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) { "warn", None, None, - Some(*source_id), + None, format!("Source {} deleted", source_id), serde_json::json!({ "source_id": source_id }), )), @@ -235,6 +269,40 @@ async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) { format!("{} points deleted for source {}", point_ids.len(), source_id), serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), )), + AppEvent::EquipmentStartCommandSent { + equipment_id, + unit_id, + point_id, + } => Some(( + "equipment.start_command_sent", + "info", + *unit_id, + Some(*equipment_id), + None, + format!("Start command sent to equipment {}", equipment_id), + serde_json::json!({ + "equipment_id": equipment_id, + "unit_id": unit_id, + "point_id": point_id + }), + )), + AppEvent::EquipmentStopCommandSent { + equipment_id, + unit_id, + point_id, + } => Some(( + "equipment.stop_command_sent", + "info", + *unit_id, + Some(*equipment_id), + None, + format!("Stop command sent to equipment {}", equipment_id), + serde_json::json!({ + "equipment_id": equipment_id, + "unit_id": unit_id, + "point_id": point_id + }), + )), AppEvent::PointNewValue(_) => None, }; diff --git a/src/handler/control.rs b/src/handler/control.rs index 2dea52b..b12ff04 100644 --- a/src/handler/control.rs +++ b/src/handler/control.rs @@ -5,10 +5,14 @@ use axum::{ Json, }; use serde::Deserialize; +use serde_json::json; use uuid::Uuid; use validator::Validate; use crate::{ + connection::{BatchSetPointValueReq, SetPointValueReqItem}, + control::validator::{validate_manual_control, ControlAction}, + telemetry::ValueType, util::{ pagination::{PaginatedResponse, PaginationParams}, response::ApiErr, @@ -47,6 +51,106 @@ pub async fn get_unit_list( ))) } +pub async fn start_equipment( + State(state): State, + Path(equipment_id): Path, +) -> Result { + send_equipment_command(state, equipment_id, ControlAction::Start).await +} + +pub async fn stop_equipment( + State(state): State, + Path(equipment_id): Path, +) -> Result { + send_equipment_command(state, equipment_id, ControlAction::Stop).await +} + +async fn send_equipment_command( + state: AppState, + equipment_id: Uuid, + action: ControlAction, +) -> Result { + let context = validate_manual_control(&state, equipment_id, action).await?; + let pulse_ms = 300u64; + + let high_value = pulse_value(true, context.command_value_type.as_ref()); + let low_value = pulse_value(false, context.command_value_type.as_ref()); + + let high_result = state + .connection_manager + .write_point_values_batch(BatchSetPointValueReq { + items: vec![SetPointValueReqItem { + point_id: context.command_point.point_id, + value: high_value, + }], + }) + .await + .map_err(|e| ApiErr::Internal(e, None))?; + + if !high_result.success { + return Err(ApiErr::Internal( + "Failed to write pulse high level".to_string(), + Some(json!(high_result)), + )); + } + + tokio::time::sleep(std::time::Duration::from_millis(pulse_ms)).await; + + let low_result = state + .connection_manager + .write_point_values_batch(BatchSetPointValueReq { + items: vec![SetPointValueReqItem { + point_id: context.command_point.point_id, + value: low_value, + }], + }) + .await + .map_err(|e| ApiErr::Internal(e, None))?; + + if !low_result.success { + return Err(ApiErr::Internal( + "Pulse reset failed after command high level succeeded".to_string(), + Some(json!(low_result)), + )); + } + + let event = match action { + ControlAction::Start => crate::event::AppEvent::EquipmentStartCommandSent { + equipment_id, + unit_id: context.unit_id, + point_id: context.command_point.point_id, + }, + ControlAction::Stop => crate::event::AppEvent::EquipmentStopCommandSent { + equipment_id, + unit_id: context.unit_id, + point_id: context.command_point.point_id, + }, + }; + let _ = state.event_manager.send(event); + + Ok(Json(json!({ + "ok_msg": format!("Equipment {} command sent", action.as_str()), + "equipment_id": equipment_id, + "unit_id": context.unit_id, + "command_role": context.command_point.signal_role, + "command_point_id": context.command_point.point_id, + "pulse_ms": pulse_ms + }))) +} + +fn pulse_value(high: bool, value_type: Option<&ValueType>) -> serde_json::Value { + match value_type { + Some(ValueType::Bool) => serde_json::Value::Bool(high), + _ => { + if high { + json!(1) + } else { + json!(0) + } + } + } +} + pub async fn get_unit( State(state): State, Path(unit_id): Path, diff --git a/src/main.rs b/src/main.rs index 5aadce9..f1a3a82 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +mod control; mod config; mod connection; mod db; @@ -10,7 +11,7 @@ mod telemetry; mod util; mod websocket; use axum::{ - routing::{get, put}, + routing::{get, post, put}, Router, }; use config::AppConfig; @@ -30,6 +31,7 @@ pub struct AppState { pub connection_manager: Arc, pub event_manager: Arc, pub ws_manager: Arc, + pub control_runtime: Arc, } #[tokio::main] async fn main() { @@ -52,6 +54,7 @@ async fn main() { connection_manager.set_pool_and_start_reconnect_task(Arc::new(pool.clone())); let connection_manager = Arc::new(connection_manager); + let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new()); // Connect to all enabled sources concurrently let sources = service::get_all_enabled_sources(&pool) @@ -88,7 +91,9 @@ async fn main() { connection_manager: connection_manager.clone(), event_manager, ws_manager, + control_runtime: control_runtime.clone(), }; + control::engine::start(state.clone(), control_runtime); let app = build_router(state.clone()); let addr = format!("{}:{}", config.server_host, config.server_port); tracing::info!("Starting server at http://{}", addr); @@ -204,6 +209,14 @@ fn build_router(state: AppState) -> Router { "/api/event", get(handler::control::get_event_list), ) + .route( + "/api/control/equipment/{equipment_id}/start", + post(handler::control::start_equipment), + ) + .route( + "/api/control/equipment/{equipment_id}/stop", + post(handler::control::stop_equipment), + ) .route( "/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag), diff --git a/src/service/control.rs b/src/service/control.rs index 498259b..7be133b 100644 --- a/src/service/control.rs +++ b/src/service/control.rs @@ -1,7 +1,13 @@ use crate::model::{ControlUnit, EventRecord}; -use sqlx::{PgPool, QueryBuilder}; +use sqlx::{PgPool, QueryBuilder, Row}; use uuid::Uuid; +#[derive(Debug, Clone)] +pub struct EquipmentRolePoint { + pub point_id: Uuid, + pub signal_role: String, +} + pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result { match keyword { Some(keyword) => { @@ -301,3 +307,32 @@ pub async fn get_events_paginated( qb.build_query_as::().fetch_all(pool).await } + +pub async fn get_equipment_role_points( + pool: &PgPool, + equipment_id: Uuid, +) -> Result, sqlx::Error> { + let rows = sqlx::query( + r#" + SELECT + p.id AS point_id, + p.signal_role + FROM equipment e + INNER JOIN point p ON p.equipment_id = e.id + WHERE e.id = $1 + AND p.signal_role IS NOT NULL + ORDER BY p.created_at + "#, + ) + .bind(equipment_id) + .fetch_all(pool) + .await?; + + Ok(rows + .into_iter() + .map(|row| EquipmentRolePoint { + point_id: row.get("point_id"), + signal_role: row.get("signal_role"), + }) + .collect()) +}