perf(point): batch point creation queries
This commit is contained in:
parent
7e6c7a7e4c
commit
f33d989905
|
|
@ -1,5 +1,6 @@
|
||||||
use axum::{Json, extract::{Path, Query, State}, http::HeaderMap, response::IntoResponse};
|
use axum::{Json, extract::{Path, Query, State}, http::HeaderMap, response::IntoResponse};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use validator::Validate;
|
use validator::Validate;
|
||||||
use sqlx::{Row, QueryBuilder};
|
use sqlx::{Row, QueryBuilder};
|
||||||
|
|
@ -287,72 +288,65 @@ pub async fn batch_create_points(
|
||||||
return Err(ApiErr::BadRequest("node_ids cannot be empty".to_string(), None));
|
return Err(ApiErr::BadRequest("node_ids cannot be empty".to_string(), None));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut success_count = 0;
|
|
||||||
let mut failed_count = 0;
|
|
||||||
let mut failed_node_ids = Vec::new();
|
|
||||||
let mut created_point_ids = Vec::new();
|
|
||||||
|
|
||||||
// Use one transaction for the full batch.
|
// Use one transaction for the full batch.
|
||||||
let mut tx = pool.begin().await?;
|
let mut tx = pool.begin().await?;
|
||||||
|
let node_ids = payload.node_ids;
|
||||||
|
|
||||||
for node_id in payload.node_ids {
|
let nodes: Vec<Node> = sqlx::query_as::<_, Node>(
|
||||||
// Ensure node exists.
|
r#"SELECT * FROM node WHERE id = ANY($1)"#,
|
||||||
let node_exists = sqlx::query(
|
)
|
||||||
r#"SELECT 1 FROM node WHERE id = $1"#,
|
.bind(&node_ids)
|
||||||
)
|
.fetch_all(&mut *tx)
|
||||||
.bind(node_id)
|
.await?;
|
||||||
.fetch_optional(&mut *tx)
|
|
||||||
.await?
|
|
||||||
.is_some();
|
|
||||||
|
|
||||||
if !node_exists {
|
let node_map: HashMap<Uuid, Node> = nodes
|
||||||
failed_count += 1;
|
.into_iter()
|
||||||
failed_node_ids.push(node_id);
|
.map(|node| (node.id, node))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let existing_node_ids: HashSet<Uuid> = node_map.keys().copied().collect();
|
||||||
|
let mut failed_node_ids = Vec::new();
|
||||||
|
for node_id in &node_ids {
|
||||||
|
if !existing_node_ids.contains(node_id) {
|
||||||
|
failed_node_ids.push(*node_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let existing_point_node_ids: HashSet<Uuid> = sqlx::query_scalar::<_, Uuid>(
|
||||||
|
r#"SELECT node_id FROM point WHERE node_id = ANY($1)"#,
|
||||||
|
)
|
||||||
|
.bind(&node_ids)
|
||||||
|
.fetch_all(&mut *tx)
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut to_create = Vec::new();
|
||||||
|
let mut seen_creatable = HashSet::new();
|
||||||
|
for node_id in node_ids {
|
||||||
|
if !existing_node_ids.contains(&node_id) || existing_point_node_ids.contains(&node_id) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Skip nodes that already have a point.
|
if !seen_creatable.insert(node_id) {
|
||||||
let point_exists = sqlx::query(
|
|
||||||
r#"SELECT 1 FROM point WHERE node_id = $1"#,
|
|
||||||
)
|
|
||||||
.bind(node_id)
|
|
||||||
.fetch_optional(&mut *tx)
|
|
||||||
.await?
|
|
||||||
.is_some();
|
|
||||||
|
|
||||||
if point_exists {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use node browse_name as default point name.
|
let name = node_map
|
||||||
let node_info = sqlx::query_as::<_, Node>(
|
.get(&node_id)
|
||||||
r#"SELECT * FROM node WHERE id = $1"#,
|
.map(|node| node.browse_name.clone())
|
||||||
)
|
.unwrap_or_else(|| format!("Point_{}", node_id));
|
||||||
.bind(node_id)
|
to_create.push((Uuid::new_v4(), node_id, name));
|
||||||
.fetch_optional(&mut *tx)
|
}
|
||||||
.await?;
|
|
||||||
|
|
||||||
let name = match node_info {
|
let mut created_point_ids = Vec::with_capacity(to_create.len());
|
||||||
Some(node) => node.browse_name.clone(),
|
if !to_create.is_empty() {
|
||||||
None => format!("Point_{}", node_id),
|
let mut qb = QueryBuilder::new("INSERT INTO point (id, node_id, name) ");
|
||||||
};
|
qb.push_values(to_create.iter(), |mut b, (id, node_id, name)| {
|
||||||
|
b.push_bind(*id).push_bind(*node_id).push_bind(name);
|
||||||
let new_id = Uuid::new_v4();
|
});
|
||||||
|
qb.build().execute(&mut *tx).await?;
|
||||||
sqlx::query(
|
created_point_ids.extend(to_create.into_iter().map(|(id, _, _)| id));
|
||||||
r#"
|
|
||||||
INSERT INTO point (id, node_id, name)
|
|
||||||
VALUES ($1, $2, $3)
|
|
||||||
"#
|
|
||||||
)
|
|
||||||
.bind(new_id)
|
|
||||||
.bind(node_id)
|
|
||||||
.bind(&name)
|
|
||||||
.execute(&mut *tx)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
success_count += 1;
|
|
||||||
created_point_ids.push(new_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit the transaction.
|
// Commit the transaction.
|
||||||
|
|
@ -373,8 +367,8 @@ pub async fn batch_create_points(
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Json(BatchCreatePointsRes {
|
Ok(Json(BatchCreatePointsRes {
|
||||||
success_count,
|
success_count: created_point_ids.len(),
|
||||||
failed_count,
|
failed_count: failed_node_ids.len(),
|
||||||
failed_node_ids,
|
failed_node_ids,
|
||||||
created_point_ids,
|
created_point_ids,
|
||||||
}))
|
}))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue