From 8be82e372e837d27f09f3cf5cae8fab159f83bd0 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Mon, 23 Mar 2026 10:38:20 +0800 Subject: [PATCH] feat(point): add equipment metadata scaffolding --- ...93000_add_equipment_and_point_metadata.sql | 16 ++ src/handler/point.rs | 226 ++++++++++-------- src/model.rs | 20 +- src/service.rs | 76 +++--- 4 files changed, 192 insertions(+), 146 deletions(-) create mode 100644 migrations/20260323093000_add_equipment_and_point_metadata.sql diff --git a/migrations/20260323093000_add_equipment_and_point_metadata.sql b/migrations/20260323093000_add_equipment_and_point_metadata.sql new file mode 100644 index 0000000..521edfc --- /dev/null +++ b/migrations/20260323093000_add_equipment_and_point_metadata.sql @@ -0,0 +1,16 @@ +CREATE TABLE equipment ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + code TEXT NOT NULL, + name TEXT NOT NULL, + kind TEXT, + description TEXT, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE (code) +); + +ALTER TABLE point + ADD COLUMN equipment_id UUID REFERENCES equipment(id) ON DELETE SET NULL, + ADD COLUMN signal_role TEXT; + +CREATE INDEX idx_point_equipment_id ON point(equipment_id); diff --git a/src/handler/point.rs b/src/handler/point.rs index 739175d..a7ba175 100644 --- a/src/handler/point.rs +++ b/src/handler/point.rs @@ -1,15 +1,23 @@ -use axum::{Json, extract::{Path, Query, State}, http::HeaderMap, response::IntoResponse}; +use axum::{ + extract::{Path, Query, State}, + http::HeaderMap, + response::IntoResponse, + Json, +}; use serde::{Deserialize, Serialize}; +use sqlx::{QueryBuilder, Row}; use std::collections::{HashMap, HashSet}; use uuid::Uuid; use validator::Validate; -use sqlx::{Row, QueryBuilder}; -use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}}; +use crate::util::{ + pagination::{PaginatedResponse, PaginationParams}, + response::ApiErr, +}; use crate::{ - AppState, model::{Node, Point}, + AppState, }; /// List all points. @@ -58,7 +66,8 @@ pub async fn get_point_list( query.source_id, query.pagination.page_size, query.pagination.offset(), - ).await?; + ) + .await?; let monitor_guard = state .connection_manager @@ -68,14 +77,20 @@ pub async fn get_point_list( let data: Vec = points .into_iter() .map(|point| { - let point_monitor = monitor_guard - .get(&point.id) - .cloned(); - PointWithMonitor { point, point_monitor } + let point_monitor = monitor_guard.get(&point.id).cloned(); + PointWithMonitor { + point, + point_monitor, + } }) .collect(); - let response = PaginatedResponse::new(data, total, query.pagination.page, query.pagination.page_size); + let response = PaginatedResponse::new( + data, + total, + query.pagination.page, + query.pagination.page_size, + ); Ok(Json(response)) } @@ -124,7 +139,6 @@ pub async fn get_point_history( Ok(Json(items)) } - /// Request payload for updating editable point fields. #[derive(Deserialize, Validate)] pub struct UpdatePointReq { @@ -132,6 +146,8 @@ pub struct UpdatePointReq { pub description: Option, pub unit: Option, pub tag_id: Option, + pub equipment_id: Option, + pub signal_role: Option, } /// Request payload for batch setting point tags. @@ -151,32 +167,46 @@ pub async fn update_point( let pool = &state.pool; - if payload.name.is_none() && payload.description.is_none() && payload.unit.is_none() && payload.tag_id.is_none() { + if payload.name.is_none() + && payload.description.is_none() + && payload.unit.is_none() + && payload.tag_id.is_none() + && payload.equipment_id.is_none() + && payload.signal_role.is_none() + { return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); } // If tag_id is provided, ensure tag exists. if let Some(tag_id) = payload.tag_id { - let tag_exists = sqlx::query( - r#"SELECT 1 FROM tag WHERE id = $1"#, - ) - .bind(tag_id) - .fetch_optional(pool) - .await? - .is_some(); + let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) + .bind(tag_id) + .fetch_optional(pool) + .await? + .is_some(); if !tag_exists { return Err(ApiErr::NotFound("Tag not found".to_string(), None)); } } + if let Some(equipment_id) = payload.equipment_id { + let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#) + .bind(equipment_id) + .fetch_optional(pool) + .await? + .is_some(); + + if !equipment_exists { + return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); + } + } + // Ensure target point exists. - let existing_point = sqlx::query_as::<_, Point>( - r#"SELECT * FROM point WHERE id = $1"#, - ) - .bind(point_id) - .fetch_optional(pool) - .await?; + let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) + .bind(point_id) + .fetch_optional(pool) + .await?; if existing_point.is_none() { return Err(ApiErr::NotFound("Point not found".to_string(), None)); } @@ -196,13 +226,21 @@ pub async fn update_point( if let Some(tag_id) = &payload.tag_id { sep.push("tag_id = ").push_bind(tag_id); } + if let Some(equipment_id) = &payload.equipment_id { + sep.push("equipment_id = ").push_bind(equipment_id); + } + if let Some(signal_role) = &payload.signal_role { + sep.push("signal_role = ").push_bind(signal_role); + } sep.push("updated_at = NOW()"); qb.push(" WHERE id = ").push_bind(point_id); qb.build().execute(pool).await?; - Ok(Json(serde_json::json!({"ok_msg": "Point updated successfully"}))) + Ok(Json( + serde_json::json!({"ok_msg": "Point updated successfully"}), + )) } /// Batch set point tags. @@ -213,20 +251,21 @@ pub async fn batch_set_point_tags( payload.validate()?; if payload.point_ids.is_empty() { - return Err(ApiErr::BadRequest("point_ids cannot be empty".to_string(), None)); + return Err(ApiErr::BadRequest( + "point_ids cannot be empty".to_string(), + None, + )); } let pool = &state.pool; // If tag_id is provided, ensure tag exists. if let Some(tag_id) = payload.tag_id { - let tag_exists = sqlx::query( - r#"SELECT 1 FROM tag WHERE id = $1"#, - ) - .bind(tag_id) - .fetch_optional(pool) - .await? - .is_some(); + let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#) + .bind(tag_id) + .fetch_optional(pool) + .await? + .is_some(); if !tag_exists { return Err(ApiErr::NotFound("Tag not found".to_string(), None)); @@ -234,28 +273,25 @@ pub async fn batch_set_point_tags( } // Check which points exist - let existing_points: Vec = sqlx::query( - r#"SELECT id FROM point WHERE id = ANY($1)"#, - ) - .bind(&payload.point_ids) - .fetch_all(pool) - .await? - .into_iter() - .map(|row: sqlx::postgres::PgRow| row.get::("id")) - .collect(); + let existing_points: Vec = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#) + .bind(&payload.point_ids) + .fetch_all(pool) + .await? + .into_iter() + .map(|row: sqlx::postgres::PgRow| row.get::("id")) + .collect(); if existing_points.is_empty() { return Err(ApiErr::NotFound("No valid points found".to_string(), None)); } // Update tag_id for all existing points - let result = sqlx::query( - r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#, - ) - .bind(payload.tag_id) - .bind(&existing_points) - .execute(pool) - .await?; + let result = + sqlx::query(r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#) + .bind(payload.tag_id) + .bind(&existing_points) + .execute(pool) + .await?; Ok(Json(serde_json::json!({ "ok_msg": "Point tags updated successfully", @@ -276,23 +312,19 @@ pub async fn delete_point( }; // Ensure target point exists. - let existing_point = sqlx::query_as::<_, Point>( - r#"SELECT * FROM point WHERE id = $1"#, - ) - .bind(point_id) - .fetch_optional(pool) - .await?; + let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#) + .bind(point_id) + .fetch_optional(pool) + .await?; if existing_point.is_none() { return Err(ApiErr::NotFound("Point not found".to_string(), None)); } // Delete point. - sqlx::query( - r#"delete from point WHERE id = $1"#, - ) - .bind(point_id) - .execute(pool) - .await?; + sqlx::query(r#"delete from point WHERE id = $1"#) + .bind(point_id) + .execute(pool) + .await?; if let Some(source_id) = source_id { if let Err(e) = state @@ -306,7 +338,9 @@ pub async fn delete_point( } } - Ok(Json(serde_json::json!({"ok_msg": "Point deleted successfully"}))) + Ok(Json( + serde_json::json!({"ok_msg": "Point deleted successfully"}), + )) } #[derive(Deserialize, Validate)] @@ -334,24 +368,22 @@ pub async fn batch_create_points( let pool = &state.pool; if payload.node_ids.is_empty() { - return Err(ApiErr::BadRequest("node_ids cannot be empty".to_string(), None)); + return Err(ApiErr::BadRequest( + "node_ids cannot be empty".to_string(), + None, + )); } // Use one transaction for the full batch. let mut tx = pool.begin().await?; let node_ids = payload.node_ids; - let nodes: Vec = sqlx::query_as::<_, Node>( - r#"SELECT * FROM node WHERE id = ANY($1)"#, - ) - .bind(&node_ids) - .fetch_all(&mut *tx) - .await?; + let nodes: Vec = sqlx::query_as::<_, Node>(r#"SELECT * FROM node WHERE id = ANY($1)"#) + .bind(&node_ids) + .fetch_all(&mut *tx) + .await?; - let node_map: HashMap = nodes - .into_iter() - .map(|node| (node.id, node)) - .collect(); + let node_map: HashMap = nodes.into_iter().map(|node| (node.id, node)).collect(); let existing_node_ids: HashSet = node_map.keys().copied().collect(); let mut failed_node_ids = Vec::new(); @@ -361,14 +393,13 @@ pub async fn batch_create_points( } } - let existing_point_node_ids: HashSet = sqlx::query_scalar::<_, Uuid>( - r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#, - ) - .bind(&node_ids) - .fetch_all(&mut *tx) - .await? - .into_iter() - .collect(); + let existing_point_node_ids: HashSet = + sqlx::query_scalar::<_, Uuid>(r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#) + .bind(&node_ids) + .fetch_all(&mut *tx) + .await? + .into_iter() + .collect(); let mut to_create = Vec::new(); let mut seen_creatable = HashSet::new(); @@ -403,12 +434,16 @@ pub async fn batch_create_points( // Emit grouped create events by source. if !created_point_ids.is_empty() { - let grouped = crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?; + let grouped = + crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?; for (source_id, points) in grouped { let point_ids: Vec = points.into_iter().map(|p| p.point_id).collect(); if let Err(e) = state .event_manager - .send(crate::event::AppEvent::PointCreateBatch { source_id, point_ids }) + .send(crate::event::AppEvent::PointCreateBatch { + source_id, + point_ids, + }) { tracing::error!("Failed to send PointCreateBatch event: {}", e); } @@ -443,7 +478,10 @@ pub async fn batch_delete_points( payload.validate()?; if payload.point_ids.is_empty() { - return Err(ApiErr::BadRequest("point_ids cannot be empty".to_string(), None)); + return Err(ApiErr::BadRequest( + "point_ids cannot be empty".to_string(), + None, + )); } let pool = &state.pool; @@ -459,12 +497,10 @@ pub async fn batch_delete_points( return Ok(Json(BatchDeletePointsRes { deleted_count: 0 })); } - let result = sqlx::query( - r#"DELETE FROM point WHERE id = ANY($1)"#, - ) - .bind(&existing_point_ids) - .execute(pool) - .await?; + let result = sqlx::query(r#"DELETE FROM point WHERE id = ANY($1)"#) + .bind(&existing_point_ids) + .execute(pool) + .await?; for (source_id, points) in grouped { let ids: Vec = points.into_iter().map(|p| p.point_id).collect(); @@ -484,10 +520,6 @@ pub async fn batch_delete_points( })) } - - - - pub async fn batch_set_point_value( State(state): State, headers: HeaderMap, @@ -507,7 +539,9 @@ pub async fn batch_set_point_value( )); } - let result = state.connection_manager.write_point_values_batch(payload) + let result = state + .connection_manager + .write_point_values_batch(payload) .await .map_err(|e| ApiErr::Internal(e, None))?; Ok(Json(result)) diff --git a/src/model.rs b/src/model.rs index 243a2f2..176ff5d 100644 --- a/src/model.rs +++ b/src/model.rs @@ -1,10 +1,10 @@ +use crate::util::datetime::utc_to_local_str; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use sqlx::FromRow; use sqlx::types::Json; +use sqlx::FromRow; use std::collections::HashMap; use uuid::Uuid; -use crate::util::datetime::utc_to_local_str; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] @@ -95,6 +95,8 @@ pub struct Point { pub description: Option, pub unit: Option, pub tag_id: Option, + pub equipment_id: Option, + pub signal_role: Option, #[serde(serialize_with = "utc_to_local_str")] pub created_at: DateTime, #[serde(serialize_with = "utc_to_local_str")] @@ -118,6 +120,19 @@ pub struct Tag { pub updated_at: DateTime, } +#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] +pub struct Equipment { + pub id: Uuid, + pub code: String, + pub name: String, + pub kind: Option, + pub description: Option, + #[serde(serialize_with = "utc_to_local_str")] + pub created_at: DateTime, + #[serde(serialize_with = "utc_to_local_str")] + pub updated_at: DateTime, +} + #[derive(Debug, Serialize, Deserialize, FromRow, Clone)] pub struct Page { pub id: Uuid, @@ -128,4 +143,3 @@ pub struct Page { #[serde(serialize_with = "utc_to_local_str")] pub updated_at: DateTime, } - diff --git a/src/service.rs b/src/service.rs index 70e3997..6d3225b 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,5 +1,5 @@ use crate::model::{PointSubscriptionInfo, Source}; -use sqlx::{PgPool, query_as}; +use sqlx::{query_as, PgPool}; pub async fn get_enabled_source( pool: &PgPool, @@ -21,12 +21,10 @@ pub async fn get_point_by_id( pool: &PgPool, point_id: uuid::Uuid, ) -> Result, sqlx::Error> { - query_as::<_, crate::model::Point>( - r#"SELECT * FROM point WHERE id = $1"#, - ) - .bind(point_id) - .fetch_optional(pool) - .await + query_as::<_, crate::model::Point>(r#"SELECT * FROM point WHERE id = $1"#) + .bind(point_id) + .fetch_optional(pool) + .await } pub async fn get_points_grouped_by_source( @@ -144,11 +142,9 @@ pub async fn get_points_count( .await } None => { - sqlx::query_scalar::<_, i64>( - r#"SELECT COUNT(*) FROM point"#, - ) - .fetch_one(pool) - .await + sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM point"#) + .fetch_one(pool) + .await } } } @@ -227,14 +223,10 @@ pub async fn get_points_paginated( // ==================== Tag 相关服务函数 ==================== /// 获取标签总数 -pub async fn get_tags_count( - pool: &PgPool, -) -> Result { - sqlx::query_scalar::<_, i64>( - r#"SELECT COUNT(*) FROM tag"#, - ) - .fetch_one(pool) - .await +pub async fn get_tags_count(pool: &PgPool) -> Result { + sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM tag"#) + .fetch_one(pool) + .await } /// 获取分页标签列表 @@ -246,11 +238,9 @@ pub async fn get_tags_paginated( if page_size == 0 { Ok(vec![]) } else if page_size == -1 { - sqlx::query_as::<_, crate::model::Tag>( - r#"SELECT * FROM tag ORDER BY created_at"#, - ) - .fetch_all(pool) - .await + sqlx::query_as::<_, crate::model::Tag>(r#"SELECT * FROM tag ORDER BY created_at"#) + .fetch_all(pool) + .await } else { sqlx::query_as::<_, crate::model::Tag>( r#" @@ -271,12 +261,10 @@ pub async fn get_tag_by_id( pool: &PgPool, tag_id: uuid::Uuid, ) -> Result, sqlx::Error> { - query_as::<_, crate::model::Tag>( - r#"SELECT * FROM tag WHERE id = $1"# - ) - .bind(tag_id) - .fetch_optional(pool) - .await + query_as::<_, crate::model::Tag>(r#"SELECT * FROM tag WHERE id = $1"#) + .bind(tag_id) + .fetch_optional(pool) + .await } /// 获取标签下的点位 @@ -290,7 +278,7 @@ pub async fn get_tag_points( FROM point WHERE tag_id = $1 ORDER BY created_at - "# + "#, ) .bind(tag_id) .fetch_all(pool) @@ -312,7 +300,7 @@ pub async fn create_tag( r#" INSERT INTO tag (id, name, description) VALUES ($1, $2, $3) - "# + "#, ) .bind(tag_id) .bind(name) @@ -327,7 +315,7 @@ pub async fn create_tag( UPDATE point SET tag_id = $1 WHERE id = $2 - "# + "#, ) .bind(tag_id) .bind(point_id) @@ -394,7 +382,7 @@ pub async fn update_tag( UPDATE point SET tag_id = NULL WHERE tag_id = $1 - "# + "#, ) .bind(tag_id) .execute(&mut *tx) @@ -407,7 +395,7 @@ pub async fn update_tag( UPDATE point SET tag_id = $1 WHERE id = $2 - "# + "#, ) .bind(tag_id) .bind(point_id) @@ -422,17 +410,11 @@ pub async fn update_tag( } /// 删除标签 -pub async fn delete_tag( - pool: &PgPool, - tag_id: uuid::Uuid, -) -> Result { - let result = sqlx::query( - r#"DELETE FROM tag WHERE id = $1"# - ) - .bind(tag_id) - .execute(pool) - .await?; +pub async fn delete_tag(pool: &PgPool, tag_id: uuid::Uuid) -> Result { + let result = sqlx::query(r#"DELETE FROM tag WHERE id = $1"#) + .bind(tag_id) + .execute(pool) + .await?; Ok(result.rows_affected() > 0) } -