feat(control): add manual equipment pulse commands

This commit is contained in:
caoqianming 2026-03-24 11:16:50 +08:00
parent 1f29eb3871
commit 97d2f6ebf8
8 changed files with 499 additions and 3 deletions

7
src/control/engine.rs Normal file
View File

@ -0,0 +1,7 @@
use std::sync::Arc;
use crate::{control::runtime::ControlRuntimeStore, AppState};
pub fn start(_state: AppState, _runtime_store: Arc<ControlRuntimeStore>) {
// Automatic control state machine will be added in the next step.
}

3
src/control/mod.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod engine;
pub mod runtime;
pub mod validator;

75
src/control/runtime.rs Normal file
View File

@ -0,0 +1,75 @@
use std::{collections::HashMap, sync::Arc};
use chrono::{DateTime, Utc};
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum UnitRuntimeState {
Stopped,
Running,
DistributorRunning,
FaultLocked,
CommLocked,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct UnitRuntime {
pub unit_id: Uuid,
pub state: UnitRuntimeState,
pub accumulated_run_sec: i64,
pub current_run_elapsed_sec: i64,
pub current_stop_elapsed_sec: i64,
pub distributor_run_elapsed_sec: i64,
pub fault_locked: bool,
pub comm_locked: bool,
pub manual_ack_required: bool,
pub last_tick_at: Option<DateTime<Utc>>,
}
impl UnitRuntime {
pub fn new(unit_id: Uuid) -> Self {
Self {
unit_id,
state: UnitRuntimeState::Stopped,
accumulated_run_sec: 0,
current_run_elapsed_sec: 0,
current_stop_elapsed_sec: 0,
distributor_run_elapsed_sec: 0,
fault_locked: false,
comm_locked: false,
manual_ack_required: false,
last_tick_at: None,
}
}
}
#[derive(Clone, Default)]
pub struct ControlRuntimeStore {
inner: Arc<RwLock<HashMap<Uuid, UnitRuntime>>>,
}
impl ControlRuntimeStore {
pub fn new() -> Self {
Self::default()
}
pub async fn get(&self, unit_id: Uuid) -> Option<UnitRuntime> {
self.inner.read().await.get(&unit_id).cloned()
}
pub async fn get_or_init(&self, unit_id: Uuid) -> UnitRuntime {
if let Some(runtime) = self.get(unit_id).await {
return runtime;
}
let runtime = UnitRuntime::new(unit_id);
self.inner.write().await.insert(unit_id, runtime.clone());
runtime
}
pub async fn upsert(&self, runtime: UnitRuntime) {
self.inner.write().await.insert(runtime.unit_id, runtime);
}
}

191
src/control/validator.rs Normal file
View File

@ -0,0 +1,191 @@
use std::collections::HashMap;
use serde_json::json;
use uuid::Uuid;
use crate::{
service::EquipmentRolePoint,
telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType},
util::response::ApiErr,
AppState,
};
#[derive(Debug, Clone, Copy)]
pub enum ControlAction {
Start,
Stop,
}
impl ControlAction {
pub fn as_str(self) -> &'static str {
match self {
Self::Start => "start",
Self::Stop => "stop",
}
}
pub fn command_role(self) -> &'static str {
match self {
Self::Start => "start_cmd",
Self::Stop => "stop_cmd",
}
}
}
pub struct ManualControlContext {
pub unit_id: Option<Uuid>,
pub command_point: EquipmentRolePoint,
pub command_value_type: Option<ValueType>,
}
pub async fn validate_manual_control(
state: &AppState,
equipment_id: Uuid,
action: ControlAction,
) -> Result<ManualControlContext, ApiErr> {
let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Equipment not found".to_string(), None))?;
let role_points = crate::service::get_equipment_role_points(&state.pool, equipment_id).await?;
if role_points.is_empty() {
return Err(ApiErr::BadRequest(
"Equipment has no bound role points".to_string(),
Some(json!({ "equipment_id": equipment_id })),
));
}
let role_map: HashMap<&str, &EquipmentRolePoint> = role_points
.iter()
.map(|point| (point.signal_role.as_str(), point))
.collect();
let command_point = role_map
.get(action.command_role())
.copied()
.ok_or_else(|| {
ApiErr::BadRequest(
format!("Equipment missing role point {}", action.command_role()),
Some(json!({
"equipment_id": equipment_id,
"required_role": action.command_role()
})),
)
})?
.clone();
let monitor_guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
validate_quality(
role_map.get("rem").copied(),
&monitor_guard,
"REM",
equipment_id,
)?;
validate_quality(
role_map.get("flt").copied(),
&monitor_guard,
"FLT",
equipment_id,
)?;
if let Some(rem_point) = role_map.get("rem").copied() {
let rem_monitor = monitor_guard
.get(&rem_point.point_id)
.ok_or_else(|| missing_monitor_err("REM", equipment_id))?;
if !monitor_value_as_bool(rem_monitor) {
return Err(ApiErr::Forbidden(
"Remote control not allowed, REM is not enabled".to_string(),
Some(json!({ "equipment_id": equipment_id })),
));
}
}
if let Some(flt_point) = role_map.get("flt").copied() {
let flt_monitor = monitor_guard
.get(&flt_point.point_id)
.ok_or_else(|| missing_monitor_err("FLT", equipment_id))?;
if monitor_value_as_bool(flt_monitor) {
return Err(ApiErr::Forbidden(
"Equipment fault is active, command denied".to_string(),
Some(json!({ "equipment_id": equipment_id })),
));
}
}
if let Some(estop_point) = role_map.get("estop").copied() {
let estop_monitor = monitor_guard
.get(&estop_point.point_id)
.ok_or_else(|| missing_monitor_err("ESTOP", equipment_id))?;
if monitor_value_as_bool(estop_monitor) {
return Err(ApiErr::Forbidden(
"Emergency stop is active, command denied".to_string(),
Some(json!({ "equipment_id": equipment_id })),
));
}
}
let command_value_type = monitor_guard
.get(&command_point.point_id)
.and_then(|item| item.value_type.clone());
Ok(ManualControlContext {
unit_id: equipment.unit_id,
command_point,
command_value_type,
})
}
fn validate_quality(
role_point: Option<&EquipmentRolePoint>,
monitor_map: &HashMap<Uuid, PointMonitorInfo>,
role: &str,
equipment_id: Uuid,
) -> Result<(), ApiErr> {
let Some(role_point) = role_point else {
return Ok(());
};
let monitor = monitor_map
.get(&role_point.point_id)
.ok_or_else(|| missing_monitor_err(role, equipment_id))?;
if monitor.quality != PointQuality::Good {
return Err(ApiErr::Forbidden(
format!("Communication abnormal for role {}", role),
Some(json!({
"equipment_id": equipment_id,
"role": role,
"quality": monitor.quality
})),
));
}
Ok(())
}
fn missing_monitor_err(role: &str, equipment_id: Uuid) -> ApiErr {
ApiErr::Forbidden(
format!("No realtime value for role {}", role),
Some(json!({
"equipment_id": equipment_id,
"role": role
})),
)
}
fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool {
match monitor.value.as_ref() {
Some(DataValue::Bool(value)) => *value,
Some(DataValue::Int(value)) => *value != 0,
Some(DataValue::UInt(value)) => *value != 0,
Some(DataValue::Float(value)) => *value != 0.0,
Some(DataValue::Text(value)) => matches!(
value.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "on" | "yes"
),
_ => false,
}
}

View File

@ -24,6 +24,16 @@ pub enum AppEvent {
source_id: Uuid,
point_ids: Vec<Uuid>,
},
EquipmentStartCommandSent {
equipment_id: Uuid,
unit_id: Option<Uuid>,
point_id: Uuid,
},
EquipmentStopCommandSent {
equipment_id: Uuid,
unit_id: Option<Uuid>,
point_id: Uuid,
},
PointNewValue(crate::telemetry::PointNewValue),
}
@ -182,6 +192,30 @@ async fn handle_control_event(
tracing::error!("Failed to unsubscribe points: {}", e);
}
}
AppEvent::EquipmentStartCommandSent {
equipment_id,
unit_id,
point_id,
} => {
tracing::info!(
"Equipment start command sent: equipment={}, unit={:?}, point={}",
equipment_id,
unit_id,
point_id
);
}
AppEvent::EquipmentStopCommandSent {
equipment_id,
unit_id,
point_id,
} => {
tracing::info!(
"Equipment stop command sent: equipment={}, unit={:?}, point={}",
equipment_id,
unit_id,
point_id
);
}
AppEvent::PointNewValue(_) => {
tracing::warn!("PointNewValue routed to control worker unexpectedly");
}
@ -213,7 +247,7 @@ async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) {
"warn",
None,
None,
Some(*source_id),
None,
format!("Source {} deleted", source_id),
serde_json::json!({ "source_id": source_id }),
)),
@ -235,6 +269,40 @@ async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) {
format!("{} points deleted for source {}", point_ids.len(), source_id),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
)),
AppEvent::EquipmentStartCommandSent {
equipment_id,
unit_id,
point_id,
} => Some((
"equipment.start_command_sent",
"info",
*unit_id,
Some(*equipment_id),
None,
format!("Start command sent to equipment {}", equipment_id),
serde_json::json!({
"equipment_id": equipment_id,
"unit_id": unit_id,
"point_id": point_id
}),
)),
AppEvent::EquipmentStopCommandSent {
equipment_id,
unit_id,
point_id,
} => Some((
"equipment.stop_command_sent",
"info",
*unit_id,
Some(*equipment_id),
None,
format!("Stop command sent to equipment {}", equipment_id),
serde_json::json!({
"equipment_id": equipment_id,
"unit_id": unit_id,
"point_id": point_id
}),
)),
AppEvent::PointNewValue(_) => None,
};

View File

@ -5,10 +5,14 @@ use axum::{
Json,
};
use serde::Deserialize;
use serde_json::json;
use uuid::Uuid;
use validator::Validate;
use crate::{
connection::{BatchSetPointValueReq, SetPointValueReqItem},
control::validator::{validate_manual_control, ControlAction},
telemetry::ValueType,
util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
@ -47,6 +51,106 @@ pub async fn get_unit_list(
)))
}
pub async fn start_equipment(
State(state): State<AppState>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
send_equipment_command(state, equipment_id, ControlAction::Start).await
}
pub async fn stop_equipment(
State(state): State<AppState>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
send_equipment_command(state, equipment_id, ControlAction::Stop).await
}
async fn send_equipment_command(
state: AppState,
equipment_id: Uuid,
action: ControlAction,
) -> Result<impl IntoResponse, ApiErr> {
let context = validate_manual_control(&state, equipment_id, action).await?;
let pulse_ms = 300u64;
let high_value = pulse_value(true, context.command_value_type.as_ref());
let low_value = pulse_value(false, context.command_value_type.as_ref());
let high_result = state
.connection_manager
.write_point_values_batch(BatchSetPointValueReq {
items: vec![SetPointValueReqItem {
point_id: context.command_point.point_id,
value: high_value,
}],
})
.await
.map_err(|e| ApiErr::Internal(e, None))?;
if !high_result.success {
return Err(ApiErr::Internal(
"Failed to write pulse high level".to_string(),
Some(json!(high_result)),
));
}
tokio::time::sleep(std::time::Duration::from_millis(pulse_ms)).await;
let low_result = state
.connection_manager
.write_point_values_batch(BatchSetPointValueReq {
items: vec![SetPointValueReqItem {
point_id: context.command_point.point_id,
value: low_value,
}],
})
.await
.map_err(|e| ApiErr::Internal(e, None))?;
if !low_result.success {
return Err(ApiErr::Internal(
"Pulse reset failed after command high level succeeded".to_string(),
Some(json!(low_result)),
));
}
let event = match action {
ControlAction::Start => crate::event::AppEvent::EquipmentStartCommandSent {
equipment_id,
unit_id: context.unit_id,
point_id: context.command_point.point_id,
},
ControlAction::Stop => crate::event::AppEvent::EquipmentStopCommandSent {
equipment_id,
unit_id: context.unit_id,
point_id: context.command_point.point_id,
},
};
let _ = state.event_manager.send(event);
Ok(Json(json!({
"ok_msg": format!("Equipment {} command sent", action.as_str()),
"equipment_id": equipment_id,
"unit_id": context.unit_id,
"command_role": context.command_point.signal_role,
"command_point_id": context.command_point.point_id,
"pulse_ms": pulse_ms
})))
}
fn pulse_value(high: bool, value_type: Option<&ValueType>) -> serde_json::Value {
match value_type {
Some(ValueType::Bool) => serde_json::Value::Bool(high),
_ => {
if high {
json!(1)
} else {
json!(0)
}
}
}
}
pub async fn get_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,

View File

@ -1,3 +1,4 @@
mod control;
mod config;
mod connection;
mod db;
@ -10,7 +11,7 @@ mod telemetry;
mod util;
mod websocket;
use axum::{
routing::{get, put},
routing::{get, post, put},
Router,
};
use config::AppConfig;
@ -30,6 +31,7 @@ pub struct AppState {
pub connection_manager: Arc<ConnectionManager>,
pub event_manager: Arc<EventManager>,
pub ws_manager: Arc<websocket::WebSocketManager>,
pub control_runtime: Arc<control::runtime::ControlRuntimeStore>,
}
#[tokio::main]
async fn main() {
@ -52,6 +54,7 @@ async fn main() {
connection_manager.set_pool_and_start_reconnect_task(Arc::new(pool.clone()));
let connection_manager = Arc::new(connection_manager);
let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new());
// Connect to all enabled sources concurrently
let sources = service::get_all_enabled_sources(&pool)
@ -88,7 +91,9 @@ async fn main() {
connection_manager: connection_manager.clone(),
event_manager,
ws_manager,
control_runtime: control_runtime.clone(),
};
control::engine::start(state.clone(), control_runtime);
let app = build_router(state.clone());
let addr = format!("{}:{}", config.server_host, config.server_port);
tracing::info!("Starting server at http://{}", addr);
@ -204,6 +209,14 @@ fn build_router(state: AppState) -> Router {
"/api/event",
get(handler::control::get_event_list),
)
.route(
"/api/control/equipment/{equipment_id}/start",
post(handler::control::start_equipment),
)
.route(
"/api/control/equipment/{equipment_id}/stop",
post(handler::control::stop_equipment),
)
.route(
"/api/tag",
get(handler::tag::get_tag_list).post(handler::tag::create_tag),

View File

@ -1,7 +1,13 @@
use crate::model::{ControlUnit, EventRecord};
use sqlx::{PgPool, QueryBuilder};
use sqlx::{PgPool, QueryBuilder, Row};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct EquipmentRolePoint {
pub point_id: Uuid,
pub signal_role: String,
}
pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result<i64, sqlx::Error> {
match keyword {
Some(keyword) => {
@ -301,3 +307,32 @@ pub async fn get_events_paginated(
qb.build_query_as::<EventRecord>().fetch_all(pool).await
}
pub async fn get_equipment_role_points(
pool: &PgPool,
equipment_id: Uuid,
) -> Result<Vec<EquipmentRolePoint>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT
p.id AS point_id,
p.signal_role
FROM equipment e
INNER JOIN point p ON p.equipment_id = e.id
WHERE e.id = $1
AND p.signal_role IS NOT NULL
ORDER BY p.created_at
"#,
)
.bind(equipment_id)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|row| EquipmentRolePoint {
point_id: row.get("point_id"),
signal_role: row.get("signal_role"),
})
.collect())
}