feat(point): add equipment metadata scaffolding

This commit is contained in:
caoqianming 2026-03-23 10:38:20 +08:00
parent a691f07e8e
commit 8be82e372e
4 changed files with 192 additions and 146 deletions

View File

@ -0,0 +1,16 @@
CREATE TABLE equipment (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code TEXT NOT NULL,
name TEXT NOT NULL,
kind TEXT,
description TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (code)
);
ALTER TABLE point
ADD COLUMN equipment_id UUID REFERENCES equipment(id) ON DELETE SET NULL,
ADD COLUMN signal_role TEXT;
CREATE INDEX idx_point_equipment_id ON point(equipment_id);

View File

@ -1,15 +1,23 @@
use axum::{Json, extract::{Path, Query, State}, http::HeaderMap, response::IntoResponse}; use axum::{
extract::{Path, Query, State},
http::HeaderMap,
response::IntoResponse,
Json,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::{QueryBuilder, Row};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use uuid::Uuid; use uuid::Uuid;
use validator::Validate; use validator::Validate;
use sqlx::{Row, QueryBuilder};
use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}}; use crate::util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
};
use crate::{ use crate::{
AppState,
model::{Node, Point}, model::{Node, Point},
AppState,
}; };
/// List all points. /// List all points.
@ -58,7 +66,8 @@ pub async fn get_point_list(
query.source_id, query.source_id,
query.pagination.page_size, query.pagination.page_size,
query.pagination.offset(), query.pagination.offset(),
).await?; )
.await?;
let monitor_guard = state let monitor_guard = state
.connection_manager .connection_manager
@ -68,14 +77,20 @@ pub async fn get_point_list(
let data: Vec<PointWithMonitor> = points let data: Vec<PointWithMonitor> = points
.into_iter() .into_iter()
.map(|point| { .map(|point| {
let point_monitor = monitor_guard let point_monitor = monitor_guard.get(&point.id).cloned();
.get(&point.id) PointWithMonitor {
.cloned(); point,
PointWithMonitor { point, point_monitor } point_monitor,
}
}) })
.collect(); .collect();
let response = PaginatedResponse::new(data, total, query.pagination.page, query.pagination.page_size); let response = PaginatedResponse::new(
data,
total,
query.pagination.page,
query.pagination.page_size,
);
Ok(Json(response)) Ok(Json(response))
} }
@ -124,7 +139,6 @@ pub async fn get_point_history(
Ok(Json(items)) Ok(Json(items))
} }
/// Request payload for updating editable point fields. /// Request payload for updating editable point fields.
#[derive(Deserialize, Validate)] #[derive(Deserialize, Validate)]
pub struct UpdatePointReq { pub struct UpdatePointReq {
@ -132,6 +146,8 @@ pub struct UpdatePointReq {
pub description: Option<String>, pub description: Option<String>,
pub unit: Option<String>, pub unit: Option<String>,
pub tag_id: Option<Uuid>, pub tag_id: Option<Uuid>,
pub equipment_id: Option<Uuid>,
pub signal_role: Option<String>,
} }
/// Request payload for batch setting point tags. /// Request payload for batch setting point tags.
@ -151,15 +167,19 @@ pub async fn update_point(
let pool = &state.pool; let pool = &state.pool;
if payload.name.is_none() && payload.description.is_none() && payload.unit.is_none() && payload.tag_id.is_none() { if payload.name.is_none()
&& payload.description.is_none()
&& payload.unit.is_none()
&& payload.tag_id.is_none()
&& payload.equipment_id.is_none()
&& payload.signal_role.is_none()
{
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
} }
// If tag_id is provided, ensure tag exists. // If tag_id is provided, ensure tag exists.
if let Some(tag_id) = payload.tag_id { if let Some(tag_id) = payload.tag_id {
let tag_exists = sqlx::query( let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#)
r#"SELECT 1 FROM tag WHERE id = $1"#,
)
.bind(tag_id) .bind(tag_id)
.fetch_optional(pool) .fetch_optional(pool)
.await? .await?
@ -170,10 +190,20 @@ pub async fn update_point(
} }
} }
if let Some(equipment_id) = payload.equipment_id {
let equipment_exists = sqlx::query(r#"SELECT 1 FROM equipment WHERE id = $1"#)
.bind(equipment_id)
.fetch_optional(pool)
.await?
.is_some();
if !equipment_exists {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
}
// Ensure target point exists. // Ensure target point exists.
let existing_point = sqlx::query_as::<_, Point>( let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#)
r#"SELECT * FROM point WHERE id = $1"#,
)
.bind(point_id) .bind(point_id)
.fetch_optional(pool) .fetch_optional(pool)
.await?; .await?;
@ -196,13 +226,21 @@ pub async fn update_point(
if let Some(tag_id) = &payload.tag_id { if let Some(tag_id) = &payload.tag_id {
sep.push("tag_id = ").push_bind(tag_id); sep.push("tag_id = ").push_bind(tag_id);
} }
if let Some(equipment_id) = &payload.equipment_id {
sep.push("equipment_id = ").push_bind(equipment_id);
}
if let Some(signal_role) = &payload.signal_role {
sep.push("signal_role = ").push_bind(signal_role);
}
sep.push("updated_at = NOW()"); sep.push("updated_at = NOW()");
qb.push(" WHERE id = ").push_bind(point_id); qb.push(" WHERE id = ").push_bind(point_id);
qb.build().execute(pool).await?; qb.build().execute(pool).await?;
Ok(Json(serde_json::json!({"ok_msg": "Point updated successfully"}))) Ok(Json(
serde_json::json!({"ok_msg": "Point updated successfully"}),
))
} }
/// Batch set point tags. /// Batch set point tags.
@ -213,16 +251,17 @@ pub async fn batch_set_point_tags(
payload.validate()?; payload.validate()?;
if payload.point_ids.is_empty() { if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest("point_ids cannot be empty".to_string(), None)); return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
} }
let pool = &state.pool; let pool = &state.pool;
// If tag_id is provided, ensure tag exists. // If tag_id is provided, ensure tag exists.
if let Some(tag_id) = payload.tag_id { if let Some(tag_id) = payload.tag_id {
let tag_exists = sqlx::query( let tag_exists = sqlx::query(r#"SELECT 1 FROM tag WHERE id = $1"#)
r#"SELECT 1 FROM tag WHERE id = $1"#,
)
.bind(tag_id) .bind(tag_id)
.fetch_optional(pool) .fetch_optional(pool)
.await? .await?
@ -234,9 +273,7 @@ pub async fn batch_set_point_tags(
} }
// Check which points exist // Check which points exist
let existing_points: Vec<Uuid> = sqlx::query( let existing_points: Vec<Uuid> = sqlx::query(r#"SELECT id FROM point WHERE id = ANY($1)"#)
r#"SELECT id FROM point WHERE id = ANY($1)"#,
)
.bind(&payload.point_ids) .bind(&payload.point_ids)
.fetch_all(pool) .fetch_all(pool)
.await? .await?
@ -249,9 +286,8 @@ pub async fn batch_set_point_tags(
} }
// Update tag_id for all existing points // Update tag_id for all existing points
let result = sqlx::query( let result =
r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#, sqlx::query(r#"UPDATE point SET tag_id = $1, updated_at = NOW() WHERE id = ANY($2)"#)
)
.bind(payload.tag_id) .bind(payload.tag_id)
.bind(&existing_points) .bind(&existing_points)
.execute(pool) .execute(pool)
@ -276,9 +312,7 @@ pub async fn delete_point(
}; };
// Ensure target point exists. // Ensure target point exists.
let existing_point = sqlx::query_as::<_, Point>( let existing_point = sqlx::query_as::<_, Point>(r#"SELECT * FROM point WHERE id = $1"#)
r#"SELECT * FROM point WHERE id = $1"#,
)
.bind(point_id) .bind(point_id)
.fetch_optional(pool) .fetch_optional(pool)
.await?; .await?;
@ -287,9 +321,7 @@ pub async fn delete_point(
} }
// Delete point. // Delete point.
sqlx::query( sqlx::query(r#"delete from point WHERE id = $1"#)
r#"delete from point WHERE id = $1"#,
)
.bind(point_id) .bind(point_id)
.execute(pool) .execute(pool)
.await?; .await?;
@ -306,7 +338,9 @@ pub async fn delete_point(
} }
} }
Ok(Json(serde_json::json!({"ok_msg": "Point deleted successfully"}))) Ok(Json(
serde_json::json!({"ok_msg": "Point deleted successfully"}),
))
} }
#[derive(Deserialize, Validate)] #[derive(Deserialize, Validate)]
@ -334,24 +368,22 @@ pub async fn batch_create_points(
let pool = &state.pool; let pool = &state.pool;
if payload.node_ids.is_empty() { if payload.node_ids.is_empty() {
return Err(ApiErr::BadRequest("node_ids cannot be empty".to_string(), None)); return Err(ApiErr::BadRequest(
"node_ids cannot be empty".to_string(),
None,
));
} }
// Use one transaction for the full batch. // Use one transaction for the full batch.
let mut tx = pool.begin().await?; let mut tx = pool.begin().await?;
let node_ids = payload.node_ids; let node_ids = payload.node_ids;
let nodes: Vec<Node> = sqlx::query_as::<_, Node>( let nodes: Vec<Node> = sqlx::query_as::<_, Node>(r#"SELECT * FROM node WHERE id = ANY($1)"#)
r#"SELECT * FROM node WHERE id = ANY($1)"#,
)
.bind(&node_ids) .bind(&node_ids)
.fetch_all(&mut *tx) .fetch_all(&mut *tx)
.await?; .await?;
let node_map: HashMap<Uuid, Node> = nodes let node_map: HashMap<Uuid, Node> = nodes.into_iter().map(|node| (node.id, node)).collect();
.into_iter()
.map(|node| (node.id, node))
.collect();
let existing_node_ids: HashSet<Uuid> = node_map.keys().copied().collect(); let existing_node_ids: HashSet<Uuid> = node_map.keys().copied().collect();
let mut failed_node_ids = Vec::new(); let mut failed_node_ids = Vec::new();
@ -361,9 +393,8 @@ pub async fn batch_create_points(
} }
} }
let existing_point_node_ids: HashSet<Uuid> = sqlx::query_scalar::<_, Uuid>( let existing_point_node_ids: HashSet<Uuid> =
r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#, sqlx::query_scalar::<_, Uuid>(r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#)
)
.bind(&node_ids) .bind(&node_ids)
.fetch_all(&mut *tx) .fetch_all(&mut *tx)
.await? .await?
@ -403,12 +434,16 @@ pub async fn batch_create_points(
// Emit grouped create events by source. // Emit grouped create events by source.
if !created_point_ids.is_empty() { if !created_point_ids.is_empty() {
let grouped = crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?; let grouped =
crate::service::get_points_grouped_by_source(pool, &created_point_ids).await?;
for (source_id, points) in grouped { for (source_id, points) in grouped {
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect(); let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state if let Err(e) = state
.event_manager .event_manager
.send(crate::event::AppEvent::PointCreateBatch { source_id, point_ids }) .send(crate::event::AppEvent::PointCreateBatch {
source_id,
point_ids,
})
{ {
tracing::error!("Failed to send PointCreateBatch event: {}", e); tracing::error!("Failed to send PointCreateBatch event: {}", e);
} }
@ -443,7 +478,10 @@ pub async fn batch_delete_points(
payload.validate()?; payload.validate()?;
if payload.point_ids.is_empty() { if payload.point_ids.is_empty() {
return Err(ApiErr::BadRequest("point_ids cannot be empty".to_string(), None)); return Err(ApiErr::BadRequest(
"point_ids cannot be empty".to_string(),
None,
));
} }
let pool = &state.pool; let pool = &state.pool;
@ -459,9 +497,7 @@ pub async fn batch_delete_points(
return Ok(Json(BatchDeletePointsRes { deleted_count: 0 })); return Ok(Json(BatchDeletePointsRes { deleted_count: 0 }));
} }
let result = sqlx::query( let result = sqlx::query(r#"DELETE FROM point WHERE id = ANY($1)"#)
r#"DELETE FROM point WHERE id = ANY($1)"#,
)
.bind(&existing_point_ids) .bind(&existing_point_ids)
.execute(pool) .execute(pool)
.await?; .await?;
@ -484,10 +520,6 @@ pub async fn batch_delete_points(
})) }))
} }
pub async fn batch_set_point_value( pub async fn batch_set_point_value(
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap, headers: HeaderMap,
@ -507,7 +539,9 @@ pub async fn batch_set_point_value(
)); ));
} }
let result = state.connection_manager.write_point_values_batch(payload) let result = state
.connection_manager
.write_point_values_batch(payload)
.await .await
.map_err(|e| ApiErr::Internal(e, None))?; .map_err(|e| ApiErr::Internal(e, None))?;
Ok(Json(result)) Ok(Json(result))

View File

@ -1,10 +1,10 @@
use crate::util::datetime::utc_to_local_str;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use sqlx::types::Json; use sqlx::types::Json;
use sqlx::FromRow;
use std::collections::HashMap; use std::collections::HashMap;
use uuid::Uuid; use uuid::Uuid;
use crate::util::datetime::utc_to_local_str;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
@ -95,6 +95,8 @@ pub struct Point {
pub description: Option<String>, pub description: Option<String>,
pub unit: Option<String>, pub unit: Option<String>,
pub tag_id: Option<Uuid>, pub tag_id: Option<Uuid>,
pub equipment_id: Option<Uuid>,
pub signal_role: Option<String>,
#[serde(serialize_with = "utc_to_local_str")] #[serde(serialize_with = "utc_to_local_str")]
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
#[serde(serialize_with = "utc_to_local_str")] #[serde(serialize_with = "utc_to_local_str")]
@ -118,6 +120,19 @@ pub struct Tag {
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
} }
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct Equipment {
pub id: Uuid,
pub code: String,
pub name: String,
pub kind: Option<String>,
pub description: Option<String>,
#[serde(serialize_with = "utc_to_local_str")]
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "utc_to_local_str")]
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] #[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct Page { pub struct Page {
pub id: Uuid, pub id: Uuid,
@ -128,4 +143,3 @@ pub struct Page {
#[serde(serialize_with = "utc_to_local_str")] #[serde(serialize_with = "utc_to_local_str")]
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
} }

View File

@ -1,5 +1,5 @@
use crate::model::{PointSubscriptionInfo, Source}; use crate::model::{PointSubscriptionInfo, Source};
use sqlx::{PgPool, query_as}; use sqlx::{query_as, PgPool};
pub async fn get_enabled_source( pub async fn get_enabled_source(
pool: &PgPool, pool: &PgPool,
@ -21,9 +21,7 @@ pub async fn get_point_by_id(
pool: &PgPool, pool: &PgPool,
point_id: uuid::Uuid, point_id: uuid::Uuid,
) -> Result<Option<crate::model::Point>, sqlx::Error> { ) -> Result<Option<crate::model::Point>, sqlx::Error> {
query_as::<_, crate::model::Point>( query_as::<_, crate::model::Point>(r#"SELECT * FROM point WHERE id = $1"#)
r#"SELECT * FROM point WHERE id = $1"#,
)
.bind(point_id) .bind(point_id)
.fetch_optional(pool) .fetch_optional(pool)
.await .await
@ -144,9 +142,7 @@ pub async fn get_points_count(
.await .await
} }
None => { None => {
sqlx::query_scalar::<_, i64>( sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM point"#)
r#"SELECT COUNT(*) FROM point"#,
)
.fetch_one(pool) .fetch_one(pool)
.await .await
} }
@ -227,12 +223,8 @@ pub async fn get_points_paginated(
// ==================== Tag 相关服务函数 ==================== // ==================== Tag 相关服务函数 ====================
/// 获取标签总数 /// 获取标签总数
pub async fn get_tags_count( pub async fn get_tags_count(pool: &PgPool) -> Result<i64, sqlx::Error> {
pool: &PgPool, sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM tag"#)
) -> Result<i64, sqlx::Error> {
sqlx::query_scalar::<_, i64>(
r#"SELECT COUNT(*) FROM tag"#,
)
.fetch_one(pool) .fetch_one(pool)
.await .await
} }
@ -246,9 +238,7 @@ pub async fn get_tags_paginated(
if page_size == 0 { if page_size == 0 {
Ok(vec![]) Ok(vec![])
} else if page_size == -1 { } else if page_size == -1 {
sqlx::query_as::<_, crate::model::Tag>( sqlx::query_as::<_, crate::model::Tag>(r#"SELECT * FROM tag ORDER BY created_at"#)
r#"SELECT * FROM tag ORDER BY created_at"#,
)
.fetch_all(pool) .fetch_all(pool)
.await .await
} else { } else {
@ -271,9 +261,7 @@ pub async fn get_tag_by_id(
pool: &PgPool, pool: &PgPool,
tag_id: uuid::Uuid, tag_id: uuid::Uuid,
) -> Result<Option<crate::model::Tag>, sqlx::Error> { ) -> Result<Option<crate::model::Tag>, sqlx::Error> {
query_as::<_, crate::model::Tag>( query_as::<_, crate::model::Tag>(r#"SELECT * FROM tag WHERE id = $1"#)
r#"SELECT * FROM tag WHERE id = $1"#
)
.bind(tag_id) .bind(tag_id)
.fetch_optional(pool) .fetch_optional(pool)
.await .await
@ -290,7 +278,7 @@ pub async fn get_tag_points(
FROM point FROM point
WHERE tag_id = $1 WHERE tag_id = $1
ORDER BY created_at ORDER BY created_at
"# "#,
) )
.bind(tag_id) .bind(tag_id)
.fetch_all(pool) .fetch_all(pool)
@ -312,7 +300,7 @@ pub async fn create_tag(
r#" r#"
INSERT INTO tag (id, name, description) INSERT INTO tag (id, name, description)
VALUES ($1, $2, $3) VALUES ($1, $2, $3)
"# "#,
) )
.bind(tag_id) .bind(tag_id)
.bind(name) .bind(name)
@ -327,7 +315,7 @@ pub async fn create_tag(
UPDATE point UPDATE point
SET tag_id = $1 SET tag_id = $1
WHERE id = $2 WHERE id = $2
"# "#,
) )
.bind(tag_id) .bind(tag_id)
.bind(point_id) .bind(point_id)
@ -394,7 +382,7 @@ pub async fn update_tag(
UPDATE point UPDATE point
SET tag_id = NULL SET tag_id = NULL
WHERE tag_id = $1 WHERE tag_id = $1
"# "#,
) )
.bind(tag_id) .bind(tag_id)
.execute(&mut *tx) .execute(&mut *tx)
@ -407,7 +395,7 @@ pub async fn update_tag(
UPDATE point UPDATE point
SET tag_id = $1 SET tag_id = $1
WHERE id = $2 WHERE id = $2
"# "#,
) )
.bind(tag_id) .bind(tag_id)
.bind(point_id) .bind(point_id)
@ -422,17 +410,11 @@ pub async fn update_tag(
} }
/// 删除标签 /// 删除标签
pub async fn delete_tag( pub async fn delete_tag(pool: &PgPool, tag_id: uuid::Uuid) -> Result<bool, sqlx::Error> {
pool: &PgPool, let result = sqlx::query(r#"DELETE FROM tag WHERE id = $1"#)
tag_id: uuid::Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"DELETE FROM tag WHERE id = $1"#
)
.bind(tag_id) .bind(tag_id)
.execute(pool) .execute(pool)
.await?; .await?;
Ok(result.rows_affected() > 0) Ok(result.rows_affected() > 0)
} }