feat(events): use Chinese messages with entity names

Event messages are now stored and displayed in Chinese. Names/codes are
resolved via lightweight DB lookups in persist_event_if_needed (entities
still exist at processing time). SourceDelete passes the name explicitly
since the source is deleted before the async event is processed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-03-26 09:53:26 +08:00
parent 0b7f2401bd
commit 9d787e452b
2 changed files with 161 additions and 122 deletions

View File

@ -15,6 +15,7 @@ pub enum AppEvent {
}, },
SourceDelete { SourceDelete {
source_id: Uuid, source_id: Uuid,
source_name: String,
}, },
PointCreateBatch { PointCreateBatch {
source_id: Uuid, source_id: Uuid,
@ -159,7 +160,7 @@ async fn handle_control_event(
tracing::error!("Failed to reconnect source {}: {}", source_id, e); tracing::error!("Failed to reconnect source {}: {}", source_id, e);
} }
} }
AppEvent::SourceDelete { source_id } => { AppEvent::SourceDelete { source_id, .. } => {
tracing::info!("Processing SourceDelete event for {}", source_id); tracing::info!("Processing SourceDelete event for {}", source_id);
if let Err(e) = connection_manager.disconnect(source_id).await { if let Err(e) = connection_manager.disconnect(source_id).await {
tracing::error!("Failed to disconnect from source {}: {}", source_id, e); tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
@ -253,127 +254,165 @@ async fn handle_control_event(
} }
} }
async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn persist_event_if_needed( async fn persist_event_if_needed(
event: &AppEvent, event: &AppEvent,
pool: &sqlx::PgPool, pool: &sqlx::PgPool,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>, ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
) { ) {
let record = match event { let record = match event {
AppEvent::SourceCreate { source_id } => Some(( AppEvent::SourceCreate { source_id } => {
"source.created", let name = fetch_source_name(pool, *source_id).await;
"info", Some((
None, "source.created", "info",
None, None, None, Some(*source_id),
Some(*source_id), format!("数据源【{}】已创建", name),
format!("Source {} created", source_id), serde_json::json!({ "source_id": source_id }),
))
}
AppEvent::SourceUpdate { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"source.updated", "info",
None, None, Some(*source_id),
format!("数据源【{}】已更新", name),
serde_json::json!({ "source_id": source_id }),
))
}
AppEvent::SourceDelete { source_id, source_name } => Some((
"source.deleted", "warn",
None, None, None,
format!("数据源【{}】已删除", source_name),
serde_json::json!({ "source_id": source_id }), serde_json::json!({ "source_id": source_id }),
)), )),
AppEvent::SourceUpdate { source_id } => Some(( AppEvent::PointCreateBatch { source_id, point_ids } => {
"source.updated", let name = fetch_source_name(pool, *source_id).await;
"info", Some((
None, "point.batch_created", "info",
None, None, None, Some(*source_id),
Some(*source_id), format!("批量创建 {} 个测点(数据源:{}", point_ids.len(), name),
format!("Source {} updated", source_id),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::SourceDelete { source_id } => Some((
"source.deleted",
"warn",
None,
None,
None,
format!("Source {} deleted", source_id),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::PointCreateBatch { source_id, point_ids } => Some((
"point.batch_created",
"info",
None,
None,
Some(*source_id),
format!("{} points created for source {}", point_ids.len(), source_id),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
)), ))
AppEvent::PointDeleteBatch { source_id, point_ids } => Some(( }
"point.batch_deleted", AppEvent::PointDeleteBatch { source_id, point_ids } => {
"warn", let name = fetch_source_name(pool, *source_id).await;
None, Some((
None, "point.batch_deleted", "warn",
Some(*source_id), None, None, Some(*source_id),
format!("{} points deleted for source {}", point_ids.len(), source_id), format!("批量删除 {} 个测点(数据源:{}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
)), ))
AppEvent::EquipmentStartCommandSent { }
equipment_id, AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => {
unit_id, let code = fetch_equipment_code(pool, *equipment_id).await;
point_id, Some((
} => Some(( "equipment.start_command_sent", "info",
"equipment.start_command_sent", *unit_id, Some(*equipment_id), None,
"info", format!("已发送启动指令(设备:{}", code),
*unit_id,
Some(*equipment_id),
None,
format!("Start command sent to equipment {}", equipment_id),
serde_json::json!({ serde_json::json!({
"equipment_id": equipment_id, "equipment_id": equipment_id,
"unit_id": unit_id, "unit_id": unit_id,
"point_id": point_id "point_id": point_id
}), }),
)), ))
AppEvent::EquipmentStopCommandSent { }
equipment_id, AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id } => {
unit_id, let code = fetch_equipment_code(pool, *equipment_id).await;
point_id, Some((
} => Some(( "equipment.stop_command_sent", "info",
"equipment.stop_command_sent", *unit_id, Some(*equipment_id), None,
"info", format!("已发送停止指令(设备:{}", code),
*unit_id,
Some(*equipment_id),
None,
format!("Stop command sent to equipment {}", equipment_id),
serde_json::json!({ serde_json::json!({
"equipment_id": equipment_id, "equipment_id": equipment_id,
"unit_id": unit_id, "unit_id": unit_id,
"point_id": point_id "point_id": point_id
}), }),
)), ))
AppEvent::AutoControlStarted { unit_id } => Some(( }
AppEvent::AutoControlStarted { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.auto_control_started", "info", "unit.auto_control_started", "info",
Some(*unit_id), None, None, Some(*unit_id), None, None,
format!("Auto control started for unit {}", unit_id), format!("已启动自动控制(单元:{}", code),
serde_json::json!({ "unit_id": unit_id }), serde_json::json!({ "unit_id": unit_id }),
)), ))
AppEvent::AutoControlStopped { unit_id } => Some(( }
AppEvent::AutoControlStopped { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.auto_control_stopped", "info", "unit.auto_control_stopped", "info",
Some(*unit_id), None, None, Some(*unit_id), None, None,
format!("Auto control stopped for unit {}", unit_id), format!("已停止自动控制(单元:{}", code),
serde_json::json!({ "unit_id": unit_id }), serde_json::json!({ "unit_id": unit_id }),
)), ))
AppEvent::FaultLocked { unit_id, equipment_id } => Some(( }
AppEvent::FaultLocked { unit_id, equipment_id } => {
let unit_code = fetch_unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"unit.fault_locked", "error", "unit.fault_locked", "error",
Some(*unit_id), Some(*equipment_id), None, Some(*unit_id), Some(*equipment_id), None,
format!("Unit {} fault locked by equipment {}", unit_id, equipment_id), format!("单元【{}】发生故障锁定,触发设备:{}", unit_code, eq_code),
serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
)), ))
AppEvent::FaultAcked { unit_id } => Some(( }
AppEvent::FaultAcked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.fault_acked", "info", "unit.fault_acked", "info",
Some(*unit_id), None, None, Some(*unit_id), None, None,
format!("Unit {} fault acknowledged", unit_id), format!("单元【{}】故障已人工确认", code),
serde_json::json!({ "unit_id": unit_id }), serde_json::json!({ "unit_id": unit_id }),
)), ))
AppEvent::CommLocked { unit_id } => Some(( }
AppEvent::CommLocked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.comm_locked", "warn", "unit.comm_locked", "warn",
Some(*unit_id), None, None, Some(*unit_id), None, None,
format!("Unit {} communication locked", unit_id), format!("单元【{}】通讯中断", code),
serde_json::json!({ "unit_id": unit_id }), serde_json::json!({ "unit_id": unit_id }),
)), ))
AppEvent::CommRecovered { unit_id } => Some(( }
AppEvent::CommRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.comm_recovered", "info", "unit.comm_recovered", "info",
Some(*unit_id), None, None, Some(*unit_id), None, None,
format!("Unit {} communication recovered", unit_id), format!("单元【{}】通讯恢复", code),
serde_json::json!({ "unit_id": unit_id }), serde_json::json!({ "unit_id": unit_id }),
)), ))
}
AppEvent::UnitStateChanged { .. } => None, AppEvent::UnitStateChanged { .. } => None,
AppEvent::PointNewValue(_) => None, AppEvent::PointNewValue(_) => None,
}; };

View File

@ -312,19 +312,19 @@ pub async fn delete_source(
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool; let pool = &state.pool;
// 删除source let source_name = sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
let result = sqlx::query("DELETE FROM source WHERE id = $1") .bind(source_id)
.fetch_optional(pool)
.await?
.ok_or_else(|| ApiErr::NotFound(format!("Source with id {} not found", source_id), None))?;
sqlx::query("DELETE FROM source WHERE id = $1")
.bind(source_id) .bind(source_id)
.execute(pool) .execute(pool)
.await?; .await?;
// 检查是否删除了记录
if result.rows_affected() == 0 {
return Err(ApiErr::NotFound(format!("Source with id {} not found", source_id), None));
}
// 触发 SourceDelete 事件 // 触发 SourceDelete 事件
let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id }); let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id, source_name });
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }