From 9d787e452b9a7b9702060ddf04aab12e7d9fddc2 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 26 Mar 2026 09:53:26 +0800 Subject: [PATCH] feat(events): use Chinese messages with entity names Event messages are now stored and displayed in Chinese. Names/codes are resolved via lightweight DB lookups in persist_event_if_needed (entities still exist at processing time). SourceDelete passes the name explicitly since the source is deleted before the async event is processed. Co-Authored-By: Claude Sonnet 4.6 --- src/event.rs | 267 ++++++++++++++++++++++++------------------ src/handler/source.rs | 16 +-- 2 files changed, 161 insertions(+), 122 deletions(-) diff --git a/src/event.rs b/src/event.rs index c155761..9c3a026 100644 --- a/src/event.rs +++ b/src/event.rs @@ -15,6 +15,7 @@ pub enum AppEvent { }, SourceDelete { source_id: Uuid, + source_name: String, }, PointCreateBatch { source_id: Uuid, @@ -159,7 +160,7 @@ async fn handle_control_event( tracing::error!("Failed to reconnect source {}: {}", source_id, e); } } - AppEvent::SourceDelete { source_id } => { + AppEvent::SourceDelete { source_id, .. } => { tracing::info!("Processing SourceDelete event for {}", source_id); if let Err(e) = connection_manager.disconnect(source_id).await { tracing::error!("Failed to disconnect from source {}: {}", source_id, e); @@ -253,127 +254,165 @@ async fn handle_control_event( } } +async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String { + sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await + .ok() + .flatten() + .unwrap_or_else(|| id.to_string()) +} + +async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String { + sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await + .ok() + .flatten() + .unwrap_or_else(|| id.to_string()) +} + +async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String { + sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await + .ok() + .flatten() + .unwrap_or_else(|| id.to_string()) +} + async fn persist_event_if_needed( event: &AppEvent, pool: &sqlx::PgPool, ws_manager: Option<&std::sync::Arc>, ) { let record = match event { - AppEvent::SourceCreate { source_id } => Some(( - "source.created", - "info", - None, - None, - Some(*source_id), - format!("Source {} created", source_id), + AppEvent::SourceCreate { source_id } => { + let name = fetch_source_name(pool, *source_id).await; + Some(( + "source.created", "info", + None, None, Some(*source_id), + format!("数据源【{}】已创建", name), + serde_json::json!({ "source_id": source_id }), + )) + } + AppEvent::SourceUpdate { source_id } => { + let name = fetch_source_name(pool, *source_id).await; + Some(( + "source.updated", "info", + None, None, Some(*source_id), + format!("数据源【{}】已更新", name), + serde_json::json!({ "source_id": source_id }), + )) + } + AppEvent::SourceDelete { source_id, source_name } => Some(( + "source.deleted", "warn", + None, None, None, + format!("数据源【{}】已删除", source_name), serde_json::json!({ "source_id": source_id }), )), - AppEvent::SourceUpdate { source_id } => Some(( - "source.updated", - "info", - None, - None, - Some(*source_id), - format!("Source {} updated", source_id), - serde_json::json!({ "source_id": source_id }), - )), - AppEvent::SourceDelete { source_id } => Some(( - "source.deleted", - "warn", - None, - None, - None, - format!("Source {} deleted", source_id), - serde_json::json!({ "source_id": source_id }), - )), - AppEvent::PointCreateBatch { source_id, point_ids } => Some(( - "point.batch_created", - "info", - None, - None, - Some(*source_id), - format!("{} points created for source {}", point_ids.len(), source_id), - serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), - )), - AppEvent::PointDeleteBatch { source_id, point_ids } => Some(( - "point.batch_deleted", - "warn", - None, - None, - Some(*source_id), - format!("{} points deleted for source {}", point_ids.len(), source_id), - serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), - )), - AppEvent::EquipmentStartCommandSent { - equipment_id, - unit_id, - point_id, - } => Some(( - "equipment.start_command_sent", - "info", - *unit_id, - Some(*equipment_id), - None, - format!("Start command sent to equipment {}", equipment_id), - serde_json::json!({ - "equipment_id": equipment_id, - "unit_id": unit_id, - "point_id": point_id - }), - )), - AppEvent::EquipmentStopCommandSent { - equipment_id, - unit_id, - point_id, - } => Some(( - "equipment.stop_command_sent", - "info", - *unit_id, - Some(*equipment_id), - None, - format!("Stop command sent to equipment {}", equipment_id), - serde_json::json!({ - "equipment_id": equipment_id, - "unit_id": unit_id, - "point_id": point_id - }), - )), - AppEvent::AutoControlStarted { unit_id } => Some(( - "unit.auto_control_started", "info", - Some(*unit_id), None, None, - format!("Auto control started for unit {}", unit_id), - serde_json::json!({ "unit_id": unit_id }), - )), - AppEvent::AutoControlStopped { unit_id } => Some(( - "unit.auto_control_stopped", "info", - Some(*unit_id), None, None, - format!("Auto control stopped for unit {}", unit_id), - serde_json::json!({ "unit_id": unit_id }), - )), - AppEvent::FaultLocked { unit_id, equipment_id } => Some(( - "unit.fault_locked", "error", - Some(*unit_id), Some(*equipment_id), None, - format!("Unit {} fault locked by equipment {}", unit_id, equipment_id), - serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), - )), - AppEvent::FaultAcked { unit_id } => Some(( - "unit.fault_acked", "info", - Some(*unit_id), None, None, - format!("Unit {} fault acknowledged", unit_id), - serde_json::json!({ "unit_id": unit_id }), - )), - AppEvent::CommLocked { unit_id } => Some(( - "unit.comm_locked", "warn", - Some(*unit_id), None, None, - format!("Unit {} communication locked", unit_id), - serde_json::json!({ "unit_id": unit_id }), - )), - AppEvent::CommRecovered { unit_id } => Some(( - "unit.comm_recovered", "info", - Some(*unit_id), None, None, - format!("Unit {} communication recovered", unit_id), - serde_json::json!({ "unit_id": unit_id }), - )), + AppEvent::PointCreateBatch { source_id, point_ids } => { + let name = fetch_source_name(pool, *source_id).await; + Some(( + "point.batch_created", "info", + None, None, Some(*source_id), + format!("批量创建 {} 个测点(数据源:{})", point_ids.len(), name), + serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), + )) + } + AppEvent::PointDeleteBatch { source_id, point_ids } => { + let name = fetch_source_name(pool, *source_id).await; + Some(( + "point.batch_deleted", "warn", + None, None, Some(*source_id), + format!("批量删除 {} 个测点(数据源:{})", point_ids.len(), name), + serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), + )) + } + AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => { + let code = fetch_equipment_code(pool, *equipment_id).await; + Some(( + "equipment.start_command_sent", "info", + *unit_id, Some(*equipment_id), None, + format!("已发送启动指令(设备:{})", code), + serde_json::json!({ + "equipment_id": equipment_id, + "unit_id": unit_id, + "point_id": point_id + }), + )) + } + AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id } => { + let code = fetch_equipment_code(pool, *equipment_id).await; + Some(( + "equipment.stop_command_sent", "info", + *unit_id, Some(*equipment_id), None, + format!("已发送停止指令(设备:{})", code), + serde_json::json!({ + "equipment_id": equipment_id, + "unit_id": unit_id, + "point_id": point_id + }), + )) + } + AppEvent::AutoControlStarted { unit_id } => { + let code = fetch_unit_code(pool, *unit_id).await; + Some(( + "unit.auto_control_started", "info", + Some(*unit_id), None, None, + format!("已启动自动控制(单元:{})", code), + serde_json::json!({ "unit_id": unit_id }), + )) + } + AppEvent::AutoControlStopped { unit_id } => { + let code = fetch_unit_code(pool, *unit_id).await; + Some(( + "unit.auto_control_stopped", "info", + Some(*unit_id), None, None, + format!("已停止自动控制(单元:{})", code), + serde_json::json!({ "unit_id": unit_id }), + )) + } + AppEvent::FaultLocked { unit_id, equipment_id } => { + let unit_code = fetch_unit_code(pool, *unit_id).await; + let eq_code = fetch_equipment_code(pool, *equipment_id).await; + Some(( + "unit.fault_locked", "error", + Some(*unit_id), Some(*equipment_id), None, + format!("单元【{}】发生故障锁定,触发设备:{}", unit_code, eq_code), + serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }), + )) + } + AppEvent::FaultAcked { unit_id } => { + let code = fetch_unit_code(pool, *unit_id).await; + Some(( + "unit.fault_acked", "info", + Some(*unit_id), None, None, + format!("单元【{}】故障已人工确认", code), + serde_json::json!({ "unit_id": unit_id }), + )) + } + AppEvent::CommLocked { unit_id } => { + let code = fetch_unit_code(pool, *unit_id).await; + Some(( + "unit.comm_locked", "warn", + Some(*unit_id), None, None, + format!("单元【{}】通讯中断", code), + serde_json::json!({ "unit_id": unit_id }), + )) + } + AppEvent::CommRecovered { unit_id } => { + let code = fetch_unit_code(pool, *unit_id).await; + Some(( + "unit.comm_recovered", "info", + Some(*unit_id), None, None, + format!("单元【{}】通讯恢复", code), + serde_json::json!({ "unit_id": unit_id }), + )) + } AppEvent::UnitStateChanged { .. } => None, AppEvent::PointNewValue(_) => None, }; diff --git a/src/handler/source.rs b/src/handler/source.rs index dfb8e02..52bae8f 100644 --- a/src/handler/source.rs +++ b/src/handler/source.rs @@ -312,19 +312,19 @@ pub async fn delete_source( ) -> Result { let pool = &state.pool; - // 删除source - let result = sqlx::query("DELETE FROM source WHERE id = $1") + let source_name = sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1") + .bind(source_id) + .fetch_optional(pool) + .await? + .ok_or_else(|| ApiErr::NotFound(format!("Source with id {} not found", source_id), None))?; + + sqlx::query("DELETE FROM source WHERE id = $1") .bind(source_id) .execute(pool) .await?; - - // 检查是否删除了记录 - if result.rows_affected() == 0 { - return Err(ApiErr::NotFound(format!("Source with id {} not found", source_id), None)); - } // 触发 SourceDelete 事件 - let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id }); + let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id, source_name }); Ok(StatusCode::NO_CONTENT) }