feat(event): stream created events over websocket
This commit is contained in:
parent
97d2f6ebf8
commit
c50127b9d0
33
src/event.rs
33
src/event.rs
|
|
@ -55,9 +55,11 @@ impl EventManager {
|
||||||
|
|
||||||
let control_cm = connection_manager.clone();
|
let control_cm = connection_manager.clone();
|
||||||
let control_pool = pool.clone();
|
let control_pool = pool.clone();
|
||||||
|
let control_ws_manager = ws_manager.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Some(event) = control_receiver.recv().await {
|
while let Some(event) = control_receiver.recv().await {
|
||||||
handle_control_event(event, &control_pool, &control_cm).await;
|
handle_control_event(event, &control_pool, &control_cm, control_ws_manager.as_ref())
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -133,8 +135,9 @@ async fn handle_control_event(
|
||||||
event: AppEvent,
|
event: AppEvent,
|
||||||
pool: &sqlx::PgPool,
|
pool: &sqlx::PgPool,
|
||||||
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
|
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
|
||||||
|
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
|
||||||
) {
|
) {
|
||||||
persist_event_if_needed(&event, pool).await;
|
persist_event_if_needed(&event, pool, ws_manager).await;
|
||||||
|
|
||||||
match event {
|
match event {
|
||||||
AppEvent::SourceCreate { source_id } => {
|
AppEvent::SourceCreate { source_id } => {
|
||||||
|
|
@ -222,7 +225,11 @@ async fn handle_control_event(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) {
|
async fn persist_event_if_needed(
|
||||||
|
event: &AppEvent,
|
||||||
|
pool: &sqlx::PgPool,
|
||||||
|
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
|
||||||
|
) {
|
||||||
let record = match event {
|
let record = match event {
|
||||||
AppEvent::SourceCreate { source_id } => Some((
|
AppEvent::SourceCreate { source_id } => Some((
|
||||||
"source.created",
|
"source.created",
|
||||||
|
|
@ -310,10 +317,11 @@ async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(err) = sqlx::query(
|
let inserted = sqlx::query_as::<_, crate::model::EventRecord>(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload)
|
INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||||
|
RETURNING *
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(event_type)
|
.bind(event_type)
|
||||||
|
|
@ -323,12 +331,23 @@ async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) {
|
||||||
.bind(source_id)
|
.bind(source_id)
|
||||||
.bind(message)
|
.bind(message)
|
||||||
.bind(sqlx::types::Json(payload))
|
.bind(sqlx::types::Json(payload))
|
||||||
.execute(pool)
|
.fetch_one(pool)
|
||||||
.await
|
.await;
|
||||||
{
|
|
||||||
|
match inserted {
|
||||||
|
Ok(record) => {
|
||||||
|
if let Some(ws_manager) = ws_manager {
|
||||||
|
let ws_message = crate::websocket::WsMessage::EventCreated(record);
|
||||||
|
if let Err(err) = ws_manager.send_to_public(ws_message).await {
|
||||||
|
tracing::warn!("Failed to broadcast event websocket message: {}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
tracing::warn!("Failed to persist event: {}", err);
|
tracing::warn!("Failed to persist event: {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn process_point_new_value(
|
async fn process_point_new_value(
|
||||||
payload: crate::telemetry::PointNewValue,
|
payload: crate::telemetry::PointNewValue,
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ use uuid::Uuid;
|
||||||
pub enum WsMessage {
|
pub enum WsMessage {
|
||||||
PointNewValue(crate::telemetry::PointMonitorInfo),
|
PointNewValue(crate::telemetry::PointMonitorInfo),
|
||||||
PointSetValueBatchResult(crate::connection::BatchSetPointValueRes),
|
PointSetValueBatchResult(crate::connection::BatchSetPointValueRes),
|
||||||
|
EventCreated(crate::model::EventRecord),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,22 @@ export function renderEvents() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function matchesCurrentFilter(item) {
|
||||||
|
if (state.selectedUnitId && item.unit_id !== state.selectedUnitId) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function prependEvent(item) {
|
||||||
|
if (!matchesCurrentFilter(item)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
state.events = [item, ...state.events.filter((existing) => existing.id !== item.id)].slice(0, 20);
|
||||||
|
renderEvents();
|
||||||
|
}
|
||||||
|
|
||||||
export async function loadEvents() {
|
export async function loadEvents() {
|
||||||
const params = new URLSearchParams({
|
const params = new URLSearchParams({
|
||||||
page: "1",
|
page: "1",
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
import { appendChartPoint } from "./chart.js";
|
import { appendChartPoint } from "./chart.js";
|
||||||
import { dom } from "./dom.js";
|
import { dom } from "./dom.js";
|
||||||
|
import { prependEvent } from "./events.js";
|
||||||
import { formatValue } from "./points.js";
|
import { formatValue } from "./points.js";
|
||||||
import { state } from "./state.js";
|
import { state } from "./state.js";
|
||||||
|
|
||||||
|
|
@ -74,10 +75,7 @@ export function startPointSocket() {
|
||||||
ws.onmessage = (event) => {
|
ws.onmessage = (event) => {
|
||||||
try {
|
try {
|
||||||
const payload = JSON.parse(event.data);
|
const payload = JSON.parse(event.data);
|
||||||
if (payload.type !== "PointNewValue" && payload.type !== "point_new_value") {
|
if (payload.type === "PointNewValue" || payload.type === "point_new_value") {
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const data = payload.data;
|
const data = payload.data;
|
||||||
const entry = state.pointEls.get(data.point_id);
|
const entry = state.pointEls.get(data.point_id);
|
||||||
if (entry) {
|
if (entry) {
|
||||||
|
|
@ -90,6 +88,12 @@ export function startPointSocket() {
|
||||||
if (state.chartPointId === data.point_id) {
|
if (state.chartPointId === data.point_id) {
|
||||||
appendChartPoint(data);
|
appendChartPoint(data);
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payload.type === "EventCreated" || payload.type === "event_created") {
|
||||||
|
prependEvent(payload.data);
|
||||||
|
}
|
||||||
} catch {
|
} catch {
|
||||||
// ignore malformed messages
|
// ignore malformed messages
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue