diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index dfa38e4..0000000 --- a/src/config.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::env; - -#[derive(Clone)] -pub struct AppConfig { - pub database_url: String, - pub server_host: String, - pub server_port: u16, - pub write_api_key: Option, - /// When true, simulate RUN signal feedback after start/stop commands. - /// Set SIMULATE_PLC=true in .env for use with OPC UA proxy simulators. - pub simulate_plc: bool, -} - - -impl AppConfig { - pub fn from_env() -> Result { - let database_url = get_env("DATABASE_URL")?; - let server_host = env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string()); - let server_port = env::var("PORT") - .unwrap_or_else(|_| "60309".to_string()) - .parse::() - .map_err(|_| "PORT must be a number")?; - // Prefer WRITE_API_KEY, keep WRITE_KEY as backward-compatible fallback. - let write_api_key = env::var("WRITE_API_KEY") - .ok() - .or_else(|| env::var("WRITE_KEY").ok()); - - let simulate_plc = env::var("SIMULATE_PLC") - .unwrap_or_default() - .to_lowercase() == "true"; - - Ok(Self { - database_url, - server_host, - server_port, - write_api_key, - simulate_plc, - }) - } - - pub fn verify_write_key(&self, key: &str) -> bool { - self.write_api_key - .as_ref() - .map(|expected| expected == key) - .unwrap_or(false) - } -} - -fn get_env(key: &str) -> Result { - env::var(key).map_err(|_| format!("Missing environment variable: {}", key)) -} diff --git a/src/control/command.rs b/src/control/command.rs deleted file mode 100644 index cd40d37..0000000 --- a/src/control/command.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::{ - connection::{BatchSetPointValueReq, ConnectionManager, SetPointValueReqItem}, - telemetry::ValueType, -}; -use serde_json::json; -use std::sync::Arc; -use uuid::Uuid; - -/// Write a pulse (high → delay → low) to a command point. -/// Returns Ok(()) on success, Err(msg) on any failure. -pub async fn send_pulse_command( - connection_manager: &Arc, - point_id: Uuid, - value_type: Option<&ValueType>, - pulse_ms: u64, -) -> Result<(), String> { - let high = pulse_value(true, value_type); - let low = pulse_value(false, value_type); - - let high_result = connection_manager - .write_point_values_batch(BatchSetPointValueReq { - items: vec![SetPointValueReqItem { point_id, value: high }], - }) - .await?; - - if !high_result.success { - return Err(format!("Pulse high write failed: {:?}", high_result.err_msg)); - } - - tokio::time::sleep(std::time::Duration::from_millis(pulse_ms)).await; - - let low_result = connection_manager - .write_point_values_batch(BatchSetPointValueReq { - items: vec![SetPointValueReqItem { point_id, value: low }], - }) - .await?; - - if !low_result.success { - return Err(format!("Pulse low write failed: {:?}", low_result.err_msg)); - } - - Ok(()) -} - -fn pulse_value(high: bool, value_type: Option<&ValueType>) -> serde_json::Value { - match value_type { - Some(ValueType::Bool) => serde_json::Value::Bool(high), - _ => if high { json!(1) } else { json!(0) }, - } -} diff --git a/src/control/engine.rs b/src/control/engine.rs deleted file mode 100644 index 589ff11..0000000 --- a/src/control/engine.rs +++ /dev/null @@ -1,559 +0,0 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use tokio::sync::Notify; -use tokio::time::Duration; -use uuid::Uuid; - -use crate::{ - control::{ - command::send_pulse_command, - runtime::{ControlRuntimeStore, UnitRuntime, UnitRuntimeState}, - }, - event::AppEvent, - service::EquipmentRolePoint, - telemetry::{PointMonitorInfo, PointQuality}, - websocket::WsMessage, - AppState, -}; - -/// Start the engine: a supervisor spawns one async task per enabled unit. -pub fn start(state: AppState, runtime_store: Arc) { - tokio::spawn(async move { - supervise(state, runtime_store).await; - }); -} - -/// Supervisor: scans for enabled units every 10 s and ensures each has a running task. -/// Uses JoinHandle to detect exited tasks so disabled-then-re-enabled units are restarted. -async fn supervise(state: AppState, store: Arc) { - let mut tasks: HashMap> = HashMap::new(); - let mut interval = tokio::time::interval(Duration::from_secs(10)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - loop { - interval.tick().await; - match crate::service::get_all_enabled_units(&state.pool).await { - Ok(units) => { - for unit in units { - let needs_spawn = tasks - .get(&unit.id) - .map_or(true, |h| h.is_finished()); - if needs_spawn { - let s = state.clone(); - let st = store.clone(); - let handle = tokio::spawn(async move { unit_task(s, st, unit.id).await; }); - tasks.insert(unit.id, handle); - } - } - } - Err(e) => tracing::error!("Engine supervisor: failed to load units: {}", e), - } - } -} - -// Per-unit task. - -async fn unit_task(state: AppState, store: Arc, unit_id: Uuid) { - let notify = store.get_or_create_notify(unit_id).await; - - // Fault/comm check ticker; still need periodic polling of point monitor data. - let mut fault_tick = tokio::time::interval(Duration::from_millis(500)); - fault_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - loop { - // Reload unit config on each iteration to detect disable/delete. - let unit = match crate::service::get_unit_by_id(&state.pool, unit_id).await { - Ok(Some(u)) if u.enabled => u, - Ok(_) => { - tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id); - return; - } - Err(e) => { - tracing::error!("Engine: unit {} config reload failed: {}", unit_id, e); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - }; - - // Fault / comm check. - let (kind_roles, kind_eq_ids, all_roles) = match load_equipment_maps(&state, unit_id).await { - Ok(maps) => maps, - Err(e) => { - tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - }; - - let mut runtime = store.get_or_init(unit_id).await; - if check_fault_comm(&state, &mut runtime, &unit, &all_roles).await { - store.upsert(runtime.clone()).await; - push_ws(&state, &runtime).await; - } - - // Wait when not active. - if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required { - tokio::select! { - _ = fault_tick.tick() => {} - _ = notify.notified() => { - // Push fresh runtime immediately so the frontend reflects the change - // (e.g. auto_enabled toggled) without waiting for the next state transition. - let runtime = store.get_or_init(unit_id).await; - push_ws(&state, &runtime).await; - } - } - continue; - } - - // State machine step. - match runtime.state { - UnitRuntimeState::Stopped => { - // Wait stop_time_sec (0 = skip wait, start immediately). - if !wait_phase(&state, &store, &unit, &all_roles, ¬ify, &mut fault_tick).await { - continue; - } - // Send feeder start command. - let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; - let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "start_cmd", &monitor)); - drop(monitor); - if let Some((pid, vt)) = cmd { - if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { - tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e); - continue; - } - if state.config.simulate_plc { - if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { - crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await; - } - } - } - let mut runtime = store.get_or_init(unit_id).await; - runtime.state = UnitRuntimeState::Running; - store.upsert(runtime.clone()).await; - push_ws(&state, &runtime).await; - } - - UnitRuntimeState::Running => { - // Wait run_time_sec. run_time_sec == 0 means run without a time limit - // (relies on acc_time_sec to eventually stop). Treat as a very long phase. - let secs = if unit.run_time_sec > 0 { unit.run_time_sec } else { i32::MAX }; - let unit_for_wait = plc_platform_core::model::ControlUnit { - run_time_sec: secs, - ..unit.clone() - }; - if !wait_phase(&state, &store, &unit_for_wait, &all_roles, ¬ify, &mut fault_tick).await { - continue; - } - // Stop feeder. - let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; - let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); - drop(monitor); - if let Some((pid, vt)) = cmd { - if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { - tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e); - continue; - } - if state.config.simulate_plc { - if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { - crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await; - } - } - } - let mut runtime = store.get_or_init(unit_id).await; - runtime.accumulated_run_sec += secs as i64 * 1000; - runtime.display_acc_sec = runtime.accumulated_run_sec; - - if unit.acc_time_sec > 0 && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 { - // Accumulated threshold reached; start distributor. - let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; - let dist_cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "start_cmd", &monitor)); - drop(monitor); - if let Some((pid, vt)) = dist_cmd { - if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { - tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e); - } else if state.config.simulate_plc { - if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { - crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await; - } - } - } - runtime.state = UnitRuntimeState::DistributorRunning; - } else { - runtime.state = UnitRuntimeState::Stopped; - } - store.upsert(runtime.clone()).await; - push_ws(&state, &runtime).await; - } - - UnitRuntimeState::DistributorRunning => { - // Wait bl_time_sec then stop distributor. - if !wait_phase(&state, &store, &unit, &all_roles, ¬ify, &mut fault_tick).await { - continue; - } - let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; - let cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); - drop(monitor); - if let Some((pid, vt)) = cmd { - if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { - tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e); - continue; - } - if state.config.simulate_plc { - if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { - crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await; - } - } - } - let mut runtime = store.get_or_init(unit_id).await; - runtime.accumulated_run_sec = 0; - runtime.display_acc_sec = 0; - runtime.state = UnitRuntimeState::Stopped; - store.upsert(runtime.clone()).await; - push_ws(&state, &runtime).await; - } - - UnitRuntimeState::FaultLocked | UnitRuntimeState::CommLocked => { - tokio::select! { - _ = fault_tick.tick() => {} - _ = notify.notified() => {} - } - } - } - } -} - -// Helpers. - -/// Sleep for the duration appropriate to the *current* state, interrupting every -/// 500 ms to re-check fault/comm. Returns `true` when the full time elapsed, -/// `false` if the phase was interrupted (auto disabled, fault, or comm lock). -async fn wait_phase( - state: &AppState, - store: &ControlRuntimeStore, - unit: &plc_platform_core::model::ControlUnit, - all_roles: &[(Uuid, HashMap)], - notify: &Arc, - fault_tick: &mut tokio::time::Interval, -) -> bool { - let secs = match store.get_or_init(unit.id).await.state { - UnitRuntimeState::Stopped => unit.stop_time_sec, - UnitRuntimeState::Running => unit.run_time_sec, - UnitRuntimeState::DistributorRunning => unit.bl_time_sec, - _ => return false, - }; - if secs <= 0 { - return true; - } - let deadline = tokio::time::Instant::now() + Duration::from_secs(secs as u64); - loop { - let completed = tokio::select! { - _ = tokio::time::sleep_until(deadline) => true, - _ = fault_tick.tick() => false, - _ = notify.notified() => false, - }; - if completed { - return true; - } - // Re-check fault/comm mid-phase. - let mut runtime = store.get_or_init(unit.id).await; - if check_fault_comm(state, &mut runtime, unit, all_roles).await { - store.upsert(runtime.clone()).await; - push_ws(state, &runtime).await; - } - if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required { - return false; - } - } -} - -async fn push_ws(state: &AppState, runtime: &UnitRuntime) { - if let Err(e) = state - .ws_manager - .send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone())) - .await - { - tracing::debug!("Engine: WS push skipped (no subscribers): {}", e); - } -} - -/// Check fault and comm status, mutate runtime, fire events. -/// Returns `true` if any field changed. -async fn check_fault_comm( - state: &AppState, - runtime: &mut UnitRuntime, - unit: &plc_platform_core::model::ControlUnit, - all_roles: &[(Uuid, HashMap)], -) -> bool { - let monitor = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - - let any_bad = all_roles.iter().flat_map(|(_, r)| r.values()).any(|rp| { - monitor - .get(&rp.point_id) - .map(|m| m.quality != PointQuality::Good) - .unwrap_or(false) - }); - - let any_flt = all_roles.iter().any(|(_, roles)| { - roles - .get("flt") - .and_then(|rp| monitor.get(&rp.point_id)) - .map(|m| super::monitor_value_as_bool(m)) - .unwrap_or(false) - }); - - let flt_eq_id = if any_flt && !runtime.fault_locked { - all_roles - .iter() - .find(|(_, roles)| { - roles - .get("flt") - .and_then(|rp| monitor.get(&rp.point_id)) - .map(|m| super::monitor_value_as_bool(m)) - .unwrap_or(false) - }) - .map(|(eq_id, _)| *eq_id) - } else { - None - }; - - // REM local: any equipment with a rem point that is explicitly false (local mode) with good quality. - let any_rem_local = all_roles.iter().any(|(_, roles)| { - roles - .get("rem") - .and_then(|rp| monitor.get(&rp.point_id)) - .map(|m| !super::monitor_value_as_bool(m) && m.quality == PointQuality::Good) - .unwrap_or(false) - }); - - // Find the first equipment that just switched to local (for event payload). - let rem_local_eq_id = if any_rem_local && !runtime.rem_local { - all_roles - .iter() - .find(|(_, roles)| { - roles - .get("rem") - .and_then(|rp| monitor.get(&rp.point_id)) - .map(|m| !super::monitor_value_as_bool(m) && m.quality == PointQuality::Good) - .unwrap_or(false) - }) - .map(|(eq_id, _)| *eq_id) - } else { - None - }; - - drop(monitor); - - let prev_comm = runtime.comm_locked; - let prev_flt = runtime.flt_active; - let prev_fault_locked = runtime.fault_locked; - let prev_auto = runtime.auto_enabled; - let prev_ack = runtime.manual_ack_required; - let prev_rem_local = runtime.rem_local; - - runtime.comm_locked = any_bad; - runtime.flt_active = any_flt; - runtime.rem_local = any_rem_local; - - if !prev_comm && runtime.comm_locked { - let _ = state.event_manager.send(AppEvent::CommLocked { unit_id: unit.id }); - } else if prev_comm && !runtime.comm_locked { - let _ = state.event_manager.send(AppEvent::CommRecovered { unit_id: unit.id }); - } - - if let Some(eq_id) = flt_eq_id { - runtime.fault_locked = true; - let _ = state.event_manager.send(AppEvent::FaultLocked { unit_id: unit.id, equipment_id: eq_id }); - if runtime.auto_enabled { - runtime.auto_enabled = false; - let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id }); - } - } - - if prev_flt && !any_flt && runtime.fault_locked { - if unit.require_manual_ack_after_fault { - runtime.manual_ack_required = true; - } else { - runtime.fault_locked = false; - } - } - - // Fire RemLocal event when any equipment first switches to local mode. - if let Some(eq_id) = rem_local_eq_id { - let _ = state.event_manager.send(AppEvent::RemLocal { unit_id: unit.id, equipment_id: eq_id }); - if runtime.auto_enabled { - runtime.auto_enabled = false; - let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id }); - } - } - - // Fire RemRecovered when all rem signals return to remote. - if prev_rem_local && !any_rem_local { - let _ = state.event_manager.send(AppEvent::RemRecovered { unit_id: unit.id }); - } - - runtime.comm_locked != prev_comm - || runtime.flt_active != prev_flt - || runtime.fault_locked != prev_fault_locked - || runtime.auto_enabled != prev_auto - || runtime.manual_ack_required != prev_ack - || runtime.rem_local != prev_rem_local -} - -type EquipMaps = ( - HashMap>, - HashMap, - Vec<(Uuid, HashMap)>, -); - -async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result { - let equipment_list = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; - let equipment_ids: Vec = equipment_list.iter().map(|equip| equip.id).collect(); - let role_point_rows = - crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?; - let mut role_points_by_equipment: HashMap> = HashMap::new(); - for row in role_point_rows { - role_points_by_equipment - .entry(row.equipment_id) - .or_default() - .push(EquipmentRolePoint { - point_id: row.point_id, - signal_role: row.signal_role, - }); - } - - Ok(build_equipment_maps( - unit_id, - &equipment_list, - role_points_by_equipment, - )) -} - -fn build_equipment_maps( - unit_id: Uuid, - equipment_list: &[plc_platform_core::model::Equipment], - mut role_points_by_equipment: HashMap>, -) -> EquipMaps { - let mut kind_roles: HashMap> = HashMap::new(); - let mut kind_eq_ids: HashMap = HashMap::new(); - let mut all_roles: Vec<(Uuid, HashMap)> = Vec::new(); - - for equip in equipment_list { - let role_map: HashMap = role_points_by_equipment - .remove(&equip.id) - .unwrap_or_default() - .into_iter() - .map(|rp| (rp.signal_role.clone(), rp)) - .collect(); - - if let Some(kind) = &equip.kind { - if !kind_roles.contains_key(kind.as_str()) { - kind_roles.insert(kind.clone(), role_map.clone()); - kind_eq_ids.insert(kind.clone(), equip.id); - } else { - tracing::warn!( - "Engine: unit {} has multiple {} equipment; using first", - unit_id, kind - ); - } - } - all_roles.push((equip.id, role_map)); - } - - (kind_roles, kind_eq_ids, all_roles) -} - -/// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad. -fn find_cmd( - roles: &HashMap, - role: &str, - monitor: &HashMap, -) -> Option<(Uuid, Option)> { - let cmd_rp = roles.get(role)?; - - let rem_ok = roles - .get("rem") - .and_then(|rp| monitor.get(&rp.point_id)) - .map(|m| super::monitor_value_as_bool(m) && m.quality == PointQuality::Good) - .unwrap_or(true); - - let flt_ok = roles - .get("flt") - .and_then(|rp| monitor.get(&rp.point_id)) - .map(|m| !super::monitor_value_as_bool(m) && m.quality == PointQuality::Good) - .unwrap_or(true); - - if rem_ok && flt_ok { - let vtype = monitor - .get(&cmd_rp.point_id) - .and_then(|m| m.value_type.clone()); - Some((cmd_rp.point_id, vtype)) - } else { - None - } -} - -#[cfg(test)] -mod tests { - use super::build_equipment_maps; - use plc_platform_core::model::Equipment; - use crate::service::EquipmentRolePoint; - use chrono::Utc; - use std::collections::HashMap; - use uuid::Uuid; - - fn equipment(id: Uuid, unit_id: Uuid, kind: &str) -> Equipment { - Equipment { - id, - unit_id: Some(unit_id), - code: format!("EQ-{id}"), - name: format!("Equipment-{id}"), - kind: Some(kind.to_string()), - description: None, - created_at: Utc::now(), - updated_at: Utc::now(), - } - } - - #[test] - fn build_equipment_maps_reflects_latest_role_bindings() { - let unit_id = Uuid::new_v4(); - let equipment_id = Uuid::new_v4(); - let first_start_point = Uuid::new_v4(); - let second_start_point = Uuid::new_v4(); - let equipment_list = vec![equipment(equipment_id, unit_id, "coal_feeder")]; - - let mut first_roles = HashMap::new(); - first_roles.insert( - equipment_id, - vec![EquipmentRolePoint { - point_id: first_start_point, - signal_role: "start_cmd".to_string(), - }], - ); - let (first_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, first_roles); - - let mut second_roles = HashMap::new(); - second_roles.insert( - equipment_id, - vec![EquipmentRolePoint { - point_id: second_start_point, - signal_role: "start_cmd".to_string(), - }], - ); - let (second_kind_roles, _, _) = - build_equipment_maps(unit_id, &equipment_list, second_roles); - - assert_eq!( - first_kind_roles["coal_feeder"]["start_cmd"].point_id, - first_start_point - ); - assert_eq!( - second_kind_roles["coal_feeder"]["start_cmd"].point_id, - second_start_point - ); - } -} diff --git a/src/control/mod.rs b/src/control/mod.rs deleted file mode 100644 index 0b5c26d..0000000 --- a/src/control/mod.rs +++ /dev/null @@ -1,20 +0,0 @@ -pub mod command; -pub mod engine; -pub mod runtime; -pub mod simulate; -pub mod validator; - -use crate::telemetry::{DataValue, PointMonitorInfo}; - -pub(crate) fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool { - match monitor.value.as_ref() { - Some(DataValue::Bool(v)) => *v, - Some(DataValue::Int(v)) => *v != 0, - Some(DataValue::UInt(v)) => *v != 0, - Some(DataValue::Float(v)) => *v != 0.0, - Some(DataValue::Text(v)) => { - matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes") - } - _ => false, - } -} diff --git a/src/control/runtime.rs b/src/control/runtime.rs deleted file mode 100644 index 6a9d26a..0000000 --- a/src/control/runtime.rs +++ /dev/null @@ -1,97 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use tokio::sync::{Notify, RwLock}; -use uuid::Uuid; - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] -#[serde(rename_all = "snake_case")] -pub enum UnitRuntimeState { - Stopped, - Running, - DistributorRunning, - FaultLocked, - CommLocked, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct UnitRuntime { - pub unit_id: Uuid, - pub state: UnitRuntimeState, - pub auto_enabled: bool, - pub accumulated_run_sec: i64, - /// Snapshot updated only on state transitions; used for display to avoid mid-tick jitter. - pub display_acc_sec: i64, - pub fault_locked: bool, - pub flt_active: bool, - pub comm_locked: bool, - pub manual_ack_required: bool, - /// True when any equipment in the unit has REM=false (local mode) with good signal quality. - pub rem_local: bool, -} - -impl UnitRuntime { - pub fn new(unit_id: Uuid) -> Self { - Self { - unit_id, - state: UnitRuntimeState::Stopped, - auto_enabled: false, - accumulated_run_sec: 0, - display_acc_sec: 0, - fault_locked: false, - flt_active: false, - comm_locked: false, - manual_ack_required: false, - rem_local: false, - } - } -} - -#[derive(Clone, Default)] -pub struct ControlRuntimeStore { - inner: Arc>>, - notifiers: Arc>>>, -} - -impl ControlRuntimeStore { - pub fn new() -> Self { - Self::default() - } - - pub async fn get(&self, unit_id: Uuid) -> Option { - self.inner.read().await.get(&unit_id).cloned() - } - - pub async fn get_or_init(&self, unit_id: Uuid) -> UnitRuntime { - if let Some(runtime) = self.get(unit_id).await { - return runtime; - } - - let runtime = UnitRuntime::new(unit_id); - self.inner.write().await.insert(unit_id, runtime.clone()); - runtime - } - - pub async fn upsert(&self, runtime: UnitRuntime) { - self.inner.write().await.insert(runtime.unit_id, runtime); - } - - pub async fn get_or_create_notify(&self, unit_id: Uuid) -> Arc { - self.notifiers - .write() - .await - .entry(unit_id) - .or_insert_with(|| Arc::new(Notify::new())) - .clone() - } - - pub async fn get_all(&self) -> HashMap { - self.inner.read().await.clone() - } - - /// Wake the engine task for a unit (e.g., when auto_enabled or fault_locked changes). - pub async fn notify_unit(&self, unit_id: Uuid) { - if let Some(n) = self.notifiers.read().await.get(&unit_id) { - n.notify_one(); - } - } -} diff --git a/src/control/simulate.rs b/src/control/simulate.rs deleted file mode 100644 index 11462a3..0000000 --- a/src/control/simulate.rs +++ /dev/null @@ -1,213 +0,0 @@ -use tokio::time::Duration; -use uuid::Uuid; - -use crate::{ - connection::{BatchSetPointValueReq, SetPointValueReqItem}, - telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType}, - websocket::WsMessage, - AppState, -}; - -/// Start the chaos simulation task (only when SIMULATE_PLC=true). -/// Randomly disrupts `rem` or `flt` signals on equipment to exercise the control engine. -pub fn start(state: AppState) { - tokio::spawn(async move { - run(state).await; - }); -} - -async fn run(state: AppState) { - let mut rng = seed_rng(); - - loop { - // Wait a random 15-60 s between events. - let wait_secs = 15 + xorshift(&mut rng) % 46; - tokio::time::sleep(Duration::from_secs(wait_secs)).await; - - // Pick a random enabled unit. - let units = match crate::service::get_all_enabled_units(&state.pool).await { - Ok(u) if !u.is_empty() => u, - _ => continue, - }; - let unit = &units[xorshift(&mut rng) as usize % units.len()]; - - // Only target units with auto control running; otherwise the event is uninteresting. - let runtime = state.control_runtime.get(unit.id).await; - if runtime.map_or(true, |r| !r.auto_enabled) { - continue; - } - - // Pick a random equipment in that unit. - let equipments = - match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await { - Ok(e) if !e.is_empty() => e, - _ => continue, - }; - let eq = &equipments[xorshift(&mut rng) as usize % equipments.len()]; - - // Find which of rem / flt this equipment has. - let role_points = - match crate::service::get_equipment_role_points(&state.pool, eq.id).await { - Ok(rp) if !rp.is_empty() => rp, - _ => continue, - }; - - let candidates: Vec<&str> = ["flt", "rem"] - .iter() - .filter(|&&r| role_points.iter().any(|p| p.signal_role == r)) - .copied() - .collect(); - - if candidates.is_empty() { - continue; - } - - let target_role = candidates[xorshift(&mut rng) as usize % candidates.len()]; - let target_point = role_points - .iter() - .find(|p| p.signal_role == target_role) - .unwrap(); - - // rem=false means the equipment is not in remote mode. - // flt=true means the equipment reports an active fault. - let trigger_value = target_role == "flt"; - - // Hold duration: 5-15 s for rem, 3-10 s for flt. - let hold_secs = if target_role == "flt" { - 3 + xorshift(&mut rng) % 8 - } else { - 5 + xorshift(&mut rng) % 11 - }; - - tracing::info!( - "[chaos] unit={} eq={} role={} -> {} (hold {}s)", - unit.code, - eq.code, - target_role, - if trigger_value { "FAULT" } else { "REM OFF" }, - hold_secs - ); - patch_signal(&state, target_point.point_id, trigger_value).await; - patch_signal(&state, target_point.point_id, trigger_value).await; - tokio::time::sleep(Duration::from_secs(hold_secs)).await; - patch_signal(&state, target_point.point_id, !trigger_value).await; - - tracing::info!( - "[chaos] unit={} eq={} role={} -> RESTORED", - unit.code, - eq.code, - target_role - ); - } -} - -/// Simulate RUN signal feedback for an equipment after a manual start/stop command. -/// Called by the engine and control handler when SIMULATE_PLC=true. -pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) { - let role_points = - match crate::service::get_equipment_role_points(&state.pool, equipment_id).await { - Ok(v) => v, - Err(e) => { - tracing::warn!("simulate_run_feedback: db error: {}", e); - return; - } - }; - let run_point = match role_points.iter().find(|p| p.signal_role == "run") { - Some(p) => p.clone(), - None => return, - }; - patch_signal(state, run_point.point_id, run_on).await; -} - -/// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push. -pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) { - let write_json = serde_json::json!(if value_on { 1 } else { 0 }); - let write_ok = match state - .connection_manager - .write_point_values_batch(BatchSetPointValueReq { - items: vec![SetPointValueReqItem { - point_id, - value: write_json, - }], - }) - .await - { - Ok(res) => res.success, - Err(_) => false, - }; - - if write_ok { - return; - } - - // Fallback: patch the monitor cache directly and broadcast over WS. - let (value, value_type, value_text) = { - let guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) { - Some(ValueType::Int) => ( - DataValue::Int(if value_on { 1 } else { 0 }), - Some(ValueType::Int), - Some(if value_on { "1" } else { "0" }.to_string()), - ), - Some(ValueType::UInt) => ( - DataValue::UInt(if value_on { 1 } else { 0 }), - Some(ValueType::UInt), - Some(if value_on { "1" } else { "0" }.to_string()), - ), - _ => ( - DataValue::Bool(value_on), - Some(ValueType::Bool), - Some(value_on.to_string()), - ), - } - }; - - let monitor = PointMonitorInfo { - protocol: "simulation".to_string(), - source_id: Uuid::nil(), - point_id, - client_handle: 0, - scan_mode: plc_platform_core::model::ScanMode::Poll, - timestamp: Some(chrono::Utc::now()), - quality: PointQuality::Good, - value: Some(value), - value_type, - value_text, - old_value: None, - old_timestamp: None, - value_changed: true, - }; - - if let Err(e) = state - .connection_manager - .update_point_monitor_data(monitor.clone()) - .await - { - tracing::warn!("[chaos] cache update failed for {}: {}", point_id, e); - return; - } - - let _ = state - .ws_manager - .send_to_public(WsMessage::PointNewValue(monitor)) - .await; -} - -// Minimal XorShift64 PRNG (no external crate needed). - -fn seed_rng() -> u64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_nanos() as u64 ^ d.as_secs().wrapping_mul(0x9e37_79b9_7f4a_7c15)) - .unwrap_or(0xdeadbeef) -} - -fn xorshift(s: &mut u64) -> u64 { - *s ^= *s << 13; - *s ^= *s >> 7; - *s ^= *s << 17; - *s -} diff --git a/src/control/validator.rs b/src/control/validator.rs deleted file mode 100644 index b91a398..0000000 --- a/src/control/validator.rs +++ /dev/null @@ -1,201 +0,0 @@ -use std::collections::HashMap; - -use serde_json::json; -use uuid::Uuid; - -use crate::{ - service::EquipmentRolePoint, - telemetry::{PointMonitorInfo, PointQuality, ValueType}, - util::response::ApiErr, - AppState, -}; - -#[derive(Debug, Clone, Copy)] -pub enum ControlAction { - Start, - Stop, -} - -impl ControlAction { - pub fn as_str(self) -> &'static str { - match self { - Self::Start => "start", - Self::Stop => "stop", - } - } - - pub fn command_role(self) -> &'static str { - match self { - Self::Start => "start_cmd", - Self::Stop => "stop_cmd", - } - } -} - -pub struct ManualControlContext { - pub unit_id: Option, - pub command_point: EquipmentRolePoint, - pub command_value_type: Option, -} - -pub async fn validate_manual_control( - state: &AppState, - equipment_id: Uuid, - action: ControlAction, -) -> Result { - let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id) - .await? - .ok_or_else(|| ApiErr::NotFound("Equipment not found".to_string(), None))?; - - let role_points = crate::service::get_equipment_role_points(&state.pool, equipment_id).await?; - if role_points.is_empty() { - return Err(ApiErr::BadRequest( - "Equipment has no bound role points".to_string(), - Some(json!({ "equipment_id": equipment_id })), - )); - } - - let role_map: HashMap<&str, &EquipmentRolePoint> = role_points - .iter() - .map(|point| (point.signal_role.as_str(), point)) - .collect(); - - let command_point = role_map - .get(action.command_role()) - .copied() - .ok_or_else(|| { - ApiErr::BadRequest( - format!("Equipment missing role point {}", action.command_role()), - Some(json!({ - "equipment_id": equipment_id, - "required_role": action.command_role() - })), - ) - })? - .clone(); - - let monitor_guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - - validate_quality( - role_map.get("rem").copied(), - &monitor_guard, - "REM", - equipment_id, - )?; - validate_quality( - role_map.get("flt").copied(), - &monitor_guard, - "FLT", - equipment_id, - )?; - if let Some(rem_point) = role_map.get("rem").copied() { - let rem_monitor = monitor_guard - .get(&rem_point.point_id) - .ok_or_else(|| missing_monitor_err("REM", equipment_id))?; - if !super::monitor_value_as_bool(rem_monitor) { - return Err(ApiErr::Forbidden( - "Remote control not allowed, REM is not enabled".to_string(), - Some(json!({ "equipment_id": equipment_id })), - )); - } - } - - if let Some(flt_point) = role_map.get("flt").copied() { - let flt_monitor = monitor_guard - .get(&flt_point.point_id) - .ok_or_else(|| missing_monitor_err("FLT", equipment_id))?; - if super::monitor_value_as_bool(flt_monitor) { - return Err(ApiErr::Forbidden( - "Equipment fault is active, command denied".to_string(), - Some(json!({ "equipment_id": equipment_id })), - )); - } - } - - drop(monitor_guard); - - // Runtime state checks — block commands if unit is locked - if let Some(unit_id) = equipment.unit_id { - if let Some(runtime) = state.control_runtime.get(unit_id).await { - if runtime.auto_enabled { - return Err(ApiErr::Forbidden( - "Auto control is active; disable auto first".to_string(), - Some(json!({ "unit_id": unit_id })), - )); - } - if runtime.comm_locked { - return Err(ApiErr::Forbidden( - "Unit communication is locked".to_string(), - Some(json!({ "unit_id": unit_id })), - )); - } - if runtime.fault_locked { - return Err(ApiErr::Forbidden( - "Unit is fault locked".to_string(), - Some(json!({ "unit_id": unit_id, "manual_ack_required": runtime.manual_ack_required })), - )); - } - if runtime.manual_ack_required { - return Err(ApiErr::Forbidden( - "Fault acknowledgement required before issuing commands".to_string(), - Some(json!({ "unit_id": unit_id })), - )); - } - } - } - - let command_value_type = state - .connection_manager - .get_point_monitor_data_read_guard() - .await - .get(&command_point.point_id) - .and_then(|item| item.value_type.clone()); - - Ok(ManualControlContext { - unit_id: equipment.unit_id, - command_point, - command_value_type, - }) -} - -fn validate_quality( - role_point: Option<&EquipmentRolePoint>, - monitor_map: &HashMap, - role: &str, - equipment_id: Uuid, -) -> Result<(), ApiErr> { - let Some(role_point) = role_point else { - return Ok(()); - }; - - let monitor = monitor_map - .get(&role_point.point_id) - .ok_or_else(|| missing_monitor_err(role, equipment_id))?; - - if monitor.quality != PointQuality::Good { - return Err(ApiErr::Forbidden( - format!("Communication abnormal for role {}", role), - Some(json!({ - "equipment_id": equipment_id, - "role": role, - "quality": monitor.quality - })), - )); - } - - Ok(()) -} - -fn missing_monitor_err(role: &str, equipment_id: Uuid) -> ApiErr { - ApiErr::Forbidden( - format!("No realtime value for role {}", role), - Some(json!({ - "equipment_id": equipment_id, - "role": role - })), - ) -} - diff --git a/src/event.rs b/src/event.rs deleted file mode 100644 index 23a4e4a..0000000 --- a/src/event.rs +++ /dev/null @@ -1,567 +0,0 @@ -use std::collections::HashMap; -use plc_platform_core::event::EventEnvelope; -use tokio::sync::mpsc; -use uuid::Uuid; -use plc_platform_core::model::EventRecord; - -const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; -const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096; - -#[derive(Debug, Clone)] -pub enum AppEvent { - SourceCreate { - source_id: Uuid, - }, - SourceUpdate { - source_id: Uuid, - }, - SourceDelete { - source_id: Uuid, - source_name: String, - }, - PointCreateBatch { - source_id: Uuid, - point_ids: Vec, - }, - PointDeleteBatch { - source_id: Uuid, - point_ids: Vec, - }, - EquipmentStartCommandSent { - equipment_id: Uuid, - unit_id: Option, - point_id: Uuid, - }, - EquipmentStopCommandSent { - equipment_id: Uuid, - unit_id: Option, - point_id: Uuid, - }, - AutoControlStarted { unit_id: Uuid }, - AutoControlStopped { unit_id: Uuid }, - FaultLocked { unit_id: Uuid, equipment_id: Uuid }, - FaultAcked { unit_id: Uuid }, - CommLocked { unit_id: Uuid }, - CommRecovered { unit_id: Uuid }, - RemLocal { unit_id: Uuid, equipment_id: Uuid }, - RemRecovered { unit_id: Uuid }, - UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String }, - PointNewValue(crate::telemetry::PointNewValue), -} - -pub struct EventManager { - control_sender: mpsc::Sender, - telemetry_sender: mpsc::Sender, -} - -impl EventManager { - pub fn new( - pool: sqlx::PgPool, - connection_manager: std::sync::Arc, - ws_manager: Option>, - ) -> Self { - let (control_sender, mut control_receiver) = - mpsc::channel::(CONTROL_EVENT_CHANNEL_CAPACITY); - let (telemetry_sender, mut telemetry_receiver) = - mpsc::channel::(TELEMETRY_EVENT_CHANNEL_CAPACITY); - - let control_cm = connection_manager.clone(); - let control_pool = pool.clone(); - let control_ws_manager = ws_manager.clone(); - tokio::spawn(async move { - while let Some(event) = control_receiver.recv().await { - handle_control_event(event, &control_pool, &control_cm, control_ws_manager.as_ref()) - .await; - } - }); - - let ws_manager_clone = ws_manager.clone(); - let telemetry_cm = connection_manager.clone(); - tokio::spawn(async move { - while let Some(payload) = telemetry_receiver.recv().await { - let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> = - HashMap::new(); - latest_by_key.insert((payload.source_id, payload.client_handle), payload); - - loop { - match telemetry_receiver.try_recv() { - Ok(next_payload) => { - latest_by_key.insert( - (next_payload.source_id, next_payload.client_handle), - next_payload, - ); - } - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { - break; - } - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { - break; - } - } - } - - for point_payload in latest_by_key.into_values() { - process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref()) - .await; - } - } - }); - - Self { - control_sender, - telemetry_sender, - } - } - - pub fn send(&self, event: AppEvent) -> Result<(), String> { - match event { - AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) { - Ok(()) => Ok(()), - Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { - Err(format!("Failed to send telemetry event: channel closed ({e:?})")) - } - Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => { - // High-frequency telemetry is lossy by design under sustained pressure. - tracing::warn!( - "Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}", - payload.source_id, - payload.client_handle - ); - Ok(()) - } - }, - control_event => match self.control_sender.try_send(control_event) { - Ok(()) => Ok(()), - Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => { - Err(format!("Failed to send control event: channel closed ({e:?})")) - } - Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => { - Err(format!("Failed to send control event: queue full ({e:?})")) - } - }, - } - } -} - -impl plc_platform_core::connection::PointEventSink for EventManager { - fn send_point_new_value( - &self, - payload: plc_platform_core::telemetry::PointNewValue, - ) -> Result<(), String> { - self.send(AppEvent::PointNewValue(payload)) - } -} - -async fn handle_control_event( - event: AppEvent, - pool: &sqlx::PgPool, - connection_manager: &std::sync::Arc, - ws_manager: Option<&std::sync::Arc>, -) { - persist_event_if_needed(&event, pool, ws_manager).await; - - match event { - AppEvent::SourceCreate { source_id } => { - tracing::info!("Processing SourceCreate event for {}", source_id); - if let Err(e) = connection_manager.connect_from_source(pool, source_id).await { - tracing::error!("Failed to connect to source {}: {}", source_id, e); - } - } - AppEvent::SourceUpdate { source_id } => { - tracing::info!("Processing SourceUpdate event for {}", source_id); - if let Err(e) = connection_manager.reconnect(pool, source_id).await { - tracing::error!("Failed to reconnect source {}: {}", source_id, e); - } - } - AppEvent::SourceDelete { source_id, .. } => { - tracing::info!("Processing SourceDelete event for {}", source_id); - if let Err(e) = connection_manager.disconnect(source_id).await { - tracing::error!("Failed to disconnect from source {}: {}", source_id, e); - } - } - AppEvent::PointCreateBatch { source_id, point_ids } => { - let requested_count = point_ids.len(); - match connection_manager - .subscribe_points_from_source(source_id, Some(point_ids), pool) - .await - { - Ok(stats) => { - let subscribed = *stats.get("subscribed").unwrap_or(&0); - let polled = *stats.get("polled").unwrap_or(&0); - let total = *stats.get("total").unwrap_or(&0); - tracing::info!( - "PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}", - source_id, - requested_count, - subscribed, - polled, - total - ); - } - Err(e) => { - tracing::error!("Failed to subscribe to points: {}", e); - } - } - } - AppEvent::PointDeleteBatch { source_id, point_ids } => { - tracing::info!( - "Processing PointDeleteBatch event for source {} with {} points", - source_id, - point_ids.len() - ); - if let Err(e) = connection_manager - .unsubscribe_points_from_source(source_id, point_ids) - .await - { - tracing::error!("Failed to unsubscribe points: {}", e); - } - } - AppEvent::EquipmentStartCommandSent { - equipment_id, - unit_id, - point_id, - } => { - tracing::info!( - "Equipment start command sent: equipment={}, unit={:?}, point={}", - equipment_id, - unit_id, - point_id - ); - } - AppEvent::EquipmentStopCommandSent { - equipment_id, - unit_id, - point_id, - } => { - tracing::info!( - "Equipment stop command sent: equipment={}, unit={:?}, point={}", - equipment_id, - unit_id, - point_id - ); - } - AppEvent::AutoControlStarted { unit_id } => { - tracing::info!("Auto control started for unit {}", unit_id); - } - AppEvent::AutoControlStopped { unit_id } => { - tracing::info!("Auto control stopped for unit {}", unit_id); - } - AppEvent::FaultLocked { unit_id, equipment_id } => { - tracing::warn!("Fault locked: unit={}, equipment={}", unit_id, equipment_id); - } - AppEvent::FaultAcked { unit_id } => { - tracing::info!("Fault acked for unit {}", unit_id); - } - AppEvent::CommLocked { unit_id } => { - tracing::warn!("Comm locked for unit {}", unit_id); - } - AppEvent::CommRecovered { unit_id } => { - tracing::info!("Comm recovered for unit {}", unit_id); - } - AppEvent::RemLocal { unit_id, equipment_id } => { - tracing::warn!("REM local: unit={}, equipment={}", unit_id, equipment_id); - } - AppEvent::RemRecovered { unit_id } => { - tracing::info!("REM recovered for unit {}", unit_id); - } - AppEvent::UnitStateChanged { unit_id, from_state, to_state } => { - tracing::info!("Unit {} state: {} → {}", unit_id, from_state, to_state); - } - AppEvent::PointNewValue(_) => { - tracing::warn!("PointNewValue routed to control worker unexpectedly"); - } - } -} - -async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String { - sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") - .bind(id) - .fetch_optional(pool) - .await - .ok() - .flatten() - .unwrap_or_else(|| id.to_string()) -} - -async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String { - sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1") - .bind(id) - .fetch_optional(pool) - .await - .ok() - .flatten() - .unwrap_or_else(|| id.to_string()) -} - -async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String { - sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1") - .bind(id) - .fetch_optional(pool) - .await - .ok() - .flatten() - .unwrap_or_else(|| id.to_string()) -} - -async fn persist_event_if_needed( - event: &AppEvent, - pool: &sqlx::PgPool, - ws_manager: Option<&std::sync::Arc>, -) { - let record = match event { - AppEvent::SourceCreate { source_id } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "source.created", "info", - None, None, Some(*source_id), - format!("数据源【{}】已创建", name), - serde_json::json!({ "source_id": source_id }), - )) - } - AppEvent::SourceUpdate { source_id } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "source.updated", "info", - None, None, Some(*source_id), - format!("数据源【{}】已更新", name), - serde_json::json!({ "source_id": source_id }), - )) - } - AppEvent::SourceDelete { source_id, source_name } => Some(( - "source.deleted", "warn", - None, None, None, - format!("数据源【{}】已删除", source_name), - serde_json::json!({ "source_id": source_id }), - )), - AppEvent::PointCreateBatch { source_id, point_ids } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "point.batch_created", "info", - None, None, Some(*source_id), - format!("批量创建 {} 个测点(数据源:{})", point_ids.len(), name), - serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), - )) - } - AppEvent::PointDeleteBatch { source_id, point_ids } => { - let name = fetch_source_name(pool, *source_id).await; - Some(( - "point.batch_deleted", "warn", - None, None, Some(*source_id), - format!("批量删除 {} 个测点(数据源:{})", point_ids.len(), name), - serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), - )) - } - AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => { - let code = fetch_equipment_code(pool, *equipment_id).await; - Some(( - "equipment.start_command_sent", "info", - *unit_id, Some(*equipment_id), None, - format!("已发送启动指令(设备:{})", code), - serde_json::json!({ - "equipment_id": equipment_id, - "unit_id": unit_id, - "point_id": point_id - }), - )) - } - AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id } => { - let code = fetch_equipment_code(pool, *equipment_id).await; - Some(( - "equipment.stop_command_sent", "info", - *unit_id, Some(*equipment_id), None, - format!("已发送停止指令(设备:{})", code), - serde_json::json!({ - "equipment_id": equipment_id, - "unit_id": unit_id, - "point_id": point_id - }), - )) - } - AppEvent::AutoControlStarted { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "unit.auto_control_started", "info", - Some(*unit_id), None, None, - format!("已启动自动控制(单元:{})", code), - serde_json::json!({ "unit_id": unit_id }), - )) - } - AppEvent::AutoControlStopped { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "unit.auto_control_stopped", "info", - Some(*unit_id), None, None, - format!("已停止自动控制(单元:{})", code), - serde_json::json!({ "unit_id": unit_id }), - )) - } - AppEvent::FaultLocked { unit_id, equipment_id } => { - let unit_code = fetch_unit_code(pool, *unit_id).await; - let eq_code = fetch_equipment_code(pool, *equipment_id).await; - Some(( - "unit.fault_locked", "error", - Some(*unit_id), Some(*equipment_id), None, - format!("单元【{}】发生故障锁定,触发设备:{}", unit_code, eq_code), - serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), - )) - } - AppEvent::FaultAcked { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "unit.fault_acked", "info", - Some(*unit_id), None, None, - format!("单元【{}】故障已人工确认", code), - serde_json::json!({ "unit_id": unit_id }), - )) - } - AppEvent::CommLocked { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "unit.comm_locked", "warn", - Some(*unit_id), None, None, - format!("单元【{}】通讯中断", code), - serde_json::json!({ "unit_id": unit_id }), - )) - } - AppEvent::CommRecovered { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "unit.comm_recovered", "info", - Some(*unit_id), None, None, - format!("单元【{}】通讯恢复", code), - serde_json::json!({ "unit_id": unit_id }), - )) - } - AppEvent::RemLocal { unit_id, equipment_id } => { - let unit_code = fetch_unit_code(pool, *unit_id).await; - let eq_code = fetch_equipment_code(pool, *equipment_id).await; - Some(( - "unit.rem_local", "warn", - Some(*unit_id), Some(*equipment_id), None, - format!("单元【{}】切换为本地控制,触发设备:{},自动控制已停止", unit_code, eq_code), - serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), - )) - } - AppEvent::RemRecovered { unit_id } => { - let code = fetch_unit_code(pool, *unit_id).await; - Some(( - "unit.rem_recovered", "warn", - Some(*unit_id), None, None, - format!("单元【{}】已切换回远程控制,自动控制需手动重新启动", code), - serde_json::json!({ "unit_id": unit_id }), - )) - } - AppEvent::UnitStateChanged { .. } => None, - AppEvent::PointNewValue(_) => None, - }; - - let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record else { - return; - }; - let envelope = EventEnvelope::new(event_type, payload); - - let inserted = sqlx::query_as::<_, EventRecord>( - r#" - INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload) - VALUES ($1, $2, $3, $4, $5, $6, $7) - RETURNING * - "#, - ) - .bind(envelope.event_type) - .bind(level) - .bind(unit_id as Option) - .bind(equipment_id as Option) - .bind(source_id) - .bind(message) - .bind(sqlx::types::Json(envelope.payload)) - .fetch_one(pool) - .await; - - match inserted { - Ok(record) => { - if let Some(ws_manager) = ws_manager { - let ws_message = crate::websocket::WsMessage::EventCreated(record); - if let Err(err) = ws_manager.send_to_public(ws_message).await { - tracing::warn!("Failed to broadcast event websocket message: {}", err); - } - } - } - Err(err) => { - tracing::warn!("Failed to persist event: {}", err); - } - } -} - -async fn process_point_new_value( - payload: crate::telemetry::PointNewValue, - connection_manager: &std::sync::Arc, - ws_manager: Option<&std::sync::Arc>, -) { - let source_id = payload.source_id; - let client_handle = payload.client_handle; - let point_id = if let Some(point_id) = payload.point_id { - Some(point_id) - } else { - let status = connection_manager.get_status_read_guard().await; - status - .get(&source_id) - .and_then(|s| s.client_handle_map.get(&client_handle).copied()) - }; - if let Some(point_id) = point_id { - // 从缓存中读取旧值 - let (old_value, old_timestamp, value_changed) = { - let monitor_data = connection_manager.get_point_monitor_data_read_guard().await; - let old_monitor_info = monitor_data.get(&point_id); - - if let Some(old_info) = old_monitor_info { - let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp; - (old_info.value.clone(), old_info.timestamp, changed) - } else { - (None, None, false) - } - }; - - let monitor = crate::telemetry::PointMonitorInfo { - protocol: payload.protocol, - source_id, - point_id, - client_handle, - scan_mode: payload.scan_mode, - timestamp: payload.timestamp, - quality: payload.quality, - value: payload.value, - value_type: payload.value_type, - value_text: payload.value_text, - old_value, - old_timestamp, - value_changed, - }; - - if let Err(e) = connection_manager - .update_point_monitor_data(monitor.clone()) - .await - { - tracing::error!( - "Failed to update point monitor data for point {}: {}", - point_id, - e - ); - } - - if let Some(ws_manager) = ws_manager { - let ws_message = crate::websocket::WsMessage::PointNewValue(monitor); - if let Err(e) = ws_manager.send_to_public(ws_message).await { - tracing::warn!( - "Failed to send WebSocket message to public room: {}", - e - ); - } - } - } else { - tracing::warn!( - "Point not found for source {} client_handle {}", - source_id, - client_handle - ); - } -} diff --git a/src/handler.rs b/src/handler.rs deleted file mode 100644 index c9befdf..0000000 --- a/src/handler.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub mod control; -pub mod doc; -pub mod equipment; -pub mod log; -pub mod page; -pub mod point; -pub mod source; -pub mod tag; diff --git a/src/handler/control.rs b/src/handler/control.rs deleted file mode 100644 index 4cd8b94..0000000 --- a/src/handler/control.rs +++ /dev/null @@ -1,750 +0,0 @@ -use axum::{ - extract::{Path, Query, State}, - http::StatusCode, - response::IntoResponse, - Json, -}; -use serde::Deserialize; -use serde_json::json; -use uuid::Uuid; -use validator::Validate; - -use crate::{ - control::validator::{validate_manual_control, ControlAction}, - util::{ - pagination::{PaginatedResponse, PaginationParams}, - response::ApiErr, - }, - AppState, -}; - -fn validate_unit_timing_order( - run_time_sec: i32, - acc_time_sec: i32, -) -> Result<(), ApiErr> { - if acc_time_sec <= run_time_sec { - return Err(ApiErr::BadRequest( - "acc_time_sec must be greater than run_time_sec".to_string(), - Some(json!({ - "run_time_sec": ["must be less than acc_time_sec"], - "acc_time_sec": ["must be greater than run_time_sec"] - })), - )); - } - - Ok(()) -} - -fn auto_control_start_blocked(runtime: &crate::control::runtime::UnitRuntime) -> bool { - runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required || runtime.rem_local -} - -#[derive(Debug, Deserialize, Validate)] -pub struct GetUnitListQuery { - #[validate(length(min = 1, max = 100))] - pub keyword: Option, - #[serde(flatten)] - pub pagination: PaginationParams, -} - -#[derive(serde::Serialize)] -pub struct UnitEquipmentItem { - #[serde(flatten)] - pub equipment: plc_platform_core::model::Equipment, - pub role_points: Vec, -} - -#[derive(serde::Serialize)] -pub struct UnitWithRuntime { - #[serde(flatten)] - pub unit: plc_platform_core::model::ControlUnit, - pub runtime: Option, - pub equipments: Vec, -} - -pub async fn get_unit_list( - State(state): State, - Query(query): Query, -) -> Result { - query.validate()?; - - let total = crate::service::get_units_count(&state.pool, query.keyword.as_deref()).await?; - let units = crate::service::get_units_paginated( - &state.pool, - query.keyword.as_deref(), - query.pagination.page_size, - query.pagination.offset(), - ) - .await?; - - let all_runtimes = state.control_runtime.get_all().await; - - let unit_ids: Vec = units.iter().map(|u| u.id).collect(); - let all_equipments = - crate::service::get_equipment_by_unit_ids(&state.pool, &unit_ids).await?; - - let eq_ids: Vec = all_equipments.iter().map(|e| e.id).collect(); - let role_point_rows = - crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?; - - let monitor_guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - - let mut role_points_map: std::collections::HashMap< - Uuid, - Vec, - > = std::collections::HashMap::new(); - for rp in role_point_rows { - role_points_map - .entry(rp.equipment_id) - .or_default() - .push(crate::handler::equipment::SignalRolePoint { - point_id: rp.point_id, - signal_role: rp.signal_role, - point_monitor: monitor_guard.get(&rp.point_id).cloned(), - }); - } - drop(monitor_guard); - - let mut equipments_by_unit: std::collections::HashMap> = - std::collections::HashMap::new(); - for eq in all_equipments { - let role_points = role_points_map.remove(&eq.id).unwrap_or_default(); - if let Some(unit_id) = eq.unit_id { - equipments_by_unit - .entry(unit_id) - .or_default() - .push(UnitEquipmentItem { equipment: eq, role_points }); - } - } - - let data = units - .into_iter() - .map(|unit| { - let runtime = all_runtimes.get(&unit.id).cloned(); - let equipments = equipments_by_unit.remove(&unit.id).unwrap_or_default(); - UnitWithRuntime { unit, runtime, equipments } - }) - .collect::>(); - - Ok(Json(PaginatedResponse::new( - data, - total, - query.pagination.page, - query.pagination.page_size, - ))) -} - -pub async fn start_equipment( - State(state): State, - Path(equipment_id): Path, -) -> Result { - send_equipment_command(state, equipment_id, ControlAction::Start).await -} - -pub async fn stop_equipment( - State(state): State, - Path(equipment_id): Path, -) -> Result { - send_equipment_command(state, equipment_id, ControlAction::Stop).await -} - - -async fn send_equipment_command( - state: AppState, - equipment_id: Uuid, - action: ControlAction, -) -> Result { - let context = validate_manual_control(&state, equipment_id, action).await?; - let pulse_ms = 300u64; - - crate::control::command::send_pulse_command( - &state.connection_manager, - context.command_point.point_id, - context.command_value_type.as_ref(), - pulse_ms, - ) - .await - .map_err(|e| ApiErr::Internal(e, None))?; - - if state.config.simulate_plc { - crate::control::simulate::simulate_run_feedback( - &state, - equipment_id, - matches!(action, ControlAction::Start), - ) - .await; - } - - let event = match action { - ControlAction::Start => crate::event::AppEvent::EquipmentStartCommandSent { - equipment_id, - unit_id: context.unit_id, - point_id: context.command_point.point_id, - }, - ControlAction::Stop => crate::event::AppEvent::EquipmentStopCommandSent { - equipment_id, - unit_id: context.unit_id, - point_id: context.command_point.point_id, - }, - }; - let _ = state.event_manager.send(event); - - Ok(Json(json!({ - "ok_msg": format!("Equipment {} command sent", action.as_str()), - "equipment_id": equipment_id, - "unit_id": context.unit_id, - "command_role": context.command_point.signal_role, - "command_point_id": context.command_point.point_id, - "pulse_ms": pulse_ms - }))) -} - -pub async fn get_unit( - State(state): State, - Path(unit_id): Path, -) -> Result { - let unit = crate::service::get_unit_by_id(&state.pool, unit_id) - .await? - .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; - let runtime = state.control_runtime.get(unit_id).await; - - let all_equipments = - crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; - let eq_ids: Vec = all_equipments.iter().map(|e| e.id).collect(); - let role_point_rows = - crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?; - let monitor_guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - let mut role_points_map: std::collections::HashMap< - Uuid, - Vec, - > = std::collections::HashMap::new(); - for rp in role_point_rows { - role_points_map - .entry(rp.equipment_id) - .or_default() - .push(crate::handler::equipment::SignalRolePoint { - point_id: rp.point_id, - signal_role: rp.signal_role, - point_monitor: monitor_guard.get(&rp.point_id).cloned(), - }); - } - drop(monitor_guard); - - let equipments = all_equipments - .into_iter() - .map(|eq| { - let role_points = role_points_map.remove(&eq.id).unwrap_or_default(); - UnitEquipmentItem { equipment: eq, role_points } - }) - .collect(); - - Ok(Json(UnitWithRuntime { unit, runtime, equipments })) -} - -#[derive(serde::Serialize)] -pub struct PointDetail { - #[serde(flatten)] - pub point: plc_platform_core::model::Point, - pub point_monitor: Option, -} - -#[derive(serde::Serialize)] -pub struct EquipmentDetail { - #[serde(flatten)] - pub equipment: plc_platform_core::model::Equipment, - pub points: Vec, -} - -#[derive(serde::Serialize)] -pub struct UnitDetail { - #[serde(flatten)] - pub unit: plc_platform_core::model::ControlUnit, - pub runtime: Option, - pub equipments: Vec, -} - -pub async fn get_unit_detail( - State(state): State, - Path(unit_id): Path, -) -> Result { - let unit = crate::service::get_unit_by_id(&state.pool, unit_id) - .await? - .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; - - let runtime = state.control_runtime.get(unit_id).await; - - let equipments = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; - let equipment_ids: Vec = equipments.iter().map(|e| e.id).collect(); - let all_points = crate::service::get_points_by_equipment_ids(&state.pool, &equipment_ids).await?; - - let monitor_guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - - let equipments = equipments - .into_iter() - .map(|eq| { - let points = all_points - .iter() - .filter(|p| p.equipment_id == Some(eq.id)) - .map(|p| PointDetail { - point_monitor: monitor_guard.get(&p.id).cloned(), - point: p.clone(), - }) - .collect(); - EquipmentDetail { equipment: eq, points } - }) - .collect(); - - Ok(Json(UnitDetail { unit, runtime, equipments })) -} - -#[derive(Debug, Deserialize, Validate)] -pub struct CreateUnitReq { - #[validate(length(min = 1, max = 100))] - pub code: String, - #[validate(length(min = 1, max = 100))] - pub name: String, - pub description: Option, - pub enabled: Option, - #[validate(range(min = 1, message = "must be greater than 0"))] - pub run_time_sec: Option, - #[validate(range(min = 1, message = "must be greater than 0"))] - pub stop_time_sec: Option, - #[validate(range(min = 1, message = "must be greater than 0"))] - pub acc_time_sec: Option, - #[validate(range(min = 1, message = "must be greater than 0"))] - pub bl_time_sec: Option, - pub require_manual_ack_after_fault: Option, -} - -pub async fn create_unit( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let run_time_sec = payload.run_time_sec.ok_or_else(|| { - ApiErr::BadRequest( - "run_time_sec is required".to_string(), - Some(json!({ "run_time_sec": ["is required"] })), - ) - })?; - let stop_time_sec = payload.stop_time_sec.ok_or_else(|| { - ApiErr::BadRequest( - "stop_time_sec is required".to_string(), - Some(json!({ "stop_time_sec": ["is required"] })), - ) - })?; - let acc_time_sec = payload.acc_time_sec.ok_or_else(|| { - ApiErr::BadRequest( - "acc_time_sec is required".to_string(), - Some(json!({ "acc_time_sec": ["is required"] })), - ) - })?; - let bl_time_sec = payload.bl_time_sec.ok_or_else(|| { - ApiErr::BadRequest( - "bl_time_sec is required".to_string(), - Some(json!({ "bl_time_sec": ["is required"] })), - ) - })?; - - validate_unit_timing_order(run_time_sec, acc_time_sec)?; - - if crate::service::get_unit_by_code(&state.pool, &payload.code) - .await? - .is_some() - { - return Err(ApiErr::BadRequest( - "Unit code already exists".to_string(), - None, - )); - } - - let unit_id = crate::service::create_unit( - &state.pool, - crate::service::CreateUnitParams { - code: &payload.code, - name: &payload.name, - description: payload.description.as_deref(), - enabled: payload.enabled.unwrap_or(true), - run_time_sec, - stop_time_sec, - acc_time_sec, - bl_time_sec, - require_manual_ack_after_fault: payload - .require_manual_ack_after_fault - .unwrap_or(true), - }, - ) - .await?; - - Ok(( - StatusCode::CREATED, - Json(serde_json::json!({ - "id": unit_id, - "ok_msg": "Unit created successfully" - })), - )) -} - -#[derive(Debug, Deserialize, Validate)] -pub struct UpdateUnitReq { - #[validate(length(min = 1, max = 100))] - pub code: Option, - #[validate(length(min = 1, max = 100))] - pub name: Option, - pub description: Option, - pub enabled: Option, - #[validate(range(min = 1, message = "must be greater than 0"))] - pub run_time_sec: Option, - #[validate(range(min = 1, message = "must be greater than 0"))] - pub stop_time_sec: Option, - #[validate(range(min = 1, message = "must be greater than 0"))] - pub acc_time_sec: Option, - #[validate(range(min = 1, message = "must be greater than 0"))] - pub bl_time_sec: Option, - pub require_manual_ack_after_fault: Option, -} - -pub async fn update_unit( - State(state): State, - Path(unit_id): Path, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let existing_unit = crate::service::get_unit_by_id(&state.pool, unit_id) - .await? - .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; - - validate_unit_timing_order( - payload.run_time_sec.unwrap_or(existing_unit.run_time_sec), - payload.acc_time_sec.unwrap_or(existing_unit.acc_time_sec), - )?; - - if let Some(code) = payload.code.as_deref() { - let duplicate = crate::service::get_unit_by_code(&state.pool, code).await?; - if duplicate.as_ref().is_some_and(|item| item.id != unit_id) { - return Err(ApiErr::BadRequest( - "Unit code already exists".to_string(), - None, - )); - } - } - - if payload.code.is_none() - && payload.name.is_none() - && payload.description.is_none() - && payload.enabled.is_none() - && payload.run_time_sec.is_none() - && payload.stop_time_sec.is_none() - && payload.acc_time_sec.is_none() - && payload.bl_time_sec.is_none() - && payload.require_manual_ack_after_fault.is_none() - { - return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); - } - - crate::service::update_unit( - &state.pool, - unit_id, - crate::service::UpdateUnitParams { - code: payload.code.as_deref(), - name: payload.name.as_deref(), - description: payload.description.as_deref(), - enabled: payload.enabled, - run_time_sec: payload.run_time_sec, - stop_time_sec: payload.stop_time_sec, - acc_time_sec: payload.acc_time_sec, - bl_time_sec: payload.bl_time_sec, - require_manual_ack_after_fault: payload.require_manual_ack_after_fault, - }, - ) - .await?; - - Ok(Json(serde_json::json!({ - "ok_msg": "Unit updated successfully" - }))) -} - -pub async fn delete_unit( - State(state): State, - Path(unit_id): Path, -) -> Result { - let deleted = crate::service::delete_unit(&state.pool, unit_id).await?; - if !deleted { - return Err(ApiErr::NotFound("Unit not found".to_string(), None)); - } - - Ok(StatusCode::NO_CONTENT) -} - -#[derive(Debug, Deserialize, Validate)] -pub struct GetEventListQuery { - pub unit_id: Option, - #[validate(length(min = 1, max = 100))] - pub event_type: Option, - #[serde(flatten)] - pub pagination: PaginationParams, -} - -pub async fn get_event_list( - State(state): State, - Query(query): Query, -) -> Result { - query.validate()?; - - let total = crate::service::get_events_count( - &state.pool, - query.unit_id, - query.event_type.as_deref(), - ) - .await?; - let data = crate::service::get_events_paginated( - &state.pool, - query.unit_id, - query.event_type.as_deref(), - query.pagination.page_size, - query.pagination.offset(), - ) - .await?; - - Ok(Json(PaginatedResponse::new( - data, - total, - query.pagination.page, - query.pagination.page_size, - ))) -} - -pub async fn start_auto_unit( - State(state): State, - Path(unit_id): Path, -) -> Result { - let unit = crate::service::get_unit_by_id(&state.pool, unit_id) - .await? - .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; - - if !unit.enabled { - return Err(ApiErr::BadRequest("Unit is disabled".to_string(), None)); - } - - let mut runtime = state.control_runtime.get_or_init(unit_id).await; - if auto_control_start_blocked(&runtime) { - let message = if runtime.fault_locked { - "Unit is fault locked, cannot start auto control" - } else if runtime.comm_locked { - "Unit communication is locked, cannot start auto control" - } else if runtime.rem_local { - "Equipment is in local mode (REM off), cannot start auto control" - } else { - "Fault acknowledgement required before starting auto control" - }; - return Err(ApiErr::BadRequest(message.to_string(), None)); - } - runtime.auto_enabled = true; - runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; - state.control_runtime.upsert(runtime).await; - state.control_runtime.notify_unit(unit_id).await; - - let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStarted { unit_id }); - - Ok(Json(json!({ "ok_msg": "Auto control started", "unit_id": unit_id }))) -} - -pub async fn stop_auto_unit( - State(state): State, - Path(unit_id): Path, -) -> Result { - crate::service::get_unit_by_id(&state.pool, unit_id) - .await? - .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; - - let mut runtime = state.control_runtime.get_or_init(unit_id).await; - runtime.auto_enabled = false; - state.control_runtime.upsert(runtime).await; - state.control_runtime.notify_unit(unit_id).await; - - let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStopped { unit_id }); - - Ok(Json(json!({ "ok_msg": "Auto control stopped", "unit_id": unit_id }))) -} - -pub async fn batch_start_auto( - State(state): State, -) -> Result { - let units = crate::service::get_all_enabled_units(&state.pool).await?; - let mut started = Vec::new(); - let mut skipped = Vec::new(); - - for unit in units { - let mut runtime = state.control_runtime.get_or_init(unit.id).await; - if runtime.auto_enabled { - skipped.push(unit.id); - continue; - } - if auto_control_start_blocked(&runtime) { - skipped.push(unit.id); - continue; - } - runtime.auto_enabled = true; - runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; - state.control_runtime.upsert(runtime).await; - state.control_runtime.notify_unit(unit.id).await; - let _ = state - .event_manager - .send(crate::event::AppEvent::AutoControlStarted { unit_id: unit.id }); - started.push(unit.id); - } - - Ok(Json(json!({ "started": started, "skipped": skipped }))) -} - -pub async fn batch_stop_auto( - State(state): State, -) -> Result { - let units = crate::service::get_all_enabled_units(&state.pool).await?; - let mut stopped = Vec::new(); - - for unit in units { - let mut runtime = state.control_runtime.get_or_init(unit.id).await; - if !runtime.auto_enabled { - continue; - } - runtime.auto_enabled = false; - state.control_runtime.upsert(runtime).await; - state.control_runtime.notify_unit(unit.id).await; - let _ = state - .event_manager - .send(crate::event::AppEvent::AutoControlStopped { unit_id: unit.id }); - stopped.push(unit.id); - } - - Ok(Json(json!({ "stopped": stopped }))) -} - -pub async fn ack_fault_unit( - State(state): State, - Path(unit_id): Path, -) -> Result { - crate::service::get_unit_by_id(&state.pool, unit_id) - .await? - .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; - - let mut runtime = state.control_runtime.get_or_init(unit_id).await; - - if !runtime.fault_locked { - return Err(ApiErr::BadRequest( - "Unit is not fault locked".to_string(), - Some(json!({ "unit_id": unit_id })), - )); - } - if runtime.flt_active { - return Err(ApiErr::BadRequest( - "FLT is still active, cannot acknowledge".to_string(), - Some(json!({ "unit_id": unit_id })), - )); - } - - runtime.fault_locked = false; - runtime.manual_ack_required = false; - runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; - state.control_runtime.upsert(runtime).await; - state.control_runtime.notify_unit(unit_id).await; - - let _ = state.event_manager.send(crate::event::AppEvent::FaultAcked { unit_id }); - - Ok(Json(json!({ "ok_msg": "Fault acknowledged", "unit_id": unit_id }))) -} - -pub async fn get_unit_runtime( - State(state): State, - Path(unit_id): Path, -) -> Result { - crate::service::get_unit_by_id(&state.pool, unit_id) - .await? - .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; - - let runtime = state.control_runtime.get_or_init(unit_id).await; - Ok(Json(runtime)) -} - -#[cfg(test)] -mod tests { - use super::{ - auto_control_start_blocked, validate_unit_timing_order, CreateUnitReq, UpdateUnitReq, - }; - use crate::control::runtime::{UnitRuntime, UnitRuntimeState}; - use uuid::Uuid; - use validator::Validate; - - #[test] - fn create_unit_req_rejects_zero_second_fields() { - let payload = CreateUnitReq { - code: "U-01".to_string(), - name: "Unit 01".to_string(), - description: None, - enabled: Some(true), - run_time_sec: Some(0), - stop_time_sec: Some(10), - acc_time_sec: Some(20), - bl_time_sec: Some(5), - require_manual_ack_after_fault: Some(true), - }; - - assert!(payload.validate().is_err()); - } - - #[test] - fn create_unit_req_rejects_acc_time_not_greater_than_run_time() { - assert!(validate_unit_timing_order(10, 10).is_err()); - } - - #[test] - fn update_unit_req_rejects_zero_second_fields() { - let payload = UpdateUnitReq { - code: None, - name: None, - description: None, - enabled: None, - run_time_sec: None, - stop_time_sec: Some(0), - acc_time_sec: Some(20), - bl_time_sec: Some(5), - require_manual_ack_after_fault: None, - }; - - assert!(payload.validate().is_err()); - } - - #[test] - fn update_unit_req_rejects_acc_time_not_greater_than_run_time_when_both_present() { - assert!(validate_unit_timing_order(20, 15).is_err()); - } - - #[test] - fn auto_control_start_is_blocked_by_comm_lock() { - let runtime = UnitRuntime { - unit_id: Uuid::new_v4(), - state: UnitRuntimeState::Stopped, - auto_enabled: false, - accumulated_run_sec: 0, - display_acc_sec: 0, - fault_locked: false, - flt_active: false, - comm_locked: true, - manual_ack_required: false, - rem_local: false, - }; - - assert!(auto_control_start_blocked(&runtime)); - } -} diff --git a/src/handler/doc.rs b/src/handler/doc.rs deleted file mode 100644 index 49429f9..0000000 --- a/src/handler/doc.rs +++ /dev/null @@ -1,40 +0,0 @@ -use axum::{ - http::{header, HeaderMap, HeaderValue, StatusCode}, - response::IntoResponse, -}; - -use plc_platform_core::util::response::ApiErr; - -pub async fn get_api_md() -> Result { - let content = tokio::fs::read_to_string("API.md") - .await - .map_err(|err| { - tracing::error!("Failed to read API.md: {}", err); - ApiErr::NotFound("API.md not found".to_string(), None) - })?; - - let mut headers = HeaderMap::new(); - headers.insert( - header::CONTENT_TYPE, - HeaderValue::from_static("text/markdown; charset=utf-8"), - ); - - Ok((StatusCode::OK, headers, content)) -} - -pub async fn get_readme_md() -> Result { - let content = tokio::fs::read_to_string("README.md") - .await - .map_err(|err| { - tracing::error!("Failed to read README.md: {}", err); - ApiErr::NotFound("README.md not found".to_string(), None) - })?; - - let mut headers = HeaderMap::new(); - headers.insert( - header::CONTENT_TYPE, - HeaderValue::from_static("text/markdown; charset=utf-8"), - ); - - Ok((StatusCode::OK, headers, content)) -} diff --git a/src/handler/equipment.rs b/src/handler/equipment.rs deleted file mode 100644 index 7ad5232..0000000 --- a/src/handler/equipment.rs +++ /dev/null @@ -1,331 +0,0 @@ -use axum::{ - extract::{Path, Query, State}, - http::StatusCode, - response::IntoResponse, - Json, -}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; -use validator::Validate; - -use plc_platform_core::util::{ - pagination::{PaginatedResponse, PaginationParams}, - response::ApiErr, -}; -use crate::AppState; - -async fn notify_units( - state: &AppState, - unit_ids: impl IntoIterator, -) { - let mut seen = std::collections::HashSet::new(); - for unit_id in unit_ids { - if seen.insert(unit_id) { - state.control_runtime.notify_unit(unit_id).await; - } - } -} - -#[derive(Deserialize, Validate)] -pub struct GetEquipmentListQuery { - #[validate(length(min = 1, max = 100))] - pub keyword: Option, - #[serde(flatten)] - pub pagination: PaginationParams, -} - -#[derive(Serialize)] -pub struct SignalRolePoint { - pub point_id: uuid::Uuid, - pub signal_role: String, - pub point_monitor: Option, -} - -#[derive(Serialize)] -pub struct EquipmentListItem { - #[serde(flatten)] - pub equipment: plc_platform_core::model::Equipment, - pub point_count: i64, - pub role_points: Vec, -} - -pub async fn get_equipment_list( - State(state): State, - Query(query): Query, -) -> Result { - query.validate()?; - - let total = crate::service::get_equipment_count(&state.pool, query.keyword.as_deref()).await?; - let items = crate::service::get_equipment_paginated( - &state.pool, - query.keyword.as_deref(), - query.pagination.page_size, - query.pagination.offset(), - ) - .await?; - - let equipment_ids: Vec = items.iter().map(|item| item.equipment.id).collect(); - let role_point_rows = - crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?; - - let monitor_guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - - let mut role_points_map: std::collections::HashMap> = - std::collections::HashMap::new(); - for rp in role_point_rows { - role_points_map - .entry(rp.equipment_id) - .or_default() - .push(SignalRolePoint { - point_id: rp.point_id, - signal_role: rp.signal_role, - point_monitor: monitor_guard.get(&rp.point_id).cloned(), - }); - } - - let data = items - .into_iter() - .map(|item| EquipmentListItem { - role_points: role_points_map - .remove(&item.equipment.id) - .unwrap_or_default(), - ..item - }) - .collect::>(); - - Ok(Json(PaginatedResponse::new( - data, - total, - query.pagination.page, - query.pagination.page_size, - ))) -} - -pub async fn get_equipment( - State(state): State, - Path(equipment_id): Path, -) -> Result { - let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; - - match equipment { - Some(item) => Ok(Json(item)), - None => Err(ApiErr::NotFound("Equipment not found".to_string(), None)), - } -} - -pub async fn get_equipment_points( - State(state): State, - Path(equipment_id): Path, -) -> Result { - let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; - if exists.is_none() { - return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); - } - - let points = crate::service::get_points_by_equipment_id(&state.pool, equipment_id).await?; - Ok(Json(points)) -} - -#[derive(Debug, Deserialize, Validate)] -pub struct CreateEquipmentReq { - pub unit_id: Option, - #[validate(length(min = 1, max = 100))] - pub code: String, - #[validate(length(min = 1, max = 100))] - pub name: String, - pub kind: Option, - pub description: Option, -} - -#[derive(Debug, Deserialize, Validate)] -pub struct UpdateEquipmentReq { - pub unit_id: Option>, - #[validate(length(min = 1, max = 100))] - pub code: Option, - #[validate(length(min = 1, max = 100))] - pub name: Option, - pub kind: Option, - pub description: Option, -} - -#[derive(Debug, Deserialize, Validate)] -pub struct BatchSetEquipmentUnitReq { - pub equipment_ids: Vec, - pub unit_id: Option, -} - -pub async fn create_equipment( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let exists = crate::service::get_equipment_by_code(&state.pool, &payload.code).await?; - if exists.is_some() { - return Err(ApiErr::BadRequest( - "Equipment code already exists".to_string(), - None, - )); - } - - if let Some(unit_id) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; - if unit_exists.is_none() { - return Err(ApiErr::NotFound("Unit not found".to_string(), None)); - } - } - - let equipment_id = crate::service::create_equipment( - &state.pool, - payload.unit_id, - &payload.code, - &payload.name, - payload.kind.as_deref(), - payload.description.as_deref(), - ) - .await?; - - if let Some(unit_id) = payload.unit_id { - notify_units(&state, [unit_id]).await; - } - - Ok(( - StatusCode::CREATED, - Json(serde_json::json!({ - "id": equipment_id, - "ok_msg": "Equipment created successfully" - })), - )) -} - -pub async fn update_equipment( - State(state): State, - Path(equipment_id): Path, - Json(payload): Json, -) -> Result { - payload.validate()?; - - if payload.unit_id.is_none() - && payload.code.is_none() - && payload.name.is_none() - && payload.kind.is_none() - && payload.description.is_none() - { - return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); - } - - let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?; - let existing_equipment = if let Some(equipment) = exists { - equipment - } else { - return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); - }; - - if let Some(Some(unit_id)) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; - if unit_exists.is_none() { - return Err(ApiErr::NotFound("Unit not found".to_string(), None)); - } - } - - if let Some(code) = payload.code.as_deref() { - let duplicate = crate::service::get_equipment_by_code(&state.pool, code).await?; - if duplicate - .as_ref() - .is_some_and(|item| item.id != equipment_id) - { - return Err(ApiErr::BadRequest( - "Equipment code already exists".to_string(), - None, - )); - } - } - - crate::service::update_equipment( - &state.pool, - equipment_id, - payload.unit_id, - payload.code.as_deref(), - payload.name.as_deref(), - payload.kind.as_deref(), - payload.description.as_deref(), - ) - .await?; - - let mut unit_ids = Vec::new(); - if let Some(unit_id) = existing_equipment.unit_id { - unit_ids.push(unit_id); - } - let next_unit_id = match payload.unit_id { - Some(next) => next, - None => existing_equipment.unit_id, - }; - if let Some(unit_id) = next_unit_id { - unit_ids.push(unit_id); - } - notify_units(&state, unit_ids).await; - - Ok(Json(serde_json::json!({ - "ok_msg": "Equipment updated successfully" - }))) -} - -pub async fn batch_set_equipment_unit( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - if payload.equipment_ids.is_empty() { - return Err(ApiErr::BadRequest( - "equipment_ids cannot be empty".to_string(), - None, - )); - } - - if let Some(unit_id) = payload.unit_id { - let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; - if unit_exists.is_none() { - return Err(ApiErr::NotFound("Unit not found".to_string(), None)); - } - } - - let before_unit_ids = - crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?; - - let updated_count = crate::service::batch_set_equipment_unit( - &state.pool, - &payload.equipment_ids, - payload.unit_id, - ) - .await?; - - let mut unit_ids = before_unit_ids; - if let Some(unit_id) = payload.unit_id { - unit_ids.push(unit_id); - } - notify_units(&state, unit_ids).await; - - Ok(Json(serde_json::json!({ - "ok_msg": "Equipment unit updated successfully", - "updated_count": updated_count - }))) -} - -pub async fn delete_equipment( - State(state): State, - Path(equipment_id): Path, -) -> Result { - let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?; - let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?; - if !deleted { - return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); - } - - notify_units(&state, unit_ids).await; - - Ok(StatusCode::NO_CONTENT) -} diff --git a/src/handler/log.rs b/src/handler/log.rs deleted file mode 100644 index a08eaa0..0000000 --- a/src/handler/log.rs +++ /dev/null @@ -1,354 +0,0 @@ -use std::{ - convert::Infallible, - path::{Path, PathBuf}, - time::SystemTime, -}; - -use async_stream::stream; -use axum::{ - extract::Query, - response::{ - sse::{Event, KeepAlive, Sse}, - IntoResponse, - }, - Json, -}; -use serde::{Deserialize, Serialize}; -use tokio::{ - fs, - io::{AsyncReadExt, AsyncSeekExt, SeekFrom}, - time::{Duration, interval}, -}; - -use plc_platform_core::util::response::ApiErr; - -const LOG_DIR: &str = "./logs"; -const DEFAULT_TAIL_LINES: usize = 200; -const MAX_TAIL_LINES: usize = 2000; -const DEFAULT_MAX_BYTES: usize = 64 * 1024; -const STREAM_MAX_BYTES: usize = 32 * 1024; -const MAX_MAX_BYTES: usize = 512 * 1024; - -#[derive(Debug, Deserialize)] -pub struct LogQuery { - pub file: Option, - pub cursor: Option, - pub tail_lines: Option, - pub max_bytes: Option, -} - -#[derive(Debug, Serialize)] -pub struct LogChunkResponse { - pub file: String, - pub cursor: u64, - pub lines: Vec, - pub truncated: bool, - pub reset: bool, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -struct StreamFileState { - path: PathBuf, - file_name: String, - cursor: u64, -} - -pub async fn get_logs(Query(query): Query) -> Result { - let path = resolve_log_file(query.file.as_deref()).await?; - let file_name = file_name_of(&path); - let max_bytes = query - .max_bytes - .unwrap_or(DEFAULT_MAX_BYTES) - .clamp(1, MAX_MAX_BYTES); - - let response = if let Some(cursor) = query.cursor { - read_since(&path, &file_name, cursor, max_bytes).await? - } else { - let tail_lines = query - .tail_lines - .unwrap_or(DEFAULT_TAIL_LINES) - .clamp(1, MAX_TAIL_LINES); - read_tail(&path, &file_name, tail_lines).await? - }; - - Ok(Json(response)) -} - -pub async fn stream_logs(Query(query): Query) -> Result { - let path = resolve_log_file(query.file.as_deref()).await?; - let file_name = file_name_of(&path); - let max_bytes = query - .max_bytes - .unwrap_or(STREAM_MAX_BYTES) - .clamp(1, MAX_MAX_BYTES); - let follow_latest = query.file.is_none(); - let start_cursor = query.cursor.unwrap_or(file_len(&path).await?); - - let event_stream = stream! { - let mut ticker = interval(Duration::from_millis(800)); - let mut stream_file = StreamFileState { - path, - file_name, - cursor: start_cursor, - }; - - loop { - ticker.tick().await; - let switched = if follow_latest { - match latest_log_file(Path::new(LOG_DIR)).await { - Ok(latest_path) => { - let latest = StreamFileState { - file_name: file_name_of(&latest_path), - path: latest_path, - cursor: 0, - }; - let (next, switched) = advance_stream_file(&stream_file, &latest); - stream_file = next; - switched - } - Err(_) => false, - } - } else { - false - }; - - match read_since(&stream_file.path, &stream_file.file_name, stream_file.cursor, max_bytes).await { - Ok(chunk) => { - stream_file.cursor = chunk.cursor; - let chunk = LogChunkResponse { - reset: chunk.reset || switched, - ..chunk - }; - if chunk.reset || !chunk.lines.is_empty() { - match Event::default().event("log").json_data(&chunk) { - Ok(event) => yield Ok::(event), - Err(_) => { - yield Ok::( - Event::default().event("error").data("serialize log event failed") - ); - break; - } - } - } - } - Err(_) => { - yield Ok::( - Event::default().event("error").data("log stream read failed") - ); - break; - } - } - } - }; - - Ok( - Sse::new(event_stream) - .keep_alive(KeepAlive::new().interval(Duration::from_secs(10)).text("keepalive")), - ) -} - -async fn resolve_log_file(file: Option<&str>) -> Result { - let log_dir = PathBuf::from(LOG_DIR); - - if let Some(file_name) = file { - validate_file_name(file_name)?; - let path = log_dir.join(file_name); - ensure_exists(&path).await?; - return Ok(path); - } - - latest_log_file(&log_dir).await -} - -fn validate_file_name(file_name: &str) -> Result<(), ApiErr> { - if file_name.is_empty() { - return Err(ApiErr::BadRequest("file cannot be empty".to_string(), None)); - } - if file_name.contains(['/', '\\']) || file_name.contains("..") { - return Err(ApiErr::BadRequest("invalid log file name".to_string(), None)); - } - if !file_name.starts_with("app.log") { - return Err(ApiErr::BadRequest("only app.log* files are allowed".to_string(), None)); - } - Ok(()) -} - -async fn ensure_exists(path: &Path) -> Result<(), ApiErr> { - fs::metadata(path).await.map_err(|e| match e.kind() { - std::io::ErrorKind::NotFound => ApiErr::NotFound("log file not found".to_string(), None), - _ => ApiErr::Internal("failed to access log file".to_string(), None), - })?; - Ok(()) -} - -async fn latest_log_file(log_dir: &Path) -> Result { - let mut entries = fs::read_dir(log_dir).await.map_err(|e| match e.kind() { - std::io::ErrorKind::NotFound => ApiErr::NotFound("log directory not found".to_string(), None), - _ => ApiErr::Internal("failed to read log directory".to_string(), None), - })?; - - let mut latest: Option<(SystemTime, PathBuf)> = None; - - while let Some(entry) = entries - .next_entry() - .await - .map_err(|_| ApiErr::Internal("failed to enumerate log files".to_string(), None))? - { - let file_name = entry.file_name(); - let file_name = file_name.to_string_lossy(); - if !file_name.starts_with("app.log") { - continue; - } - - let metadata = entry - .metadata() - .await - .map_err(|_| ApiErr::Internal("failed to read log metadata".to_string(), None))?; - if !metadata.is_file() { - continue; - } - - let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH); - let path = entry.path(); - - match latest.as_ref() { - Some((latest_modified, _)) if modified <= *latest_modified => {} - _ => latest = Some((modified, path)), - } - } - - latest - .map(|(_, path)| path) - .ok_or_else(|| ApiErr::NotFound("no app.log files found".to_string(), None)) -} - -async fn read_tail(path: &Path, file_name: &str, tail_lines: usize) -> Result { - let mut file = fs::File::open(path).await.map_err(map_open_err)?; - let mut buffer = Vec::new(); - file.read_to_end(&mut buffer) - .await - .map_err(|_| ApiErr::Internal("failed to read log file".to_string(), None))?; - - let cursor = buffer.len() as u64; - let text = String::from_utf8_lossy(&buffer); - let mut lines: Vec = text.lines().map(|line| line.to_string()).collect(); - let dropped = lines.len().saturating_sub(tail_lines); - if dropped > 0 { - lines = lines.into_iter().skip(dropped).collect(); - } - - Ok(LogChunkResponse { - file: file_name.to_string(), - cursor, - lines, - truncated: false, - reset: false, - }) -} - -async fn read_since( - path: &Path, - file_name: &str, - cursor: u64, - max_bytes: usize, -) -> Result { - let mut file = fs::File::open(path).await.map_err(map_open_err)?; - let metadata = file - .metadata() - .await - .map_err(|_| ApiErr::Internal("failed to read log metadata".to_string(), None))?; - let file_size = metadata.len(); - - let (start, reset) = if cursor > file_size { - (0, true) - } else { - (cursor, false) - }; - let end = (start + max_bytes as u64).min(file_size); - - file.seek(SeekFrom::Start(start)) - .await - .map_err(|_| ApiErr::Internal("failed to seek log file".to_string(), None))?; - - let mut buffer = vec![0u8; (end - start) as usize]; - if !buffer.is_empty() { - file.read_exact(&mut buffer) - .await - .map_err(|_| ApiErr::Internal("failed to read log file chunk".to_string(), None))?; - } - - let text = String::from_utf8_lossy(&buffer); - let lines = text.lines().map(|line| line.to_string()).collect(); - - Ok(LogChunkResponse { - file: file_name.to_string(), - cursor: end, - lines, - truncated: end < file_size, - reset, - }) -} - -async fn file_len(path: &Path) -> Result { - let metadata = fs::metadata(path).await.map_err(map_open_err)?; - Ok(metadata.len()) -} - -fn file_name_of(path: &Path) -> String { - path.file_name() - .and_then(|name| name.to_str()) - .unwrap_or_default() - .to_string() -} - -fn advance_stream_file( - current: &StreamFileState, - latest: &StreamFileState, -) -> (StreamFileState, bool) { - if current.path == latest.path { - return (current.clone(), false); - } - - ( - StreamFileState { - path: latest.path.clone(), - file_name: latest.file_name.clone(), - cursor: 0, - }, - true, - ) -} - -fn map_open_err(err: std::io::Error) -> ApiErr { - match err.kind() { - std::io::ErrorKind::NotFound => ApiErr::NotFound("log file not found".to_string(), None), - _ => ApiErr::Internal("failed to access log file".to_string(), None), - } -} - -#[cfg(test)] -mod tests { - use super::{advance_stream_file, StreamFileState}; - use std::path::PathBuf; - - #[test] - fn advance_stream_file_switches_to_latest_file_and_resets_cursor() { - let current = StreamFileState { - path: PathBuf::from("logs/app.log"), - file_name: "app.log".to_string(), - cursor: 128, - }; - let latest = StreamFileState { - path: PathBuf::from("logs/app.log.1"), - file_name: "app.log.1".to_string(), - cursor: 42, - }; - - let (next, switched) = advance_stream_file(¤t, &latest); - - assert!(switched); - assert_eq!(next.path, latest.path); - assert_eq!(next.file_name, latest.file_name); - assert_eq!(next.cursor, 0); - } -} diff --git a/src/handler/page.rs b/src/handler/page.rs deleted file mode 100644 index 0aa37fa..0000000 --- a/src/handler/page.rs +++ /dev/null @@ -1,169 +0,0 @@ -use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse}; -use serde::Deserialize; -use std::collections::HashMap; -use sqlx::types::Json as SqlxJson; -use uuid::Uuid; -use validator::Validate; - -use plc_platform_core::model::Page; -use plc_platform_core::util::response::ApiErr; -use crate::AppState; - -#[derive(Deserialize, Validate)] -pub struct GetPageListQuery { - #[validate(length(min = 1, max = 100))] - pub name: Option, -} - -pub async fn get_page_list( - State(state): State, - Query(query): Query, -) -> Result { - query.validate()?; - let pool = &state.pool; - - let pages: Vec = if let Some(name) = query.name { - sqlx::query_as::<_, Page>( - r#" - SELECT * FROM page - WHERE name ILIKE $1 - ORDER BY created_at - "#, - ) - .bind(format!("%{}%", name)) - .fetch_all(pool) - .await? - } else { - sqlx::query_as::<_, Page>( - r#"SELECT * FROM page ORDER BY created_at"#, - ) - .fetch_all(pool) - .await? - }; - - Ok(Json(pages)) -} - -pub async fn get_page( - State(state): State, - Path(page_id): Path, -) -> Result { - let page = sqlx::query_as::<_, Page>("SELECT * FROM page WHERE id = $1") - .bind(page_id) - .fetch_optional(&state.pool) - .await?; - - match page { - Some(p) => Ok(Json(p)), - None => Err(ApiErr::NotFound("Page not found".to_string(), None)), - } -} - -#[derive(Debug, Deserialize, Validate)] -pub struct CreatePageReq { - #[validate(length(min = 1, max = 100))] - pub name: String, - pub data: HashMap, -} - -#[derive(Debug, Deserialize, Validate)] -pub struct UpdatePageReq { - #[validate(length(min = 1, max = 100))] - pub name: Option, - pub data: Option>, -} - -pub async fn create_page( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let page_id = sqlx::query_scalar::<_, Uuid>( - r#" - INSERT INTO page (name, data) - VALUES ($1, $2) - RETURNING id - "#, - ) - .bind(&payload.name) - .bind(SqlxJson(payload.data)) - .fetch_one(&state.pool) - .await?; - - Ok((StatusCode::CREATED, Json(serde_json::json!({ - "id": page_id, - "ok_msg": "Page created successfully" - })))) -} - -pub async fn update_page( - State(state): State, - Path(page_id): Path, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let exists = sqlx::query("SELECT 1 FROM page WHERE id = $1") - .bind(page_id) - .fetch_optional(&state.pool) - .await?; - if exists.is_none() { - return Err(ApiErr::NotFound("Page not found".to_string(), None)); - } - - if payload.name.is_none() && payload.data.is_none() { - return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); - } - - let mut updates = Vec::new(); - let mut param_count = 1; - - if payload.name.is_some() { - updates.push(format!("name = ${}", param_count)); - param_count += 1; - } - if payload.data.is_some() { - updates.push(format!("data = ${}", param_count)); - param_count += 1; - } - - updates.push("updated_at = NOW()".to_string()); - - let sql = format!( - r#"UPDATE page SET {} WHERE id = ${}"#, - updates.join(", "), - param_count - ); - - let mut query = sqlx::query(&sql); - if let Some(name) = payload.name { - query = query.bind(name); - } - if let Some(data) = payload.data { - query = query.bind(SqlxJson(data)); - } - query = query.bind(page_id); - - query.execute(&state.pool).await?; - - Ok(Json(serde_json::json!({ - "ok_msg": "Page updated successfully" - }))) -} - -pub async fn delete_page( - State(state): State, - Path(page_id): Path, -) -> Result { - let result = sqlx::query("DELETE FROM page WHERE id = $1") - .bind(page_id) - .execute(&state.pool) - .await?; - - if result.rows_affected() == 0 { - return Err(ApiErr::NotFound("Page not found".to_string(), None)); - } - - Ok(StatusCode::NO_CONTENT) -} diff --git a/src/handler/point.rs b/src/handler/point.rs deleted file mode 100644 index 0f9b39e..0000000 --- a/src/handler/point.rs +++ /dev/null @@ -1,693 +0,0 @@ -use axum::{ - extract::{Path, Query, State}, - http::HeaderMap, - response::IntoResponse, - Json, -}; -use serde::{Deserialize, Serialize}; -use serde_with::rust::double_option; -use sqlx::{QueryBuilder, Row}; -use std::collections::{HashMap, HashSet}; -use uuid::Uuid; -use validator::Validate; - -use plc_platform_core::util::{ - pagination::{PaginatedResponse, PaginationParams}, - response::ApiErr, -}; - -use crate::{ - AppState, -}; -use plc_platform_core::model::{Node, Point}; - -async fn notify_units( - state: &AppState, - unit_ids: impl IntoIterator, -) { - let mut seen = std::collections::HashSet::new(); - for unit_id in unit_ids { - if seen.insert(unit_id) { - state.control_runtime.notify_unit(unit_id).await; - } - } -} - -/// List all points. -#[derive(Deserialize, Validate)] -pub struct GetPointListQuery { - pub source_id: Option, - pub equipment_id: Option, - #[serde(flatten)] - pub pagination: PaginationParams, -} - -#[derive(Serialize)] -pub struct PointWithMonitor { - #[serde(flatten)] - pub point: Point, - pub point_monitor: Option, -} - -#[derive(Deserialize, Validate)] -pub struct GetPointHistoryQuery { - pub limit: Option, -} - -#[derive(Serialize)] -pub struct PointHistoryItem { - #[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")] - pub timestamp: Option>, - pub quality: crate::telemetry::PointQuality, - pub value: Option, - pub value_text: Option, - pub value_number: Option, -} - -pub async fn get_point_list( - State(state): State, - Query(query): Query, -) -> Result { - query.validate()?; - let pool = &state.pool; - - // Count total rows. - let total = crate::service::get_points_count(pool, query.source_id, query.equipment_id).await?; - - // Load current page rows. - let points = crate::service::get_points_paginated( - pool, - query.source_id, - query.equipment_id, - query.pagination.page_size, - query.pagination.offset(), - ) - .await?; - - let monitor_guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - - let data: Vec = points - .into_iter() - .map(|point| { - let point_monitor = monitor_guard.get(&point.id).cloned(); - PointWithMonitor { - point, - point_monitor, - } - }) - .collect(); - - let response = PaginatedResponse::new( - data, - total, - query.pagination.page, - query.pagination.page_size, - ); - - Ok(Json(response)) -} -/// Get a point by id. -pub async fn get_point( - State(state): State, - Path(point_id): Path, -) -> Result { - let pool = &state.pool; - let point = crate::service::get_point_by_id(pool, point_id).await?; - - Ok(Json(point)) -} - -pub async fn get_point_history( - State(state): State, - Path(point_id): Path, - Query(query): Query, -) -> Result { - let pool = &state.pool; - let point = crate::service::get_point_by_id(pool, point_id).await?; - if point.is_none() { - return Err(ApiErr::NotFound("Point not found".to_string(), None)); - } - - let limit = query.limit.unwrap_or(120).clamp(1, 1000); - let history = state - .connection_manager - .get_point_history(point_id, limit) - .await; - - let items: Vec = history - .into_iter() - .map(|item| { - let value_number = monitor_value_to_number(&item); - PointHistoryItem { - timestamp: item.timestamp, - quality: item.quality, - value_number, - value: item.value, - value_text: item.value_text, - } - }) - .collect(); - - Ok(Json(items)) -} - -/// Request payload for updating editable point fields. -#[derive(Deserialize, Validate)] -pub struct UpdatePointReq { - pub name: Option, - #[serde(default, with = "double_option")] - pub description: Option>, - #[serde(default, with = "double_option")] - pub unit: Option>, - #[serde(default, with = "double_option")] - pub tag_id: Option>, - #[serde(default, with = "double_option")] - pub equipment_id: Option>, - #[serde(default, with = "double_option")] - pub signal_role: Option>, -} - -/// Request payload for batch setting point tags. -#[derive(Deserialize, Validate)] -pub struct BatchSetPointTagsReq { - #[validate(length(min = 1, max = 500))] - pub point_ids: Vec, - pub tag_id: Option, -} - -#[derive(Deserialize, Validate)] -pub struct BatchSetPointEquipmentReq { - #[validate(length(min = 1, max = 500))] - pub point_ids: Vec, - pub equipment_id: Option, - pub signal_role: Option, -} - -/// Update point metadata (name/description/unit only). -pub async fn update_point( - State(state): State, - Path(point_id): Path, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let pool = &state.pool; - - if payload.name.is_none() - && payload.description.is_none() - && payload.unit.is_none() - && payload.tag_id.is_none() - && payload.equipment_id.is_none() - && payload.signal_role.is_none() - { - return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); - } - - // If tag_id is provided, ensure tag exists. - if let Some(Some(tag_id)) = payload.tag_id { - let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) - .bind(tag_id) - .fetch_optional(pool) - .await? - .is_some(); - - if !tag_exists { - return Err(ApiErr::NotFound("Tag not found".to_string(), None)); - } - } - - if let Some(Some(equipment_id)) = payload.equipment_id { - let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) - .bind(equipment_id) - .fetch_optional(pool) - .await? - .is_some(); - - if !equipment_exists { - return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); - } - } - - // Ensure target point exists. - let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) - .bind(point_id) - .fetch_optional(pool) - .await?; - if existing_point.is_none() { - return Err(ApiErr::NotFound("Point not found".to_string(), None)); - } - let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; - - let mut qb: QueryBuilder = QueryBuilder::new("UPDATE point SET "); - let mut wrote_field = false; - - if let Some(name) = &payload.name { - if wrote_field { - qb.push(", "); - } - qb.push("name = ").push_bind(name); - wrote_field = true; - } - if let Some(description) = &payload.description { - if wrote_field { - qb.push(", "); - } - qb.push("description = ").push_bind(description.as_deref()); - wrote_field = true; - } - if let Some(unit) = &payload.unit { - if wrote_field { - qb.push(", "); - } - qb.push("unit = ").push_bind(unit.as_deref()); - wrote_field = true; - } - if let Some(tag_id) = &payload.tag_id { - if wrote_field { - qb.push(", "); - } - qb.push("tag_id = ").push_bind(tag_id.as_ref()); - wrote_field = true; - } - if let Some(equipment_id) = &payload.equipment_id { - if wrote_field { - qb.push(", "); - } - qb.push("equipment_id = ").push_bind(equipment_id.as_ref()); - wrote_field = true; - } - if let Some(signal_role) = &payload.signal_role { - if wrote_field { - qb.push(", "); - } - qb.push("signal_role = ").push_bind(signal_role.as_deref()); - wrote_field = true; - } - - if wrote_field { - qb.push(", "); - } - qb.push("updated_at = NOW()"); - - qb.push(" WHERE id = ").push_bind(point_id); - qb.build().execute(pool).await?; - - let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; - notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await; - - Ok(Json( - serde_json::json!({"ok_msg": "Point updated successfully"}), - )) -} - -/// Batch set point tags. -pub async fn batch_set_point_tags( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - if payload.point_ids.is_empty() { - return Err(ApiErr::BadRequest( - "point_ids cannot be empty".to_string(), - None, - )); - } - - let pool = &state.pool; - - // If tag_id is provided, ensure tag exists. - if let Some(tag_id) = payload.tag_id { - let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) - .bind(tag_id) - .fetch_optional(pool) - .await? - .is_some(); - - if !tag_exists { - return Err(ApiErr::NotFound("Tag not found".to_string(), None)); - } - } - - // Check which points exist - let existing_points: Vec = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#) - .bind(&payload.point_ids) - .fetch_all(pool) - .await? - .into_iter() - .map(|row: sqlx::postgres::PgRow| row.get::("id")) - .collect(); - - if existing_points.is_empty() { - return Err(ApiErr::NotFound("No valid points found".to_string(), None)); - } - - // Update tag_id for all existing points - let result = - sqlx::query(r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#) - .bind(payload.tag_id) - .bind(&existing_points) - .execute(pool) - .await?; - - Ok(Json(serde_json::json!({ - "ok_msg": "Point tags updated successfully", - "updated_count": result.rows_affected() - }))) -} - -pub async fn batch_set_point_equipment( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - if payload.point_ids.is_empty() { - return Err(ApiErr::BadRequest( - "point_ids cannot be empty".to_string(), - None, - )); - } - - let pool = &state.pool; - - if let Some(equipment_id) = payload.equipment_id { - let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) - .bind(equipment_id) - .fetch_optional(pool) - .await? - .is_some(); - - if !equipment_exists { - return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); - } - } - - let existing_points: Vec = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#) - .bind(&payload.point_ids) - .fetch_all(pool) - .await? - .into_iter() - .map(|row: sqlx::postgres::PgRow| row.get::("id")) - .collect(); - - if existing_points.is_empty() { - return Err(ApiErr::NotFound("No valid points found".to_string(), None)); - } - - let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; - - let result = sqlx::query( - r#" - UPDATE point - SET equipment_id = $1, - signal_role = $2, - updated_at = NOW() - WHERE id = ANY($3) - "#, - ) - .bind(payload.equipment_id) - .bind(payload.signal_role.as_deref()) - .bind(&existing_points) - .execute(pool) - .await?; - - let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; - notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await; - - Ok(Json(serde_json::json!({ - "ok_msg": "Point equipment updated successfully", - "updated_count": result.rows_affected() - }))) -} - -/// Delete one point by id. -pub async fn delete_point( - State(state): State, - Path(point_id): Path, -) -> Result { - let pool = &state.pool; - let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; - - let source_id = { - let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?; - grouped.keys().next().copied() - }; - - // Ensure target point exists. - let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) - .bind(point_id) - .fetch_optional(pool) - .await?; - if existing_point.is_none() { - return Err(ApiErr::NotFound("Point not found".to_string(), None)); - } - - // Delete point. - sqlx::query(r#"delete from point WHERE id = $1"#) - .bind(point_id) - .execute(pool) - .await?; - - if let Some(source_id) = source_id { - if let Err(e) = state - .event_manager - .send(crate::event::AppEvent::PointDeleteBatch { - source_id, - point_ids: vec![point_id], - }) - { - tracing::error!("Failed to send PointDeleteBatch event: {}", e); - } - } - - notify_units(&state, affected_unit_ids).await; - - Ok(Json( - serde_json::json!({"ok_msg": "Point deleted successfully"}), - )) -} - -#[derive(Deserialize, Validate)] -/// Request payload for batch point creation from node ids. -pub struct BatchCreatePointsReq { - #[validate(length(min = 1, max = 500))] - pub node_ids: Vec, -} - -#[derive(Serialize)] -/// Response payload for batch point creation. -pub struct BatchCreatePointsRes { - pub success_count: usize, - pub failed_count: usize, - pub failed_node_ids: Vec, - pub created_point_ids: Vec, -} - -/// Batch create points by node ids. -pub async fn batch_create_points( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let pool = &state.pool; - - if payload.node_ids.is_empty() { - return Err(ApiErr::BadRequest( - "node_ids cannot be empty".to_string(), - None, - )); - } - - // Use one transaction for the full batch. - let mut tx = pool.begin().await?; - let node_ids = payload.node_ids; - - let nodes: Vec = sqlx::query_as::<_, Node>(r#"SELECT * FROM node WHERE id = ANY($1)"#) - .bind(&node_ids) - .fetch_all(&mut *tx) - .await?; - - let node_map: HashMap = nodes.into_iter().map(|node| (node.id, node)).collect(); - - let existing_node_ids: HashSet = node_map.keys().copied().collect(); - let mut failed_node_ids = Vec::new(); - for node_id in &node_ids { - if !existing_node_ids.contains(node_id) { - failed_node_ids.push(*node_id); - } - } - - let existing_point_node_ids: HashSet = - sqlx::query_scalar::<_, Uuid>(r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#) - .bind(&node_ids) - .fetch_all(&mut *tx) - .await? - .into_iter() - .collect(); - - let mut to_create = Vec::new(); - let mut seen_creatable = HashSet::new(); - for node_id in node_ids { - if !existing_node_ids.contains(&node_id) || existing_point_node_ids.contains(&node_id) { - continue; - } - - if !seen_creatable.insert(node_id) { - continue; - } - - let name = node_map - .get(&node_id) - .map(|node| node.browse_name.clone()) - .unwrap_or_else(|| format!("Point_{}", node_id)); - to_create.push((Uuid::new_v4(), node_id, name)); - } - - let mut created_point_ids = Vec::with_capacity(to_create.len()); - if !to_create.is_empty() { - let mut qb = QueryBuilder::new("INSERT INTO point (id, node_id, name) "); - qb.push_values(to_create.iter(), |mut b, (id, node_id, name)| { - b.push_bind(*id).push_bind(*node_id).push_bind(name); - }); - qb.build().execute(&mut *tx).await?; - created_point_ids.extend(to_create.into_iter().map(|(id, _, _)| id)); - } - - // Commit the transaction. - tx.commit().await?; - - // Emit grouped create events by source. - if !created_point_ids.is_empty() { - let grouped = - crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?; - for (source_id, points) in grouped { - let point_ids: Vec = points.into_iter().map(|p| p.point_id).collect(); - if let Err(e) = state - .event_manager - .send(crate::event::AppEvent::PointCreateBatch { - source_id, - point_ids, - }) - { - tracing::error!("Failed to send PointCreateBatch event: {}", e); - } - } - } - - Ok(Json(BatchCreatePointsRes { - success_count: created_point_ids.len(), - failed_count: failed_node_ids.len(), - failed_node_ids, - created_point_ids, - })) -} - -#[derive(Deserialize, Validate)] -/// Request payload for batch point deletion. -pub struct BatchDeletePointsReq { - #[validate(length(min = 1, max = 500))] - pub point_ids: Vec, -} - -#[derive(Serialize)] -/// Response payload for batch point deletion. -pub struct BatchDeletePointsRes { - pub deleted_count: u64, -} - -/// Batch delete points and emit grouped delete events by source. -pub async fn batch_delete_points( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - if payload.point_ids.is_empty() { - return Err(ApiErr::BadRequest( - "point_ids cannot be empty".to_string(), - None, - )); - } - - let pool = &state.pool; - let point_ids = payload.point_ids; - - let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?; - let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?; - let existing_point_ids: Vec = grouped - .values() - .flat_map(|points| points.iter().map(|p| p.point_id)) - .collect(); - - if existing_point_ids.is_empty() { - return Ok(Json(BatchDeletePointsRes { deleted_count: 0 })); - } - - let result = sqlx::query(r#"DELETE FROM point WHERE id = ANY($1)"#) - .bind(&existing_point_ids) - .execute(pool) - .await?; - - for (source_id, points) in grouped { - let ids: Vec = points.into_iter().map(|p| p.point_id).collect(); - if let Err(e) = state - .event_manager - .send(crate::event::AppEvent::PointDeleteBatch { - source_id, - point_ids: ids, - }) - { - tracing::error!("Failed to send PointDeleteBatch event: {}", e); - } - } - - notify_units(&state, affected_unit_ids).await; - - Ok(Json(BatchDeletePointsRes { - deleted_count: result.rows_affected(), - })) -} - -pub async fn batch_set_point_value( - State(state): State, - headers: HeaderMap, - Json(payload): Json, -) -> Result { - let write_key = headers - .get("X-Write-Key") - .and_then(|v| v.to_str().ok()) - .unwrap_or_default(); - - if !state.config.verify_write_key(write_key) { - return Err(ApiErr::Forbidden( - "write permission denied".to_string(), - Some(serde_json::json!({ - "hint": "set WRITE_API_KEY (or legacy WRITE_KEY) and pass header X-Write-Key" - })), - )); - } - - let result = state - .connection_manager - .write_point_values_batch(payload) - .await - .map_err(|e| ApiErr::Internal(e, None))?; - Ok(Json(result)) -} - -fn monitor_value_to_number(item: &crate::telemetry::PointMonitorInfo) -> Option { - match item.value.as_ref()? { - crate::telemetry::DataValue::Int(v) => Some(*v as f64), - crate::telemetry::DataValue::UInt(v) => Some(*v as f64), - crate::telemetry::DataValue::Float(v) => Some(*v), - crate::telemetry::DataValue::Bool(v) => Some(if *v { 1.0 } else { 0.0 }), - crate::telemetry::DataValue::Text(v) => v.parse::().ok(), - _ => None, - } -} - diff --git a/src/handler/source.rs b/src/handler/source.rs deleted file mode 100644 index 33f8dc9..0000000 --- a/src/handler/source.rs +++ /dev/null @@ -1,626 +0,0 @@ -use axum::{Json, extract::{Path, State}, http::StatusCode, response::IntoResponse}; -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; -use validator::Validate; -use opcua::types::{ - NodeId, BrowseDescription, ReferenceDescription, - BrowseDirection as OpcuaBrowseDirection, Identifier, ReadValueId, AttributeId, NumericRange, TimestampsToReturn, Variant -}; -use opcua::types::ReferenceTypeId; -use opcua::client::Session; -use std::collections::{HashMap, VecDeque}; - -use plc_platform_core::util::response::ApiErr; - -use anyhow::{Context}; -use plc_platform_core::model::{Node, Source}; -use crate::AppState; -use sqlx::QueryBuilder; - -// 树节点结构体 -#[derive(Debug, Serialize, Clone)] -pub struct TreeNode { - pub id: Uuid, - pub source_id: Uuid, - pub external_id: String, - pub namespace_uri: Option, - pub namespace_index: Option, - pub identifier_type: Option, - pub identifier: Option, - pub browse_name: String, - pub display_name: Option, - pub node_class: String, - pub parent_id: Option, - pub children: Vec, -} - -impl TreeNode { - fn from_node(node: Node) -> Self { - TreeNode { - id: node.id, - source_id: node.source_id, - external_id: node.external_id, - namespace_uri: node.namespace_uri, - namespace_index: node.namespace_index, - identifier_type: node.identifier_type, - identifier: node.identifier, - browse_name: node.browse_name, - display_name: node.display_name, - node_class: node.node_class, - parent_id: node.parent_id, - children: Vec::new(), - } - } -} - - -// 带连接状态的Source响应结构? -#[derive(Debug, Serialize, Clone)] -pub struct SourceWithStatus { - #[serde(flatten)] - pub source: SourcePublic, - pub is_connected: bool, - pub last_error: Option, - #[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")] - pub last_time: Option>, -} - -#[derive(Debug, Serialize, Clone)] -pub struct SourcePublic { - pub id: Uuid, - pub name: String, - pub protocol: String, - pub endpoint: String, - pub security_policy: Option, - pub security_mode: Option, - pub enabled: bool, - #[serde(serialize_with = "plc_platform_core::util::datetime::utc_to_local_str")] - pub created_at: DateTime, - #[serde(serialize_with = "plc_platform_core::util::datetime::utc_to_local_str")] - pub updated_at: DateTime, -} - -impl From for SourcePublic { - fn from(source: Source) -> Self { - Self { - id: source.id, - name: source.name, - protocol: source.protocol, - endpoint: source.endpoint, - security_policy: source.security_policy, - security_mode: source.security_mode, - enabled: source.enabled, - created_at: source.created_at, - updated_at: source.updated_at, - } - } -} - -pub async fn get_source_list(State(state): State) -> Result { - let pool = &state.pool; - let sources: Vec = crate::service::get_all_enabled_sources(pool).await?; - - // 获取所有连接状? - let status_map: std::collections::HashMap, Option>)> = - state.connection_manager.get_all_status().await - .into_iter() - .map(|(source_id, s)| (source_id, (s.is_connected, s.last_error, Some(s.last_time)))) - .collect(); - - // 组合Source和连接状? - let sources_with_status: Vec = sources - .into_iter() - .map(|source| { - let (is_connected, last_error, last_time) = status_map - .get(&source.id) - .map(|(connected, error, time)| (*connected, error.clone(), *time)) - .unwrap_or((false, None, None)); - SourceWithStatus { - source: source.into(), - is_connected, - last_error, - last_time, - } - }) - .collect(); - - Ok(Json(sources_with_status)) -} - -pub async fn get_node_tree( - State(state): State, - Path(source_id): Path, -) -> Result { - let pool = &state.pool; - - // 查询所有属于该source的节? - let nodes: Vec = sqlx::query_as::<_, Node>( - r#"SELECT * FROM node WHERE source_id = $1 ORDER BY created_at"#, - ) - .bind(source_id) - .fetch_all(pool) - .await?; - - // 构建节点? - let tree = build_node_tree(nodes); - - Ok(Json(tree)) -} - -fn build_node_tree(nodes: Vec) -> Vec { - let mut node_map: HashMap = HashMap::new(); - let mut children_map: HashMap> = HashMap::new(); - let mut roots: Vec = Vec::new(); - - // ?转换 + 记录 parent 关系 - for node in nodes { - let tree_node = TreeNode::from_node(node); - let id = tree_node.id; - - if let Some(pid) = tree_node.parent_id { - children_map.entry(pid).or_default().push(id); - } else { - roots.push(id); - } - - node_map.insert(id, tree_node); - } - - // ?递归构建 - fn attach_children( - id: Uuid, - node_map: &mut HashMap, - children_map: &HashMap>, - ) -> Option { - let mut node = node_map.remove(&id)?; - - if let Some(child_ids) = children_map.get(&id) { - for &cid in child_ids { - if let Some(child) = attach_children(cid, node_map, children_map) { - node.children.push(child); - } - } - } - - Some(node) - } - - // ?生成最终树 - roots - .into_iter() - .filter_map(|rid| attach_children(rid, &mut node_map, &children_map)) - .collect() -} - - -#[derive(Deserialize, Validate)] -pub struct CreateSourceReq { - pub name: String, - pub endpoint: String, - pub enabled: bool, -} - -#[derive(Serialize)] -pub struct CreateSourceRes { - pub id: Uuid, -} - -pub async fn create_source( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let pool = &state.pool; - let new_id = Uuid::new_v4(); - - sqlx::query( - r#"INSERT INTO source (id, name, endpoint, enabled, protocol) VALUES ($1, $2, $3, $4, $5)"#, - ) - .bind(new_id) - .bind(&payload.name) - .bind(&payload.endpoint) - .bind(payload.enabled) - .bind("opcua") //默认opcua协议 - .execute(pool) - .await?; - - // 触发 SourceCreate 事件 - let _ = state.event_manager.send(crate::event::AppEvent::SourceCreate { source_id: new_id }); - - Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id }))) -} - -#[derive(Deserialize, Validate)] -pub struct UpdateSourceReq { - pub name: Option, - pub endpoint: Option, - pub enabled: Option, - pub security_policy: Option, - pub security_mode: Option, - pub username: Option, - pub password: Option, -} - -pub async fn update_source( - State(state): State, - Path(source_id): Path, - Json(payload): Json, -) -> Result { - payload.validate()?; - - if payload.name.is_none() - && payload.endpoint.is_none() - && payload.enabled.is_none() - && payload.security_policy.is_none() - && payload.security_mode.is_none() - && payload.username.is_none() - && payload.password.is_none() - { - return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); - } - - let pool = &state.pool; - - let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1") - .bind(source_id) - .fetch_optional(pool) - .await? - .is_some(); - if !exists { - return Err(ApiErr::NotFound(format!("Source with id {} not found", source_id), None)); - } - - let mut qb = QueryBuilder::new("UPDATE source SET "); - let mut sep = qb.separated(", "); - - if let Some(name) = &payload.name { - sep.push("name = ").push_bind(name); - } - if let Some(endpoint) = &payload.endpoint { - sep.push("endpoint = ").push_bind(endpoint); - } - if let Some(enabled) = payload.enabled { - sep.push("enabled = ").push_bind(enabled); - } - if let Some(security_policy) = &payload.security_policy { - sep.push("security_policy = ").push_bind(security_policy); - } - if let Some(security_mode) = &payload.security_mode { - sep.push("security_mode = ").push_bind(security_mode); - } - if let Some(username) = &payload.username { - sep.push("username = ").push_bind(username); - } - if let Some(password) = &payload.password { - sep.push("password = ").push_bind(password); - } - - sep.push("updated_at = NOW()"); - - qb.push(" WHERE id = ").push_bind(source_id); - qb.build().execute(pool).await?; - - let _ = state.event_manager.send(crate::event::AppEvent::SourceUpdate { source_id }); - - Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"}))) -} - -pub async fn delete_source( - State(state): State, - Path(source_id): Path, -) -> Result { - let pool = &state.pool; - - let source_name = sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") - .bind(source_id) - .fetch_optional(pool) - .await? - .ok_or_else(|| ApiErr::NotFound(format!("Source with id {} not found", source_id), None))?; - - sqlx::query("DELETE FROM source WHERE id = $1") - .bind(source_id) - .execute(pool) - .await?; - - // 触发 SourceDelete 事件 - let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id, source_name }); - - Ok(StatusCode::NO_CONTENT) -} - -pub async fn reconnect_source( - State(state): State, - Path(source_id): Path, -) -> Result { - let pool = &state.pool; - - let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1") - .bind(source_id) - .fetch_optional(pool) - .await? - .is_some(); - if !exists { - return Err(ApiErr::NotFound( - format!("Source with id {} not found", source_id), - None, - )); - } - - state - .connection_manager - .reconnect(pool, source_id) - .await - .map_err(|e| ApiErr::Internal(e, None))?; - - Ok(Json(serde_json::json!({"ok_msg": "Source reconnected successfully"}))) -} - -pub async fn browse_and_save_nodes( - State(state): State, - Path(source_id): Path, -) -> Result { - - let pool = &state.pool; - - // 确认 source 存在 - sqlx::query("SELECT 1 FROM source WHERE id = $1") - .bind(source_id) - .fetch_one(pool) - .await?; - - let session = state.connection_manager - .get_session(source_id) - .await - .ok_or_else(|| anyhow::anyhow!("Source not connected"))?; - - // 读取 namespace 映射 - let namespace_map = load_namespace_map(&session).await - .context("Failed to load namespace map")?; - - // 开启事务(整次浏览一个事务) - let mut tx = pool.begin().await - .context("Failed to begin transaction")?; - - let mut processed_nodes: HashMap = HashMap::new(); - let mut queue: VecDeque<(NodeId, Option)> = VecDeque::new(); - - queue.push_back((NodeId::objects_folder_id(), None)); - - while let Some((node_id, parent_id)) = queue.pop_front() { - browse_single_node( - &session, - &mut tx, - source_id, - &node_id, - parent_id, - &namespace_map, - &mut processed_nodes, - &mut queue, - ).await - .with_context(|| format!("Failed to browse node: {:?}", node_id))?; - } - - tx.commit().await - .context("Failed to commit transaction")?; - - Ok(Json(serde_json::json!({ - "ok_msg": "Browse completed", - "total_nodes": processed_nodes.len() - }))) -} - -//////////////////////////////////////////////////////////////// -// 浏览单个节点(含 continuation? -//////////////////////////////////////////////////////////////// - -async fn browse_single_node( - session: &Session, - tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - source_id: Uuid, - node_id: &NodeId, - parent_id: Option, - namespace_map: &HashMap, - processed_nodes: &mut HashMap, - queue: &mut VecDeque<(NodeId, Option)>, -) -> anyhow::Result<()> { - - let browse_desc = BrowseDescription { - node_id: node_id.clone(), - browse_direction: OpcuaBrowseDirection::Forward, - reference_type_id: ReferenceTypeId::HierarchicalReferences.into(), - include_subtypes: true, - node_class_mask: 0, - result_mask: 0x3F, - }; - - let mut results = session.browse(&[browse_desc], 0u32, None).await - .context("Failed to browse node")?; - - loop { - let result = &results[0]; - - if let Some(refs) = &result.references { - for ref_desc in refs { - process_reference( - ref_desc, - tx, - source_id, - parent_id, - namespace_map, - processed_nodes, - queue, - ).await - .with_context(|| format!("Failed to process reference: {:?}", ref_desc.node_id.node_id))?; - } - } - - if !result.continuation_point.is_null() { - let cp = result.continuation_point.clone(); - results = session.browse_next(false, &[cp]).await - .context("Failed to browse next")?; - } else { - break; - } - } - - Ok(()) -} - -//////////////////////////////////////////////////////////////// -// 处理单个 Reference(核心优化版? -//////////////////////////////////////////////////////////////// - -async fn process_reference( - ref_desc: &ReferenceDescription, - tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - source_id: Uuid, - parent_id: Option, - namespace_map: &HashMap, - processed_nodes: &mut HashMap, - queue: &mut VecDeque<(NodeId, Option)>, -) -> anyhow::Result<()> { - - let node_id_obj = &ref_desc.node_id.node_id; - let node_id_str = node_id_obj.to_string(); - - // 内存去重 - if processed_nodes.contains_key(&node_id_str) { - return Ok(()); - } - - let (namespace_index, identifier_type, identifier) = - parse_node_id(node_id_obj); - - let namespace_uri = namespace_map - .get(&(namespace_index.unwrap_or(0) as i32)) - .cloned() - .unwrap_or_default(); - - let browse_name = ref_desc.browse_name.name.to_string(); - let display_name = ref_desc.display_name.text.to_string(); - let node_class = format!("{:?}", ref_desc.node_class); - - let effective_parent_id = if let Some(pid) = parent_id { - let parent_exists = sqlx::query(r#"SELECT 1 FROM node WHERE id = $1"#) - .bind(pid) - .fetch_optional(tx.as_mut()) - .await?; - if parent_exists.is_some() { - Some(pid) - } else { - None - } - } else { - None - }; - - // Use RETURNING id so queue always carries the actual DB node id. - let persisted_node_id = sqlx::query_scalar::<_, Uuid>( - r#" - INSERT INTO node ( - id, - source_id, - external_id, - namespace_uri, - namespace_index, - identifier_type, - identifier, - browse_name, - display_name, - node_class, - parent_id - ) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) - ON CONFLICT(source_id, external_id) DO UPDATE SET - namespace_uri = excluded.namespace_uri, - namespace_index = excluded.namespace_index, - identifier_type = excluded.identifier_type, - identifier = excluded.identifier, - browse_name = excluded.browse_name, - display_name = excluded.display_name, - node_class = excluded.node_class, - parent_id = COALESCE(excluded.parent_id, node.parent_id), - updated_at = NOW() - RETURNING id - "#, - ) - .bind(Uuid::new_v4()) - .bind(source_id) - .bind(&node_id_str) - .bind(&namespace_uri) - .bind(namespace_index.map(|v| v as i32)) - .bind(&identifier_type) - .bind(&identifier) - .bind(&browse_name) - .bind(&display_name) - .bind(&node_class) - .bind(effective_parent_id) - .fetch_one(tx.as_mut()) - .await - .context("Failed to execute UPSERT query")?; - - processed_nodes.insert(node_id_str.clone(), ()); - queue.push_back((node_id_obj.clone(), Some(persisted_node_id))); - - Ok(()) -} - -//////////////////////////////////////////////////////////////// -// 解析 NodeId -//////////////////////////////////////////////////////////////// - -fn parse_node_id(node_id: &NodeId) -> (Option, Option, String) { - - let namespace_index = Some(node_id.namespace); - - let (identifier_type, identifier) = match &node_id.identifier { - Identifier::Numeric(i) => ("i".to_string(), i.to_string()), - Identifier::String(s) => ("s".to_string(), s.to_string()), - Identifier::Guid(g) => ("g".to_string(), g.to_string()), - Identifier::ByteString(b) => ("b".to_string(), format!("{:?}", b)), - }; - - (namespace_index, Some(identifier_type), identifier) -} - -//////////////////////////////////////////////////////////////// -// 读取 NamespaceArray -//////////////////////////////////////////////////////////////// - -async fn load_namespace_map( - session: &Session, -) -> anyhow::Result> { - // 读取命名空间数组节点 - let ns_node = NodeId::new(0, 2255); - let read_request = ReadValueId { - node_id: ns_node, - attribute_id: AttributeId::Value as u32, - index_range: NumericRange::None, - data_encoding: Default::default(), - }; - - // 执行读取操作 - let result = session.read(&[read_request], TimestampsToReturn::Neither, 0f64).await - .context("Failed to read namespace map")?; - - // 解析并构建命名空间映? - let mut map = HashMap::new(); - if let Some(value) = &result[0].value { - if let Variant::Array(array) = value { - for (i, item) in array.values.iter().enumerate() { - if let Ok(index) = i32::try_from(i) { - if let Variant::String(uri) = item { - map.insert(index, uri.to_string()); - } - } - } - } - } - - Ok(map) -} - - - diff --git a/src/handler/tag.rs b/src/handler/tag.rs deleted file mode 100644 index dcc28fc..0000000 --- a/src/handler/tag.rs +++ /dev/null @@ -1,126 +0,0 @@ -use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse}; -use serde::Deserialize; -use uuid::Uuid; -use validator::Validate; - -use plc_platform_core::util::{ - pagination::{PaginatedResponse, PaginationParams}, - response::ApiErr, -}; -use crate::{AppState}; - -/// List all tags. -#[derive(Deserialize, Validate)] -pub struct GetTagListQuery { - #[serde(flatten)] - pub pagination: PaginationParams, -} - -pub async fn get_tag_list( - State(state): State, - Query(query): Query, -) -> Result { - query.validate()?; - let pool = &state.pool; - - // Count total rows. - let total = crate::service::get_tags_count(pool).await?; - - // Load current page rows. - let tags = crate::service::get_tags_paginated( - pool, - query.pagination.page_size, - query.pagination.offset(), - ).await?; - - let response = PaginatedResponse::new(tags, total, query.pagination.page, query.pagination.page_size); - - Ok(Json(response)) -} - -/// List points under a tag. -pub async fn get_tag_points( - State(state): State, - Path(tag_id): Path, -) -> Result { - let points = crate::service::get_tag_points(&state.pool, tag_id).await?; - Ok(Json(points)) -} - -#[derive(Debug, Deserialize, Validate)] -pub struct CreateTagReq { - #[validate(length(min = 1, max = 100))] - pub name: String, - pub description: Option, - pub point_ids: Option>, -} - -#[derive(Debug, Deserialize, Validate)] -pub struct UpdateTagReq { - #[validate(length(min = 1, max = 100))] - pub name: Option, - pub description: Option, - pub point_ids: Option>, -} - -/// Create a tag. -pub async fn create_tag( - State(state): State, - Json(payload): Json, -) -> Result { - payload.validate()?; - - let point_ids = payload.point_ids.as_deref().unwrap_or(&[]); - let tag_id = crate::service::create_tag( - &state.pool, - &payload.name, - payload.description.as_deref(), - point_ids, - ).await?; - - Ok((StatusCode::CREATED, Json(serde_json::json!({ - "id": tag_id, - "ok_msg": "Tag created successfully" - })))) -} - -/// Update a tag. -pub async fn update_tag( - State(state): State, - Path(tag_id): Path, - Json(payload): Json, -) -> Result { - payload.validate()?; - - // Ensure the target tag exists. - let exists = crate::service::get_tag_by_id(&state.pool, tag_id).await?; - if exists.is_none() { - return Err(ApiErr::NotFound("Tag not found".to_string(), None)); - } - - crate::service::update_tag( - &state.pool, - tag_id, - payload.name.as_deref(), - payload.description.as_deref(), - payload.point_ids.as_deref(), - ).await?; - - Ok(Json(serde_json::json!({ - "ok_msg": "Tag updated successfully" - }))) -} - -/// Delete a tag. -pub async fn delete_tag( - State(state): State, - Path(tag_id): Path, -) -> Result { - let deleted = crate::service::delete_tag(&state.pool, tag_id).await?; - - if !deleted { - return Err(ApiErr::NotFound("Tag not found".to_string(), None)); - } - - Ok(StatusCode::NO_CONTENT) -} diff --git a/src/middleware.rs b/src/middleware.rs deleted file mode 100644 index a06052d..0000000 --- a/src/middleware.rs +++ /dev/null @@ -1,37 +0,0 @@ -use axum::{ - body::Body, - http::Request, - middleware::Next, - response::Response, -}; -use std::time::Instant; - -pub async fn simple_logger( - req: Request, - next: Next, -) -> Response { - // Borrow the path string directly; no clone needed. - let method = req.method().to_string(); - let uri = req.uri().to_string(); // `Uri::to_string()` allocates the owned string once. - - let start = Instant::now(); - let res = next.run(req).await; - let duration = start.elapsed(); - let status = res.status(); - match status.as_u16() { - 100..=399 => { - tracing::info!("{} {} {} {:?}", method, uri, status, duration); - } - 400..=499 => { - tracing::warn!("{} {} {} {:?}", method, uri, status, duration); - } - 500..=599 => { - tracing::error!("{} {} {} {:?}", method, uri, status, duration); - } - _ => { - tracing::warn!("{} {} {} {:?}", method, uri, status, duration); - } - } - - res -} diff --git a/src/websocket.rs b/src/websocket.rs deleted file mode 100644 index de773f2..0000000 --- a/src/websocket.rs +++ /dev/null @@ -1,271 +0,0 @@ -use axum::{ - extract::{ - ws::{Message, WebSocket, WebSocketUpgrade}, - Path, State, - }, - response::IntoResponse, -}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::{broadcast, RwLock}; -use uuid::Uuid; - -/// WebSocket message payload types. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", content = "data")] -pub enum WsMessage { - PointNewValue(crate::telemetry::PointMonitorInfo), - PointSetValueBatchResult(crate::connection::BatchSetPointValueRes), - EventCreated(plc_platform_core::model::EventRecord), - UnitRuntimeChanged(crate::control::runtime::UnitRuntime), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", content = "data", rename_all = "snake_case")] -pub enum WsClientMessage { - AuthWrite(WsAuthWriteReq), - PointSetValueBatch(crate::connection::BatchSetPointValueReq), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WsAuthWriteReq { - pub key: String, -} - -/// Room manager: room_id -> broadcast sender. -#[derive(Clone)] -pub struct RoomManager { - rooms: Arc>>>, -} - -impl RoomManager { - pub fn new() -> Self { - Self { - rooms: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Get or create room sender. - pub async fn get_or_create_room(&self, room_id: &str) -> broadcast::Sender { - let mut rooms = self.rooms.write().await; - - if let Some(sender) = rooms.get(room_id) { - return sender.clone(); - } - - let (sender, _) = broadcast::channel(100); - rooms.insert(room_id.to_string(), sender.clone()); - tracing::info!("Created new room: {}", room_id); - sender - } - - /// Get room sender if room exists. - pub async fn get_room(&self, room_id: &str) -> Option> { - let rooms = self.rooms.read().await; - rooms.get(room_id).cloned() - } - - /// Remove room if there are no receivers left. - pub async fn remove_room_if_empty(&self, room_id: &str) { - let mut rooms = self.rooms.write().await; - let should_remove = rooms - .get(room_id) - .map(|sender| sender.receiver_count() == 0) - .unwrap_or(false); - - if should_remove { - rooms.remove(room_id); - tracing::info!("Removed empty room: {}", room_id); - } - } - - /// Send message to room. - /// - /// Returns: - /// - Ok(n): n subscribers received it - /// - Ok(0): room missing or no active subscribers - pub async fn send_to_room(&self, room_id: &str, message: WsMessage) -> Result { - if let Some(sender) = self.get_room(room_id).await { - match sender.send(message) { - Ok(count) => Ok(count), - // No receiver is not exceptional in push scenarios. - Err(broadcast::error::SendError(_)) => Ok(0), - } - } else { - Ok(0) - } - } - -} - -impl Default for RoomManager { - fn default() -> Self { - Self::new() - } -} - -/// WebSocket manager. -#[derive(Clone)] -pub struct WebSocketManager { - public_room: Arc, -} - -impl WebSocketManager { - pub fn new() -> Self { - Self { - public_room: Arc::new(RoomManager::new()), - } - } - - /// Send message to public room. - pub async fn send_to_public(&self, message: WsMessage) -> Result { - self.public_room.get_or_create_room("public").await; - self.public_room.send_to_room("public", message).await - } - - /// Send message to a dedicated client room. - pub async fn send_to_client(&self, client_id: Uuid, message: WsMessage) -> Result { - self.public_room - .send_to_room(&client_id.to_string(), message) - .await - } -} - -impl Default for WebSocketManager { - fn default() -> Self { - Self::new() - } -} - -/// Public websocket handler. -pub async fn public_websocket_handler( - ws: WebSocketUpgrade, - State(state): State, -) -> impl IntoResponse { - let ws_manager = state.ws_manager.clone(); - let app_state = state.clone(); - ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, "public".to_string(), app_state)) -} - -/// Client websocket handler. -pub async fn client_websocket_handler( - ws: WebSocketUpgrade, - Path(client_id): Path, - State(state): State, -) -> impl IntoResponse { - let ws_manager = state.ws_manager.clone(); - let room_id = client_id.to_string(); - let app_state = state.clone(); - ws.on_upgrade(move |socket| handle_socket(socket, ws_manager, room_id, app_state)) -} - -/// Handle websocket connection for one room. -async fn handle_socket( - mut socket: WebSocket, - ws_manager: Arc, - room_id: String, - state: crate::AppState, -) { - let room_sender = ws_manager.public_room.get_or_create_room(&room_id).await; - let mut rx = room_sender.subscribe(); - let mut can_write = false; - - loop { - tokio::select! { - maybe_msg = socket.recv() => { - match maybe_msg { - Some(Ok(msg)) => { - if matches!(msg, Message::Close(_)) { - break; - } - match msg { - Message::Text(text) => { - match serde_json::from_str::(&text) { - Ok(WsClientMessage::AuthWrite(payload)) => { - can_write = state.config.verify_write_key(&payload.key); - if !can_write { - tracing::warn!("WebSocket write auth failed in room {}", room_id); - } - } - Ok(WsClientMessage::PointSetValueBatch(payload)) => { - let response = if !can_write { - crate::connection::BatchSetPointValueRes { - success: false, - err_msg: Some("write permission denied".to_string()), - success_count: 0, - failed_count: 0, - results: vec![], - } - } else { - match state.connection_manager.write_point_values_batch(payload).await { - Ok(v) => v, - Err(e) => crate::connection::BatchSetPointValueRes { - success: false, - err_msg: Some(e), - success_count: 0, - failed_count: 1, - results: vec![crate::connection::SetPointValueResItem { - point_id: Uuid::nil(), - success: false, - err_msg: Some("Internal write error".to_string()), - }], - }, - } - }; - if let Err(e) = ws_manager - .public_room - .send_to_room(&room_id, WsMessage::PointSetValueBatchResult(response)) - .await - { - tracing::error!( - "Failed to send PointSetValueBatchResult to room {}: {}", - room_id, - e - ); - } - } - Err(e) => { - tracing::warn!( - "Invalid websocket message in room {}: {}", - room_id, - e - ); - } - } - } - _ => { - tracing::debug!("Received WebSocket message from room {}: {:?}", room_id, msg); - } - } - } - Some(Err(e)) => { - tracing::error!("WebSocket error in room {}: {}", room_id, e); - break; - } - None => break, - } - } - room_message = rx.recv() => { - match room_message { - Ok(message) => match serde_json::to_string(&message) { - Ok(json_str) => { - if socket.send(Message::Text(json_str.into())).await.is_err() { - break; - } - } - Err(e) => { - tracing::error!("Failed to serialize websocket message: {}", e); - } - }, - Err(broadcast::error::RecvError::Lagged(skipped)) => { - tracing::warn!("WebSocket room {} lagged, skipped {} messages", room_id, skipped); - } - Err(broadcast::error::RecvError::Closed) => break, - } - } - } - } - - ws_manager.public_room.remove_room_if_empty(&room_id).await; -}