plc_control/src/event.rs

556 lines
21 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::collections::HashMap;
use tokio::sync::mpsc;
use uuid::Uuid;
const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024;
const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096;
#[derive(Debug, Clone)]
pub enum AppEvent {
SourceCreate {
source_id: Uuid,
},
SourceUpdate {
source_id: Uuid,
},
SourceDelete {
source_id: Uuid,
source_name: String,
},
PointCreateBatch {
source_id: Uuid,
point_ids: Vec<Uuid>,
},
PointDeleteBatch {
source_id: Uuid,
point_ids: Vec<Uuid>,
},
EquipmentStartCommandSent {
equipment_id: Uuid,
unit_id: Option<Uuid>,
point_id: Uuid,
},
EquipmentStopCommandSent {
equipment_id: Uuid,
unit_id: Option<Uuid>,
point_id: Uuid,
},
AutoControlStarted { unit_id: Uuid },
AutoControlStopped { unit_id: Uuid },
FaultLocked { unit_id: Uuid, equipment_id: Uuid },
FaultAcked { unit_id: Uuid },
CommLocked { unit_id: Uuid },
CommRecovered { unit_id: Uuid },
RemLocal { unit_id: Uuid, equipment_id: Uuid },
RemRecovered { unit_id: Uuid },
UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String },
PointNewValue(crate::telemetry::PointNewValue),
}
pub struct EventManager {
control_sender: mpsc::Sender<AppEvent>,
telemetry_sender: mpsc::Sender<crate::telemetry::PointNewValue>,
}
impl EventManager {
pub fn new(
pool: sqlx::PgPool,
connection_manager: std::sync::Arc<crate::connection::ConnectionManager>,
ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>,
) -> Self {
let (control_sender, mut control_receiver) =
mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
let (telemetry_sender, mut telemetry_receiver) =
mpsc::channel::<crate::telemetry::PointNewValue>(TELEMETRY_EVENT_CHANNEL_CAPACITY);
let control_cm = connection_manager.clone();
let control_pool = pool.clone();
let control_ws_manager = ws_manager.clone();
tokio::spawn(async move {
while let Some(event) = control_receiver.recv().await {
handle_control_event(event, &control_pool, &control_cm, control_ws_manager.as_ref())
.await;
}
});
let ws_manager_clone = ws_manager.clone();
let telemetry_cm = connection_manager.clone();
tokio::spawn(async move {
while let Some(payload) = telemetry_receiver.recv().await {
let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> =
HashMap::new();
latest_by_key.insert((payload.source_id, payload.client_handle), payload);
loop {
match telemetry_receiver.try_recv() {
Ok(next_payload) => {
latest_by_key.insert(
(next_payload.source_id, next_payload.client_handle),
next_payload,
);
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
break;
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
break;
}
}
}
for point_payload in latest_by_key.into_values() {
process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref())
.await;
}
}
});
Self {
control_sender,
telemetry_sender,
}
}
pub fn send(&self, event: AppEvent) -> Result<(), String> {
match event {
AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) {
Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
Err(format!("Failed to send telemetry event: channel closed ({e:?})"))
}
Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => {
// High-frequency telemetry is lossy by design under sustained pressure.
tracing::warn!(
"Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}",
payload.source_id,
payload.client_handle
);
Ok(())
}
},
control_event => match self.control_sender.try_send(control_event) {
Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
Err(format!("Failed to send control event: channel closed ({e:?})"))
}
Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => {
Err(format!("Failed to send control event: queue full ({e:?})"))
}
},
}
}
}
async fn handle_control_event(
event: AppEvent,
pool: &sqlx::PgPool,
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
) {
persist_event_if_needed(&event, pool, ws_manager).await;
match event {
AppEvent::SourceCreate { source_id } => {
tracing::info!("Processing SourceCreate event for {}", source_id);
if let Err(e) = connection_manager.connect_from_source(pool, source_id).await {
tracing::error!("Failed to connect to source {}: {}", source_id, e);
}
}
AppEvent::SourceUpdate { source_id } => {
tracing::info!("Processing SourceUpdate event for {}", source_id);
if let Err(e) = connection_manager.reconnect(pool, source_id).await {
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
}
}
AppEvent::SourceDelete { source_id, .. } => {
tracing::info!("Processing SourceDelete event for {}", source_id);
if let Err(e) = connection_manager.disconnect(source_id).await {
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
}
}
AppEvent::PointCreateBatch { source_id, point_ids } => {
let requested_count = point_ids.len();
match connection_manager
.subscribe_points_from_source(source_id, Some(point_ids), pool)
.await
{
Ok(stats) => {
let subscribed = *stats.get("subscribed").unwrap_or(&0);
let polled = *stats.get("polled").unwrap_or(&0);
let total = *stats.get("total").unwrap_or(&0);
tracing::info!(
"PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}",
source_id,
requested_count,
subscribed,
polled,
total
);
}
Err(e) => {
tracing::error!("Failed to subscribe to points: {}", e);
}
}
}
AppEvent::PointDeleteBatch { source_id, point_ids } => {
tracing::info!(
"Processing PointDeleteBatch event for source {} with {} points",
source_id,
point_ids.len()
);
if let Err(e) = connection_manager
.unsubscribe_points_from_source(source_id, point_ids)
.await
{
tracing::error!("Failed to unsubscribe points: {}", e);
}
}
AppEvent::EquipmentStartCommandSent {
equipment_id,
unit_id,
point_id,
} => {
tracing::info!(
"Equipment start command sent: equipment={}, unit={:?}, point={}",
equipment_id,
unit_id,
point_id
);
}
AppEvent::EquipmentStopCommandSent {
equipment_id,
unit_id,
point_id,
} => {
tracing::info!(
"Equipment stop command sent: equipment={}, unit={:?}, point={}",
equipment_id,
unit_id,
point_id
);
}
AppEvent::AutoControlStarted { unit_id } => {
tracing::info!("Auto control started for unit {}", unit_id);
}
AppEvent::AutoControlStopped { unit_id } => {
tracing::info!("Auto control stopped for unit {}", unit_id);
}
AppEvent::FaultLocked { unit_id, equipment_id } => {
tracing::warn!("Fault locked: unit={}, equipment={}", unit_id, equipment_id);
}
AppEvent::FaultAcked { unit_id } => {
tracing::info!("Fault acked for unit {}", unit_id);
}
AppEvent::CommLocked { unit_id } => {
tracing::warn!("Comm locked for unit {}", unit_id);
}
AppEvent::CommRecovered { unit_id } => {
tracing::info!("Comm recovered for unit {}", unit_id);
}
AppEvent::RemLocal { unit_id, equipment_id } => {
tracing::warn!("REM local: unit={}, equipment={}", unit_id, equipment_id);
}
AppEvent::RemRecovered { unit_id } => {
tracing::info!("REM recovered for unit {}", unit_id);
}
AppEvent::UnitStateChanged { unit_id, from_state, to_state } => {
tracing::info!("Unit {} state: {} → {}", unit_id, from_state, to_state);
}
AppEvent::PointNewValue(_) => {
tracing::warn!("PointNewValue routed to control worker unexpectedly");
}
}
}
async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn persist_event_if_needed(
event: &AppEvent,
pool: &sqlx::PgPool,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
) {
let record = match event {
AppEvent::SourceCreate { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"source.created", "info",
None, None, Some(*source_id),
format!("数据源【{}】已创建", name),
serde_json::json!({ "source_id": source_id }),
))
}
AppEvent::SourceUpdate { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"source.updated", "info",
None, None, Some(*source_id),
format!("数据源【{}】已更新", name),
serde_json::json!({ "source_id": source_id }),
))
}
AppEvent::SourceDelete { source_id, source_name } => Some((
"source.deleted", "warn",
None, None, None,
format!("数据源【{}】已删除", source_name),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::PointCreateBatch { source_id, point_ids } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"point.batch_created", "info",
None, None, Some(*source_id),
format!("批量创建 {} 个测点(数据源:{}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
AppEvent::PointDeleteBatch { source_id, point_ids } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"point.batch_deleted", "warn",
None, None, Some(*source_id),
format!("批量删除 {} 个测点(数据源:{}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => {
let code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"equipment.start_command_sent", "info",
*unit_id, Some(*equipment_id), None,
format!("已发送启动指令(设备:{}", code),
serde_json::json!({
"equipment_id": equipment_id,
"unit_id": unit_id,
"point_id": point_id
}),
))
}
AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id } => {
let code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"equipment.stop_command_sent", "info",
*unit_id, Some(*equipment_id), None,
format!("已发送停止指令(设备:{}", code),
serde_json::json!({
"equipment_id": equipment_id,
"unit_id": unit_id,
"point_id": point_id
}),
))
}
AppEvent::AutoControlStarted { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.auto_control_started", "info",
Some(*unit_id), None, None,
format!("已启动自动控制(单元:{}", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::AutoControlStopped { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.auto_control_stopped", "info",
Some(*unit_id), None, None,
format!("已停止自动控制(单元:{}", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::FaultLocked { unit_id, equipment_id } => {
let unit_code = fetch_unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"unit.fault_locked", "error",
Some(*unit_id), Some(*equipment_id), None,
format!("单元【{}】发生故障锁定,触发设备:{}", unit_code, eq_code),
serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
))
}
AppEvent::FaultAcked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.fault_acked", "info",
Some(*unit_id), None, None,
format!("单元【{}】故障已人工确认", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::CommLocked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.comm_locked", "warn",
Some(*unit_id), None, None,
format!("单元【{}】通讯中断", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::CommRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.comm_recovered", "info",
Some(*unit_id), None, None,
format!("单元【{}】通讯恢复", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::RemLocal { unit_id, equipment_id } => {
let unit_code = fetch_unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"unit.rem_local", "warn",
Some(*unit_id), Some(*equipment_id), None,
format!("单元【{}】切换为本地控制,触发设备:{},自动控制已停止", unit_code, eq_code),
serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
))
}
AppEvent::RemRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.rem_recovered", "warn",
Some(*unit_id), None, None,
format!("单元【{}】已切换回远程控制,自动控制需手动重新启动", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::UnitStateChanged { .. } => None,
AppEvent::PointNewValue(_) => None,
};
let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record else {
return;
};
let inserted = sqlx::query_as::<_, crate::model::EventRecord>(
r#"
INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
)
.bind(event_type)
.bind(level)
.bind(unit_id as Option<Uuid>)
.bind(equipment_id as Option<Uuid>)
.bind(source_id)
.bind(message)
.bind(sqlx::types::Json(payload))
.fetch_one(pool)
.await;
match inserted {
Ok(record) => {
if let Some(ws_manager) = ws_manager {
let ws_message = crate::websocket::WsMessage::EventCreated(record);
if let Err(err) = ws_manager.send_to_public(ws_message).await {
tracing::warn!("Failed to broadcast event websocket message: {}", err);
}
}
}
Err(err) => {
tracing::warn!("Failed to persist event: {}", err);
}
}
}
async fn process_point_new_value(
payload: crate::telemetry::PointNewValue,
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
) {
let source_id = payload.source_id;
let client_handle = payload.client_handle;
let point_id = if let Some(point_id) = payload.point_id {
Some(point_id)
} else {
let status = connection_manager.get_status_read_guard().await;
status
.get(&source_id)
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
};
if let Some(point_id) = point_id {
// 从缓存中读取旧值
let (old_value, old_timestamp, value_changed) = {
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
let old_monitor_info = monitor_data.get(&point_id);
if let Some(old_info) = old_monitor_info {
let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp;
(old_info.value.clone(), old_info.timestamp, changed)
} else {
(None, None, false)
}
};
let monitor = crate::telemetry::PointMonitorInfo {
protocol: payload.protocol,
source_id,
point_id,
client_handle,
scan_mode: payload.scan_mode,
timestamp: payload.timestamp,
quality: payload.quality,
value: payload.value,
value_type: payload.value_type,
value_text: payload.value_text,
old_value,
old_timestamp,
value_changed,
};
if let Err(e) = connection_manager
.update_point_monitor_data(monitor.clone())
.await
{
tracing::error!(
"Failed to update point monitor data for point {}: {}",
point_id,
e
);
}
if let Some(ws_manager) = ws_manager {
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
if let Err(e) = ws_manager.send_to_public(ws_message).await {
tracing::warn!(
"Failed to send WebSocket message to public room: {}",
e
);
}
}
} else {
tracing::warn!(
"Point not found for source {} client_handle {}",
source_id,
client_handle
);
}
}