use axum::{ extract::{Path, Query, State}, http::HeaderMap, response::IntoResponse, Json, }; use serde::{Deserialize, Serialize}; use serde_with::rust::double_option; use sqlx::{QueryBuilder, Row}; use std::collections::{HashMap, HashSet}; use uuid::Uuid; use validator::Validate; use plc_platform_core::util::{ pagination::{PaginatedResponse, PaginationParams}, response::ApiErr, }; use crate::{ AppState, }; use plc_platform_core::model::{Node, Point}; async fn notify_units( state: &AppState, unit_ids: impl IntoIterator, ) { let mut seen = std::collections::HashSet::new(); for unit_id in unit_ids { if seen.insert(unit_id) { state.control_runtime.notify_unit(unit_id).await; } } } /// List all points. #[derive(Deserialize, Validate)] pub struct GetPointListQuery { pub source_id: Option, pub equipment_id: Option, #[serde(flatten)] pub pagination: PaginationParams, } #[derive(Serialize)] pub struct PointWithMonitor { #[serde(flatten)] pub point: Point, pub point_monitor: Option, } #[derive(Deserialize, Validate)] pub struct GetPointHistoryQuery { pub limit: Option, } #[derive(Serialize)] pub struct PointHistoryItem { #[serde(serialize_with = "plc_platform_core::util::datetime::option_utc_to_local_str")] pub timestamp: Option>, pub quality: crate::telemetry::PointQuality, pub value: Option, pub value_text: Option, pub value_number: Option, } pub async fn get_point_list( State(state): State, Query(query): Query, ) -> Result { query.validate()?; let pool = &state.pool; // Count total rows. let total = crate::service::get_points_count(pool, query.source_id, query.equipment_id).await?; // Load current page rows. let points = crate::service::get_points_paginated( pool, query.source_id, query.equipment_id, query.pagination.page_size, query.pagination.offset(), ) .await?; let monitor_guard = state .connection_manager .get_point_monitor_data_read_guard() .await; let data: Vec = points .into_iter() .map(|point| { let point_monitor = monitor_guard.get(&point.id).cloned(); PointWithMonitor { point, point_monitor, } }) .collect(); let response = PaginatedResponse::new( data, total, query.pagination.page, query.pagination.page_size, ); Ok(Json(response)) } /// Get a point by id. pub async fn get_point( State(state): State, Path(point_id): Path, ) -> Result { let pool = &state.pool; let point = crate::service::get_point_by_id(pool, point_id).await?; Ok(Json(point)) } pub async fn get_point_history( State(state): State, Path(point_id): Path, Query(query): Query, ) -> Result { let pool = &state.pool; let point = crate::service::get_point_by_id(pool, point_id).await?; if point.is_none() { return Err(ApiErr::NotFound("Point not found".to_string(), None)); } let limit = query.limit.unwrap_or(120).clamp(1, 1000); let history = state .connection_manager .get_point_history(point_id, limit) .await; let items: Vec = history .into_iter() .map(|item| { let value_number = monitor_value_to_number(&item); PointHistoryItem { timestamp: item.timestamp, quality: item.quality, value_number, value: item.value, value_text: item.value_text, } }) .collect(); Ok(Json(items)) } /// Request payload for updating editable point fields. #[derive(Deserialize, Validate)] pub struct UpdatePointReq { pub name: Option, #[serde(default, with = "double_option")] pub description: Option>, #[serde(default, with = "double_option")] pub unit: Option>, #[serde(default, with = "double_option")] pub tag_id: Option>, #[serde(default, with = "double_option")] pub equipment_id: Option>, #[serde(default, with = "double_option")] pub signal_role: Option>, } /// Request payload for batch setting point tags. #[derive(Deserialize, Validate)] pub struct BatchSetPointTagsReq { #[validate(length(min = 1, max = 500))] pub point_ids: Vec, pub tag_id: Option, } #[derive(Deserialize, Validate)] pub struct BatchSetPointEquipmentReq { #[validate(length(min = 1, max = 500))] pub point_ids: Vec, pub equipment_id: Option, pub signal_role: Option, } /// Update point metadata (name/description/unit only). pub async fn update_point( State(state): State, Path(point_id): Path, Json(payload): Json, ) -> Result { payload.validate()?; let pool = &state.pool; if payload.name.is_none() && payload.description.is_none() && payload.unit.is_none() && payload.tag_id.is_none() && payload.equipment_id.is_none() && payload.signal_role.is_none() { return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } // If tag_id is provided, ensure tag exists. if let Some(Some(tag_id)) = payload.tag_id { let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) .bind(tag_id) .fetch_optional(pool) .await? .is_some(); if !tag_exists { return Err(ApiErr::NotFound("Tag not found".to_string(), None)); } } if let Some(Some(equipment_id)) = payload.equipment_id { let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) .bind(equipment_id) .fetch_optional(pool) .await? .is_some(); if !equipment_exists { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } } // Ensure target point exists. let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) .bind(point_id) .fetch_optional(pool) .await?; if existing_point.is_none() { return Err(ApiErr::NotFound("Point not found".to_string(), None)); } let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; let mut qb: QueryBuilder = QueryBuilder::new("UPDATE point SET "); let mut wrote_field = false; if let Some(name) = &payload.name { if wrote_field { qb.push(", "); } qb.push("name = ").push_bind(name); wrote_field = true; } if let Some(description) = &payload.description { if wrote_field { qb.push(", "); } qb.push("description = ").push_bind(description.as_deref()); wrote_field = true; } if let Some(unit) = &payload.unit { if wrote_field { qb.push(", "); } qb.push("unit = ").push_bind(unit.as_deref()); wrote_field = true; } if let Some(tag_id) = &payload.tag_id { if wrote_field { qb.push(", "); } qb.push("tag_id = ").push_bind(tag_id.as_ref()); wrote_field = true; } if let Some(equipment_id) = &payload.equipment_id { if wrote_field { qb.push(", "); } qb.push("equipment_id = ").push_bind(equipment_id.as_ref()); wrote_field = true; } if let Some(signal_role) = &payload.signal_role { if wrote_field { qb.push(", "); } qb.push("signal_role = ").push_bind(signal_role.as_deref()); wrote_field = true; } if wrote_field { qb.push(", "); } qb.push("updated_at = NOW()"); qb.push(" WHERE id = ").push_bind(point_id); qb.build().execute(pool).await?; let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await; Ok(Json( serde_json::json!({"ok_msg": "Point updated successfully"}), )) } /// Batch set point tags. pub async fn batch_set_point_tags( State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; if payload.point_ids.is_empty() { return Err(ApiErr::BadRequest( "point_ids cannot be empty".to_string(), None, )); } let pool = &state.pool; // If tag_id is provided, ensure tag exists. if let Some(tag_id) = payload.tag_id { let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) .bind(tag_id) .fetch_optional(pool) .await? .is_some(); if !tag_exists { return Err(ApiErr::NotFound("Tag not found".to_string(), None)); } } // Check which points exist let existing_points: Vec = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#) .bind(&payload.point_ids) .fetch_all(pool) .await? .into_iter() .map(|row: sqlx::postgres::PgRow| row.get::("id")) .collect(); if existing_points.is_empty() { return Err(ApiErr::NotFound("No valid points found".to_string(), None)); } // Update tag_id for all existing points let result = sqlx::query(r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#) .bind(payload.tag_id) .bind(&existing_points) .execute(pool) .await?; Ok(Json(serde_json::json!({ "ok_msg": "Point tags updated successfully", "updated_count": result.rows_affected() }))) } pub async fn batch_set_point_equipment( State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; if payload.point_ids.is_empty() { return Err(ApiErr::BadRequest( "point_ids cannot be empty".to_string(), None, )); } let pool = &state.pool; if let Some(equipment_id) = payload.equipment_id { let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) .bind(equipment_id) .fetch_optional(pool) .await? .is_some(); if !equipment_exists { return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } } let existing_points: Vec = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#) .bind(&payload.point_ids) .fetch_all(pool) .await? .into_iter() .map(|row: sqlx::postgres::PgRow| row.get::("id")) .collect(); if existing_points.is_empty() { return Err(ApiErr::NotFound("No valid points found".to_string(), None)); } let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; let result = sqlx::query( r#" UPDATE point SET equipment_id = $1, signal_role = $2, updated_at = NOW() WHERE id = ANY($3) "#, ) .bind(payload.equipment_id) .bind(payload.signal_role.as_deref()) .bind(&existing_points) .execute(pool) .await?; let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?; notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await; Ok(Json(serde_json::json!({ "ok_msg": "Point equipment updated successfully", "updated_count": result.rows_affected() }))) } /// Delete one point by id. pub async fn delete_point( State(state): State, Path(point_id): Path, ) -> Result { let pool = &state.pool; let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?; let source_id = { let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?; grouped.keys().next().copied() }; // Ensure target point exists. let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) .bind(point_id) .fetch_optional(pool) .await?; if existing_point.is_none() { return Err(ApiErr::NotFound("Point not found".to_string(), None)); } // Delete point. sqlx::query(r#"delete from point WHERE id = $1"#) .bind(point_id) .execute(pool) .await?; if let Some(source_id) = source_id { if let Err(e) = state .event_manager .send(crate::event::AppEvent::PointDeleteBatch { source_id, point_ids: vec![point_id], }) { tracing::error!("Failed to send PointDeleteBatch event: {}", e); } } notify_units(&state, affected_unit_ids).await; Ok(Json( serde_json::json!({"ok_msg": "Point deleted successfully"}), )) } #[derive(Deserialize, Validate)] /// Request payload for batch point creation from node ids. pub struct BatchCreatePointsReq { #[validate(length(min = 1, max = 500))] pub node_ids: Vec, } #[derive(Serialize)] /// Response payload for batch point creation. pub struct BatchCreatePointsRes { pub success_count: usize, pub failed_count: usize, pub failed_node_ids: Vec, pub created_point_ids: Vec, } /// Batch create points by node ids. pub async fn batch_create_points( State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; let pool = &state.pool; if payload.node_ids.is_empty() { return Err(ApiErr::BadRequest( "node_ids cannot be empty".to_string(), None, )); } // Use one transaction for the full batch. let mut tx = pool.begin().await?; let node_ids = payload.node_ids; let nodes: Vec = sqlx::query_as::<_, Node>(r#"SELECT * FROM node WHERE id = ANY($1)"#) .bind(&node_ids) .fetch_all(&mut *tx) .await?; let node_map: HashMap = nodes.into_iter().map(|node| (node.id, node)).collect(); let existing_node_ids: HashSet = node_map.keys().copied().collect(); let mut failed_node_ids = Vec::new(); for node_id in &node_ids { if !existing_node_ids.contains(node_id) { failed_node_ids.push(*node_id); } } let existing_point_node_ids: HashSet = sqlx::query_scalar::<_, Uuid>(r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#) .bind(&node_ids) .fetch_all(&mut *tx) .await? .into_iter() .collect(); let mut to_create = Vec::new(); let mut seen_creatable = HashSet::new(); for node_id in node_ids { if !existing_node_ids.contains(&node_id) || existing_point_node_ids.contains(&node_id) { continue; } if !seen_creatable.insert(node_id) { continue; } let name = node_map .get(&node_id) .map(|node| node.browse_name.clone()) .unwrap_or_else(|| format!("Point_{}", node_id)); to_create.push((Uuid::new_v4(), node_id, name)); } let mut created_point_ids = Vec::with_capacity(to_create.len()); if !to_create.is_empty() { let mut qb = QueryBuilder::new("INSERT INTO point (id, node_id, name) "); qb.push_values(to_create.iter(), |mut b, (id, node_id, name)| { b.push_bind(*id).push_bind(*node_id).push_bind(name); }); qb.build().execute(&mut *tx).await?; created_point_ids.extend(to_create.into_iter().map(|(id, _, _)| id)); } // Commit the transaction. tx.commit().await?; // Emit grouped create events by source. if !created_point_ids.is_empty() { let grouped = crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?; for (source_id, points) in grouped { let point_ids: Vec = points.into_iter().map(|p| p.point_id).collect(); if let Err(e) = state .event_manager .send(crate::event::AppEvent::PointCreateBatch { source_id, point_ids, }) { tracing::error!("Failed to send PointCreateBatch event: {}", e); } } } Ok(Json(BatchCreatePointsRes { success_count: created_point_ids.len(), failed_count: failed_node_ids.len(), failed_node_ids, created_point_ids, })) } #[derive(Deserialize, Validate)] /// Request payload for batch point deletion. pub struct BatchDeletePointsReq { #[validate(length(min = 1, max = 500))] pub point_ids: Vec, } #[derive(Serialize)] /// Response payload for batch point deletion. pub struct BatchDeletePointsRes { pub deleted_count: u64, } /// Batch delete points and emit grouped delete events by source. pub async fn batch_delete_points( State(state): State, Json(payload): Json, ) -> Result { payload.validate()?; if payload.point_ids.is_empty() { return Err(ApiErr::BadRequest( "point_ids cannot be empty".to_string(), None, )); } let pool = &state.pool; let point_ids = payload.point_ids; let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?; let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?; let existing_point_ids: Vec = grouped .values() .flat_map(|points| points.iter().map(|p| p.point_id)) .collect(); if existing_point_ids.is_empty() { return Ok(Json(BatchDeletePointsRes { deleted_count: 0 })); } let result = sqlx::query(r#"DELETE FROM point WHERE id = ANY($1)"#) .bind(&existing_point_ids) .execute(pool) .await?; for (source_id, points) in grouped { let ids: Vec = points.into_iter().map(|p| p.point_id).collect(); if let Err(e) = state .event_manager .send(crate::event::AppEvent::PointDeleteBatch { source_id, point_ids: ids, }) { tracing::error!("Failed to send PointDeleteBatch event: {}", e); } } notify_units(&state, affected_unit_ids).await; Ok(Json(BatchDeletePointsRes { deleted_count: result.rows_affected(), })) } pub async fn batch_set_point_value( State(state): State, headers: HeaderMap, Json(payload): Json, ) -> Result { let write_key = headers .get("X-Write-Key") .and_then(|v| v.to_str().ok()) .unwrap_or_default(); if !state.config.verify_write_key(write_key) { return Err(ApiErr::Forbidden( "write permission denied".to_string(), Some(serde_json::json!({ "hint": "set WRITE_API_KEY (or legacy WRITE_KEY) and pass header X-Write-Key" })), )); } let result = state .connection_manager .write_point_values_batch(payload) .await .map_err(|e| ApiErr::Internal(e, None))?; Ok(Json(result)) } fn monitor_value_to_number(item: &crate::telemetry::PointMonitorInfo) -> Option { match item.value.as_ref()? { crate::telemetry::DataValue::Int(v) => Some(*v as f64), crate::telemetry::DataValue::UInt(v) => Some(*v as f64), crate::telemetry::DataValue::Float(v) => Some(*v), crate::telemetry::DataValue::Bool(v) => Some(if *v { 1.0 } else { 0.0 }), crate::telemetry::DataValue::Text(v) => v.parse::().ok(), _ => None, } }