refactor(event): rename ReloadEvent to AppEvent and split event channels

Clarify event semantics by renaming ReloadEvent to AppEvent and route control vs telemetry traffic through dedicated channels. This keeps control events isolated from high-frequency PointNewValue updates while preserving the existing send() call pattern.

Made-with: Cursor
This commit is contained in:
caoqianming 2026-03-13 14:44:30 +08:00
parent 5fa63ad6dd
commit 503aefc4cb
4 changed files with 90 additions and 84 deletions

View File

@ -436,7 +436,7 @@ impl ConnectionManager {
.map(crate::telemetry::PointQuality::from_status_code)
.unwrap_or(crate::telemetry::PointQuality::Unknown);
let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue(
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue {
source_id,
point_id: Some(poll_point.point_id),
@ -986,7 +986,7 @@ impl ConnectionManager {
// Emit local updates only when the full batch succeeds.
if let Some(event_manager) = &self.event_manager {
for (source_id, point_id, variant) in success_events {
if let Err(e) = event_manager.send(crate::event::ReloadEvent::PointNewValue(
if let Err(e) = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue {
source_id,
point_id: Some(point_id),
@ -1126,7 +1126,7 @@ impl ConnectionManager {
.unwrap_or(crate::telemetry::PointQuality::Unknown);
if let Some(event_manager) = &data_manager.event_manager {
let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue(
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue {
source_id: current_source_id,
point_id: None,

View File

@ -1,11 +1,12 @@
use std::collections::HashMap;
use tokio::sync::mpsc;
use uuid::Uuid;
use std::collections::HashMap;
const EVENT_CHANNEL_CAPACITY: usize = 4096;
const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024;
const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096;
#[derive(Debug, Clone)]
pub enum ReloadEvent {
pub enum AppEvent {
SourceCreate {
source_id: Uuid,
},
@ -27,7 +28,8 @@ pub enum ReloadEvent {
}
pub struct EventManager {
sender: mpsc::Sender<ReloadEvent>,
control_sender: mpsc::Sender<AppEvent>,
telemetry_sender: mpsc::Sender<crate::telemetry::PointNewValue>,
}
impl EventManager {
@ -36,35 +38,35 @@ impl EventManager {
connection_manager: std::sync::Arc<crate::connection::ConnectionManager>,
ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>,
) -> Self {
let (sender, mut receiver) = mpsc::channel::<ReloadEvent>(EVENT_CHANNEL_CAPACITY);
let ws_manager_clone = ws_manager.clone();
let (control_sender, mut control_receiver) =
mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
let (telemetry_sender, mut telemetry_receiver) =
mpsc::channel::<crate::telemetry::PointNewValue>(TELEMETRY_EVENT_CHANNEL_CAPACITY);
let control_cm = connection_manager.clone();
let control_pool = pool.clone();
tokio::spawn(async move {
// 在循环外克隆,避免在循环中移动
let connection_manager_clone = connection_manager.clone();
while let Some(event) = receiver.recv().await {
match event {
ReloadEvent::PointNewValue(payload) => {
while let Some(event) = control_receiver.recv().await {
handle_control_event(event, &control_pool, &control_cm).await;
}
});
let ws_manager_clone = ws_manager.clone();
let telemetry_cm = connection_manager.clone();
tokio::spawn(async move {
while let Some(payload) = telemetry_receiver.recv().await {
let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> =
HashMap::new();
latest_by_key.insert((payload.source_id, payload.client_handle), payload);
loop {
match receiver.try_recv() {
Ok(ReloadEvent::PointNewValue(next_payload)) => {
match telemetry_receiver.try_recv() {
Ok(next_payload) => {
latest_by_key.insert(
(next_payload.source_id, next_payload.client_handle),
next_payload,
);
}
Ok(other_event) => {
handle_control_event(
other_event,
&pool,
&connection_manager_clone,
)
.await;
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
break;
}
@ -75,71 +77,73 @@ impl EventManager {
}
for point_payload in latest_by_key.into_values() {
process_point_new_value(
point_payload,
&connection_manager_clone,
ws_manager_clone.as_ref(),
)
process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref())
.await;
}
}
other_event => {
handle_control_event(other_event, &pool, &connection_manager_clone).await;
}
}
}
});
Self { sender }
Self {
control_sender,
telemetry_sender,
}
}
pub fn send(&self, event: ReloadEvent) -> Result<(), String> {
match self.sender.try_send(event) {
pub fn send(&self, event: AppEvent) -> Result<(), String> {
match event {
AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) {
Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
Err(format!("Failed to send event: channel closed ({e:?})"))
Err(format!("Failed to send telemetry event: channel closed ({e:?})"))
}
Err(tokio::sync::mpsc::error::TrySendError::Full(ReloadEvent::PointNewValue(payload))) => {
Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => {
// High-frequency telemetry is lossy by design under sustained pressure.
tracing::warn!(
"Dropping PointNewValue due to full event queue: source={}, client_handle={}",
"Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}",
payload.source_id,
payload.client_handle
);
Ok(())
}
Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => {
Err(format!("Failed to send event: queue full ({e:?})"))
},
control_event => match self.control_sender.try_send(control_event) {
Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
Err(format!("Failed to send control event: channel closed ({e:?})"))
}
Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => {
Err(format!("Failed to send control event: queue full ({e:?})"))
}
},
}
}
}
async fn handle_control_event(
event: ReloadEvent,
event: AppEvent,
pool: &sqlx::PgPool,
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
) {
match event {
ReloadEvent::SourceCreate { source_id } => {
AppEvent::SourceCreate { source_id } => {
tracing::info!("Processing SourceCreate event for {}", source_id);
if let Err(e) = connection_manager.connect_from_source(pool, source_id).await {
tracing::error!("Failed to connect to source {}: {}", source_id, e);
}
}
ReloadEvent::SourceUpdate { source_id } => {
AppEvent::SourceUpdate { source_id } => {
tracing::info!("Processing SourceUpdate event for {}", source_id);
if let Err(e) = connection_manager.reconnect(pool, source_id).await {
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
}
}
ReloadEvent::SourceDelete { source_id } => {
AppEvent::SourceDelete { source_id } => {
tracing::info!("Processing SourceDelete event for {}", source_id);
if let Err(e) = connection_manager.disconnect(source_id).await {
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
}
}
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
AppEvent::PointCreateBatch { source_id, point_ids } => {
let requested_count = point_ids.len();
match connection_manager
.subscribe_points_from_source(source_id, Some(point_ids), pool)
@ -163,7 +167,7 @@ async fn handle_control_event(
}
}
}
ReloadEvent::PointDeleteBatch { source_id, point_ids } => {
AppEvent::PointDeleteBatch { source_id, point_ids } => {
tracing::info!(
"Processing PointDeleteBatch event for source {} with {} points",
source_id,
@ -176,7 +180,9 @@ async fn handle_control_event(
tracing::error!("Failed to unsubscribe points: {}", e);
}
}
ReloadEvent::PointNewValue(_) => {}
AppEvent::PointNewValue(_) => {
tracing::warn!("PointNewValue routed to control worker unexpectedly");
}
}
}
@ -239,7 +245,7 @@ async fn process_point_new_value(
if let Some(ws_manager) = ws_manager {
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
if let Err(e) = ws_manager.send_to_public(ws_message).await {
tracing::error!(
tracing::warn!(
"Failed to send WebSocket message to public room: {}",
e
);

View File

@ -247,7 +247,7 @@ pub async fn delete_point(
if let Some(source_id) = source_id {
if let Err(e) = state
.event_manager
.send(crate::event::ReloadEvent::PointDeleteBatch {
.send(crate::event::AppEvent::PointDeleteBatch {
source_id,
point_ids: vec![point_id],
})
@ -365,7 +365,7 @@ pub async fn batch_create_points(
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state
.event_manager
.send(crate::event::ReloadEvent::PointCreateBatch { source_id, point_ids })
.send(crate::event::AppEvent::PointCreateBatch { source_id, point_ids })
{
tracing::error!("Failed to send PointCreateBatch event: {}", e);
}
@ -427,7 +427,7 @@ pub async fn batch_delete_points(
let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state
.event_manager
.send(crate::event::ReloadEvent::PointDeleteBatch {
.send(crate::event::AppEvent::PointDeleteBatch {
source_id,
point_ids: ids,
})

View File

@ -225,7 +225,7 @@ pub async fn create_source(
.await?;
// 触发 SourceCreate 事件
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceCreate { source_id: new_id });
let _ = state.event_manager.send(crate::event::AppEvent::SourceCreate { source_id: new_id });
Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id })))
}
@ -300,7 +300,7 @@ pub async fn update_source(
qb.push(" WHERE id = ").push_bind(source_id);
qb.build().execute(pool).await?;
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceUpdate { source_id });
let _ = state.event_manager.send(crate::event::AppEvent::SourceUpdate { source_id });
Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"})))
}
@ -323,7 +323,7 @@ pub async fn delete_source(
}
// 触发 SourceDelete 事件
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceDelete { source_id });
let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id });
Ok(StatusCode::NO_CONTENT)
}