refactor(core): centralize telemetry, connection management, and event sink in platform
Move TelemetryProcessor (PointNewValue batching/dedup/broadcast) from feeder to plc_platform_core. PlatformBuilder.build() now auto-wires telemetry processing and reconnect task. PlatformContext.emit_event() handles connection management side effects (connect/reconnect/disconnect/subscribe/unsubscribe) directly. Simplify PlatformEventSink trait from 6 methods to single on_event(). Feeder's AppEvent now only contains business events; FeederPlatformEventSink only handles UnitsChanged for control runtime. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
6814e9eae9
commit
1317271e16
|
|
@ -44,27 +44,22 @@ pub async fn run() {
|
||||||
};
|
};
|
||||||
|
|
||||||
let config = AppConfig::from_env().expect("Failed to load configuration");
|
let config = AppConfig::from_env().expect("Failed to load configuration");
|
||||||
let mut builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url)
|
let builder = plc_platform_core::bootstrap::bootstrap_platform(&config.database_url)
|
||||||
.await
|
.await
|
||||||
.expect("Failed to bootstrap platform");
|
.expect("Failed to bootstrap platform");
|
||||||
|
|
||||||
let event_manager = Arc::new(EventManager::new(
|
|
||||||
builder.pool.clone(),
|
|
||||||
Arc::new(builder.connection_manager.clone()),
|
|
||||||
Some(builder.ws_manager.clone()),
|
|
||||||
));
|
|
||||||
|
|
||||||
builder.connection_manager.set_event_manager(event_manager.clone());
|
|
||||||
builder.connection_manager.set_pool_and_start_reconnect_task(Arc::new(builder.pool.clone()));
|
|
||||||
|
|
||||||
let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new());
|
let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new());
|
||||||
|
|
||||||
let event_sink = Arc::new(crate::event::FeederPlatformEventSink::new(
|
let event_sink = Arc::new(crate::event::FeederPlatformEventSink::new(
|
||||||
event_manager.clone(),
|
|
||||||
control_runtime.clone(),
|
control_runtime.clone(),
|
||||||
));
|
));
|
||||||
let platform = builder.build().with_event_sink(event_sink);
|
let platform = builder.build().with_event_sink(event_sink);
|
||||||
|
|
||||||
|
let event_manager = Arc::new(EventManager::new(
|
||||||
|
platform.pool.clone(),
|
||||||
|
Some(platform.ws_manager.clone()),
|
||||||
|
));
|
||||||
|
|
||||||
let sources = crate::service::get_all_enabled_sources(&platform.pool)
|
let sources = crate::service::get_all_enabled_sources(&platform.pool)
|
||||||
.await
|
.await
|
||||||
.expect("Failed to fetch sources");
|
.expect("Failed to fetch sources");
|
||||||
|
|
@ -143,7 +138,6 @@ pub fn test_state() -> AppState {
|
||||||
let ws_manager = Arc::new(WebSocketManager::new());
|
let ws_manager = Arc::new(WebSocketManager::new());
|
||||||
let event_manager = Arc::new(EventManager::new(
|
let event_manager = Arc::new(EventManager::new(
|
||||||
pool.clone(),
|
pool.clone(),
|
||||||
connection_manager.clone(),
|
|
||||||
Some(ws_manager.clone()),
|
Some(ws_manager.clone()),
|
||||||
));
|
));
|
||||||
let platform = PlatformContext::new(pool, connection_manager, ws_manager);
|
let platform = PlatformContext::new(pool, connection_manager, ws_manager);
|
||||||
|
|
|
||||||
|
|
@ -1,32 +1,14 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use plc_platform_core::event::EventEnvelope;
|
use plc_platform_core::event::EventEnvelope;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use plc_platform_core::model::EventRecord;
|
use plc_platform_core::model::EventRecord;
|
||||||
|
|
||||||
const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024;
|
const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024;
|
||||||
const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096;
|
|
||||||
|
|
||||||
|
/// Feeder-specific business events only.
|
||||||
|
/// Platform events (source/point lifecycle) are handled by core's emit_event().
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum AppEvent {
|
pub enum AppEvent {
|
||||||
SourceCreate {
|
|
||||||
source_id: Uuid,
|
|
||||||
},
|
|
||||||
SourceUpdate {
|
|
||||||
source_id: Uuid,
|
|
||||||
},
|
|
||||||
SourceDelete {
|
|
||||||
source_id: Uuid,
|
|
||||||
source_name: String,
|
|
||||||
},
|
|
||||||
PointCreateBatch {
|
|
||||||
source_id: Uuid,
|
|
||||||
point_ids: Vec<Uuid>,
|
|
||||||
},
|
|
||||||
PointDeleteBatch {
|
|
||||||
source_id: Uuid,
|
|
||||||
point_ids: Vec<Uuid>,
|
|
||||||
},
|
|
||||||
EquipmentStartCommandSent {
|
EquipmentStartCommandSent {
|
||||||
equipment_id: Uuid,
|
equipment_id: Uuid,
|
||||||
unit_id: Option<Uuid>,
|
unit_id: Option<Uuid>,
|
||||||
|
|
@ -46,244 +28,98 @@ pub enum AppEvent {
|
||||||
RemLocal { unit_id: Uuid, equipment_id: Uuid },
|
RemLocal { unit_id: Uuid, equipment_id: Uuid },
|
||||||
RemRecovered { unit_id: Uuid },
|
RemRecovered { unit_id: Uuid },
|
||||||
UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String },
|
UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String },
|
||||||
PointNewValue(crate::telemetry::PointNewValue),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct EventManager {
|
pub struct EventManager {
|
||||||
control_sender: mpsc::Sender<AppEvent>,
|
control_sender: mpsc::Sender<AppEvent>,
|
||||||
telemetry_sender: mpsc::Sender<crate::telemetry::PointNewValue>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventManager {
|
impl EventManager {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
pool: sqlx::PgPool,
|
pool: sqlx::PgPool,
|
||||||
connection_manager: std::sync::Arc<crate::connection::ConnectionManager>,
|
|
||||||
ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>,
|
ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (control_sender, mut control_receiver) =
|
let (control_sender, mut control_receiver) =
|
||||||
mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
|
mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
|
||||||
let (telemetry_sender, mut telemetry_receiver) =
|
|
||||||
mpsc::channel::<crate::telemetry::PointNewValue>(TELEMETRY_EVENT_CHANNEL_CAPACITY);
|
|
||||||
|
|
||||||
let control_cm = connection_manager.clone();
|
|
||||||
let control_pool = pool.clone();
|
let control_pool = pool.clone();
|
||||||
let control_ws_manager = ws_manager.clone();
|
let control_ws_manager = ws_manager.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Some(event) = control_receiver.recv().await {
|
while let Some(event) = control_receiver.recv().await {
|
||||||
handle_control_event(event, &control_pool, &control_cm, control_ws_manager.as_ref())
|
handle_control_event(event, &control_pool, control_ws_manager.as_ref()).await;
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let ws_manager_clone = ws_manager.clone();
|
Self { control_sender }
|
||||||
let telemetry_cm = connection_manager.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
while let Some(payload) = telemetry_receiver.recv().await {
|
|
||||||
let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> =
|
|
||||||
HashMap::new();
|
|
||||||
latest_by_key.insert((payload.source_id, payload.client_handle), payload);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match telemetry_receiver.try_recv() {
|
|
||||||
Ok(next_payload) => {
|
|
||||||
latest_by_key.insert(
|
|
||||||
(next_payload.source_id, next_payload.client_handle),
|
|
||||||
next_payload,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for point_payload in latest_by_key.into_values() {
|
|
||||||
process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref())
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Self {
|
|
||||||
control_sender,
|
|
||||||
telemetry_sender,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send(&self, event: AppEvent) -> Result<(), String> {
|
pub fn send(&self, event: AppEvent) -> Result<(), String> {
|
||||||
match event {
|
match self.control_sender.try_send(event) {
|
||||||
AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) {
|
Ok(()) => Ok(()),
|
||||||
Ok(()) => Ok(()),
|
Err(mpsc::error::TrySendError::Closed(e)) => {
|
||||||
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
|
Err(format!("Failed to send control event: channel closed ({e:?})"))
|
||||||
Err(format!("Failed to send telemetry event: channel closed ({e:?})"))
|
}
|
||||||
}
|
Err(mpsc::error::TrySendError::Full(e)) => {
|
||||||
Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => {
|
Err(format!("Failed to send control event: queue full ({e:?})"))
|
||||||
// High-frequency telemetry is lossy by design under sustained pressure.
|
}
|
||||||
tracing::warn!(
|
|
||||||
"Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}",
|
|
||||||
payload.source_id,
|
|
||||||
payload.client_handle
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
control_event => match self.control_sender.try_send(control_event) {
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
|
|
||||||
Err(format!("Failed to send control event: channel closed ({e:?})"))
|
|
||||||
}
|
|
||||||
Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => {
|
|
||||||
Err(format!("Failed to send control event: queue full ({e:?})"))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adapter that bridges platform handler events to feeder's EventManager + ControlRuntime.
|
/// Bridges platform events to feeder-specific side effects.
|
||||||
|
/// Connection management and telemetry are handled by core automatically.
|
||||||
pub struct FeederPlatformEventSink {
|
pub struct FeederPlatformEventSink {
|
||||||
event_manager: std::sync::Arc<EventManager>,
|
|
||||||
control_runtime: std::sync::Arc<crate::control::runtime::ControlRuntimeStore>,
|
control_runtime: std::sync::Arc<crate::control::runtime::ControlRuntimeStore>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FeederPlatformEventSink {
|
impl FeederPlatformEventSink {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
event_manager: std::sync::Arc<EventManager>,
|
|
||||||
control_runtime: std::sync::Arc<crate::control::runtime::ControlRuntimeStore>,
|
control_runtime: std::sync::Arc<crate::control::runtime::ControlRuntimeStore>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self { event_manager, control_runtime }
|
Self { control_runtime }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEventSink {
|
impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEventSink {
|
||||||
fn on_source_created(&self, source_id: Uuid) {
|
fn on_event(&self, event: &plc_platform_core::event::PlatformEvent) {
|
||||||
let _ = self.event_manager.send(AppEvent::SourceCreate { source_id });
|
match event {
|
||||||
}
|
plc_platform_core::event::PlatformEvent::UnitsChanged { unit_ids } => {
|
||||||
|
let runtime = self.control_runtime.clone();
|
||||||
fn on_source_updated(&self, source_id: Uuid) {
|
let ids = unit_ids.clone();
|
||||||
let _ = self.event_manager.send(AppEvent::SourceUpdate { source_id });
|
tokio::spawn(async move {
|
||||||
}
|
for unit_id in ids {
|
||||||
|
runtime.notify_unit(unit_id).await;
|
||||||
fn on_source_deleted(&self, source_id: Uuid, source_name: String) {
|
}
|
||||||
let _ = self.event_manager.send(AppEvent::SourceDelete { source_id, source_name });
|
});
|
||||||
}
|
|
||||||
|
|
||||||
fn on_points_created(&self, source_id: Uuid, point_ids: Vec<Uuid>) {
|
|
||||||
let _ = self.event_manager.send(AppEvent::PointCreateBatch { source_id, point_ids });
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_points_deleted(&self, source_id: Uuid, point_ids: Vec<Uuid>) {
|
|
||||||
let _ = self.event_manager.send(AppEvent::PointDeleteBatch { source_id, point_ids });
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_units_changed(&self, unit_ids: Vec<Uuid>) {
|
|
||||||
let runtime = self.control_runtime.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
for unit_id in unit_ids {
|
|
||||||
runtime.notify_unit(unit_id).await;
|
|
||||||
}
|
}
|
||||||
});
|
// Other platform events: connection management handled by core.
|
||||||
}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl plc_platform_core::connection::PointEventSink for EventManager {
|
|
||||||
fn send_point_new_value(
|
|
||||||
&self,
|
|
||||||
payload: plc_platform_core::telemetry::PointNewValue,
|
|
||||||
) -> Result<(), String> {
|
|
||||||
self.send(AppEvent::PointNewValue(payload))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_control_event(
|
async fn handle_control_event(
|
||||||
event: AppEvent,
|
event: AppEvent,
|
||||||
pool: &sqlx::PgPool,
|
pool: &sqlx::PgPool,
|
||||||
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
|
|
||||||
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
|
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
|
||||||
) {
|
) {
|
||||||
persist_event_if_needed(&event, pool, ws_manager).await;
|
persist_event_if_needed(&event, pool, ws_manager).await;
|
||||||
|
|
||||||
match event {
|
match event {
|
||||||
AppEvent::SourceCreate { source_id } => {
|
|
||||||
tracing::info!("Processing SourceCreate event for {}", source_id);
|
|
||||||
if let Err(e) = connection_manager.connect_from_source(pool, source_id).await {
|
|
||||||
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AppEvent::SourceUpdate { source_id } => {
|
|
||||||
tracing::info!("Processing SourceUpdate event for {}", source_id);
|
|
||||||
if let Err(e) = connection_manager.reconnect(pool, source_id).await {
|
|
||||||
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AppEvent::SourceDelete { source_id, .. } => {
|
|
||||||
tracing::info!("Processing SourceDelete event for {}", source_id);
|
|
||||||
if let Err(e) = connection_manager.disconnect(source_id).await {
|
|
||||||
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AppEvent::PointCreateBatch { source_id, point_ids } => {
|
|
||||||
let requested_count = point_ids.len();
|
|
||||||
match connection_manager
|
|
||||||
.subscribe_points_from_source(source_id, Some(point_ids), pool)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(stats) => {
|
|
||||||
let subscribed = *stats.get("subscribed").unwrap_or(&0);
|
|
||||||
let polled = *stats.get("polled").unwrap_or(&0);
|
|
||||||
let total = *stats.get("total").unwrap_or(&0);
|
|
||||||
tracing::info!(
|
|
||||||
"PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}",
|
|
||||||
source_id,
|
|
||||||
requested_count,
|
|
||||||
subscribed,
|
|
||||||
polled,
|
|
||||||
total
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("Failed to subscribe to points: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AppEvent::PointDeleteBatch { source_id, point_ids } => {
|
|
||||||
tracing::info!(
|
|
||||||
"Processing PointDeleteBatch event for source {} with {} points",
|
|
||||||
source_id,
|
|
||||||
point_ids.len()
|
|
||||||
);
|
|
||||||
if let Err(e) = connection_manager
|
|
||||||
.unsubscribe_points_from_source(source_id, point_ids)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
tracing::error!("Failed to unsubscribe points: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AppEvent::EquipmentStartCommandSent {
|
AppEvent::EquipmentStartCommandSent {
|
||||||
equipment_id,
|
equipment_id, unit_id, point_id,
|
||||||
unit_id,
|
|
||||||
point_id,
|
|
||||||
} => {
|
} => {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"Equipment start command sent: equipment={}, unit={:?}, point={}",
|
"Equipment start command sent: equipment={}, unit={:?}, point={}",
|
||||||
equipment_id,
|
equipment_id, unit_id, point_id
|
||||||
unit_id,
|
|
||||||
point_id
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
AppEvent::EquipmentStopCommandSent {
|
AppEvent::EquipmentStopCommandSent {
|
||||||
equipment_id,
|
equipment_id, unit_id, point_id,
|
||||||
unit_id,
|
|
||||||
point_id,
|
|
||||||
} => {
|
} => {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"Equipment stop command sent: equipment={}, unit={:?}, point={}",
|
"Equipment stop command sent: equipment={}, unit={:?}, point={}",
|
||||||
equipment_id,
|
equipment_id, unit_id, point_id
|
||||||
unit_id,
|
|
||||||
point_id
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
AppEvent::AutoControlStarted { unit_id } => {
|
AppEvent::AutoControlStarted { unit_id } => {
|
||||||
|
|
@ -313,9 +149,6 @@ async fn handle_control_event(
|
||||||
AppEvent::UnitStateChanged { unit_id, from_state, to_state } => {
|
AppEvent::UnitStateChanged { unit_id, from_state, to_state } => {
|
||||||
tracing::info!("Unit {} state: {} -> {}", unit_id, from_state, to_state);
|
tracing::info!("Unit {} state: {} -> {}", unit_id, from_state, to_state);
|
||||||
}
|
}
|
||||||
AppEvent::PointNewValue(_) => {
|
|
||||||
tracing::warn!("PointNewValue routed to control worker unexpectedly");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -345,13 +178,6 @@ async fn persist_event_if_needed(
|
||||||
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
|
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
|
||||||
) {
|
) {
|
||||||
let record: Option<(&str, &str, Option<Uuid>, Option<Uuid>, Option<Uuid>, String, serde_json::Value)> = match event {
|
let record: Option<(&str, &str, Option<Uuid>, Option<Uuid>, Option<Uuid>, String, serde_json::Value)> = match event {
|
||||||
// Platform events — persistence is handled by core's emit_event().
|
|
||||||
AppEvent::SourceCreate { .. } => None,
|
|
||||||
AppEvent::SourceUpdate { .. } => None,
|
|
||||||
AppEvent::SourceDelete { .. } => None,
|
|
||||||
AppEvent::PointCreateBatch { .. } => None,
|
|
||||||
AppEvent::PointDeleteBatch { .. } => None,
|
|
||||||
// Feeder-specific events — persisted here.
|
|
||||||
AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => {
|
AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => {
|
||||||
let code = fetch_equipment_code(pool, *equipment_id).await;
|
let code = fetch_equipment_code(pool, *equipment_id).await;
|
||||||
Some((
|
Some((
|
||||||
|
|
@ -453,10 +279,10 @@ async fn persist_event_if_needed(
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
AppEvent::UnitStateChanged { .. } => None,
|
AppEvent::UnitStateChanged { .. } => None,
|
||||||
AppEvent::PointNewValue(_) => None,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record else {
|
let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record
|
||||||
|
else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
let envelope = EventEnvelope::new(event_type, payload);
|
let envelope = EventEnvelope::new(event_type, payload);
|
||||||
|
|
@ -492,77 +318,3 @@ async fn persist_event_if_needed(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_point_new_value(
|
|
||||||
payload: crate::telemetry::PointNewValue,
|
|
||||||
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
|
|
||||||
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
|
|
||||||
) {
|
|
||||||
let source_id = payload.source_id;
|
|
||||||
let client_handle = payload.client_handle;
|
|
||||||
let point_id = if let Some(point_id) = payload.point_id {
|
|
||||||
Some(point_id)
|
|
||||||
} else {
|
|
||||||
let status = connection_manager.get_status_read_guard().await;
|
|
||||||
status
|
|
||||||
.get(&source_id)
|
|
||||||
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
|
||||||
};
|
|
||||||
if let Some(point_id) = point_id {
|
|
||||||
// Read the previous value from the in-memory cache.
|
|
||||||
let (old_value, old_timestamp, value_changed) = {
|
|
||||||
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
|
|
||||||
let old_monitor_info = monitor_data.get(&point_id);
|
|
||||||
|
|
||||||
if let Some(old_info) = old_monitor_info {
|
|
||||||
let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp;
|
|
||||||
(old_info.value.clone(), old_info.timestamp, changed)
|
|
||||||
} else {
|
|
||||||
(None, None, false)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let monitor = crate::telemetry::PointMonitorInfo {
|
|
||||||
protocol: payload.protocol,
|
|
||||||
source_id,
|
|
||||||
point_id,
|
|
||||||
client_handle,
|
|
||||||
scan_mode: payload.scan_mode,
|
|
||||||
timestamp: payload.timestamp,
|
|
||||||
quality: payload.quality,
|
|
||||||
value: payload.value,
|
|
||||||
value_type: payload.value_type,
|
|
||||||
value_text: payload.value_text,
|
|
||||||
old_value,
|
|
||||||
old_timestamp,
|
|
||||||
value_changed,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = connection_manager
|
|
||||||
.update_point_monitor_data(monitor.clone())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
tracing::error!(
|
|
||||||
"Failed to update point monitor data for point {}: {}",
|
|
||||||
point_id,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ws_manager) = ws_manager {
|
|
||||||
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
|
|
||||||
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
|
||||||
tracing::warn!(
|
|
||||||
"Failed to send WebSocket message to public room: {}",
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tracing::warn!(
|
|
||||||
"Point not found for source {} client_handle {}",
|
|
||||||
source_id,
|
|
||||||
client_handle
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ use std::sync::Arc;
|
||||||
use crate::connection::ConnectionManager;
|
use crate::connection::ConnectionManager;
|
||||||
use crate::db::init_database;
|
use crate::db::init_database;
|
||||||
use crate::platform_context::PlatformContext;
|
use crate::platform_context::PlatformContext;
|
||||||
|
use crate::telemetry_processor::TelemetryProcessor;
|
||||||
use crate::websocket::WebSocketManager;
|
use crate::websocket::WebSocketManager;
|
||||||
|
|
||||||
pub struct PlatformBuilder {
|
pub struct PlatformBuilder {
|
||||||
|
|
@ -12,7 +13,21 @@ pub struct PlatformBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PlatformBuilder {
|
impl PlatformBuilder {
|
||||||
pub fn build(self) -> PlatformContext {
|
/// Finalize the platform: wire up telemetry processing, reconnect task,
|
||||||
|
/// and wrap everything into `PlatformContext`.
|
||||||
|
pub fn build(mut self) -> PlatformContext {
|
||||||
|
// Telemetry processor: handles PointNewValue batching, monitor updates, WS broadcast.
|
||||||
|
let cm_for_telemetry = Arc::new(self.connection_manager.clone());
|
||||||
|
let telemetry_processor = Arc::new(TelemetryProcessor::new(
|
||||||
|
cm_for_telemetry,
|
||||||
|
self.ws_manager.clone(),
|
||||||
|
));
|
||||||
|
self.connection_manager.set_event_manager(telemetry_processor);
|
||||||
|
|
||||||
|
// Start reconnect task (auto-reconnects on connection loss).
|
||||||
|
self.connection_manager
|
||||||
|
.set_pool_and_start_reconnect_task(Arc::new(self.pool.clone()));
|
||||||
|
|
||||||
PlatformContext::new(
|
PlatformContext::new(
|
||||||
self.pool,
|
self.pool,
|
||||||
Arc::new(self.connection_manager),
|
Arc::new(self.connection_manager),
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ pub mod model;
|
||||||
pub mod platform_context;
|
pub mod platform_context;
|
||||||
pub mod service;
|
pub mod service;
|
||||||
pub mod telemetry;
|
pub mod telemetry;
|
||||||
|
pub mod telemetry_processor;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
pub mod websocket;
|
pub mod websocket;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,24 +1,18 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::connection::ConnectionManager;
|
use crate::connection::ConnectionManager;
|
||||||
use crate::event::PlatformEvent;
|
use crate::event::PlatformEvent;
|
||||||
use crate::websocket::WebSocketManager;
|
use crate::websocket::WebSocketManager;
|
||||||
|
|
||||||
/// Callback interface for app-specific side effects on platform events.
|
/// Callback interface for app-specific side effects on platform events.
|
||||||
///
|
///
|
||||||
/// Platform-level concerns (event persistence, WebSocket broadcast) are handled
|
/// Platform-level concerns (event persistence, WebSocket broadcast, connection
|
||||||
/// automatically by `PlatformContext::emit_event()`. Implementations of this trait
|
/// management) are handled automatically by `PlatformContext::emit_event()`.
|
||||||
/// only need to handle app-specific behavior (e.g. connection management,
|
/// Implementations only need to handle truly app-specific behavior
|
||||||
/// control runtime notifications).
|
/// (e.g. feeder's control runtime notifications).
|
||||||
pub trait PlatformEventSink: Send + Sync {
|
pub trait PlatformEventSink: Send + Sync {
|
||||||
fn on_source_created(&self, source_id: Uuid);
|
/// Called for every platform event. Override to add app-specific side effects.
|
||||||
fn on_source_updated(&self, source_id: Uuid);
|
fn on_event(&self, event: &PlatformEvent);
|
||||||
fn on_source_deleted(&self, source_id: Uuid, source_name: String);
|
|
||||||
fn on_points_created(&self, source_id: Uuid, point_ids: Vec<Uuid>);
|
|
||||||
fn on_points_deleted(&self, source_id: Uuid, point_ids: Vec<Uuid>);
|
|
||||||
fn on_units_changed(&self, unit_ids: Vec<Uuid>);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
@ -48,47 +42,88 @@ impl PlatformContext {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Emit a platform event: persists to DB, broadcasts via WebSocket,
|
/// Emit a platform event.
|
||||||
/// then notifies the app-specific event sink for side effects.
|
///
|
||||||
|
/// Synchronously notifies the app-specific sink, then spawns async work for:
|
||||||
|
/// - Event persistence to DB + WebSocket broadcast
|
||||||
|
/// - Connection management side effects (connect, subscribe, etc.)
|
||||||
pub fn emit_event(&self, event: PlatformEvent) {
|
pub fn emit_event(&self, event: PlatformEvent) {
|
||||||
// Notify app-specific sink synchronously (fire-and-forget via channels).
|
// 1. Notify app-specific sink synchronously (fire-and-forget via channels).
|
||||||
if let Some(sink) = &self.event_sink {
|
if let Some(sink) = &self.event_sink {
|
||||||
|
sink.on_event(&event);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Spawn async: persistence + WS broadcast + connection management.
|
||||||
|
let pool = self.pool.clone();
|
||||||
|
let ws_manager = self.ws_manager.clone();
|
||||||
|
let cm = self.connection_manager.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Persist + broadcast.
|
||||||
|
crate::event::persist_and_broadcast(&event, &pool, &ws_manager).await;
|
||||||
|
|
||||||
|
// Connection management side effects.
|
||||||
match &event {
|
match &event {
|
||||||
PlatformEvent::SourceCreated { source_id } => {
|
PlatformEvent::SourceCreated { source_id } => {
|
||||||
sink.on_source_created(*source_id);
|
tracing::info!("Processing SourceCreated for {}", source_id);
|
||||||
|
if let Err(e) = cm.connect_from_source(&pool, *source_id).await {
|
||||||
|
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
PlatformEvent::SourceUpdated { source_id } => {
|
PlatformEvent::SourceUpdated { source_id } => {
|
||||||
sink.on_source_updated(*source_id);
|
tracing::info!("Processing SourceUpdated for {}", source_id);
|
||||||
|
if let Err(e) = cm.reconnect(&pool, *source_id).await {
|
||||||
|
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
PlatformEvent::SourceDeleted {
|
PlatformEvent::SourceDeleted { source_id, .. } => {
|
||||||
source_id,
|
tracing::info!("Processing SourceDeleted for {}", source_id);
|
||||||
source_name,
|
if let Err(e) = cm.disconnect(*source_id).await {
|
||||||
} => {
|
tracing::error!("Failed to disconnect source {}: {}", source_id, e);
|
||||||
sink.on_source_deleted(*source_id, source_name.clone());
|
}
|
||||||
}
|
}
|
||||||
PlatformEvent::PointsCreated {
|
PlatformEvent::PointsCreated {
|
||||||
source_id,
|
source_id,
|
||||||
point_ids,
|
point_ids,
|
||||||
} => {
|
} => {
|
||||||
sink.on_points_created(*source_id, point_ids.clone());
|
let requested_count = point_ids.len();
|
||||||
|
match cm
|
||||||
|
.subscribe_points_from_source(*source_id, Some(point_ids.clone()), &pool)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(stats) => {
|
||||||
|
let subscribed = *stats.get("subscribed").unwrap_or(&0);
|
||||||
|
let polled = *stats.get("polled").unwrap_or(&0);
|
||||||
|
let total = *stats.get("total").unwrap_or(&0);
|
||||||
|
tracing::info!(
|
||||||
|
"PointsCreated subscribe for source {}: requested={}, subscribed={}, polled={}, total={}",
|
||||||
|
source_id, requested_count, subscribed, polled, total
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to subscribe points: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
PlatformEvent::PointsDeleted {
|
PlatformEvent::PointsDeleted {
|
||||||
source_id,
|
source_id,
|
||||||
point_ids,
|
point_ids,
|
||||||
} => {
|
} => {
|
||||||
sink.on_points_deleted(*source_id, point_ids.clone());
|
tracing::info!(
|
||||||
|
"Processing PointsDeleted for source {} ({} points)",
|
||||||
|
source_id,
|
||||||
|
point_ids.len()
|
||||||
|
);
|
||||||
|
if let Err(e) = cm
|
||||||
|
.unsubscribe_points_from_source(*source_id, point_ids.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!("Failed to unsubscribe points: {}", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
PlatformEvent::UnitsChanged { unit_ids } => {
|
PlatformEvent::UnitsChanged { .. } => {
|
||||||
sink.on_units_changed(unit_ids.clone());
|
// No platform-level side effect; app handles via on_event().
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn async persistence + WebSocket broadcast.
|
|
||||||
let pool = self.pool.clone();
|
|
||||||
let ws_manager = self.ws_manager.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
crate::event::persist_and_broadcast(&event, &pool, &ws_manager).await;
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,148 @@
|
||||||
|
//! Platform-level telemetry event processor.
|
||||||
|
//!
|
||||||
|
//! Handles `PointNewValue` events from `ConnectionManager`:
|
||||||
|
//! - Batches/deduplicates high-frequency telemetry data
|
||||||
|
//! - Updates point monitor data in `ConnectionManager`
|
||||||
|
//! - Broadcasts changes via WebSocket
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::connection::{ConnectionManager, PointEventSink};
|
||||||
|
use crate::telemetry::{PointMonitorInfo, PointNewValue};
|
||||||
|
use crate::websocket::{WebSocketManager, WsMessage};
|
||||||
|
|
||||||
|
const TELEMETRY_CHANNEL_CAPACITY: usize = 4096;
|
||||||
|
|
||||||
|
pub struct TelemetryProcessor {
|
||||||
|
sender: mpsc::Sender<PointNewValue>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TelemetryProcessor {
|
||||||
|
pub fn new(
|
||||||
|
connection_manager: Arc<ConnectionManager>,
|
||||||
|
ws_manager: Arc<WebSocketManager>,
|
||||||
|
) -> Self {
|
||||||
|
let (sender, mut receiver) = mpsc::channel::<PointNewValue>(TELEMETRY_CHANNEL_CAPACITY);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(payload) = receiver.recv().await {
|
||||||
|
// Batch: drain all pending messages, keeping only the latest per (source, handle).
|
||||||
|
let mut latest_by_key: HashMap<(Uuid, u32), PointNewValue> = HashMap::new();
|
||||||
|
latest_by_key.insert((payload.source_id, payload.client_handle), payload);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match receiver.try_recv() {
|
||||||
|
Ok(next) => {
|
||||||
|
latest_by_key.insert((next.source_id, next.client_handle), next);
|
||||||
|
}
|
||||||
|
Err(mpsc::error::TryRecvError::Empty) => break,
|
||||||
|
Err(mpsc::error::TryRecvError::Disconnected) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for point_payload in latest_by_key.into_values() {
|
||||||
|
process_point_new_value(point_payload, &connection_manager, &ws_manager).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Self { sender }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PointEventSink for TelemetryProcessor {
|
||||||
|
fn send_point_new_value(&self, payload: PointNewValue) -> Result<(), String> {
|
||||||
|
match self.sender.try_send(payload) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(mpsc::error::TrySendError::Closed(e)) => {
|
||||||
|
Err(format!(
|
||||||
|
"Telemetry channel closed ({e:?})"
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Err(mpsc::error::TrySendError::Full(payload)) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Dropping PointNewValue due to full telemetry queue: source={}, handle={}",
|
||||||
|
payload.source_id,
|
||||||
|
payload.client_handle
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_point_new_value(
|
||||||
|
payload: PointNewValue,
|
||||||
|
connection_manager: &Arc<ConnectionManager>,
|
||||||
|
ws_manager: &Arc<WebSocketManager>,
|
||||||
|
) {
|
||||||
|
let source_id = payload.source_id;
|
||||||
|
let client_handle = payload.client_handle;
|
||||||
|
|
||||||
|
let point_id = if let Some(point_id) = payload.point_id {
|
||||||
|
Some(point_id)
|
||||||
|
} else {
|
||||||
|
let status = connection_manager.get_status_read_guard().await;
|
||||||
|
status
|
||||||
|
.get(&source_id)
|
||||||
|
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(point_id) = point_id else {
|
||||||
|
tracing::warn!(
|
||||||
|
"Point not found for source {} client_handle {}",
|
||||||
|
source_id,
|
||||||
|
client_handle
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Read the previous value from the in-memory cache.
|
||||||
|
let (old_value, old_timestamp, value_changed) = {
|
||||||
|
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
|
||||||
|
let old_monitor_info = monitor_data.get(&point_id);
|
||||||
|
|
||||||
|
if let Some(old_info) = old_monitor_info {
|
||||||
|
let changed =
|
||||||
|
old_info.value != payload.value || old_info.timestamp != payload.timestamp;
|
||||||
|
(old_info.value.clone(), old_info.timestamp, changed)
|
||||||
|
} else {
|
||||||
|
(None, None, false)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let monitor = PointMonitorInfo {
|
||||||
|
protocol: payload.protocol,
|
||||||
|
source_id,
|
||||||
|
point_id,
|
||||||
|
client_handle,
|
||||||
|
scan_mode: payload.scan_mode,
|
||||||
|
timestamp: payload.timestamp,
|
||||||
|
quality: payload.quality,
|
||||||
|
value: payload.value,
|
||||||
|
value_type: payload.value_type,
|
||||||
|
value_text: payload.value_text,
|
||||||
|
old_value,
|
||||||
|
old_timestamp,
|
||||||
|
value_changed,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = connection_manager
|
||||||
|
.update_point_monitor_data(monitor.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to update point monitor data for point {}: {}",
|
||||||
|
point_id,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let ws_message = WsMessage::PointNewValue(monitor);
|
||||||
|
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
||||||
|
tracing::warn!("Failed to send WebSocket message to public room: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue