refactor(simulate): consolidate all simulation code into simulate.rs

Moved simulate_run_feedback from command.rs into simulate.rs where it
reuses patch_signal. command.rs now only contains real PLC command logic.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-03-26 10:17:44 +08:00
parent 532eeaba42
commit 5a481a5eb3
4 changed files with 24 additions and 144 deletions

View File

@ -1,7 +1,6 @@
use crate::{ use crate::{
connection::{BatchSetPointValueReq, ConnectionManager, SetPointValueReqItem}, connection::{BatchSetPointValueReq, ConnectionManager, SetPointValueReqItem},
telemetry::ValueType, telemetry::ValueType,
AppState,
}; };
use serde_json::json; use serde_json::json;
use std::sync::Arc; use std::sync::Arc;
@ -43,143 +42,6 @@ pub async fn send_pulse_command(
Ok(()) Ok(())
} }
/// Simulate RUN signal feedback after a command when SIMULATE_PLC=true.
///
/// Strategy:
/// 1. Try writing the desired value to the RUN point via the normal OPC UA write path.
/// If the proxy accepts the write, `write_point_values_batch` will emit a local
/// `PointNewValue` event that updates the cache and WebSocket automatically.
/// 2. If the write is rejected (proxy has no write target or returns an error),
/// fall back to directly patching the local monitor cache and broadcasting over WS.
pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) {
let role_points =
match crate::service::get_equipment_role_points(&state.pool, equipment_id).await {
Ok(v) => v,
Err(e) => {
tracing::warn!("simulate_run_feedback: db error: {}", e);
return;
}
};
let run_point = match role_points.iter().find(|p| p.signal_role == "run") {
Some(p) => p.clone(),
None => return,
};
// Determine the write value based on the current known value_type for the point.
let write_json = {
let guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
match guard
.get(&run_point.point_id)
.and_then(|m| m.value_type.as_ref())
{
Some(crate::telemetry::ValueType::Int) | Some(crate::telemetry::ValueType::UInt) => {
serde_json::json!(if run_on { 1 } else { 0 })
}
_ => serde_json::json!(run_on),
}
};
// Try writing to the proxy server first.
let write_ok = match state
.connection_manager
.write_point_values_batch(crate::connection::BatchSetPointValueReq {
items: vec![crate::connection::SetPointValueReqItem {
point_id: run_point.point_id,
value: write_json,
}],
})
.await
{
Ok(res) => res.success,
Err(e) => {
tracing::debug!("simulate_run_feedback: write attempt failed: {}", e);
false
}
};
if write_ok {
// write_point_values_batch already emitted PointNewValue; nothing more to do.
tracing::info!(
"simulate_run_feedback: wrote run={} for equipment={} via OPC UA",
run_on,
equipment_id
);
return;
}
// Fallback: patch the local cache and push over WebSocket.
tracing::debug!(
"simulate_run_feedback: OPC UA write rejected, falling back to cache patch for equipment={}",
equipment_id
);
let (value, value_type, value_text) = {
let guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
match guard
.get(&run_point.point_id)
.and_then(|m| m.value_type.as_ref())
{
Some(crate::telemetry::ValueType::Int) => (
crate::telemetry::DataValue::Int(if run_on { 1 } else { 0 }),
Some(crate::telemetry::ValueType::Int),
Some(if run_on { "1" } else { "0" }.to_string()),
),
Some(crate::telemetry::ValueType::UInt) => (
crate::telemetry::DataValue::UInt(if run_on { 1 } else { 0 }),
Some(crate::telemetry::ValueType::UInt),
Some(if run_on { "1" } else { "0" }.to_string()),
),
_ => (
crate::telemetry::DataValue::Bool(run_on),
Some(crate::telemetry::ValueType::Bool),
Some(run_on.to_string()),
),
}
};
let monitor = crate::telemetry::PointMonitorInfo {
protocol: "simulation".to_string(),
source_id: uuid::Uuid::nil(),
point_id: run_point.point_id,
client_handle: 0,
scan_mode: crate::model::ScanMode::Poll,
timestamp: Some(chrono::Utc::now()),
quality: crate::telemetry::PointQuality::Good,
value: Some(value),
value_type,
value_text,
old_value: None,
old_timestamp: None,
value_changed: true,
};
if let Err(e) = state
.connection_manager
.update_point_monitor_data(monitor.clone())
.await
{
tracing::warn!("simulate_run_feedback: cache update failed: {}", e);
return;
}
let _ = state
.ws_manager
.send_to_public(crate::websocket::WsMessage::PointNewValue(monitor))
.await;
tracing::info!(
"simulate_run_feedback: cache-patched run={} for equipment={}",
run_on,
equipment_id
);
}
fn pulse_value(high: bool, value_type: Option<&ValueType>) -> serde_json::Value { fn pulse_value(high: bool, value_type: Option<&ValueType>) -> serde_json::Value {
match value_type { match value_type {
Some(ValueType::Bool) => serde_json::Value::Bool(high), Some(ValueType::Bool) => serde_json::Value::Bool(high),

View File

@ -127,7 +127,7 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
} }
if state.config.simulate_plc { if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() {
crate::control::command::simulate_run_feedback(&state, eq_id, true).await; crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await;
} }
} }
} }
@ -156,7 +156,7 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
} }
if state.config.simulate_plc { if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() {
crate::control::command::simulate_run_feedback(&state, eq_id, false).await; crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await;
} }
} }
} }
@ -174,7 +174,7 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e); tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e);
} else if state.config.simulate_plc { } else if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { if let Some(eq_id) = kind_eq_ids.get("distributor").copied() {
crate::control::command::simulate_run_feedback(&state, eq_id, true).await; crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await;
} }
} }
} }
@ -201,7 +201,7 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
} }
if state.config.simulate_plc { if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { if let Some(eq_id) = kind_eq_ids.get("distributor").copied() {
crate::control::command::simulate_run_feedback(&state, eq_id, false).await; crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await;
} }
} }
} }

View File

@ -101,8 +101,26 @@ async fn run(state: AppState) {
} }
} }
/// Simulate RUN signal feedback for an equipment after a manual start/stop command.
/// Called by the engine and control handler when SIMULATE_PLC=true.
pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) {
let role_points =
match crate::service::get_equipment_role_points(&state.pool, equipment_id).await {
Ok(v) => v,
Err(e) => {
tracing::warn!("simulate_run_feedback: db error: {}", e);
return;
}
};
let run_point = match role_points.iter().find(|p| p.signal_role == "run") {
Some(p) => p.clone(),
None => return,
};
patch_signal(state, run_point.point_id, run_on).await;
}
/// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push. /// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push.
async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) {
let write_json = serde_json::json!(if value_on { 1 } else { 0 }); let write_json = serde_json::json!(if value_on { 1 } else { 0 });
let write_ok = match state let write_ok = match state
.connection_manager .connection_manager

View File

@ -98,7 +98,7 @@ async fn send_equipment_command(
.map_err(|e| ApiErr::Internal(e, None))?; .map_err(|e| ApiErr::Internal(e, None))?;
if state.config.simulate_plc { if state.config.simulate_plc {
crate::control::command::simulate_run_feedback( crate::control::simulate::simulate_run_feedback(
&state, &state,
equipment_id, equipment_id,
matches!(action, ControlAction::Start), matches!(action, ControlAction::Start),