diff --git a/crates/app_feeder_distributor/src/event.rs b/crates/app_feeder_distributor/src/event.rs index 6db4fef..60187ec 100644 --- a/crates/app_feeder_distributor/src/event.rs +++ b/crates/app_feeder_distributor/src/event.rs @@ -118,6 +118,8 @@ async fn handle_control_event( unit_id: *unit_id, equipment_id: Some(*equipment_id), source_id: None, + subject_type: None, + subject_id: None, message: format!("Start command sent to equipment {}", code), payload: serde_json::json!({ "equipment_id": equipment_id, @@ -138,6 +140,8 @@ async fn handle_control_event( unit_id: *unit_id, equipment_id: Some(*equipment_id), source_id: None, + subject_type: None, + subject_id: None, message: format!("Stop command sent to equipment {}", code), payload: serde_json::json!({ "equipment_id": equipment_id, @@ -154,6 +158,8 @@ async fn handle_control_event( unit_id: Some(*unit_id), equipment_id: None, source_id: None, + subject_type: None, + subject_id: None, message: format!("Auto control started for unit {}", code), payload: serde_json::json!({ "unit_id": unit_id }), }) @@ -166,6 +172,8 @@ async fn handle_control_event( unit_id: Some(*unit_id), equipment_id: None, source_id: None, + subject_type: None, + subject_id: None, message: format!("Auto control stopped for unit {}", code), payload: serde_json::json!({ "unit_id": unit_id }), }) @@ -182,6 +190,8 @@ async fn handle_control_event( unit_id: Some(*unit_id), equipment_id: Some(*equipment_id), source_id: None, + subject_type: None, + subject_id: None, message: format!( "Fault locked for unit {} by equipment {}", unit_code, eq_code @@ -197,6 +207,8 @@ async fn handle_control_event( unit_id: Some(*unit_id), equipment_id: None, source_id: None, + subject_type: None, + subject_id: None, message: format!("Fault acknowledged for unit {}", code), payload: serde_json::json!({ "unit_id": unit_id }), }) @@ -209,6 +221,8 @@ async fn handle_control_event( unit_id: Some(*unit_id), equipment_id: None, source_id: None, + subject_type: None, + subject_id: None, message: format!("Communication locked for unit {}", code), payload: serde_json::json!({ "unit_id": unit_id }), }) @@ -221,6 +235,8 @@ async fn handle_control_event( unit_id: Some(*unit_id), equipment_id: None, source_id: None, + subject_type: None, + subject_id: None, message: format!("Communication recovered for unit {}", code), payload: serde_json::json!({ "unit_id": unit_id }), }) @@ -237,6 +253,8 @@ async fn handle_control_event( unit_id: Some(*unit_id), equipment_id: Some(*equipment_id), source_id: None, + subject_type: None, + subject_id: None, message: format!( "Unit {} switched to local control via equipment {}", unit_code, eq_code @@ -252,6 +270,8 @@ async fn handle_control_event( unit_id: Some(*unit_id), equipment_id: None, source_id: None, + subject_type: None, + subject_id: None, message: format!( "Unit {} returned to remote control; auto control requires manual restart", code diff --git a/crates/app_operation_system/src/event.rs b/crates/app_operation_system/src/event.rs index 8b7585d..c9680ef 100644 --- a/crates/app_operation_system/src/event.rs +++ b/crates/app_operation_system/src/event.rs @@ -11,9 +11,10 @@ const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024; /// Operation-system business events. /// -/// Variants here will grow as engine phases land. Each variant maps to a -/// row in the `event` table (via `record_event`) and follows the `ops.*` -/// namespace agreed in the design doc §8.1. +/// Each variant maps to a row in the `event` table (via `record_event`) and +/// follows the `ops.*` namespace from design doc §8.1. Every record carries +/// `subject_type` + `subject_id` so the front-end can filter the timeline +/// for one segment / station without joining on event_type strings. #[derive(Debug, Clone)] pub enum AppEvent { SegmentAutoStarted { @@ -99,6 +100,26 @@ impl EventManager { } } +fn segment_event( + event_type: &'static str, + level: &'static str, + segment_id: Uuid, + message: String, + payload: serde_json::Value, +) -> EventInsert { + EventInsert { + event_type, + level, + unit_id: None, + equipment_id: None, + source_id: None, + subject_type: Some("segment"), + subject_id: Some(segment_id), + message, + payload, + } +} + async fn handle_event( event: AppEvent, pool: &sqlx::PgPool, @@ -106,93 +127,75 @@ async fn handle_event( _metadata: &MetadataCache, ) { let record: Option = match &event { - AppEvent::SegmentAutoStarted { segment_id } => Some(EventInsert { - event_type: "ops.segment.auto_started", - level: "info", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Segment {} auto control started", segment_id), - payload: serde_json::json!({ "segment_id": segment_id }), - }), - AppEvent::SegmentAutoStopped { segment_id } => Some(EventInsert { - event_type: "ops.segment.auto_stopped", - level: "info", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Segment {} auto control stopped", segment_id), - payload: serde_json::json!({ "segment_id": segment_id }), - }), + AppEvent::SegmentAutoStarted { segment_id } => Some(segment_event( + "ops.segment.auto_started", + "info", + *segment_id, + format!("Segment {} auto control started", segment_id), + serde_json::json!({ "segment_id": segment_id }), + )), + AppEvent::SegmentAutoStopped { segment_id } => Some(segment_event( + "ops.segment.auto_stopped", + "info", + *segment_id, + format!("Segment {} auto control stopped", segment_id), + serde_json::json!({ "segment_id": segment_id }), + )), AppEvent::SegmentStepAdvanced { segment_id, step_no, - } => Some(EventInsert { - event_type: "ops.segment.step_advanced", - level: "info", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Segment {} advanced to step {}", segment_id, step_no), - payload: serde_json::json!({ "segment_id": segment_id, "step_no": step_no }), - }), - AppEvent::SegmentCompleted { segment_id } => Some(EventInsert { - event_type: "ops.segment.completed", - level: "info", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Segment {} completed", segment_id), - payload: serde_json::json!({ "segment_id": segment_id }), - }), - AppEvent::SegmentBlocked { segment_id, reason } => Some(EventInsert { - event_type: "ops.segment.blocked", - level: "warn", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Segment {} blocked: {}", segment_id, reason), - payload: serde_json::json!({ "segment_id": segment_id, "reason": reason }), - }), + } => Some(segment_event( + "ops.segment.step_advanced", + "info", + *segment_id, + format!("Segment {} advanced to step {}", segment_id, step_no), + serde_json::json!({ "segment_id": segment_id, "step_no": step_no }), + )), + AppEvent::SegmentCompleted { segment_id } => Some(segment_event( + "ops.segment.completed", + "info", + *segment_id, + format!("Segment {} completed", segment_id), + serde_json::json!({ "segment_id": segment_id }), + )), + AppEvent::SegmentBlocked { segment_id, reason } => Some(segment_event( + "ops.segment.blocked", + "warn", + *segment_id, + format!("Segment {} blocked: {}", segment_id, reason), + serde_json::json!({ "segment_id": segment_id, "reason": reason }), + )), AppEvent::SegmentFaultLocked { segment_id, message, - } => Some(EventInsert { - event_type: "ops.segment.fault_locked", - level: "error", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Segment {} fault locked: {}", segment_id, message), - payload: serde_json::json!({ "segment_id": segment_id, "message": message }), - }), - AppEvent::SegmentFaultAcked { segment_id } => Some(EventInsert { - event_type: "ops.segment.fault_acked", - level: "info", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Segment {} fault acknowledged", segment_id), - payload: serde_json::json!({ "segment_id": segment_id }), - }), - AppEvent::SegmentCommLocked { segment_id } => Some(EventInsert { - event_type: "ops.segment.comm_locked", - level: "warn", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Segment {} communication locked", segment_id), - payload: serde_json::json!({ "segment_id": segment_id }), - }), - AppEvent::SegmentCommRecovered { segment_id } => Some(EventInsert { - event_type: "ops.segment.comm_recovered", - level: "info", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Segment {} communication recovered", segment_id), - payload: serde_json::json!({ "segment_id": segment_id }), - }), + } => Some(segment_event( + "ops.segment.fault_locked", + "error", + *segment_id, + format!("Segment {} fault locked: {}", segment_id, message), + serde_json::json!({ "segment_id": segment_id, "message": message }), + )), + AppEvent::SegmentFaultAcked { segment_id } => Some(segment_event( + "ops.segment.fault_acked", + "info", + *segment_id, + format!("Segment {} fault acknowledged", segment_id), + serde_json::json!({ "segment_id": segment_id }), + )), + AppEvent::SegmentCommLocked { segment_id } => Some(segment_event( + "ops.segment.comm_locked", + "warn", + *segment_id, + format!("Segment {} communication locked", segment_id), + serde_json::json!({ "segment_id": segment_id }), + )), + AppEvent::SegmentCommRecovered { segment_id } => Some(segment_event( + "ops.segment.comm_recovered", + "info", + *segment_id, + format!("Segment {} communication recovered", segment_id), + serde_json::json!({ "segment_id": segment_id }), + )), AppEvent::StationStateChanged { station_id, presence, @@ -203,6 +206,8 @@ async fn handle_event( unit_id: None, equipment_id: None, source_id: None, + subject_type: Some("station"), + subject_id: Some(*station_id), message: format!( "Station {} state changed (presence={}, vacancy={})", station_id, presence, vacancy @@ -216,48 +221,36 @@ async fn handle_event( AppEvent::AlarmActionTimeout { segment_id, step_no, - } => Some(EventInsert { - event_type: "ops.alarm.action_timeout", - level: "error", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!( - "Action timeout on segment {} step {}", - segment_id, step_no - ), - payload: serde_json::json!({ "segment_id": segment_id, "step_no": step_no }), - }), + } => Some(segment_event( + "ops.alarm.action_timeout", + "error", + *segment_id, + format!("Action timeout on segment {} step {}", segment_id, step_no), + serde_json::json!({ "segment_id": segment_id, "step_no": step_no }), + )), AppEvent::AlarmSignalConflict { segment_id, message, - } => Some(EventInsert { - event_type: "ops.alarm.signal_conflict", - level: "error", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!("Signal conflict on segment {}: {}", segment_id, message), - payload: serde_json::json!({ "segment_id": segment_id, "message": message }), - }), + } => Some(segment_event( + "ops.alarm.signal_conflict", + "error", + *segment_id, + format!("Signal conflict on segment {}: {}", segment_id, message), + serde_json::json!({ "segment_id": segment_id, "message": message }), + )), AppEvent::AlarmResourceBusy { segment_id, resource_key, - } => Some(EventInsert { - event_type: "ops.alarm.resource_busy", - level: "warn", - unit_id: None, - equipment_id: None, - source_id: None, - message: format!( - "Resource {} busy for segment {}", - resource_key, segment_id - ), - payload: serde_json::json!({ + } => Some(segment_event( + "ops.alarm.resource_busy", + "warn", + *segment_id, + format!("Resource {} busy for segment {}", resource_key, segment_id), + serde_json::json!({ "segment_id": segment_id, "resource_key": resource_key }), - }), + )), }; if let Some(record) = record { diff --git a/crates/app_operation_system/src/handler.rs b/crates/app_operation_system/src/handler.rs index cd013ec..af9a50a 100644 --- a/crates/app_operation_system/src/handler.rs +++ b/crates/app_operation_system/src/handler.rs @@ -1,5 +1,6 @@ pub mod control; pub mod doc; +pub mod event; pub mod runtime; pub mod segment; pub mod station; diff --git a/crates/app_operation_system/src/handler/event.rs b/crates/app_operation_system/src/handler/event.rs new file mode 100644 index 0000000..4ecad73 --- /dev/null +++ b/crates/app_operation_system/src/handler/event.rs @@ -0,0 +1,67 @@ +//! Event timeline endpoint with subject filtering (design doc §9.3). +//! +//! `event_type` matches exact value or prefix when `event_type=ops.` style is +//! requested. `subject_type` / `subject_id` use the columns added by the P1 +//! migration so the front-end can show a per-segment / per-station timeline +//! without parsing event_type strings. + +use axum::{extract::{Query, State}, response::IntoResponse, Json}; +use serde::Deserialize; +use uuid::Uuid; +use validator::Validate; + +use plc_platform_core::{ + service::EventFilter, + util::{ + pagination::{PaginatedResponse, PaginationParams}, + response::ApiErr, + }, +}; + +use crate::AppState; + +#[derive(Debug, Deserialize, Validate)] +pub struct GetEventListQuery { + #[validate(length(min = 1, max = 100))] + pub event_type: Option, + #[validate(length(min = 1, max = 100))] + pub event_type_prefix: Option, + #[validate(length(min = 1, max = 32))] + pub subject_type: Option, + pub subject_id: Option, + #[serde(flatten)] + pub pagination: PaginationParams, +} + +pub async fn get_event_list( + State(state): State, + Query(query): Query, +) -> Result { + query.validate()?; + + let filter = EventFilter { + unit_id: None, + event_type: query.event_type.as_deref(), + event_type_prefix: query.event_type_prefix.as_deref(), + subject_type: query.subject_type.as_deref(), + subject_id: query.subject_id, + }; + + let total = + plc_platform_core::service::get_events_count_filtered(&state.platform.pool, &filter) + .await?; + let data = plc_platform_core::service::get_events_paginated_filtered( + &state.platform.pool, + &filter, + query.pagination.page_size, + query.pagination.offset(), + ) + .await?; + + Ok(Json(PaginatedResponse::new( + data, + total, + query.pagination.page, + query.pagination.page_size, + ))) +} diff --git a/crates/app_operation_system/src/router.rs b/crates/app_operation_system/src/router.rs index b055060..ee177bf 100644 --- a/crates/app_operation_system/src/router.rs +++ b/crates/app_operation_system/src/router.rs @@ -109,6 +109,10 @@ pub fn build_router(state: AppState) -> Router { .route( "/api/runtime/station/{station_id}", get(crate::handler::runtime::get_station_runtime), + ) + .route( + "/api/event", + get(crate::handler::event::get_event_list), ); let ops_routes = Router::new() diff --git a/crates/app_operation_system/tests/router_smoke.rs b/crates/app_operation_system/tests/router_smoke.rs index fe51747..9b10480 100644 --- a/crates/app_operation_system/tests/router_smoke.rs +++ b/crates/app_operation_system/tests/router_smoke.rs @@ -92,3 +92,20 @@ async fn operation_system_router_exposes_control_batch_routes() { assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); } + +/// Event timeline endpoint is GET-only — POST should be METHOD_NOT_ALLOWED. +#[tokio::test] +async fn operation_system_router_exposes_event_timeline() { + let response = build_app() + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/api/event") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("router should answer request"); + + assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); +} diff --git a/crates/plc_platform_core/src/event.rs b/crates/plc_platform_core/src/event.rs index 6ac8b5f..dbd2b4f 100644 --- a/crates/plc_platform_core/src/event.rs +++ b/crates/plc_platform_core/src/event.rs @@ -108,6 +108,10 @@ pub struct EventInsert { pub unit_id: Option, pub equipment_id: Option, pub source_id: Option, + /// Generic owner-type tag (e.g. "segment" / "station") used by ops business + /// events. Design doc §4.2.8 attribution columns. + pub subject_type: Option<&'static str>, + pub subject_id: Option, pub message: String, pub payload: Value, } @@ -131,8 +135,11 @@ pub async fn record_event( let inserted = sqlx::query_as::<_, EventRecord>( r#" - INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO event ( + event_type, level, unit_id, equipment_id, source_id, + subject_type, subject_id, message, payload + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING * "#, ) @@ -141,6 +148,8 @@ pub async fn record_event( .bind(event.unit_id) .bind(event.equipment_id) .bind(event.source_id) + .bind(event.subject_type) + .bind(event.subject_id) .bind(event.message) .bind(sqlx::types::Json(envelope.payload)) .fetch_one(pool) @@ -176,6 +185,8 @@ pub async fn record_platform_event( unit_id: None, equipment_id: None, source_id: Some(*source_id), + subject_type: Some("source"), + subject_id: Some(*source_id), message: format!("Source {} created", name), payload: serde_json::json!({ "source_id": source_id }), }) @@ -188,6 +199,8 @@ pub async fn record_platform_event( unit_id: None, equipment_id: None, source_id: Some(*source_id), + subject_type: Some("source"), + subject_id: Some(*source_id), message: format!("Source {} updated", name), payload: serde_json::json!({ "source_id": source_id }), }) @@ -201,6 +214,8 @@ pub async fn record_platform_event( unit_id: None, equipment_id: None, source_id: None, + subject_type: Some("source"), + subject_id: Some(*source_id), message: format!("Source {} deleted", source_name), payload: serde_json::json!({ "source_id": source_id }), }), @@ -215,6 +230,8 @@ pub async fn record_platform_event( unit_id: None, equipment_id: None, source_id: Some(*source_id), + subject_type: Some("source"), + subject_id: Some(*source_id), message: format!("Created {} points for source {}", point_ids.len(), name), payload: serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), }) @@ -230,6 +247,8 @@ pub async fn record_platform_event( unit_id: None, equipment_id: None, source_id: Some(*source_id), + subject_type: Some("source"), + subject_id: Some(*source_id), message: format!("Deleted {} points for source {}", point_ids.len(), name), payload: serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), }) diff --git a/crates/plc_platform_core/src/model.rs b/crates/plc_platform_core/src/model.rs index 82efe78..97c5088 100644 --- a/crates/plc_platform_core/src/model.rs +++ b/crates/plc_platform_core/src/model.rs @@ -142,6 +142,8 @@ pub struct EventRecord { pub unit_id: Option, pub equipment_id: Option, pub source_id: Option, + pub subject_type: Option, + pub subject_id: Option, pub message: String, pub payload: Option>, #[serde(serialize_with = "utc_to_local_str")] diff --git a/crates/plc_platform_core/src/service/control.rs b/crates/plc_platform_core/src/service/control.rs index 18dc713..9167600 100644 --- a/crates/plc_platform_core/src/service/control.rs +++ b/crates/plc_platform_core/src/service/control.rs @@ -12,21 +12,46 @@ pub struct EquipmentRolePoint { pub signal_role: String, } +#[derive(Debug, Default, Clone)] +pub struct EventFilter<'a> { + pub unit_id: Option, + pub event_type: Option<&'a str>, + /// `event_type` LIKE prefix, e.g. `ops.` matches all ops events. + pub event_type_prefix: Option<&'a str>, + pub subject_type: Option<&'a str>, + pub subject_id: Option, +} + +fn apply_event_filters<'a>(qb: &mut QueryBuilder<'a, sqlx::Postgres>, filter: &EventFilter<'a>) { + if let Some(unit_id) = filter.unit_id { + qb.push(" AND unit_id = ").push_bind(unit_id); + } + if let Some(event_type) = filter.event_type { + qb.push(" AND event_type = ").push_bind(event_type); + } + if let Some(prefix) = filter.event_type_prefix { + let pattern = format!("{}%", prefix); + qb.push(" AND event_type LIKE ").push_bind(pattern); + } + if let Some(subject_type) = filter.subject_type { + qb.push(" AND subject_type = ").push_bind(subject_type); + } + if let Some(subject_id) = filter.subject_id { + qb.push(" AND subject_id = ").push_bind(subject_id); + } +} + pub async fn get_events_count( pool: &PgPool, unit_id: Option, event_type: Option<&str>, ) -> Result { - let mut qb = QueryBuilder::new("SELECT COUNT(*)::BIGINT FROM event WHERE 1 = 1"); - - if let Some(unit_id) = unit_id { - qb.push(" AND unit_id = ").push_bind(unit_id); - } - if let Some(event_type) = event_type { - qb.push(" AND event_type = ").push_bind(event_type); - } - - qb.build_query_scalar().fetch_one(pool).await + let filter = EventFilter { + unit_id, + event_type, + ..EventFilter::default() + }; + get_events_count_filtered(pool, &filter).await } pub async fn get_events_paginated( @@ -36,14 +61,31 @@ pub async fn get_events_paginated( page_size: i32, offset: u32, ) -> Result, sqlx::Error> { - let mut qb = QueryBuilder::new("SELECT * FROM event WHERE 1 = 1"); + let filter = EventFilter { + unit_id, + event_type, + ..EventFilter::default() + }; + get_events_paginated_filtered(pool, &filter, page_size, offset).await +} - if let Some(unit_id) = unit_id { - qb.push(" AND unit_id = ").push_bind(unit_id); - } - if let Some(event_type) = event_type { - qb.push(" AND event_type = ").push_bind(event_type); - } +pub async fn get_events_count_filtered( + pool: &PgPool, + filter: &EventFilter<'_>, +) -> Result { + let mut qb = QueryBuilder::new("SELECT COUNT(*)::BIGINT FROM event WHERE 1 = 1"); + apply_event_filters(&mut qb, filter); + qb.build_query_scalar().fetch_one(pool).await +} + +pub async fn get_events_paginated_filtered( + pool: &PgPool, + filter: &EventFilter<'_>, + page_size: i32, + offset: u32, +) -> Result, sqlx::Error> { + let mut qb = QueryBuilder::new("SELECT * FROM event WHERE 1 = 1"); + apply_event_filters(&mut qb, filter); qb.push(" ORDER BY created_at DESC"); diff --git a/docs/api-ops.md b/docs/api-ops.md index a4f1b9f..0ddc42a 100644 --- a/docs/api-ops.md +++ b/docs/api-ops.md @@ -62,6 +62,11 @@ - `GET /api/runtime/overview` — 所有段 + 资源占用快照 - `GET /api/runtime/segment/{id}` — 单段配置 + runtime - `GET /api/runtime/station/{id}` — 工位信号 + 最新点位监控值 +- `GET /api/event` — 事件时间线,参数: + - `event_type` — 精确匹配,例如 `ops.segment.fault_locked` + - `event_type_prefix` — 前缀匹配,例如 `ops.` 拉取全部 ops 事件 + - `subject_type` / `subject_id` — 设计文档 §4.2.8 归因字段,可按段 / 工位 / 设备过滤 + - 分页参数 `page` / `page_size` ## WebSocket(§8.2)