refactor: migrate platform handlers to core, centralize routes and event persistence

- Move source/point/equipment/tag/page handlers from feeder to plc_platform_core
  using State<PlatformContext>; feeder re-exports via handler modules
- Keep batch_set_point_value in feeder (requires app-specific write key auth)
- Add PlatformEvent enum and persist_and_broadcast() in core for platform event
  persistence to DB + WebSocket broadcast
- Add PlatformContext::emit_event() that handles both sink notification and
  async persistence in one call
- Add platform_routes<S>() in core for centralized route registration;
  both feeder and ops merge it instead of duplicating route definitions
- Implement FromRef<AppState> for PlatformContext in both apps
- Add FeederPlatformEventSink adapter bridging core events to feeder's
  EventManager + ControlRuntimeStore
- Add event namespace prefixes: platform.source.created, feeder.unit.fault_locked, etc.
- Register full platform CRUD routes in ops app

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-04-21 13:45:02 +08:00
parent 093fc5035b
commit 6814e9eae9
16 changed files with 1168 additions and 935 deletions

View File

@ -8,6 +8,7 @@ use crate::{
router::build_router,
websocket::WebSocketManager,
};
use axum::extract::FromRef;
use plc_platform_core::platform_context::PlatformContext;
use tokio::sync::mpsc;
@ -19,6 +20,12 @@ pub struct AppState {
pub control_runtime: Arc<control::runtime::ControlRuntimeStore>,
}
impl FromRef<AppState> for PlatformContext {
fn from_ref(state: &AppState) -> Self {
state.platform.clone()
}
}
pub async fn run() {
dotenv::dotenv().ok();
@ -50,10 +57,14 @@ pub async fn run() {
builder.connection_manager.set_event_manager(event_manager.clone());
builder.connection_manager.set_pool_and_start_reconnect_task(Arc::new(builder.pool.clone()));
let platform = builder.build();
let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new());
let event_sink = Arc::new(crate::event::FeederPlatformEventSink::new(
event_manager.clone(),
control_runtime.clone(),
));
let platform = builder.build().with_event_sink(event_sink);
let sources = crate::service::get_all_enabled_sources(&platform.pool)
.await
.expect("Failed to fetch sources");

View File

@ -143,6 +143,52 @@ impl EventManager {
}
}
/// Adapter that bridges platform handler events to feeder's EventManager + ControlRuntime.
pub struct FeederPlatformEventSink {
event_manager: std::sync::Arc<EventManager>,
control_runtime: std::sync::Arc<crate::control::runtime::ControlRuntimeStore>,
}
impl FeederPlatformEventSink {
pub fn new(
event_manager: std::sync::Arc<EventManager>,
control_runtime: std::sync::Arc<crate::control::runtime::ControlRuntimeStore>,
) -> Self {
Self { event_manager, control_runtime }
}
}
impl plc_platform_core::platform_context::PlatformEventSink for FeederPlatformEventSink {
fn on_source_created(&self, source_id: Uuid) {
let _ = self.event_manager.send(AppEvent::SourceCreate { source_id });
}
fn on_source_updated(&self, source_id: Uuid) {
let _ = self.event_manager.send(AppEvent::SourceUpdate { source_id });
}
fn on_source_deleted(&self, source_id: Uuid, source_name: String) {
let _ = self.event_manager.send(AppEvent::SourceDelete { source_id, source_name });
}
fn on_points_created(&self, source_id: Uuid, point_ids: Vec<Uuid>) {
let _ = self.event_manager.send(AppEvent::PointCreateBatch { source_id, point_ids });
}
fn on_points_deleted(&self, source_id: Uuid, point_ids: Vec<Uuid>) {
let _ = self.event_manager.send(AppEvent::PointDeleteBatch { source_id, point_ids });
}
fn on_units_changed(&self, unit_ids: Vec<Uuid>) {
let runtime = self.control_runtime.clone();
tokio::spawn(async move {
for unit_id in unit_ids {
runtime.notify_unit(unit_id).await;
}
});
}
}
impl plc_platform_core::connection::PointEventSink for EventManager {
fn send_point_new_value(
&self,
@ -273,16 +319,6 @@ async fn handle_control_event(
}
}
async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1")
.bind(id)
@ -308,53 +344,18 @@ async fn persist_event_if_needed(
pool: &sqlx::PgPool,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
) {
let record = match event {
AppEvent::SourceCreate { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"source.created", "info",
None, None, Some(*source_id),
format!("Source {} created", name),
serde_json::json!({ "source_id": source_id }),
))
}
AppEvent::SourceUpdate { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"source.updated", "info",
None, None, Some(*source_id),
format!("Source {} updated", name),
serde_json::json!({ "source_id": source_id }),
))
}
AppEvent::SourceDelete { source_id, source_name } => Some((
"source.deleted", "warn",
None, None, None,
format!("Source {} deleted", source_name),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::PointCreateBatch { source_id, point_ids } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"point.batch_created", "info",
None, None, Some(*source_id),
format!("Created {} points for source {}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
AppEvent::PointDeleteBatch { source_id, point_ids } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"point.batch_deleted", "warn",
None, None, Some(*source_id),
format!("Deleted {} points for source {}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
let record: Option<(&str, &str, Option<Uuid>, Option<Uuid>, Option<Uuid>, String, serde_json::Value)> = match event {
// Platform events — persistence is handled by core's emit_event().
AppEvent::SourceCreate { .. } => None,
AppEvent::SourceUpdate { .. } => None,
AppEvent::SourceDelete { .. } => None,
AppEvent::PointCreateBatch { .. } => None,
AppEvent::PointDeleteBatch { .. } => None,
// Feeder-specific events — persisted here.
AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => {
let code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"equipment.start_command_sent", "info",
"feeder.equipment.start_command_sent", "info",
*unit_id, Some(*equipment_id), None,
format!("Start command sent to equipment {}", code),
serde_json::json!({
@ -367,7 +368,7 @@ async fn persist_event_if_needed(
AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id } => {
let code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"equipment.stop_command_sent", "info",
"feeder.equipment.stop_command_sent", "info",
*unit_id, Some(*equipment_id), None,
format!("Stop command sent to equipment {}", code),
serde_json::json!({
@ -380,7 +381,7 @@ async fn persist_event_if_needed(
AppEvent::AutoControlStarted { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.auto_control_started", "info",
"feeder.unit.auto_control_started", "info",
Some(*unit_id), None, None,
format!("Auto control started for unit {}", code),
serde_json::json!({ "unit_id": unit_id }),
@ -389,7 +390,7 @@ async fn persist_event_if_needed(
AppEvent::AutoControlStopped { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.auto_control_stopped", "info",
"feeder.unit.auto_control_stopped", "info",
Some(*unit_id), None, None,
format!("Auto control stopped for unit {}", code),
serde_json::json!({ "unit_id": unit_id }),
@ -399,7 +400,7 @@ async fn persist_event_if_needed(
let unit_code = fetch_unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"unit.fault_locked", "error",
"feeder.unit.fault_locked", "error",
Some(*unit_id), Some(*equipment_id), None,
format!("Fault locked for unit {} by equipment {}", unit_code, eq_code),
serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
@ -408,7 +409,7 @@ async fn persist_event_if_needed(
AppEvent::FaultAcked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.fault_acked", "info",
"feeder.unit.fault_acked", "info",
Some(*unit_id), None, None,
format!("Fault acknowledged for unit {}", code),
serde_json::json!({ "unit_id": unit_id }),
@ -417,7 +418,7 @@ async fn persist_event_if_needed(
AppEvent::CommLocked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.comm_locked", "warn",
"feeder.unit.comm_locked", "warn",
Some(*unit_id), None, None,
format!("Communication locked for unit {}", code),
serde_json::json!({ "unit_id": unit_id }),
@ -426,7 +427,7 @@ async fn persist_event_if_needed(
AppEvent::CommRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.comm_recovered", "info",
"feeder.unit.comm_recovered", "info",
Some(*unit_id), None, None,
format!("Communication recovered for unit {}", code),
serde_json::json!({ "unit_id": unit_id }),
@ -436,7 +437,7 @@ async fn persist_event_if_needed(
let unit_code = fetch_unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"unit.rem_local", "warn",
"feeder.unit.rem_local", "warn",
Some(*unit_id), Some(*equipment_id), None,
format!("Unit {} switched to local control via equipment {}", unit_code, eq_code),
serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
@ -445,7 +446,7 @@ async fn persist_event_if_needed(
AppEvent::RemRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.rem_recovered", "warn",
"feeder.unit.rem_recovered", "warn",
Some(*unit_id), None, None,
format!("Unit {} returned to remote control; auto control requires manual restart", code),
serde_json::json!({ "unit_id": unit_id }),

View File

@ -1,10 +1,18 @@
pub mod control;
pub mod control;
pub mod doc;
pub mod equipment;
pub mod equipment {
pub use plc_platform_core::handler::equipment::*;
}
pub mod log {
pub use plc_platform_core::handler::log::*;
}
pub mod page;
pub mod page {
pub use plc_platform_core::handler::page::*;
}
pub mod point;
pub mod source;
pub mod tag;
pub mod source {
pub use plc_platform_core::handler::source::*;
}
pub mod tag {
pub use plc_platform_core::handler::tag::*;
}

View File

@ -1,662 +1,21 @@
// Re-export all platform point handlers from core.
pub use plc_platform_core::handler::point::*;
use axum::{
extract::{Path, Query, State},
extract::State,
http::HeaderMap,
response::IntoResponse,
Json,
};
use serde::{Deserialize, Serialize};
use serde_with::rust::double_option;
use sqlx::{QueryBuilder, Row};
use std::collections::{HashMap, HashSet};
use uuid::Uuid;
use validator::Validate;
use plc_platform_core::util::response::ApiErr;
use plc_platform_core::util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
};
use crate::{
AppState,
};
use plc_platform_core::model::{Node, Point};
async fn notify_units(
state: &AppState,
unit_ids: impl IntoIterator<Item = Uuid>,
) {
let mut seen = std::collections::HashSet::new();
for unit_id in unit_ids {
if seen.insert(unit_id) {
state.control_runtime.notify_unit(unit_id).await;
}
}
}
/// List all points.
#[derive(Deserialize, Validate)]
pub struct GetPointListQuery {
pub source_id: Option<Uuid>,
pub equipment_id: Option<Uuid>,
#[serde(flatten)]
pub pagination: PaginationParams,
}
#[derive(Serialize)]
pub struct PointWithMonitor {
#[serde(flatten)]
pub point: Point,
pub point_monitor: Option<crate::telemetry::PointMonitorInfo>,
}
#[derive(Deserialize, Validate)]
pub struct GetPointHistoryQuery {
pub limit: Option<usize>,
}
#[derive(Serialize)]
pub struct PointHistoryItem {
#[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")]
pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
pub quality: crate::telemetry::PointQuality,
pub value: Option<crate::telemetry::DataValue>,
pub value_text: Option<String>,
pub value_number: Option<f64>,
}
pub async fn get_point_list(
State(state): State<AppState>,
Query(query): Query<GetPointListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.platform.pool;
// Count total rows.
let total = crate::service::get_points_count(pool, query.source_id, query.equipment_id).await?;
// Load current page rows.
let points = crate::service::get_points_paginated(
pool,
query.source_id,
query.equipment_id,
query.pagination.page_size,
query.pagination.offset(),
)
.await?;
let monitor_guard = state
.platform.connection_manager
.get_point_monitor_data_read_guard()
.await;
let data: Vec<PointWithMonitor> = points
.into_iter()
.map(|point| {
let point_monitor = monitor_guard.get(&point.id).cloned();
PointWithMonitor {
point,
point_monitor,
}
})
.collect();
let response = PaginatedResponse::new(
data,
total,
query.pagination.page,
query.pagination.page_size,
);
Ok(Json(response))
}
/// Get a point by id.
pub async fn get_point(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.platform.pool;
let point = crate::service::get_point_by_id(pool, point_id).await?;
Ok(Json(point))
}
pub async fn get_point_history(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
Query(query): Query<GetPointHistoryQuery>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.platform.pool;
let point = crate::service::get_point_by_id(pool, point_id).await?;
if point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
let limit = query.limit.unwrap_or(120).clamp(1, 1000);
let history = state
.platform.connection_manager
.get_point_history(point_id, limit)
.await;
let items: Vec<PointHistoryItem> = history
.into_iter()
.map(|item| {
let value_number = monitor_value_to_number(&item);
PointHistoryItem {
timestamp: item.timestamp,
quality: item.quality,
value_number,
value: item.value,
value_text: item.value_text,
}
})
.collect();
Ok(Json(items))
}
/// Request payload for updating editable point fields.
#[derive(Deserialize, Validate)]
pub struct UpdatePointReq {
pub name: Option<String>,
#[serde(default, with = "double_option")]
pub description: Option<Option<String>>,
#[serde(default, with = "double_option")]
pub unit: Option<Option<String>>,
#[serde(default, with = "double_option")]
pub tag_id: Option<Option<Uuid>>,
#[serde(default, with = "double_option")]
pub equipment_id: Option<Option<Uuid>>,
#[serde(default, with = "double_option")]
pub signal_role: Option<Option<String>>,
}
/// Request payload for batch setting point tags.
#[derive(Deserialize, Validate)]
pub struct BatchSetPointTagsReq {
#[validate(length(min = 1, max = 500))]
pub point_ids: Vec<Uuid>,
pub tag_id: Option<Uuid>,
}
#[derive(Deserialize, Validate)]
pub struct BatchSetPointEquipmentReq {
#[validate(length(min = 1, max = 500))]
pub point_ids: Vec<Uuid>,
pub equipment_id: Option<Uuid>,
pub signal_role: Option<String>,
}
/// Update point metadata (name/description/unit only).
pub async fn update_point(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
Json(payload): Json<UpdatePointReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.platform.pool;
if payload.name.is_none()
&& payload.description.is_none()
&& payload.unit.is_none()
&& payload.tag_id.is_none()
&& payload.equipment_id.is_none()
&& payload.signal_role.is_none()
{
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
// If tag_id is provided, ensure tag exists.
if let Some(Some(tag_id)) = payload.tag_id {
let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#)
.bind(tag_id)
.fetch_optional(pool)
.await?
.is_some();
if !tag_exists {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));
}
}
if let Some(Some(equipment_id)) = payload.equipment_id {
let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#)
.bind(equipment_id)
.fetch_optional(pool)
.await?
.is_some();
if !equipment_exists {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
}
// Ensure target point exists.
let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#)
.bind(point_id)
.fetch_optional(pool)
.await?;
if existing_point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let mut qb: QueryBuilder<sqlx::Postgres> = QueryBuilder::new("UPDATE point SET ");
let mut wrote_field = false;
if let Some(name) = &payload.name {
if wrote_field {
qb.push(", ");
}
qb.push("name = ").push_bind(name);
wrote_field = true;
}
if let Some(description) = &payload.description {
if wrote_field {
qb.push(", ");
}
qb.push("description = ").push_bind(description.as_deref());
wrote_field = true;
}
if let Some(unit) = &payload.unit {
if wrote_field {
qb.push(", ");
}
qb.push("unit = ").push_bind(unit.as_deref());
wrote_field = true;
}
if let Some(tag_id) = &payload.tag_id {
if wrote_field {
qb.push(", ");
}
qb.push("tag_id = ").push_bind(tag_id.as_ref());
wrote_field = true;
}
if let Some(equipment_id) = &payload.equipment_id {
if wrote_field {
qb.push(", ");
}
qb.push("equipment_id = ").push_bind(equipment_id.as_ref());
wrote_field = true;
}
if let Some(signal_role) = &payload.signal_role {
if wrote_field {
qb.push(", ");
}
qb.push("signal_role = ").push_bind(signal_role.as_deref());
wrote_field = true;
}
if wrote_field {
qb.push(", ");
}
qb.push("updated_at = NOW()");
qb.push(" WHERE id = ").push_bind(point_id);
qb.build().execute(pool).await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await;
Ok(Json(
serde_json::json!({"ok_msg": "Point updated successfully"}),
))
}
/// Batch set point tags.
pub async fn batch_set_point_tags(
State(state): State<AppState>,
Json(payload): Json<BatchSetPointTagsReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
}
let pool = &state.platform.pool;
// If tag_id is provided, ensure tag exists.
if let Some(tag_id) = payload.tag_id {
let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#)
.bind(tag_id)
.fetch_optional(pool)
.await?
.is_some();
if !tag_exists {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));
}
}
// Check which points exist
let existing_points: Vec<Uuid> = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#)
.bind(&payload.point_ids)
.fetch_all(pool)
.await?
.into_iter()
.map(|row: sqlx::postgres::PgRow| row.get::<Uuid, _>("id"))
.collect();
if existing_points.is_empty() {
return Err(ApiErr::NotFound("No valid points found".to_string(), None));
}
// Update tag_id for all existing points
let result =
sqlx::query(r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#)
.bind(payload.tag_id)
.bind(&existing_points)
.execute(pool)
.await?;
Ok(Json(serde_json::json!({
"ok_msg": "Point tags updated successfully",
"updated_count": result.rows_affected()
})))
}
pub async fn batch_set_point_equipment(
State(state): State<AppState>,
Json(payload): Json<BatchSetPointEquipmentReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
}
let pool = &state.platform.pool;
if let Some(equipment_id) = payload.equipment_id {
let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#)
.bind(equipment_id)
.fetch_optional(pool)
.await?
.is_some();
if !equipment_exists {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
}
let existing_points: Vec<Uuid> = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#)
.bind(&payload.point_ids)
.fetch_all(pool)
.await?
.into_iter()
.map(|row: sqlx::postgres::PgRow| row.get::<Uuid, _>("id"))
.collect();
if existing_points.is_empty() {
return Err(ApiErr::NotFound("No valid points found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
let result = sqlx::query(
r#"
UPDATE point
SET equipment_id = $1,
signal_role = $2,
updated_at = NOW()
WHERE id = ANY($3)
"#,
)
.bind(payload.equipment_id)
.bind(payload.signal_role.as_deref())
.bind(&existing_points)
.execute(pool)
.await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await;
Ok(Json(serde_json::json!({
"ok_msg": "Point equipment updated successfully",
"updated_count": result.rows_affected()
})))
}
/// Delete one point by id.
pub async fn delete_point(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.platform.pool;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let source_id = {
let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?;
grouped.keys().next().copied()
};
// Ensure target point exists.
let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#)
.bind(point_id)
.fetch_optional(pool)
.await?;
if existing_point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
// Delete point.
sqlx::query(r#"delete from point WHERE id = $1"#)
.bind(point_id)
.execute(pool)
.await?;
if let Some(source_id) = source_id {
if let Err(e) = state
.event_manager
.send(crate::event::AppEvent::PointDeleteBatch {
source_id,
point_ids: vec![point_id],
})
{
tracing::error!("Failed to send PointDeleteBatch event: {}", e);
}
}
notify_units(&state, affected_unit_ids).await;
Ok(Json(
serde_json::json!({"ok_msg": "Point deleted successfully"}),
))
}
#[derive(Deserialize, Validate)]
/// Request payload for batch point creation from node ids.
pub struct BatchCreatePointsReq {
#[validate(length(min = 1, max = 500))]
pub node_ids: Vec<Uuid>,
}
#[derive(Serialize)]
/// Response payload for batch point creation.
pub struct BatchCreatePointsRes {
pub success_count: usize,
pub failed_count: usize,
pub failed_node_ids: Vec<Uuid>,
pub created_point_ids: Vec<Uuid>,
}
/// Batch create points by node ids.
pub async fn batch_create_points(
State(state): State<AppState>,
Json(payload): Json<BatchCreatePointsReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.platform.pool;
if payload.node_ids.is_empty() {
return Err(ApiErr::BadRequest(
"node_ids cannot be empty".to_string(),
None,
));
}
// Use one transaction for the full batch.
let mut tx = pool.begin().await?;
let node_ids = payload.node_ids;
let nodes: Vec<Node> = sqlx::query_as::<_, Node>(r#"SELECT * FROM node WHERE id = ANY($1)"#)
.bind(&node_ids)
.fetch_all(&mut *tx)
.await?;
let node_map: HashMap<Uuid, Node> = nodes.into_iter().map(|node| (node.id, node)).collect();
let existing_node_ids: HashSet<Uuid> = node_map.keys().copied().collect();
let mut failed_node_ids = Vec::new();
for node_id in &node_ids {
if !existing_node_ids.contains(node_id) {
failed_node_ids.push(*node_id);
}
}
let existing_point_node_ids: HashSet<Uuid> =
sqlx::query_scalar::<_, Uuid>(r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#)
.bind(&node_ids)
.fetch_all(&mut *tx)
.await?
.into_iter()
.collect();
let mut to_create = Vec::new();
let mut seen_creatable = HashSet::new();
for node_id in node_ids {
if !existing_node_ids.contains(&node_id) || existing_point_node_ids.contains(&node_id) {
continue;
}
if !seen_creatable.insert(node_id) {
continue;
}
let name = node_map
.get(&node_id)
.map(|node| node.browse_name.clone())
.unwrap_or_else(|| format!("Point_{}", node_id));
to_create.push((Uuid::new_v4(), node_id, name));
}
let mut created_point_ids = Vec::with_capacity(to_create.len());
if !to_create.is_empty() {
let mut qb = QueryBuilder::new("INSERT INTO point (id, node_id, name) ");
qb.push_values(to_create.iter(), |mut b, (id, node_id, name)| {
b.push_bind(*id).push_bind(*node_id).push_bind(name);
});
qb.build().execute(&mut *tx).await?;
created_point_ids.extend(to_create.into_iter().map(|(id, _, _)| id));
}
// Commit the transaction.
tx.commit().await?;
// Emit grouped create events by source.
if !created_point_ids.is_empty() {
let grouped =
crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?;
for (source_id, points) in grouped {
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state
.event_manager
.send(crate::event::AppEvent::PointCreateBatch {
source_id,
point_ids,
})
{
tracing::error!("Failed to send PointCreateBatch event: {}", e);
}
}
}
Ok(Json(BatchCreatePointsRes {
success_count: created_point_ids.len(),
failed_count: failed_node_ids.len(),
failed_node_ids,
created_point_ids,
}))
}
#[derive(Deserialize, Validate)]
/// Request payload for batch point deletion.
pub struct BatchDeletePointsReq {
#[validate(length(min = 1, max = 500))]
pub point_ids: Vec<Uuid>,
}
#[derive(Serialize)]
/// Response payload for batch point deletion.
pub struct BatchDeletePointsRes {
pub deleted_count: u64,
}
/// Batch delete points and emit grouped delete events by source.
pub async fn batch_delete_points(
State(state): State<AppState>,
Json(payload): Json<BatchDeletePointsReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
}
let pool = &state.platform.pool;
let point_ids = payload.point_ids;
let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?;
let existing_point_ids: Vec<Uuid> = grouped
.values()
.flat_map(|points| points.iter().map(|p| p.point_id))
.collect();
if existing_point_ids.is_empty() {
return Ok(Json(BatchDeletePointsRes { deleted_count: 0 }));
}
let result = sqlx::query(r#"DELETE FROM point WHERE id = ANY($1)"#)
.bind(&existing_point_ids)
.execute(pool)
.await?;
for (source_id, points) in grouped {
let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state
.event_manager
.send(crate::event::AppEvent::PointDeleteBatch {
source_id,
point_ids: ids,
})
{
tracing::error!("Failed to send PointDeleteBatch event: {}", e);
}
}
notify_units(&state, affected_unit_ids).await;
Ok(Json(BatchDeletePointsRes {
deleted_count: result.rows_affected(),
}))
}
use crate::AppState;
/// Feeder-specific: batch set point values (requires write key auth from app config).
pub async fn batch_set_point_value(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<crate::connection::BatchSetPointValueReq>,
Json(payload): Json<plc_platform_core::connection::BatchSetPointValueReq>,
) -> Result<impl IntoResponse, ApiErr> {
let write_key = headers
.get("X-Write-Key")
@ -673,21 +32,10 @@ pub async fn batch_set_point_value(
}
let result = state
.platform.connection_manager
.platform
.connection_manager
.write_point_values_batch(payload)
.await
.map_err(|e| ApiErr::Internal(e, None))?;
Ok(Json(result))
}
fn monitor_value_to_number(item: &crate::telemetry::PointMonitorInfo) -> Option<f64> {
match item.value.as_ref()? {
crate::telemetry::DataValue::Int(v) => Some(*v as f64),
crate::telemetry::DataValue::UInt(v) => Some(*v as f64),
crate::telemetry::DataValue::Float(v) => Some(*v),
crate::telemetry::DataValue::Bool(v) => Some(if *v { 1.0 } else { 0.0 }),
crate::telemetry::DataValue::Text(v) => v.parse::<f64>().ok(),
_ => None,
}
}

View File

@ -2,7 +2,7 @@ use axum::{
extract::Request,
middleware::Next,
response::Response,
routing::{get, post, put},
routing::{get, post},
Router,
};
use tower_http::cors::{Any, CorsLayer};
@ -20,74 +20,17 @@ async fn no_cache(req: Request, next: Next) -> Response {
}
pub fn build_router(state: AppState) -> Router {
let all_route = Router::new()
.route(
"/api/source",
get(handler::source::get_source_list).post(handler::source::create_source),
)
.route(
"/api/source/{source_id}",
axum::routing::delete(handler::source::delete_source)
.put(handler::source::update_source),
)
.route(
"/api/source/{source_id}/reconnect",
axum::routing::post(handler::source::reconnect_source),
)
.route(
"/api/source/{source_id}/browse",
axum::routing::post(handler::source::browse_and_save_nodes),
)
.route(
"/api/source/{source_id}/node-tree",
get(handler::source::get_node_tree),
)
.route("/api/point", get(handler::point::get_point_list))
// Platform routes (source, point, equipment, tag, page, logs) from core.
let platform = plc_platform_core::handler::platform_routes::<AppState>();
// Feeder-specific routes.
let feeder_routes = Router::new()
// Feeder-only: batch set point values (requires write key auth).
.route(
"/api/point/value/batch",
axum::routing::post(handler::point::batch_set_point_value),
)
.route(
"/api/point/batch",
axum::routing::post(handler::point::batch_create_points)
.delete(handler::point::batch_delete_points),
)
.route(
"/api/point/{point_id}/history",
get(handler::point::get_point_history),
)
.route(
"/api/point/{point_id}",
get(handler::point::get_point)
.put(handler::point::update_point)
.delete(handler::point::delete_point),
)
.route(
"/api/point/batch/set-tags",
put(handler::point::batch_set_point_tags),
)
.route(
"/api/point/batch/set-equipment",
put(handler::point::batch_set_point_equipment),
)
.route(
"/api/equipment",
get(handler::equipment::get_equipment_list).post(handler::equipment::create_equipment),
)
.route(
"/api/equipment/{equipment_id}",
get(handler::equipment::get_equipment)
.put(handler::equipment::update_equipment)
.delete(handler::equipment::delete_equipment),
)
.route(
"/api/equipment/batch/set-unit",
put(handler::equipment::batch_set_equipment_unit),
)
.route(
"/api/equipment/{equipment_id}/points",
get(handler::equipment::get_equipment_points),
post(handler::point::batch_set_point_value),
)
// Unit / control routes (feeder-specific).
.route(
"/api/unit",
get(handler::control::get_unit_list).post(handler::control::create_unit),
@ -135,33 +78,13 @@ pub fn build_router(state: AppState) -> Router {
"/api/unit/{unit_id}/detail",
get(handler::control::get_unit_detail),
)
.route(
"/api/tag",
get(handler::tag::get_tag_list).post(handler::tag::create_tag),
)
.route(
"/api/tag/{tag_id}",
get(handler::tag::get_tag_points)
.put(handler::tag::update_tag)
.delete(handler::tag::delete_tag),
)
.route(
"/api/page",
get(handler::page::get_page_list).post(handler::page::create_page),
)
.route(
"/api/page/{page_id}",
get(handler::page::get_page)
.put(handler::page::update_page)
.delete(handler::page::delete_page),
)
.route("/api/logs", get(handler::log::get_logs))
.route("/api/logs/stream", get(handler::log::stream_logs))
// Doc routes (feeder-specific doc paths).
.route("/api/docs/api-md", get(handler::doc::get_api_md))
.route("/api/docs/readme-md", get(handler::doc::get_readme_md));
Router::new()
.merge(all_route)
.merge(platform)
.merge(feeder_routes)
.nest(
"/ui",
Router::new()

View File

@ -1,3 +1,4 @@
use axum::extract::FromRef;
use crate::router::build_router;
use plc_platform_core::platform_context::PlatformContext;
@ -30,6 +31,12 @@ pub struct AppState {
pub platform: PlatformContext,
}
impl FromRef<AppState> for PlatformContext {
fn from_ref(state: &AppState) -> Self {
state.platform.clone()
}
}
pub async fn run() {
dotenv::dotenv().ok();
plc_platform_core::util::log::init_logger();

View File

@ -16,12 +16,18 @@ async fn no_cache(
}
pub fn build_router(state: AppState) -> Router {
Router::new()
// Platform routes (source, point, equipment, tag, page, logs) from core.
let platform = plc_platform_core::handler::platform_routes::<AppState>();
// Ops-specific routes.
let ops_routes = Router::new()
.route("/api/health", get(health_check))
.route("/api/logs", get(plc_platform_core::handler::log::get_logs))
.route("/api/logs/stream", get(plc_platform_core::handler::log::stream_logs))
.route("/api/docs/api-md", get(crate::handler::doc::get_api_md))
.route("/api/docs/readme-md", get(crate::handler::doc::get_readme_md))
.route("/api/docs/readme-md", get(crate::handler::doc::get_readme_md));
Router::new()
.merge(platform)
.merge(ops_routes)
.nest(
"/ui",
Router::new()

View File

@ -1,5 +1,9 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use crate::model::EventRecord;
use crate::websocket::{WebSocketManager, WsMessage};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EventEnvelope {
@ -15,3 +19,116 @@ impl EventEnvelope {
}
}
}
/// Platform-level events emitted by core handlers.
/// Each variant carries enough context for persistence + app-specific side effects.
#[derive(Debug, Clone)]
pub enum PlatformEvent {
SourceCreated { source_id: Uuid },
SourceUpdated { source_id: Uuid },
SourceDeleted { source_id: Uuid, source_name: String },
PointsCreated { source_id: Uuid, point_ids: Vec<Uuid> },
PointsDeleted { source_id: Uuid, point_ids: Vec<Uuid> },
UnitsChanged { unit_ids: Vec<Uuid> },
}
/// Persists platform events to the `event` table and broadcasts via WebSocket.
///
/// Apps get notified via `PlatformEventSink` for their own side effects
/// (connection management, control runtime, etc.).
pub async fn persist_and_broadcast(
event: &PlatformEvent,
pool: &sqlx::PgPool,
ws_manager: &WebSocketManager,
) {
let record = match event {
PlatformEvent::SourceCreated { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.source.created", "info",
None, None, Some(*source_id),
format!("Source {} created", name),
serde_json::json!({ "source_id": source_id }),
))
}
PlatformEvent::SourceUpdated { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.source.updated", "info",
None, None, Some(*source_id),
format!("Source {} updated", name),
serde_json::json!({ "source_id": source_id }),
))
}
PlatformEvent::SourceDeleted { source_id, source_name } => Some((
"platform.source.deleted", "warn",
None, None, None,
format!("Source {} deleted", source_name),
serde_json::json!({ "source_id": source_id }),
)),
PlatformEvent::PointsCreated { source_id, point_ids } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.point.batch_created", "info",
None, None, Some(*source_id),
format!("Created {} points for source {}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
PlatformEvent::PointsDeleted { source_id, point_ids } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"platform.point.batch_deleted", "warn",
None, None, Some(*source_id),
format!("Deleted {} points for source {}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
PlatformEvent::UnitsChanged { .. } => None,
};
let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record
else {
return;
};
let envelope = EventEnvelope::new(event_type, payload);
let inserted = sqlx::query_as::<_, EventRecord>(
r#"
INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
)
.bind(envelope.event_type)
.bind(level)
.bind(unit_id as Option<Uuid>)
.bind(equipment_id as Option<Uuid>)
.bind(source_id)
.bind(message)
.bind(sqlx::types::Json(envelope.payload))
.fetch_one(pool)
.await;
match inserted {
Ok(record) => {
let ws_message = WsMessage::EventCreated(record);
if let Err(err) = ws_manager.send_to_public(ws_message).await {
tracing::warn!("Failed to broadcast platform event: {}", err);
}
}
Err(err) => {
tracing::warn!("Failed to persist platform event: {}", err);
}
}
}
async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}

View File

@ -1,2 +1,10 @@
pub mod doc;
pub mod equipment;
pub mod log;
pub mod page;
pub mod point;
pub mod router;
pub mod source;
pub mod tag;
pub use router::platform_routes;

View File

@ -8,21 +8,19 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;
use validator::Validate;
use plc_platform_core::util::{
use crate::util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
};
use crate::AppState;
use crate::platform_context::PlatformContext;
async fn notify_units(
state: &AppState,
unit_ids: impl IntoIterator<Item = Uuid>,
) {
let mut seen = std::collections::HashSet::new();
for unit_id in unit_ids {
if seen.insert(unit_id) {
state.control_runtime.notify_unit(unit_id).await;
}
fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator<Item = Uuid>) {
let ids: Vec<Uuid> = {
let mut seen = std::collections::HashSet::new();
unit_ids.into_iter().filter(|id| seen.insert(*id)).collect()
};
if !ids.is_empty() {
state.emit_event(crate::event::PlatformEvent::UnitsChanged { unit_ids: ids });
}
}
@ -44,20 +42,20 @@ pub struct SignalRolePoint {
#[derive(Serialize)]
pub struct EquipmentListItem {
#[serde(flatten)]
pub equipment: plc_platform_core::model::Equipment,
pub equipment: crate::model::Equipment,
pub point_count: i64,
pub role_points: Vec<SignalRolePoint>,
}
pub async fn get_equipment_list(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Query(query): Query<GetEquipmentListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let total = crate::service::get_equipment_count(&state.platform.pool, query.keyword.as_deref()).await?;
let total = crate::service::get_equipment_count(&state.pool, query.keyword.as_deref()).await?;
let items = crate::service::get_equipment_paginated(
&state.platform.pool,
&state.pool,
query.keyword.as_deref(),
query.pagination.page_size,
query.pagination.offset(),
@ -66,10 +64,10 @@ pub async fn get_equipment_list(
let equipment_ids: Vec<uuid::Uuid> = items.iter().map(|item| item.equipment.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.platform.pool, &equipment_ids).await?;
crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?;
let monitor_guard = state
.platform.connection_manager
.connection_manager
.get_point_monitor_data_read_guard()
.await;
@ -107,10 +105,10 @@ pub async fn get_equipment_list(
}
pub async fn get_equipment(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let equipment = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?;
let equipment = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?;
match equipment {
Some(item) => Ok(Json(item)),
@ -119,15 +117,15 @@ pub async fn get_equipment(
}
pub async fn get_equipment_points(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let exists = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?;
let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?;
if exists.is_none() {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
let points = crate::service::get_points_by_equipment_id(&state.platform.pool, equipment_id).await?;
let points = crate::service::get_points_by_equipment_id(&state.pool, equipment_id).await?;
Ok(Json(points))
}
@ -160,12 +158,12 @@ pub struct BatchSetEquipmentUnitReq {
}
pub async fn create_equipment(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Json(payload): Json<CreateEquipmentReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let exists = crate::service::get_equipment_by_code(&state.platform.pool, &payload.code).await?;
let exists = crate::service::get_equipment_by_code(&state.pool, &payload.code).await?;
if exists.is_some() {
return Err(ApiErr::BadRequest(
"Equipment code already exists".to_string(),
@ -174,14 +172,14 @@ pub async fn create_equipment(
}
if let Some(unit_id) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?;
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
if unit_exists.is_none() {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
}
let equipment_id = crate::service::create_equipment(
&state.platform.pool,
&state.pool,
payload.unit_id,
&payload.code,
&payload.name,
@ -191,7 +189,7 @@ pub async fn create_equipment(
.await?;
if let Some(unit_id) = payload.unit_id {
notify_units(&state, [unit_id]).await;
notify_units(&state, [unit_id]);
}
Ok((
@ -204,7 +202,7 @@ pub async fn create_equipment(
}
pub async fn update_equipment(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(equipment_id): Path<Uuid>,
Json(payload): Json<UpdateEquipmentReq>,
) -> Result<impl IntoResponse, ApiErr> {
@ -219,7 +217,7 @@ pub async fn update_equipment(
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
let exists = crate::service::get_equipment_by_id(&state.platform.pool, equipment_id).await?;
let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?;
let existing_equipment = if let Some(equipment) = exists {
equipment
} else {
@ -227,14 +225,14 @@ pub async fn update_equipment(
};
if let Some(Some(unit_id)) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?;
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
if unit_exists.is_none() {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
}
if let Some(code) = payload.code.as_deref() {
let duplicate = crate::service::get_equipment_by_code(&state.platform.pool, code).await?;
let duplicate = crate::service::get_equipment_by_code(&state.pool, code).await?;
if duplicate
.as_ref()
.is_some_and(|item| item.id != equipment_id)
@ -247,7 +245,7 @@ pub async fn update_equipment(
}
crate::service::update_equipment(
&state.platform.pool,
&state.pool,
equipment_id,
payload.unit_id,
payload.code.as_deref(),
@ -268,7 +266,7 @@ pub async fn update_equipment(
if let Some(unit_id) = next_unit_id {
unit_ids.push(unit_id);
}
notify_units(&state, unit_ids).await;
notify_units(&state, unit_ids);
Ok(Json(serde_json::json!({
"ok_msg": "Equipment updated successfully"
@ -276,7 +274,7 @@ pub async fn update_equipment(
}
pub async fn batch_set_equipment_unit(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Json(payload): Json<BatchSetEquipmentUnitReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
@ -289,17 +287,17 @@ pub async fn batch_set_equipment_unit(
}
if let Some(unit_id) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.platform.pool, unit_id).await?;
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
if unit_exists.is_none() {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
}
let before_unit_ids =
crate::service::get_unit_ids_by_equipment_ids(&state.platform.pool, &payload.equipment_ids).await?;
crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?;
let updated_count = crate::service::batch_set_equipment_unit(
&state.platform.pool,
&state.pool,
&payload.equipment_ids,
payload.unit_id,
)
@ -309,7 +307,7 @@ pub async fn batch_set_equipment_unit(
if let Some(unit_id) = payload.unit_id {
unit_ids.push(unit_id);
}
notify_units(&state, unit_ids).await;
notify_units(&state, unit_ids);
Ok(Json(serde_json::json!({
"ok_msg": "Equipment unit updated successfully",
@ -318,16 +316,16 @@ pub async fn batch_set_equipment_unit(
}
pub async fn delete_equipment(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.platform.pool, &[equipment_id]).await?;
let deleted = crate::service::delete_equipment(&state.platform.pool, equipment_id).await?;
let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?;
let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
notify_units(&state, unit_ids).await;
notify_units(&state, unit_ids);
Ok(StatusCode::NO_CONTENT)
}

View File

@ -1,13 +1,13 @@
use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse};
use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse};
use serde::Deserialize;
use std::collections::HashMap;
use sqlx::types::Json as SqlxJson;
use uuid::Uuid;
use validator::Validate;
use plc_platform_core::model::Page;
use plc_platform_core::util::response::ApiErr;
use crate::AppState;
use crate::model::Page;
use crate::platform_context::PlatformContext;
use crate::util::response::ApiErr;
#[derive(Deserialize, Validate)]
pub struct GetPageListQuery {
@ -16,11 +16,11 @@ pub struct GetPageListQuery {
}
pub async fn get_page_list(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Query(query): Query<GetPageListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.platform.pool;
let pool = &state.pool;
let pages: Vec<Page> = if let Some(name) = query.name {
sqlx::query_as::<_, Page>(
@ -45,12 +45,12 @@ pub async fn get_page_list(
}
pub async fn get_page(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(page_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let page = sqlx::query_as::<_, Page>("SELECT * FROM page WHERE id = $1")
.bind(page_id)
.fetch_optional(&state.platform.pool)
.fetch_optional(&state.pool)
.await?;
match page {
@ -74,7 +74,7 @@ pub struct UpdatePageReq {
}
pub async fn create_page(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Json(payload): Json<CreatePageReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
@ -88,7 +88,7 @@ pub async fn create_page(
)
.bind(&payload.name)
.bind(SqlxJson(payload.data))
.fetch_one(&state.platform.pool)
.fetch_one(&state.pool)
.await?;
Ok((StatusCode::CREATED, Json(serde_json::json!({
@ -98,7 +98,7 @@ pub async fn create_page(
}
pub async fn update_page(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(page_id): Path<Uuid>,
Json(payload): Json<UpdatePageReq>,
) -> Result<impl IntoResponse, ApiErr> {
@ -106,7 +106,7 @@ pub async fn update_page(
let exists = sqlx::query("SELECT 1 FROM page WHERE id = $1")
.bind(page_id)
.fetch_optional(&state.platform.pool)
.fetch_optional(&state.pool)
.await?;
if exists.is_none() {
return Err(ApiErr::NotFound("Page not found".to_string(), None));
@ -145,7 +145,7 @@ pub async fn update_page(
}
query = query.bind(page_id);
query.execute(&state.platform.pool).await?;
query.execute(&state.pool).await?;
Ok(Json(serde_json::json!({
"ok_msg": "Page updated successfully"
@ -153,12 +153,12 @@ pub async fn update_page(
}
pub async fn delete_page(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(page_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let result = sqlx::query("DELETE FROM page WHERE id = $1")
.bind(page_id)
.execute(&state.platform.pool)
.execute(&state.pool)
.await?;
if result.rows_affected() == 0 {

View File

@ -0,0 +1,639 @@
use axum::{
extract::{Path, Query, State},
response::IntoResponse,
Json,
};
use serde::{Deserialize, Serialize};
use serde_with::rust::double_option;
use sqlx::{QueryBuilder, Row};
use std::collections::{HashMap, HashSet};
use uuid::Uuid;
use validator::Validate;
use crate::util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
};
use crate::platform_context::PlatformContext;
use crate::model::{Node, Point};
fn notify_units(state: &PlatformContext, unit_ids: impl IntoIterator<Item = Uuid>) {
let ids: Vec<Uuid> = {
let mut seen = std::collections::HashSet::new();
unit_ids.into_iter().filter(|id| seen.insert(*id)).collect()
};
if !ids.is_empty() {
state.emit_event(crate::event::PlatformEvent::UnitsChanged { unit_ids: ids });
}
}
/// List all points.
#[derive(Deserialize, Validate)]
pub struct GetPointListQuery {
pub source_id: Option<Uuid>,
pub equipment_id: Option<Uuid>,
#[serde(flatten)]
pub pagination: PaginationParams,
}
#[derive(Serialize)]
pub struct PointWithMonitor {
#[serde(flatten)]
pub point: Point,
pub point_monitor: Option<crate::telemetry::PointMonitorInfo>,
}
#[derive(Deserialize, Validate)]
pub struct GetPointHistoryQuery {
pub limit: Option<usize>,
}
#[derive(Serialize)]
pub struct PointHistoryItem {
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
pub quality: crate::telemetry::PointQuality,
pub value: Option<crate::telemetry::DataValue>,
pub value_text: Option<String>,
pub value_number: Option<f64>,
}
pub async fn get_point_list(
State(state): State<PlatformContext>,
Query(query): Query<GetPointListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.pool;
// Count total rows.
let total = crate::service::get_points_count(pool, query.source_id, query.equipment_id).await?;
// Load current page rows.
let points = crate::service::get_points_paginated(
pool,
query.source_id,
query.equipment_id,
query.pagination.page_size,
query.pagination.offset(),
)
.await?;
let monitor_guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let data: Vec<PointWithMonitor> = points
.into_iter()
.map(|point| {
let point_monitor = monitor_guard.get(&point.id).cloned();
PointWithMonitor {
point,
point_monitor,
}
})
.collect();
let response = PaginatedResponse::new(
data,
total,
query.pagination.page,
query.pagination.page_size,
);
Ok(Json(response))
}
/// Get a point by id.
pub async fn get_point(
State(state): State<PlatformContext>,
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let point = crate::service::get_point_by_id(pool, point_id).await?;
Ok(Json(point))
}
pub async fn get_point_history(
State(state): State<PlatformContext>,
Path(point_id): Path<Uuid>,
Query(query): Query<GetPointHistoryQuery>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let point = crate::service::get_point_by_id(pool, point_id).await?;
if point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
let limit = query.limit.unwrap_or(120).clamp(1, 1000);
let history = state
.connection_manager
.get_point_history(point_id, limit)
.await;
let items: Vec<PointHistoryItem> = history
.into_iter()
.map(|item| {
let value_number = monitor_value_to_number(&item);
PointHistoryItem {
timestamp: item.timestamp,
quality: item.quality,
value_number,
value: item.value,
value_text: item.value_text,
}
})
.collect();
Ok(Json(items))
}
/// Request payload for updating editable point fields.
#[derive(Deserialize, Validate)]
pub struct UpdatePointReq {
pub name: Option<String>,
#[serde(default, with = "double_option")]
pub description: Option<Option<String>>,
#[serde(default, with = "double_option")]
pub unit: Option<Option<String>>,
#[serde(default, with = "double_option")]
pub tag_id: Option<Option<Uuid>>,
#[serde(default, with = "double_option")]
pub equipment_id: Option<Option<Uuid>>,
#[serde(default, with = "double_option")]
pub signal_role: Option<Option<String>>,
}
/// Request payload for batch setting point tags.
#[derive(Deserialize, Validate)]
pub struct BatchSetPointTagsReq {
#[validate(length(min = 1, max = 500))]
pub point_ids: Vec<Uuid>,
pub tag_id: Option<Uuid>,
}
#[derive(Deserialize, Validate)]
pub struct BatchSetPointEquipmentReq {
#[validate(length(min = 1, max = 500))]
pub point_ids: Vec<Uuid>,
pub equipment_id: Option<Uuid>,
pub signal_role: Option<String>,
}
/// Update point metadata (name/description/unit only).
pub async fn update_point(
State(state): State<PlatformContext>,
Path(point_id): Path<Uuid>,
Json(payload): Json<UpdatePointReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.pool;
if payload.name.is_none()
&& payload.description.is_none()
&& payload.unit.is_none()
&& payload.tag_id.is_none()
&& payload.equipment_id.is_none()
&& payload.signal_role.is_none()
{
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
// If tag_id is provided, ensure tag exists.
if let Some(Some(tag_id)) = payload.tag_id {
let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#)
.bind(tag_id)
.fetch_optional(pool)
.await?
.is_some();
if !tag_exists {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));
}
}
if let Some(Some(equipment_id)) = payload.equipment_id {
let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#)
.bind(equipment_id)
.fetch_optional(pool)
.await?
.is_some();
if !equipment_exists {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
}
// Ensure target point exists.
let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#)
.bind(point_id)
.fetch_optional(pool)
.await?;
if existing_point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let mut qb: QueryBuilder<sqlx::Postgres> = QueryBuilder::new("UPDATE point SET ");
let mut wrote_field = false;
if let Some(name) = &payload.name {
if wrote_field {
qb.push(", ");
}
qb.push("name = ").push_bind(name);
wrote_field = true;
}
if let Some(description) = &payload.description {
if wrote_field {
qb.push(", ");
}
qb.push("description = ").push_bind(description.as_deref());
wrote_field = true;
}
if let Some(unit) = &payload.unit {
if wrote_field {
qb.push(", ");
}
qb.push("unit = ").push_bind(unit.as_deref());
wrote_field = true;
}
if let Some(tag_id) = &payload.tag_id {
if wrote_field {
qb.push(", ");
}
qb.push("tag_id = ").push_bind(tag_id.as_ref());
wrote_field = true;
}
if let Some(equipment_id) = &payload.equipment_id {
if wrote_field {
qb.push(", ");
}
qb.push("equipment_id = ").push_bind(equipment_id.as_ref());
wrote_field = true;
}
if let Some(signal_role) = &payload.signal_role {
if wrote_field {
qb.push(", ");
}
qb.push("signal_role = ").push_bind(signal_role.as_deref());
wrote_field = true;
}
if wrote_field {
qb.push(", ");
}
qb.push("updated_at = NOW()");
qb.push(" WHERE id = ").push_bind(point_id);
qb.build().execute(pool).await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids));
Ok(Json(
serde_json::json!({"ok_msg": "Point updated successfully"}),
))
}
/// Batch set point tags.
pub async fn batch_set_point_tags(
State(state): State<PlatformContext>,
Json(payload): Json<BatchSetPointTagsReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
}
let pool = &state.pool;
// If tag_id is provided, ensure tag exists.
if let Some(tag_id) = payload.tag_id {
let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#)
.bind(tag_id)
.fetch_optional(pool)
.await?
.is_some();
if !tag_exists {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));
}
}
// Check which points exist
let existing_points: Vec<Uuid> = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#)
.bind(&payload.point_ids)
.fetch_all(pool)
.await?
.into_iter()
.map(|row: sqlx::postgres::PgRow| row.get::<Uuid, _>("id"))
.collect();
if existing_points.is_empty() {
return Err(ApiErr::NotFound("No valid points found".to_string(), None));
}
// Update tag_id for all existing points
let result =
sqlx::query(r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#)
.bind(payload.tag_id)
.bind(&existing_points)
.execute(pool)
.await?;
Ok(Json(serde_json::json!({
"ok_msg": "Point tags updated successfully",
"updated_count": result.rows_affected()
})))
}
pub async fn batch_set_point_equipment(
State(state): State<PlatformContext>,
Json(payload): Json<BatchSetPointEquipmentReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
}
let pool = &state.pool;
if let Some(equipment_id) = payload.equipment_id {
let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#)
.bind(equipment_id)
.fetch_optional(pool)
.await?
.is_some();
if !equipment_exists {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
}
let existing_points: Vec<Uuid> = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#)
.bind(&payload.point_ids)
.fetch_all(pool)
.await?
.into_iter()
.map(|row: sqlx::postgres::PgRow| row.get::<Uuid, _>("id"))
.collect();
if existing_points.is_empty() {
return Err(ApiErr::NotFound("No valid points found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
let result = sqlx::query(
r#"
UPDATE point
SET equipment_id = $1,
signal_role = $2,
updated_at = NOW()
WHERE id = ANY($3)
"#,
)
.bind(payload.equipment_id)
.bind(payload.signal_role.as_deref())
.bind(&existing_points)
.execute(pool)
.await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids));
Ok(Json(serde_json::json!({
"ok_msg": "Point equipment updated successfully",
"updated_count": result.rows_affected()
})))
}
/// Delete one point by id.
pub async fn delete_point(
State(state): State<PlatformContext>,
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let source_id = {
let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?;
grouped.keys().next().copied()
};
// Ensure target point exists.
let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#)
.bind(point_id)
.fetch_optional(pool)
.await?;
if existing_point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
// Delete point.
sqlx::query(r#"delete from point WHERE id = $1"#)
.bind(point_id)
.execute(pool)
.await?;
if let Some(source_id) = source_id {
state.emit_event(crate::event::PlatformEvent::PointsDeleted {
source_id,
point_ids: vec![point_id],
});
}
notify_units(&state, affected_unit_ids);
Ok(Json(
serde_json::json!({"ok_msg": "Point deleted successfully"}),
))
}
#[derive(Deserialize, Validate)]
/// Request payload for batch point creation from node ids.
pub struct BatchCreatePointsReq {
#[validate(length(min = 1, max = 500))]
pub node_ids: Vec<Uuid>,
}
#[derive(Serialize)]
/// Response payload for batch point creation.
pub struct BatchCreatePointsRes {
pub success_count: usize,
pub failed_count: usize,
pub failed_node_ids: Vec<Uuid>,
pub created_point_ids: Vec<Uuid>,
}
/// Batch create points by node ids.
pub async fn batch_create_points(
State(state): State<PlatformContext>,
Json(payload): Json<BatchCreatePointsReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.pool;
if payload.node_ids.is_empty() {
return Err(ApiErr::BadRequest(
"node_ids cannot be empty".to_string(),
None,
));
}
// Use one transaction for the full batch.
let mut tx = pool.begin().await?;
let node_ids = payload.node_ids;
let nodes: Vec<Node> = sqlx::query_as::<_, Node>(r#"SELECT * FROM node WHERE id = ANY($1)"#)
.bind(&node_ids)
.fetch_all(&mut *tx)
.await?;
let node_map: HashMap<Uuid, Node> = nodes.into_iter().map(|node| (node.id, node)).collect();
let existing_node_ids: HashSet<Uuid> = node_map.keys().copied().collect();
let mut failed_node_ids = Vec::new();
for node_id in &node_ids {
if !existing_node_ids.contains(node_id) {
failed_node_ids.push(*node_id);
}
}
let existing_point_node_ids: HashSet<Uuid> =
sqlx::query_scalar::<_, Uuid>(r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#)
.bind(&node_ids)
.fetch_all(&mut *tx)
.await?
.into_iter()
.collect();
let mut to_create = Vec::new();
let mut seen_creatable = HashSet::new();
for node_id in node_ids {
if !existing_node_ids.contains(&node_id) || existing_point_node_ids.contains(&node_id) {
continue;
}
if !seen_creatable.insert(node_id) {
continue;
}
let name = node_map
.get(&node_id)
.map(|node| node.browse_name.clone())
.unwrap_or_else(|| format!("Point_{}", node_id));
to_create.push((Uuid::new_v4(), node_id, name));
}
let mut created_point_ids = Vec::with_capacity(to_create.len());
if !to_create.is_empty() {
let mut qb = QueryBuilder::new("INSERT INTO point (id, node_id, name) ");
qb.push_values(to_create.iter(), |mut b, (id, node_id, name)| {
b.push_bind(*id).push_bind(*node_id).push_bind(name);
});
qb.build().execute(&mut *tx).await?;
created_point_ids.extend(to_create.into_iter().map(|(id, _, _)| id));
}
// Commit the transaction.
tx.commit().await?;
// Emit grouped create events by source.
if !created_point_ids.is_empty() {
let grouped =
crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?;
for (source_id, points) in grouped {
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
state.emit_event(crate::event::PlatformEvent::PointsCreated { source_id, point_ids });
}
}
Ok(Json(BatchCreatePointsRes {
success_count: created_point_ids.len(),
failed_count: failed_node_ids.len(),
failed_node_ids,
created_point_ids,
}))
}
#[derive(Deserialize, Validate)]
/// Request payload for batch point deletion.
pub struct BatchDeletePointsReq {
#[validate(length(min = 1, max = 500))]
pub point_ids: Vec<Uuid>,
}
#[derive(Serialize)]
/// Response payload for batch point deletion.
pub struct BatchDeletePointsRes {
pub deleted_count: u64,
}
/// Batch delete points and emit grouped delete events by source.
pub async fn batch_delete_points(
State(state): State<PlatformContext>,
Json(payload): Json<BatchDeletePointsReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
}
let pool = &state.pool;
let point_ids = payload.point_ids;
let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?;
let existing_point_ids: Vec<Uuid> = grouped
.values()
.flat_map(|points| points.iter().map(|p| p.point_id))
.collect();
if existing_point_ids.is_empty() {
return Ok(Json(BatchDeletePointsRes { deleted_count: 0 }));
}
let result = sqlx::query(r#"DELETE FROM point WHERE id = ANY($1)"#)
.bind(&existing_point_ids)
.execute(pool)
.await?;
for (source_id, points) in grouped {
let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
state.emit_event(crate::event::PlatformEvent::PointsDeleted { source_id, point_ids: ids });
}
notify_units(&state, affected_unit_ids);
Ok(Json(BatchDeletePointsRes {
deleted_count: result.rows_affected(),
}))
}
fn monitor_value_to_number(item: &crate::telemetry::PointMonitorInfo) -> Option<f64> {
match item.value.as_ref()? {
crate::telemetry::DataValue::Int(v) => Some(*v as f64),
crate::telemetry::DataValue::UInt(v) => Some(*v as f64),
crate::telemetry::DataValue::Float(v) => Some(*v),
crate::telemetry::DataValue::Bool(v) => Some(if *v { 1.0 } else { 0.0 }),
crate::telemetry::DataValue::Text(v) => v.parse::<f64>().ok(),
_ => None,
}
}

View File

@ -0,0 +1,112 @@
use axum::{
extract::FromRef,
routing::{get, post, put},
Router,
};
use crate::platform_context::PlatformContext;
/// Returns all platform CRUD routes.
///
/// The generic `S` is the app's top-level state type. It must implement
/// `FromRef<S> for PlatformContext` so that axum can extract `State<PlatformContext>`
/// from handlers registered with the app's state.
pub fn platform_routes<S>() -> Router<S>
where
S: Clone + Send + Sync + 'static,
PlatformContext: FromRef<S>,
{
Router::new()
// Source
.route(
"/api/source",
get(super::source::get_source_list).post(super::source::create_source),
)
.route(
"/api/source/{source_id}",
axum::routing::delete(super::source::delete_source)
.put(super::source::update_source),
)
.route(
"/api/source/{source_id}/reconnect",
post(super::source::reconnect_source),
)
.route(
"/api/source/{source_id}/browse",
post(super::source::browse_and_save_nodes),
)
.route(
"/api/source/{source_id}/node-tree",
get(super::source::get_node_tree),
)
// Point
.route("/api/point", get(super::point::get_point_list))
.route(
"/api/point/batch",
post(super::point::batch_create_points)
.delete(super::point::batch_delete_points),
)
.route(
"/api/point/{point_id}/history",
get(super::point::get_point_history),
)
.route(
"/api/point/{point_id}",
get(super::point::get_point)
.put(super::point::update_point)
.delete(super::point::delete_point),
)
.route(
"/api/point/batch/set-tags",
put(super::point::batch_set_point_tags),
)
.route(
"/api/point/batch/set-equipment",
put(super::point::batch_set_point_equipment),
)
// Equipment
.route(
"/api/equipment",
get(super::equipment::get_equipment_list)
.post(super::equipment::create_equipment),
)
.route(
"/api/equipment/{equipment_id}",
get(super::equipment::get_equipment)
.put(super::equipment::update_equipment)
.delete(super::equipment::delete_equipment),
)
.route(
"/api/equipment/batch/set-unit",
put(super::equipment::batch_set_equipment_unit),
)
.route(
"/api/equipment/{equipment_id}/points",
get(super::equipment::get_equipment_points),
)
// Tag
.route(
"/api/tag",
get(super::tag::get_tag_list).post(super::tag::create_tag),
)
.route(
"/api/tag/{tag_id}",
get(super::tag::get_tag_points)
.put(super::tag::update_tag)
.delete(super::tag::delete_tag),
)
// Page
.route(
"/api/page",
get(super::page::get_page_list).post(super::page::create_page),
)
.route(
"/api/page/{page_id}",
get(super::page::get_page)
.put(super::page::update_page)
.delete(super::page::delete_page),
)
// Logs
.route("/api/logs", get(super::log::get_logs))
.route("/api/logs/stream", get(super::log::stream_logs))
}

View File

@ -11,11 +11,11 @@ use opcua::types::ReferenceTypeId;
use opcua::client::Session;
use std::collections::{HashMap, VecDeque};
use plc_platform_core::util::response::ApiErr;
use crate::util::response::ApiErr;
use anyhow::{Context};
use plc_platform_core::model::{Node, Source};
use crate::AppState;
use crate::model::{Node, Source};
use crate::platform_context::PlatformContext;
use sqlx::QueryBuilder;
// 鏍戣妭鐐圭粨鏋勪綋
@ -62,7 +62,7 @@ pub struct SourceWithStatus {
pub source: SourcePublic,
pub is_connected: bool,
pub last_error: Option<String>,
#[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")]
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
pub last_time: Option<DateTime<Utc>>,
}
@ -75,9 +75,9 @@ pub struct SourcePublic {
pub security_policy: Option<String>,
pub security_mode: Option<String>,
pub enabled: bool,
#[serde(serialize_with = "plc_platform_core::util::datetime::utc_to_local_str")]
#[serde(serialize_with = "crate::util::datetime::utc_to_local_str")]
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "plc_platform_core::util::datetime::utc_to_local_str")]
#[serde(serialize_with = "crate::util::datetime::utc_to_local_str")]
pub updated_at: DateTime<Utc>,
}
@ -97,13 +97,13 @@ impl From<Source> for SourcePublic {
}
}
pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.platform.pool;
pub async fn get_source_list(State(state): State<PlatformContext>) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let sources: Vec<Source> = crate::service::get_all_enabled_sources(pool).await?;
// 鑾峰彇鎵€鏈夎繛鎺ョ姸鎬?
let status_map: std::collections::HashMap<Uuid, (bool, Option<String>, Option<DateTime<Utc>>)> =
state.platform.connection_manager.get_all_status().await
state.connection_manager.get_all_status().await
.into_iter()
.map(|(source_id, s)| (source_id, (s.is_connected, s.last_error, Some(s.last_time))))
.collect();
@ -129,10 +129,10 @@ pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoR
}
pub async fn get_node_tree(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(source_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.platform.pool;
let pool = &state.pool;
// 鏌ヨ鎵€鏈夊睘浜庤source鐨勮妭鐐?
let nodes: Vec<Node> = sqlx::query_as::<_, Node>(
@ -207,12 +207,12 @@ pub struct CreateSourceRes {
}
pub async fn create_source(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Json(payload): Json<CreateSourceReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.platform.pool;
let pool = &state.pool;
let new_id = Uuid::new_v4();
sqlx::query(
@ -226,8 +226,7 @@ pub async fn create_source(
.execute(pool)
.await?;
// 瑙﹀彂 SourceCreate 浜嬩欢
let _ = state.event_manager.send(crate::event::AppEvent::SourceCreate { source_id: new_id });
state.emit_event(crate::event::PlatformEvent::SourceCreated { source_id: new_id });
Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id })))
}
@ -244,7 +243,7 @@ pub struct UpdateSourceReq {
}
pub async fn update_source(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(source_id): Path<Uuid>,
Json(payload): Json<UpdateSourceReq>,
) -> Result<impl IntoResponse, ApiErr> {
@ -261,7 +260,7 @@ pub async fn update_source(
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
let pool = &state.platform.pool;
let pool = &state.pool;
let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1")
.bind(source_id)
@ -302,16 +301,16 @@ pub async fn update_source(
qb.push(" WHERE id = ").push_bind(source_id);
qb.build().execute(pool).await?;
let _ = state.event_manager.send(crate::event::AppEvent::SourceUpdate { source_id });
state.emit_event(crate::event::PlatformEvent::SourceUpdated { source_id });
Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"})))
}
pub async fn delete_source(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(source_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.platform.pool;
let pool = &state.pool;
let source_name = sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
.bind(source_id)
@ -324,17 +323,16 @@ pub async fn delete_source(
.execute(pool)
.await?;
// 瑙﹀彂 SourceDelete 浜嬩欢
let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id, source_name });
state.emit_event(crate::event::PlatformEvent::SourceDeleted { source_id, source_name });
Ok(StatusCode::NO_CONTENT)
}
pub async fn reconnect_source(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(source_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.platform.pool;
let pool = &state.pool;
let exists = sqlx::query("SELECT 1 FROM source WHERE id = $1")
.bind(source_id)
@ -349,7 +347,7 @@ pub async fn reconnect_source(
}
state
.platform.connection_manager
.connection_manager
.reconnect(pool, source_id)
.await
.map_err(|e| ApiErr::Internal(e, None))?;
@ -358,11 +356,11 @@ pub async fn reconnect_source(
}
pub async fn browse_and_save_nodes(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(source_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.platform.pool;
let pool = &state.pool;
// 纭 source 瀛樺湪
sqlx::query("SELECT 1 FROM source WHERE id = $1")
@ -370,7 +368,7 @@ pub async fn browse_and_save_nodes(
.fetch_one(pool)
.await?;
let session = state.platform.connection_manager
let session = state.connection_manager
.get_session(source_id)
.await
.ok_or_else(|| anyhow::anyhow!("Source not connected"))?;

View File

@ -3,13 +3,12 @@ use serde::Deserialize;
use uuid::Uuid;
use validator::Validate;
use plc_platform_core::util::{
use crate::platform_context::PlatformContext;
use crate::util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
};
use crate::{AppState};
/// List all tags.
#[derive(Deserialize, Validate)]
pub struct GetTagListQuery {
#[serde(flatten)]
@ -17,16 +16,13 @@ pub struct GetTagListQuery {
}
pub async fn get_tag_list(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Query(query): Query<GetTagListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.platform.pool;
let pool = &state.pool;
// Count total rows.
let total = crate::service::get_tags_count(pool).await?;
// Load current page rows.
let tags = crate::service::get_tags_paginated(
pool,
query.pagination.page_size,
@ -34,16 +30,14 @@ pub async fn get_tag_list(
).await?;
let response = PaginatedResponse::new(tags, total, query.pagination.page, query.pagination.page_size);
Ok(Json(response))
}
/// List points under a tag.
pub async fn get_tag_points(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(tag_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let points = crate::service::get_tag_points(&state.platform.pool, tag_id).await?;
let points = crate::service::get_tag_points(&state.pool, tag_id).await?;
Ok(Json(points))
}
@ -63,16 +57,15 @@ pub struct UpdateTagReq {
pub point_ids: Option<Vec<Uuid>>,
}
/// Create a tag.
pub async fn create_tag(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Json(payload): Json<CreateTagReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let point_ids = payload.point_ids.as_deref().unwrap_or(&[]);
let tag_id = crate::service::create_tag(
&state.platform.pool,
&state.pool,
&payload.name,
payload.description.as_deref(),
point_ids,
@ -84,22 +77,20 @@ pub async fn create_tag(
}))))
}
/// Update a tag.
pub async fn update_tag(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(tag_id): Path<Uuid>,
Json(payload): Json<UpdateTagReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
// Ensure the target tag exists.
let exists = crate::service::get_tag_by_id(&state.platform.pool, tag_id).await?;
let exists = crate::service::get_tag_by_id(&state.pool, tag_id).await?;
if exists.is_none() {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));
}
crate::service::update_tag(
&state.platform.pool,
&state.pool,
tag_id,
payload.name.as_deref(),
payload.description.as_deref(),
@ -111,16 +102,13 @@ pub async fn update_tag(
})))
}
/// Delete a tag.
pub async fn delete_tag(
State(state): State<AppState>,
State(state): State<PlatformContext>,
Path(tag_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let deleted = crate::service::delete_tag(&state.platform.pool, tag_id).await?;
let deleted = crate::service::delete_tag(&state.pool, tag_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));
}
Ok(StatusCode::NO_CONTENT)
}

View File

@ -1,13 +1,32 @@
use std::sync::Arc;
use uuid::Uuid;
use crate::connection::ConnectionManager;
use crate::event::PlatformEvent;
use crate::websocket::WebSocketManager;
/// Callback interface for app-specific side effects on platform events.
///
/// Platform-level concerns (event persistence, WebSocket broadcast) are handled
/// automatically by `PlatformContext::emit_event()`. Implementations of this trait
/// only need to handle app-specific behavior (e.g. connection management,
/// control runtime notifications).
pub trait PlatformEventSink: Send + Sync {
fn on_source_created(&self, source_id: Uuid);
fn on_source_updated(&self, source_id: Uuid);
fn on_source_deleted(&self, source_id: Uuid, source_name: String);
fn on_points_created(&self, source_id: Uuid, point_ids: Vec<Uuid>);
fn on_points_deleted(&self, source_id: Uuid, point_ids: Vec<Uuid>);
fn on_units_changed(&self, unit_ids: Vec<Uuid>);
}
#[derive(Clone)]
pub struct PlatformContext {
pub pool: sqlx::PgPool,
pub connection_manager: Arc<ConnectionManager>,
pub ws_manager: Arc<WebSocketManager>,
pub event_sink: Option<Arc<dyn PlatformEventSink>>,
}
impl PlatformContext {
@ -20,6 +39,56 @@ impl PlatformContext {
pool,
connection_manager,
ws_manager,
event_sink: None,
}
}
pub fn with_event_sink(mut self, sink: Arc<dyn PlatformEventSink>) -> Self {
self.event_sink = Some(sink);
self
}
/// Emit a platform event: persists to DB, broadcasts via WebSocket,
/// then notifies the app-specific event sink for side effects.
pub fn emit_event(&self, event: PlatformEvent) {
// Notify app-specific sink synchronously (fire-and-forget via channels).
if let Some(sink) = &self.event_sink {
match &event {
PlatformEvent::SourceCreated { source_id } => {
sink.on_source_created(*source_id);
}
PlatformEvent::SourceUpdated { source_id } => {
sink.on_source_updated(*source_id);
}
PlatformEvent::SourceDeleted {
source_id,
source_name,
} => {
sink.on_source_deleted(*source_id, source_name.clone());
}
PlatformEvent::PointsCreated {
source_id,
point_ids,
} => {
sink.on_points_created(*source_id, point_ids.clone());
}
PlatformEvent::PointsDeleted {
source_id,
point_ids,
} => {
sink.on_points_deleted(*source_id, point_ids.clone());
}
PlatformEvent::UnitsChanged { unit_ids } => {
sink.on_units_changed(unit_ids.clone());
}
}
}
// Spawn async persistence + WebSocket broadcast.
let pool = self.pool.clone();
let ws_manager = self.ws_manager.clone();
tokio::spawn(async move {
crate::event::persist_and_broadcast(&event, &pool, &ws_manager).await;
});
}
}