From f33d989905cc6207687d47eaa9bcaccf5c14d6c6 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 17 Mar 2026 08:25:04 +0800 Subject: [PATCH] perf(point): batch point creation queries --- src/handler/point.rs | 108 ++++++++++++++++++++----------------------- 1 file changed, 51 insertions(+), 57 deletions(-) diff --git a/src/handler/point.rs b/src/handler/point.rs index d8d6835..4146e4b 100644 --- a/src/handler/point.rs +++ b/src/handler/point.rs @@ -1,5 +1,6 @@ use axum::{Json, extract::{Path, Query, State}, http::HeaderMap, response::IntoResponse}; use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; use uuid::Uuid; use validator::Validate; use sqlx::{Row, QueryBuilder}; @@ -287,72 +288,65 @@ pub async fn batch_create_points( return Err(ApiErr::BadRequest("node_ids cannot be empty".to_string(), None)); } - let mut success_count = 0; - let mut failed_count = 0; - let mut failed_node_ids = Vec::new(); - let mut created_point_ids = Vec::new(); - // Use one transaction for the full batch. let mut tx = pool.begin().await?; + let node_ids = payload.node_ids; - for node_id in payload.node_ids { - // Ensure node exists. - let node_exists = sqlx::query( - r#"SELECT 1 FROM node WHERE id = $1"#, - ) - .bind(node_id) - .fetch_optional(&mut *tx) - .await? - .is_some(); + let nodes: Vec = sqlx::query_as::<_, Node>( + r#"SELECT * FROM node WHERE id = ANY($1)"#, + ) + .bind(&node_ids) + .fetch_all(&mut *tx) + .await?; - if !node_exists { - failed_count += 1; - failed_node_ids.push(node_id); + let node_map: HashMap = nodes + .into_iter() + .map(|node| (node.id, node)) + .collect(); + + let existing_node_ids: HashSet = node_map.keys().copied().collect(); + let mut failed_node_ids = Vec::new(); + for node_id in &node_ids { + if !existing_node_ids.contains(node_id) { + failed_node_ids.push(*node_id); + } + } + + let existing_point_node_ids: HashSet = sqlx::query_scalar::<_, Uuid>( + r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#, + ) + .bind(&node_ids) + .fetch_all(&mut *tx) + .await? + .into_iter() + .collect(); + + let mut to_create = Vec::new(); + let mut seen_creatable = HashSet::new(); + for node_id in node_ids { + if !existing_node_ids.contains(&node_id) || existing_point_node_ids.contains(&node_id) { continue; } - // Skip nodes that already have a point. - let point_exists = sqlx::query( - r#"SELECT 1 FROM point WHERE node_id = $1"#, - ) - .bind(node_id) - .fetch_optional(&mut *tx) - .await? - .is_some(); - - if point_exists { + if !seen_creatable.insert(node_id) { continue; } - // Use node browse_name as default point name. - let node_info = sqlx::query_as::<_, Node>( - r#"SELECT * FROM node WHERE id = $1"#, - ) - .bind(node_id) - .fetch_optional(&mut *tx) - .await?; + let name = node_map + .get(&node_id) + .map(|node| node.browse_name.clone()) + .unwrap_or_else(|| format!("Point_{}", node_id)); + to_create.push((Uuid::new_v4(), node_id, name)); + } - let name = match node_info { - Some(node) => node.browse_name.clone(), - None => format!("Point_{}", node_id), - }; - - let new_id = Uuid::new_v4(); - - sqlx::query( - r#" - INSERT INTO point (id, node_id, name) - VALUES ($1, $2, $3) - "# - ) - .bind(new_id) - .bind(node_id) - .bind(&name) - .execute(&mut *tx) - .await?; - - success_count += 1; - created_point_ids.push(new_id); + let mut created_point_ids = Vec::with_capacity(to_create.len()); + if !to_create.is_empty() { + let mut qb = QueryBuilder::new("INSERT INTO point (id, node_id, name) "); + qb.push_values(to_create.iter(), |mut b, (id, node_id, name)| { + b.push_bind(*id).push_bind(*node_id).push_bind(name); + }); + qb.build().execute(&mut *tx).await?; + created_point_ids.extend(to_create.into_iter().map(|(id, _, _)| id)); } // Commit the transaction. @@ -373,8 +367,8 @@ pub async fn batch_create_points( } Ok(Json(BatchCreatePointsRes { - success_count, - failed_count, + success_count: created_point_ids.len(), + failed_count: failed_node_ids.len(), failed_node_ids, created_point_ids, }))