diff --git a/src/event.rs b/src/event.rs index 509cf5d..f840fe0 100644 --- a/src/event.rs +++ b/src/event.rs @@ -55,9 +55,11 @@ impl EventManager { let control_cm = connection_manager.clone(); let control_pool = pool.clone(); + let control_ws_manager = ws_manager.clone(); tokio::spawn(async move { while let Some(event) = control_receiver.recv().await { - handle_control_event(event, &control_pool, &control_cm).await; + handle_control_event(event, &control_pool, &control_cm, control_ws_manager.as_ref()) + .await; } }); @@ -133,8 +135,9 @@ async fn handle_control_event( event: AppEvent, pool: &sqlx::PgPool, connection_manager: &std::sync::Arc, + ws_manager: Option<&std::sync::Arc>, ) { - persist_event_if_needed(&event, pool).await; + persist_event_if_needed(&event, pool, ws_manager).await; match event { AppEvent::SourceCreate { source_id } => { @@ -222,7 +225,11 @@ async fn handle_control_event( } } -async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) { +async fn persist_event_if_needed( + event: &AppEvent, + pool: &sqlx::PgPool, + ws_manager: Option<&std::sync::Arc>, +) { let record = match event { AppEvent::SourceCreate { source_id } => Some(( "source.created", @@ -310,10 +317,11 @@ async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) { return; }; - if let Err(err) = sqlx::query( + let inserted = sqlx::query_as::<_, crate::model::EventRecord>( r#" INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload) VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING * "#, ) .bind(event_type) @@ -323,10 +331,21 @@ async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) { .bind(source_id) .bind(message) .bind(sqlx::types::Json(payload)) - .execute(pool) - .await - { - tracing::warn!("Failed to persist event: {}", err); + .fetch_one(pool) + .await; + + match inserted { + Ok(record) => { + if let Some(ws_manager) = ws_manager { + let ws_message = crate::websocket::WsMessage::EventCreated(record); + if let Err(err) = ws_manager.send_to_public(ws_message).await { + tracing::warn!("Failed to broadcast event websocket message: {}", err); + } + } + } + Err(err) => { + tracing::warn!("Failed to persist event: {}", err); + } } } diff --git a/src/websocket.rs b/src/websocket.rs index e5627ff..5c9db73 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -17,6 +17,7 @@ use uuid::Uuid; pub enum WsMessage { PointNewValue(crate::telemetry::PointMonitorInfo), PointSetValueBatchResult(crate::connection::BatchSetPointValueRes), + EventCreated(crate::model::EventRecord), } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/web/js/events.js b/web/js/events.js index c3b6b38..e4aa0a4 100644 --- a/web/js/events.js +++ b/web/js/events.js @@ -32,6 +32,22 @@ export function renderEvents() { }); } +function matchesCurrentFilter(item) { + if (state.selectedUnitId && item.unit_id !== state.selectedUnitId) { + return false; + } + return true; +} + +export function prependEvent(item) { + if (!matchesCurrentFilter(item)) { + return; + } + + state.events = [item, ...state.events.filter((existing) => existing.id !== item.id)].slice(0, 20); + renderEvents(); +} + export async function loadEvents() { const params = new URLSearchParams({ page: "1", diff --git a/web/js/logs.js b/web/js/logs.js index 78f6adf..0270261 100644 --- a/web/js/logs.js +++ b/web/js/logs.js @@ -1,5 +1,6 @@ import { appendChartPoint } from "./chart.js"; import { dom } from "./dom.js"; +import { prependEvent } from "./events.js"; import { formatValue } from "./points.js"; import { state } from "./state.js"; @@ -74,21 +75,24 @@ export function startPointSocket() { ws.onmessage = (event) => { try { const payload = JSON.parse(event.data); - if (payload.type !== "PointNewValue" && payload.type !== "point_new_value") { + if (payload.type === "PointNewValue" || payload.type === "point_new_value") { + const data = payload.data; + const entry = state.pointEls.get(data.point_id); + if (entry) { + entry.value.textContent = formatValue(data); + entry.quality.className = `badge quality-${(data.quality || "unknown").toLowerCase()}`; + entry.quality.textContent = (data.quality || "unknown").toUpperCase(); + entry.time.textContent = data.timestamp || "--"; + } + + if (state.chartPointId === data.point_id) { + appendChartPoint(data); + } return; } - const data = payload.data; - const entry = state.pointEls.get(data.point_id); - if (entry) { - entry.value.textContent = formatValue(data); - entry.quality.className = `badge quality-${(data.quality || "unknown").toLowerCase()}`; - entry.quality.textContent = (data.quality || "unknown").toUpperCase(); - entry.time.textContent = data.timestamp || "--"; - } - - if (state.chartPointId === data.point_id) { - appendChartPoint(data); + if (payload.type === "EventCreated" || payload.type === "event_created") { + prependEvent(payload.data); } } catch { // ignore malformed messages