diff --git a/src/control/command.rs b/src/control/command.rs index 06fbb3b..cd40d37 100644 --- a/src/control/command.rs +++ b/src/control/command.rs @@ -1,7 +1,6 @@ use crate::{ connection::{BatchSetPointValueReq, ConnectionManager, SetPointValueReqItem}, telemetry::ValueType, - AppState, }; use serde_json::json; use std::sync::Arc; @@ -43,143 +42,6 @@ pub async fn send_pulse_command( Ok(()) } -/// Simulate RUN signal feedback after a command when SIMULATE_PLC=true. -/// -/// Strategy: -/// 1. Try writing the desired value to the RUN point via the normal OPC UA write path. -/// If the proxy accepts the write, `write_point_values_batch` will emit a local -/// `PointNewValue` event that updates the cache and WebSocket automatically. -/// 2. If the write is rejected (proxy has no write target or returns an error), -/// fall back to directly patching the local monitor cache and broadcasting over WS. -pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) { - let role_points = - match crate::service::get_equipment_role_points(&state.pool, equipment_id).await { - Ok(v) => v, - Err(e) => { - tracing::warn!("simulate_run_feedback: db error: {}", e); - return; - } - }; - let run_point = match role_points.iter().find(|p| p.signal_role == "run") { - Some(p) => p.clone(), - None => return, - }; - - // Determine the write value based on the current known value_type for the point. - let write_json = { - let guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - match guard - .get(&run_point.point_id) - .and_then(|m| m.value_type.as_ref()) - { - Some(crate::telemetry::ValueType::Int) | Some(crate::telemetry::ValueType::UInt) => { - serde_json::json!(if run_on { 1 } else { 0 }) - } - _ => serde_json::json!(run_on), - } - }; - - // Try writing to the proxy server first. - let write_ok = match state - .connection_manager - .write_point_values_batch(crate::connection::BatchSetPointValueReq { - items: vec![crate::connection::SetPointValueReqItem { - point_id: run_point.point_id, - value: write_json, - }], - }) - .await - { - Ok(res) => res.success, - Err(e) => { - tracing::debug!("simulate_run_feedback: write attempt failed: {}", e); - false - } - }; - - if write_ok { - // write_point_values_batch already emitted PointNewValue; nothing more to do. - tracing::info!( - "simulate_run_feedback: wrote run={} for equipment={} via OPC UA", - run_on, - equipment_id - ); - return; - } - - // Fallback: patch the local cache and push over WebSocket. - tracing::debug!( - "simulate_run_feedback: OPC UA write rejected, falling back to cache patch for equipment={}", - equipment_id - ); - - let (value, value_type, value_text) = { - let guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - match guard - .get(&run_point.point_id) - .and_then(|m| m.value_type.as_ref()) - { - Some(crate::telemetry::ValueType::Int) => ( - crate::telemetry::DataValue::Int(if run_on { 1 } else { 0 }), - Some(crate::telemetry::ValueType::Int), - Some(if run_on { "1" } else { "0" }.to_string()), - ), - Some(crate::telemetry::ValueType::UInt) => ( - crate::telemetry::DataValue::UInt(if run_on { 1 } else { 0 }), - Some(crate::telemetry::ValueType::UInt), - Some(if run_on { "1" } else { "0" }.to_string()), - ), - _ => ( - crate::telemetry::DataValue::Bool(run_on), - Some(crate::telemetry::ValueType::Bool), - Some(run_on.to_string()), - ), - } - }; - - let monitor = crate::telemetry::PointMonitorInfo { - protocol: "simulation".to_string(), - source_id: uuid::Uuid::nil(), - point_id: run_point.point_id, - client_handle: 0, - scan_mode: crate::model::ScanMode::Poll, - timestamp: Some(chrono::Utc::now()), - quality: crate::telemetry::PointQuality::Good, - value: Some(value), - value_type, - value_text, - old_value: None, - old_timestamp: None, - value_changed: true, - }; - - if let Err(e) = state - .connection_manager - .update_point_monitor_data(monitor.clone()) - .await - { - tracing::warn!("simulate_run_feedback: cache update failed: {}", e); - return; - } - - let _ = state - .ws_manager - .send_to_public(crate::websocket::WsMessage::PointNewValue(monitor)) - .await; - - tracing::info!( - "simulate_run_feedback: cache-patched run={} for equipment={}", - run_on, - equipment_id - ); -} - fn pulse_value(high: bool, value_type: Option<&ValueType>) -> serde_json::Value { match value_type { Some(ValueType::Bool) => serde_json::Value::Bool(high), diff --git a/src/control/engine.rs b/src/control/engine.rs index ea6d74e..d376802 100644 --- a/src/control/engine.rs +++ b/src/control/engine.rs @@ -127,7 +127,7 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu } if state.config.simulate_plc { if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { - crate::control::command::simulate_run_feedback(&state, eq_id, true).await; + crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await; } } } @@ -156,7 +156,7 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu } if state.config.simulate_plc { if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { - crate::control::command::simulate_run_feedback(&state, eq_id, false).await; + crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await; } } } @@ -174,7 +174,7 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e); } else if state.config.simulate_plc { if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { - crate::control::command::simulate_run_feedback(&state, eq_id, true).await; + crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await; } } } @@ -201,7 +201,7 @@ async fn unit_task(state: AppState, store: Arc, unit_id: Uu } if state.config.simulate_plc { if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { - crate::control::command::simulate_run_feedback(&state, eq_id, false).await; + crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await; } } } diff --git a/src/control/simulate.rs b/src/control/simulate.rs index c75674a..fdebdd6 100644 --- a/src/control/simulate.rs +++ b/src/control/simulate.rs @@ -101,8 +101,26 @@ async fn run(state: AppState) { } } +/// Simulate RUN signal feedback for an equipment after a manual start/stop command. +/// Called by the engine and control handler when SIMULATE_PLC=true. +pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) { + let role_points = + match crate::service::get_equipment_role_points(&state.pool, equipment_id).await { + Ok(v) => v, + Err(e) => { + tracing::warn!("simulate_run_feedback: db error: {}", e); + return; + } + }; + let run_point = match role_points.iter().find(|p| p.signal_role == "run") { + Some(p) => p.clone(), + None => return, + }; + patch_signal(state, run_point.point_id, run_on).await; +} + /// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push. -async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { +pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { let write_json = serde_json::json!(if value_on { 1 } else { 0 }); let write_ok = match state .connection_manager diff --git a/src/handler/control.rs b/src/handler/control.rs index fe6adbb..d19c350 100644 --- a/src/handler/control.rs +++ b/src/handler/control.rs @@ -98,7 +98,7 @@ async fn send_equipment_command( .map_err(|e| ApiErr::Internal(e, None))?; if state.config.simulate_plc { - crate::control::command::simulate_run_feedback( + crate::control::simulate::simulate_run_feedback( &state, equipment_id, matches!(action, ControlAction::Start),