plc_control/src/handler/point.rs

662 lines
19 KiB
Rust

use axum::{
extract::{Path, Query, State},
http::HeaderMap,
response::IntoResponse,
Json,
};
use serde::{Deserialize, Serialize};
use serde_with::rust::double_option;
use sqlx::{QueryBuilder, Row};
use std::collections::{HashMap, HashSet};
use uuid::Uuid;
use validator::Validate;
use crate::util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
};
use crate::{
model::{Node, Point},
AppState,
};
/// List all points.
#[derive(Deserialize, Validate)]
pub struct GetPointListQuery {
pub source_id: Option<Uuid>,
pub equipment_id: Option<Uuid>,
#[serde(flatten)]
pub pagination: PaginationParams,
}
#[derive(Serialize)]
pub struct PointWithMonitor {
#[serde(flatten)]
pub point: Point,
pub point_monitor: Option<crate::telemetry::PointMonitorInfo>,
}
#[derive(Deserialize, Validate)]
pub struct GetPointHistoryQuery {
pub limit: Option<usize>,
}
#[derive(Serialize)]
pub struct PointHistoryItem {
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
pub quality: crate::telemetry::PointQuality,
pub value: Option<crate::telemetry::DataValue>,
pub value_text: Option<String>,
pub value_number: Option<f64>,
}
pub async fn get_point_list(
State(state): State<AppState>,
Query(query): Query<GetPointListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.pool;
// 获取总数
let total = crate::service::get_points_count(pool, query.source_id, query.equipment_id).await?;
// 获取分页数据
let points = crate::service::get_points_paginated(
pool,
query.source_id,
query.equipment_id,
query.pagination.page_size,
query.pagination.offset(),
)
.await?;
let monitor_guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let data: Vec<PointWithMonitor> = points
.into_iter()
.map(|point| {
let point_monitor = monitor_guard.get(&point.id).cloned();
PointWithMonitor {
point,
point_monitor,
}
})
.collect();
let response = PaginatedResponse::new(
data,
total,
query.pagination.page,
query.pagination.page_size,
);
Ok(Json(response))
}
/// Get a point by id.
pub async fn get_point(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let point = crate::service::get_point_by_id(pool, point_id).await?;
Ok(Json(point))
}
pub async fn get_point_history(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
Query(query): Query<GetPointHistoryQuery>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let point = crate::service::get_point_by_id(pool, point_id).await?;
if point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
let limit = query.limit.unwrap_or(120).clamp(1, 1000);
let history = state
.connection_manager
.get_point_history(point_id, limit)
.await;
let items: Vec<PointHistoryItem> = history
.into_iter()
.map(|item| {
let value_number = monitor_value_to_number(&item);
PointHistoryItem {
timestamp: item.timestamp,
quality: item.quality,
value_number,
value: item.value,
value_text: item.value_text,
}
})
.collect();
Ok(Json(items))
}
/// Request payload for updating editable point fields.
#[derive(Deserialize, Validate)]
pub struct UpdatePointReq {
pub name: Option<String>,
#[serde(default, with = "double_option")]
pub description: Option<Option<String>>,
#[serde(default, with = "double_option")]
pub unit: Option<Option<String>>,
#[serde(default, with = "double_option")]
pub tag_id: Option<Option<Uuid>>,
#[serde(default, with = "double_option")]
pub equipment_id: Option<Option<Uuid>>,
#[serde(default, with = "double_option")]
pub signal_role: Option<Option<String>>,
}
/// Request payload for batch setting point tags.
#[derive(Deserialize, Validate)]
pub struct BatchSetPointTagsReq {
pub point_ids: Vec<Uuid>,
pub tag_id: Option<Uuid>,
}
#[derive(Deserialize, Validate)]
pub struct BatchSetPointEquipmentReq {
pub point_ids: Vec<Uuid>,
pub equipment_id: Option<Uuid>,
pub signal_role: Option<String>,
}
/// Update point metadata (name/description/unit only).
pub async fn update_point(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
Json(payload): Json<UpdatePointReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.pool;
if payload.name.is_none()
&& payload.description.is_none()
&& payload.unit.is_none()
&& payload.tag_id.is_none()
&& payload.equipment_id.is_none()
&& payload.signal_role.is_none()
{
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
// If tag_id is provided, ensure tag exists.
if let Some(Some(tag_id)) = payload.tag_id {
let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#)
.bind(tag_id)
.fetch_optional(pool)
.await?
.is_some();
if !tag_exists {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));
}
}
if let Some(Some(equipment_id)) = payload.equipment_id {
let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#)
.bind(equipment_id)
.fetch_optional(pool)
.await?
.is_some();
if !equipment_exists {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
}
// Ensure target point exists.
let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#)
.bind(point_id)
.fetch_optional(pool)
.await?;
if existing_point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
let mut qb: QueryBuilder<sqlx::Postgres> = QueryBuilder::new("UPDATE point SET ");
let mut wrote_field = false;
if let Some(name) = &payload.name {
if wrote_field {
qb.push(", ");
}
qb.push("name = ").push_bind(name);
wrote_field = true;
}
if let Some(description) = &payload.description {
if wrote_field {
qb.push(", ");
}
qb.push("description = ").push_bind(description.as_deref());
wrote_field = true;
}
if let Some(unit) = &payload.unit {
if wrote_field {
qb.push(", ");
}
qb.push("unit = ").push_bind(unit.as_deref());
wrote_field = true;
}
if let Some(tag_id) = &payload.tag_id {
if wrote_field {
qb.push(", ");
}
qb.push("tag_id = ").push_bind(tag_id.as_ref());
wrote_field = true;
}
if let Some(equipment_id) = &payload.equipment_id {
if wrote_field {
qb.push(", ");
}
qb.push("equipment_id = ").push_bind(equipment_id.as_ref());
wrote_field = true;
}
if let Some(signal_role) = &payload.signal_role {
if wrote_field {
qb.push(", ");
}
qb.push("signal_role = ").push_bind(signal_role.as_deref());
wrote_field = true;
}
if wrote_field {
qb.push(", ");
}
qb.push("updated_at = NOW()");
qb.push(" WHERE id = ").push_bind(point_id);
qb.build().execute(pool).await?;
Ok(Json(
serde_json::json!({"ok_msg": "Point updated successfully"}),
))
}
/// Batch set point tags.
pub async fn batch_set_point_tags(
State(state): State<AppState>,
Json(payload): Json<BatchSetPointTagsReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
}
let pool = &state.pool;
// If tag_id is provided, ensure tag exists.
if let Some(tag_id) = payload.tag_id {
let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#)
.bind(tag_id)
.fetch_optional(pool)
.await?
.is_some();
if !tag_exists {
return Err(ApiErr::NotFound("Tag not found".to_string(), None));
}
}
// Check which points exist
let existing_points: Vec<Uuid> = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#)
.bind(&payload.point_ids)
.fetch_all(pool)
.await?
.into_iter()
.map(|row: sqlx::postgres::PgRow| row.get::<Uuid, _>("id"))
.collect();
if existing_points.is_empty() {
return Err(ApiErr::NotFound("No valid points found".to_string(), None));
}
// Update tag_id for all existing points
let result =
sqlx::query(r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#)
.bind(payload.tag_id)
.bind(&existing_points)
.execute(pool)
.await?;
Ok(Json(serde_json::json!({
"ok_msg": "Point tags updated successfully",
"updated_count": result.rows_affected()
})))
}
pub async fn batch_set_point_equipment(
State(state): State<AppState>,
Json(payload): Json<BatchSetPointEquipmentReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
}
let pool = &state.pool;
if let Some(equipment_id) = payload.equipment_id {
let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#)
.bind(equipment_id)
.fetch_optional(pool)
.await?
.is_some();
if !equipment_exists {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
}
let existing_points: Vec<Uuid> = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#)
.bind(&payload.point_ids)
.fetch_all(pool)
.await?
.into_iter()
.map(|row: sqlx::postgres::PgRow| row.get::<Uuid, _>("id"))
.collect();
if existing_points.is_empty() {
return Err(ApiErr::NotFound("No valid points found".to_string(), None));
}
let result = sqlx::query(
r#"
UPDATE point
SET equipment_id = $1,
signal_role = $2,
updated_at = NOW()
WHERE id = ANY($3)
"#,
)
.bind(payload.equipment_id)
.bind(payload.signal_role.as_deref())
.bind(&existing_points)
.execute(pool)
.await?;
Ok(Json(serde_json::json!({
"ok_msg": "Point equipment updated successfully",
"updated_count": result.rows_affected()
})))
}
/// Delete one point by id.
pub async fn delete_point(
State(state): State<AppState>,
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let source_id = {
let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?;
grouped.keys().next().copied()
};
// Ensure target point exists.
let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#)
.bind(point_id)
.fetch_optional(pool)
.await?;
if existing_point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
// Delete point.
sqlx::query(r#"delete from point WHERE id = $1"#)
.bind(point_id)
.execute(pool)
.await?;
if let Some(source_id) = source_id {
if let Err(e) = state
.event_manager
.send(crate::event::AppEvent::PointDeleteBatch {
source_id,
point_ids: vec![point_id],
})
{
tracing::error!("Failed to send PointDeleteBatch event: {}", e);
}
}
Ok(Json(
serde_json::json!({"ok_msg": "Point deleted successfully"}),
))
}
#[derive(Deserialize, Validate)]
/// Request payload for batch point creation from node ids.
pub struct BatchCreatePointsReq {
pub node_ids: Vec<Uuid>,
}
#[derive(Serialize)]
/// Response payload for batch point creation.
pub struct BatchCreatePointsRes {
pub success_count: usize,
pub failed_count: usize,
pub failed_node_ids: Vec<Uuid>,
pub created_point_ids: Vec<Uuid>,
}
/// Batch create points by node ids.
pub async fn batch_create_points(
State(state): State<AppState>,
Json(payload): Json<BatchCreatePointsReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let pool = &state.pool;
if payload.node_ids.is_empty() {
return Err(ApiErr::BadRequest(
"node_ids cannot be empty".to_string(),
None,
));
}
// Use one transaction for the full batch.
let mut tx = pool.begin().await?;
let node_ids = payload.node_ids;
let nodes: Vec<Node> = sqlx::query_as::<_, Node>(r#"SELECT * FROM node WHERE id = ANY($1)"#)
.bind(&node_ids)
.fetch_all(&mut *tx)
.await?;
let node_map: HashMap<Uuid, Node> = nodes.into_iter().map(|node| (node.id, node)).collect();
let existing_node_ids: HashSet<Uuid> = node_map.keys().copied().collect();
let mut failed_node_ids = Vec::new();
for node_id in &node_ids {
if !existing_node_ids.contains(node_id) {
failed_node_ids.push(*node_id);
}
}
let existing_point_node_ids: HashSet<Uuid> =
sqlx::query_scalar::<_, Uuid>(r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#)
.bind(&node_ids)
.fetch_all(&mut *tx)
.await?
.into_iter()
.collect();
let mut to_create = Vec::new();
let mut seen_creatable = HashSet::new();
for node_id in node_ids {
if !existing_node_ids.contains(&node_id) || existing_point_node_ids.contains(&node_id) {
continue;
}
if !seen_creatable.insert(node_id) {
continue;
}
let name = node_map
.get(&node_id)
.map(|node| node.browse_name.clone())
.unwrap_or_else(|| format!("Point_{}", node_id));
to_create.push((Uuid::new_v4(), node_id, name));
}
let mut created_point_ids = Vec::with_capacity(to_create.len());
if !to_create.is_empty() {
let mut qb = QueryBuilder::new("INSERT INTO point (id, node_id, name) ");
qb.push_values(to_create.iter(), |mut b, (id, node_id, name)| {
b.push_bind(*id).push_bind(*node_id).push_bind(name);
});
qb.build().execute(&mut *tx).await?;
created_point_ids.extend(to_create.into_iter().map(|(id, _, _)| id));
}
// Commit the transaction.
tx.commit().await?;
// Emit grouped create events by source.
if !created_point_ids.is_empty() {
let grouped =
crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?;
for (source_id, points) in grouped {
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state
.event_manager
.send(crate::event::AppEvent::PointCreateBatch {
source_id,
point_ids,
})
{
tracing::error!("Failed to send PointCreateBatch event: {}", e);
}
}
}
Ok(Json(BatchCreatePointsRes {
success_count: created_point_ids.len(),
failed_count: failed_node_ids.len(),
failed_node_ids,
created_point_ids,
}))
}
#[derive(Deserialize, Validate)]
/// Request payload for batch point deletion.
pub struct BatchDeletePointsReq {
pub point_ids: Vec<Uuid>,
}
#[derive(Serialize)]
/// Response payload for batch point deletion.
pub struct BatchDeletePointsRes {
pub deleted_count: u64,
}
/// Batch delete points and emit grouped delete events by source.
pub async fn batch_delete_points(
State(state): State<AppState>,
Json(payload): Json<BatchDeletePointsReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
}
let pool = &state.pool;
let point_ids = payload.point_ids;
let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?;
let existing_point_ids: Vec<Uuid> = grouped
.values()
.flat_map(|points| points.iter().map(|p| p.point_id))
.collect();
if existing_point_ids.is_empty() {
return Ok(Json(BatchDeletePointsRes { deleted_count: 0 }));
}
let result = sqlx::query(r#"DELETE FROM point WHERE id = ANY($1)"#)
.bind(&existing_point_ids)
.execute(pool)
.await?;
for (source_id, points) in grouped {
let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state
.event_manager
.send(crate::event::AppEvent::PointDeleteBatch {
source_id,
point_ids: ids,
})
{
tracing::error!("Failed to send PointDeleteBatch event: {}", e);
}
}
Ok(Json(BatchDeletePointsRes {
deleted_count: result.rows_affected(),
}))
}
pub async fn batch_set_point_value(
State(state): State<AppState>,
headers: HeaderMap,
Json(payload): Json<crate::connection::BatchSetPointValueReq>,
) -> Result<impl IntoResponse, ApiErr> {
let write_key = headers
.get("X-Write-Key")
.and_then(|v| v.to_str().ok())
.unwrap_or_default();
if !state.config.verify_write_key(write_key) {
return Err(ApiErr::Forbidden(
"write permission denied".to_string(),
Some(serde_json::json!({
"hint": "set WRITE_API_KEY (or legacy WRITE_KEY) and pass header X-Write-Key"
})),
));
}
let result = state
.connection_manager
.write_point_values_batch(payload)
.await
.map_err(|e| ApiErr::Internal(e, None))?;
Ok(Json(result))
}
fn monitor_value_to_number(item: &crate::telemetry::PointMonitorInfo) -> Option<f64> {
match item.value.as_ref()? {
crate::telemetry::DataValue::Int(v) => Some(*v as f64),
crate::telemetry::DataValue::UInt(v) => Some(*v as f64),
crate::telemetry::DataValue::Float(v) => Some(*v),
crate::telemetry::DataValue::Bool(v) => Some(if *v { 1.0 } else { 0.0 }),
crate::telemetry::DataValue::Text(v) => v.parse::<f64>().ok(),
_ => None,
}
}