Compare commits
4 Commits
efed6aa816
...
503aefc4cb
| Author | SHA1 | Date |
|---|---|---|
|
|
503aefc4cb | |
|
|
5fa63ad6dd | |
|
|
5406568969 | |
|
|
6f215162a3 |
|
|
@ -12,7 +12,7 @@ axum = { version = "0.8", features = ["ws"] }
|
||||||
tower-http = { version = "0.6", features = ["cors", "fs"] }
|
tower-http = { version = "0.6", features = ["cors", "fs"] }
|
||||||
|
|
||||||
# Database
|
# Database
|
||||||
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid"] }
|
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "json"] }
|
||||||
|
|
||||||
# Serialization
|
# Serialization
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
-- Page table for UI bindings
|
||||||
|
CREATE TABLE page (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
name TEXT NOT NULL,
|
||||||
|
data JSONB NOT NULL,
|
||||||
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Common indexes
|
||||||
|
CREATE INDEX idx_page_name ON page(name);
|
||||||
|
|
@ -17,7 +17,10 @@ impl AppConfig {
|
||||||
.unwrap_or_else(|_| "60309".to_string())
|
.unwrap_or_else(|_| "60309".to_string())
|
||||||
.parse::<u16>()
|
.parse::<u16>()
|
||||||
.map_err(|_| "PORT must be a number")?;
|
.map_err(|_| "PORT must be a number")?;
|
||||||
let write_api_key = env::var("WRITE_KEY").ok();
|
// Prefer WRITE_API_KEY, keep WRITE_KEY as backward-compatible fallback.
|
||||||
|
let write_api_key = env::var("WRITE_API_KEY")
|
||||||
|
.ok()
|
||||||
|
.or_else(|| env::var("WRITE_KEY").ok());
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
database_url,
|
database_url,
|
||||||
|
|
|
||||||
|
|
@ -436,7 +436,7 @@ impl ConnectionManager {
|
||||||
.map(crate::telemetry::PointQuality::from_status_code)
|
.map(crate::telemetry::PointQuality::from_status_code)
|
||||||
.unwrap_or(crate::telemetry::PointQuality::Unknown);
|
.unwrap_or(crate::telemetry::PointQuality::Unknown);
|
||||||
|
|
||||||
let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue(
|
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
|
||||||
crate::telemetry::PointNewValue {
|
crate::telemetry::PointNewValue {
|
||||||
source_id,
|
source_id,
|
||||||
point_id: Some(poll_point.point_id),
|
point_id: Some(poll_point.point_id),
|
||||||
|
|
@ -702,11 +702,9 @@ impl ConnectionManager {
|
||||||
// 再重新连接
|
// 再重新连接
|
||||||
let result = self.connect_from_source(pool, source_id).await;
|
let result = self.connect_from_source(pool, source_id).await;
|
||||||
|
|
||||||
// 清除重连标记
|
// 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连
|
||||||
if result.is_ok() {
|
|
||||||
let mut reconnecting = self.reconnecting.write().await;
|
let mut reconnecting = self.reconnecting.write().await;
|
||||||
reconnecting.remove(&source_id);
|
reconnecting.remove(&source_id);
|
||||||
}
|
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
@ -988,7 +986,7 @@ impl ConnectionManager {
|
||||||
// Emit local updates only when the full batch succeeds.
|
// Emit local updates only when the full batch succeeds.
|
||||||
if let Some(event_manager) = &self.event_manager {
|
if let Some(event_manager) = &self.event_manager {
|
||||||
for (source_id, point_id, variant) in success_events {
|
for (source_id, point_id, variant) in success_events {
|
||||||
if let Err(e) = event_manager.send(crate::event::ReloadEvent::PointNewValue(
|
if let Err(e) = event_manager.send(crate::event::AppEvent::PointNewValue(
|
||||||
crate::telemetry::PointNewValue {
|
crate::telemetry::PointNewValue {
|
||||||
source_id,
|
source_id,
|
||||||
point_id: Some(point_id),
|
point_id: Some(point_id),
|
||||||
|
|
@ -1128,7 +1126,7 @@ impl ConnectionManager {
|
||||||
.unwrap_or(crate::telemetry::PointQuality::Unknown);
|
.unwrap_or(crate::telemetry::PointQuality::Unknown);
|
||||||
|
|
||||||
if let Some(event_manager) = &data_manager.event_manager {
|
if let Some(event_manager) = &data_manager.event_manager {
|
||||||
let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue(
|
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
|
||||||
crate::telemetry::PointNewValue {
|
crate::telemetry::PointNewValue {
|
||||||
source_id: current_source_id,
|
source_id: current_source_id,
|
||||||
point_id: None,
|
point_id: None,
|
||||||
|
|
|
||||||
208
src/event.rs
208
src/event.rs
|
|
@ -1,8 +1,12 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024;
|
||||||
|
const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum ReloadEvent {
|
pub enum AppEvent {
|
||||||
SourceCreate {
|
SourceCreate {
|
||||||
source_id: Uuid,
|
source_id: Uuid,
|
||||||
},
|
},
|
||||||
|
|
@ -24,7 +28,8 @@ pub enum ReloadEvent {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct EventManager {
|
pub struct EventManager {
|
||||||
sender: mpsc::UnboundedSender<ReloadEvent>,
|
control_sender: mpsc::Sender<AppEvent>,
|
||||||
|
telemetry_sender: mpsc::Sender<crate::telemetry::PointNewValue>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventManager {
|
impl EventManager {
|
||||||
|
|
@ -33,36 +38,115 @@ impl EventManager {
|
||||||
connection_manager: std::sync::Arc<crate::connection::ConnectionManager>,
|
connection_manager: std::sync::Arc<crate::connection::ConnectionManager>,
|
||||||
ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>,
|
ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (sender, mut receiver) = mpsc::unbounded_channel::<ReloadEvent>();
|
let (control_sender, mut control_receiver) =
|
||||||
let ws_manager_clone = ws_manager.clone();
|
mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
|
||||||
|
let (telemetry_sender, mut telemetry_receiver) =
|
||||||
|
mpsc::channel::<crate::telemetry::PointNewValue>(TELEMETRY_EVENT_CHANNEL_CAPACITY);
|
||||||
|
|
||||||
|
let control_cm = connection_manager.clone();
|
||||||
|
let control_pool = pool.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// 在循环外克隆,避免在循环中移动
|
while let Some(event) = control_receiver.recv().await {
|
||||||
let connection_manager_clone = connection_manager.clone();
|
handle_control_event(event, &control_pool, &control_cm).await;
|
||||||
while let Some(event) = receiver.recv().await {
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let ws_manager_clone = ws_manager.clone();
|
||||||
|
let telemetry_cm = connection_manager.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(payload) = telemetry_receiver.recv().await {
|
||||||
|
let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> =
|
||||||
|
HashMap::new();
|
||||||
|
latest_by_key.insert((payload.source_id, payload.client_handle), payload);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match telemetry_receiver.try_recv() {
|
||||||
|
Ok(next_payload) => {
|
||||||
|
latest_by_key.insert(
|
||||||
|
(next_payload.source_id, next_payload.client_handle),
|
||||||
|
next_payload,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for point_payload in latest_by_key.into_values() {
|
||||||
|
process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref())
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Self {
|
||||||
|
control_sender,
|
||||||
|
telemetry_sender,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(&self, event: AppEvent) -> Result<(), String> {
|
||||||
match event {
|
match event {
|
||||||
ReloadEvent::SourceCreate { source_id } => {
|
AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
|
||||||
|
Err(format!("Failed to send telemetry event: channel closed ({e:?})"))
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => {
|
||||||
|
// High-frequency telemetry is lossy by design under sustained pressure.
|
||||||
|
tracing::warn!(
|
||||||
|
"Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}",
|
||||||
|
payload.source_id,
|
||||||
|
payload.client_handle
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
control_event => match self.control_sender.try_send(control_event) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
|
||||||
|
Err(format!("Failed to send control event: channel closed ({e:?})"))
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => {
|
||||||
|
Err(format!("Failed to send control event: queue full ({e:?})"))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_control_event(
|
||||||
|
event: AppEvent,
|
||||||
|
pool: &sqlx::PgPool,
|
||||||
|
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
|
||||||
|
) {
|
||||||
|
match event {
|
||||||
|
AppEvent::SourceCreate { source_id } => {
|
||||||
tracing::info!("Processing SourceCreate event for {}", source_id);
|
tracing::info!("Processing SourceCreate event for {}", source_id);
|
||||||
if let Err(e) = connection_manager_clone.connect_from_source(&pool, source_id).await {
|
if let Err(e) = connection_manager.connect_from_source(pool, source_id).await {
|
||||||
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
tracing::error!("Failed to connect to source {}: {}", source_id, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ReloadEvent::SourceUpdate { source_id } => {
|
AppEvent::SourceUpdate { source_id } => {
|
||||||
tracing::info!("Processing SourceUpdate event for {}", source_id);
|
tracing::info!("Processing SourceUpdate event for {}", source_id);
|
||||||
if let Err(e) = connection_manager_clone.reconnect(&pool, source_id).await {
|
if let Err(e) = connection_manager.reconnect(pool, source_id).await {
|
||||||
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
|
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ReloadEvent::SourceDelete { source_id } => {
|
AppEvent::SourceDelete { source_id } => {
|
||||||
tracing::info!("Processing SourceDelete event for {}", source_id);
|
tracing::info!("Processing SourceDelete event for {}", source_id);
|
||||||
if let Err(e) = connection_manager_clone.disconnect(source_id).await {
|
if let Err(e) = connection_manager.disconnect(source_id).await {
|
||||||
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
|
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
|
AppEvent::PointCreateBatch { source_id, point_ids } => {
|
||||||
let requested_count = point_ids.len();
|
let requested_count = point_ids.len();
|
||||||
match connection_manager_clone
|
match connection_manager
|
||||||
.subscribe_points_from_source(source_id, Some(point_ids), &pool)
|
.subscribe_points_from_source(source_id, Some(point_ids), pool)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(stats) => {
|
Ok(stats) => {
|
||||||
|
|
@ -83,26 +167,36 @@ impl EventManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ReloadEvent::PointDeleteBatch { source_id, point_ids } => {
|
AppEvent::PointDeleteBatch { source_id, point_ids } => {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"Processing PointDeleteBatch event for source {} with {} points",
|
"Processing PointDeleteBatch event for source {} with {} points",
|
||||||
source_id,
|
source_id,
|
||||||
point_ids.len()
|
point_ids.len()
|
||||||
);
|
);
|
||||||
if let Err(e) = connection_manager_clone
|
if let Err(e) = connection_manager
|
||||||
.unsubscribe_points_from_source(source_id, point_ids)
|
.unsubscribe_points_from_source(source_id, point_ids)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!("Failed to unsubscribe points: {}", e);
|
tracing::error!("Failed to unsubscribe points: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ReloadEvent::PointNewValue(payload) => {
|
AppEvent::PointNewValue(_) => {
|
||||||
|
tracing::warn!("PointNewValue routed to control worker unexpectedly");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_point_new_value(
|
||||||
|
payload: crate::telemetry::PointNewValue,
|
||||||
|
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
|
||||||
|
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
|
||||||
|
) {
|
||||||
let source_id = payload.source_id;
|
let source_id = payload.source_id;
|
||||||
let client_handle = payload.client_handle;
|
let client_handle = payload.client_handle;
|
||||||
let point_id = if let Some(point_id) = payload.point_id {
|
let point_id = if let Some(point_id) = payload.point_id {
|
||||||
Some(point_id)
|
Some(point_id)
|
||||||
} else {
|
} else {
|
||||||
let status = connection_manager_clone.get_status_read_guard().await;
|
let status = connection_manager.get_status_read_guard().await;
|
||||||
status
|
status
|
||||||
.get(&source_id)
|
.get(&source_id)
|
||||||
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
||||||
|
|
@ -110,12 +204,11 @@ impl EventManager {
|
||||||
if let Some(point_id) = point_id {
|
if let Some(point_id) = point_id {
|
||||||
// 从缓存中读取旧值
|
// 从缓存中读取旧值
|
||||||
let (old_value, old_timestamp, value_changed) = {
|
let (old_value, old_timestamp, value_changed) = {
|
||||||
let monitor_data = connection_manager_clone.get_point_monitor_data_read_guard().await;
|
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
|
||||||
let old_monitor_info = monitor_data.get(&point_id);
|
let old_monitor_info = monitor_data.get(&point_id);
|
||||||
|
|
||||||
if let Some(old_info) = old_monitor_info {
|
if let Some(old_info) = old_monitor_info {
|
||||||
let changed = old_info.value != payload.value ||
|
let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp;
|
||||||
old_info.timestamp != payload.timestamp;
|
|
||||||
(old_info.value.clone(), old_info.timestamp, changed)
|
(old_info.value.clone(), old_info.timestamp, changed)
|
||||||
} else {
|
} else {
|
||||||
(None, None, false)
|
(None, None, false)
|
||||||
|
|
@ -123,63 +216,46 @@ impl EventManager {
|
||||||
};
|
};
|
||||||
|
|
||||||
let monitor = crate::telemetry::PointMonitorInfo {
|
let monitor = crate::telemetry::PointMonitorInfo {
|
||||||
protocol: payload.protocol.clone(),
|
protocol: payload.protocol,
|
||||||
source_id,
|
source_id,
|
||||||
point_id,
|
point_id,
|
||||||
client_handle,
|
client_handle,
|
||||||
scan_mode: payload.scan_mode.clone(),
|
scan_mode: payload.scan_mode,
|
||||||
timestamp: payload.timestamp,
|
timestamp: payload.timestamp,
|
||||||
quality: payload.quality.clone(),
|
quality: payload.quality,
|
||||||
value: payload.value.clone(),
|
value: payload.value,
|
||||||
value_type: payload.value_type.clone(),
|
value_type: payload.value_type,
|
||||||
value_text: payload.value_text.clone(),
|
value_text: payload.value_text,
|
||||||
old_value,
|
old_value,
|
||||||
old_timestamp,
|
old_timestamp,
|
||||||
value_changed,
|
value_changed,
|
||||||
};
|
};
|
||||||
|
|
||||||
// 克隆 monitor,用于并行执行
|
if let Err(e) = connection_manager
|
||||||
let monitor_for_ws = monitor.clone();
|
.update_point_monitor_data(monitor.clone())
|
||||||
let monitor_for_db = monitor.clone();
|
.await
|
||||||
|
{
|
||||||
// 并行执行 update_point_monitor_data 和 send_to_public,不等待完成
|
tracing::error!(
|
||||||
let cm_clone = connection_manager_clone.clone();
|
"Failed to update point monitor data for point {}: {}",
|
||||||
tokio::spawn(async move {
|
point_id,
|
||||||
// 更新监控数据
|
e
|
||||||
if let Err(e) = cm_clone.update_point_monitor_data(monitor_for_db).await {
|
);
|
||||||
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
let ws_clone = ws_manager_clone.clone();
|
if let Some(ws_manager) = ws_manager {
|
||||||
tokio::spawn(async move {
|
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
|
||||||
// 发送WebSocket消息
|
|
||||||
if let Some(ws_manager) = ws_clone {
|
|
||||||
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws);
|
|
||||||
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
||||||
tracing::error!("Failed to send WebSocket message to public room: {}", e);
|
tracing::warn!(
|
||||||
|
"Failed to send WebSocket message to public room: {}",
|
||||||
|
e
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 暂时注释掉 send_to_client,因为现在信息只需发送到 public
|
|
||||||
// if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
|
|
||||||
// tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle);
|
tracing::warn!(
|
||||||
}
|
"Point not found for source {} client_handle {}",
|
||||||
}
|
source_id,
|
||||||
}
|
client_handle
|
||||||
}
|
);
|
||||||
});
|
|
||||||
|
|
||||||
Self { sender }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send(&self, event: ReloadEvent) -> Result<(), String> {
|
|
||||||
self.sender
|
|
||||||
.send(event)
|
|
||||||
.map_err(|e| format!("Failed to send event: {}", e))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,3 +2,4 @@ pub mod source;
|
||||||
pub mod point;
|
pub mod point;
|
||||||
pub mod tag;
|
pub mod tag;
|
||||||
pub mod log;
|
pub mod log;
|
||||||
|
pub mod page;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,169 @@
|
||||||
|
use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use sqlx::types::Json as SqlxJson;
|
||||||
|
use uuid::Uuid;
|
||||||
|
use validator::Validate;
|
||||||
|
|
||||||
|
use crate::model::Page;
|
||||||
|
use crate::util::response::ApiErr;
|
||||||
|
use crate::AppState;
|
||||||
|
|
||||||
|
#[derive(Deserialize, Validate)]
|
||||||
|
pub struct GetPageListQuery {
|
||||||
|
#[validate(length(min = 1, max = 100))]
|
||||||
|
pub name: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_page_list(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Query(query): Query<GetPageListQuery>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
query.validate()?;
|
||||||
|
let pool = &state.pool;
|
||||||
|
|
||||||
|
let pages: Vec<Page> = if let Some(name) = query.name {
|
||||||
|
sqlx::query_as::<_, Page>(
|
||||||
|
r#"
|
||||||
|
SELECT * FROM page
|
||||||
|
WHERE name ILIKE $1
|
||||||
|
ORDER BY created_at
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(format!("%{}%", name))
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await?
|
||||||
|
} else {
|
||||||
|
sqlx::query_as::<_, Page>(
|
||||||
|
r#"SELECT * FROM page ORDER BY created_at"#,
|
||||||
|
)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await?
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Json(pages))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_page(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(page_id): Path<Uuid>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
let page = sqlx::query_as::<_, Page>("SELECT * FROM page WHERE id = $1")
|
||||||
|
.bind(page_id)
|
||||||
|
.fetch_optional(&state.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
match page {
|
||||||
|
Some(p) => Ok(Json(p)),
|
||||||
|
None => Err(ApiErr::NotFound("Page not found".to_string(), None)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Validate)]
|
||||||
|
pub struct CreatePageReq {
|
||||||
|
#[validate(length(min = 1, max = 100))]
|
||||||
|
pub name: String,
|
||||||
|
pub data: HashMap<String, Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Validate)]
|
||||||
|
pub struct UpdatePageReq {
|
||||||
|
#[validate(length(min = 1, max = 100))]
|
||||||
|
pub name: Option<String>,
|
||||||
|
pub data: Option<HashMap<String, Uuid>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_page(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Json(payload): Json<CreatePageReq>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
payload.validate()?;
|
||||||
|
|
||||||
|
let page_id = sqlx::query_scalar::<_, Uuid>(
|
||||||
|
r#"
|
||||||
|
INSERT INTO page (name, data)
|
||||||
|
VALUES ($1, $2)
|
||||||
|
RETURNING id
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(&payload.name)
|
||||||
|
.bind(SqlxJson(payload.data))
|
||||||
|
.fetch_one(&state.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok((StatusCode::CREATED, Json(serde_json::json!({
|
||||||
|
"id": page_id,
|
||||||
|
"ok_msg": "Page created successfully"
|
||||||
|
}))))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_page(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(page_id): Path<Uuid>,
|
||||||
|
Json(payload): Json<UpdatePageReq>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
payload.validate()?;
|
||||||
|
|
||||||
|
let exists = sqlx::query("SELECT 1 FROM page WHERE id = $1")
|
||||||
|
.bind(page_id)
|
||||||
|
.fetch_optional(&state.pool)
|
||||||
|
.await?;
|
||||||
|
if exists.is_none() {
|
||||||
|
return Err(ApiErr::NotFound("Page not found".to_string(), None));
|
||||||
|
}
|
||||||
|
|
||||||
|
if payload.name.is_none() && payload.data.is_none() {
|
||||||
|
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut updates = Vec::new();
|
||||||
|
let mut param_count = 1;
|
||||||
|
|
||||||
|
if payload.name.is_some() {
|
||||||
|
updates.push(format!("name = ${}", param_count));
|
||||||
|
param_count += 1;
|
||||||
|
}
|
||||||
|
if payload.data.is_some() {
|
||||||
|
updates.push(format!("data = ${}", param_count));
|
||||||
|
param_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
updates.push("updated_at = NOW()".to_string());
|
||||||
|
|
||||||
|
let sql = format!(
|
||||||
|
r#"UPDATE page SET {} WHERE id = ${}"#,
|
||||||
|
updates.join(", "),
|
||||||
|
param_count
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut query = sqlx::query(&sql);
|
||||||
|
if let Some(name) = payload.name {
|
||||||
|
query = query.bind(name);
|
||||||
|
}
|
||||||
|
if let Some(data) = payload.data {
|
||||||
|
query = query.bind(SqlxJson(data));
|
||||||
|
}
|
||||||
|
query = query.bind(page_id);
|
||||||
|
|
||||||
|
query.execute(&state.pool).await?;
|
||||||
|
|
||||||
|
Ok(Json(serde_json::json!({
|
||||||
|
"ok_msg": "Page updated successfully"
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete_page(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(page_id): Path<Uuid>,
|
||||||
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
|
let result = sqlx::query("DELETE FROM page WHERE id = $1")
|
||||||
|
.bind(page_id)
|
||||||
|
.execute(&state.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if result.rows_affected() == 0 {
|
||||||
|
return Err(ApiErr::NotFound("Page not found".to_string(), None));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(StatusCode::NO_CONTENT)
|
||||||
|
}
|
||||||
|
|
@ -2,7 +2,7 @@ use axum::{Json, extract::{Path, Query, State}, http::HeaderMap, response::IntoR
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use validator::Validate;
|
use validator::Validate;
|
||||||
use sqlx::Row;
|
use sqlx::{Row, QueryBuilder};
|
||||||
|
|
||||||
use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}};
|
use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}};
|
||||||
|
|
||||||
|
|
@ -69,12 +69,7 @@ pub async fn get_point(
|
||||||
Path(point_id): Path<Uuid>,
|
Path(point_id): Path<Uuid>,
|
||||||
) -> Result<impl IntoResponse, ApiErr> {
|
) -> Result<impl IntoResponse, ApiErr> {
|
||||||
let pool = &state.pool;
|
let pool = &state.pool;
|
||||||
let point = sqlx::query_as::<_, Point>(
|
let point = crate::service::get_point_by_id(pool, point_id).await?;
|
||||||
r#"SELECT * FROM point WHERE id = $1"#,
|
|
||||||
)
|
|
||||||
.bind(point_id)
|
|
||||||
.fetch_optional(pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(Json(point))
|
Ok(Json(point))
|
||||||
}
|
}
|
||||||
|
|
@ -136,51 +131,26 @@ pub async fn update_point(
|
||||||
return Err(ApiErr::NotFound("Point not found".to_string(), None));
|
return Err(ApiErr::NotFound("Point not found".to_string(), None));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build dynamic UPDATE SQL for provided fields.
|
let mut qb = QueryBuilder::new("UPDATE point SET ");
|
||||||
let mut updates = Vec::new();
|
let mut sep = qb.separated(", ");
|
||||||
let mut values: Vec<String> = Vec::new();
|
|
||||||
let mut param_count = 1;
|
|
||||||
|
|
||||||
if let Some(name) = &payload.name {
|
if let Some(name) = &payload.name {
|
||||||
updates.push(format!("name = ${}", param_count));
|
sep.push("name = ").push_bind(name);
|
||||||
values.push(name.clone());
|
|
||||||
param_count += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(description) = &payload.description {
|
if let Some(description) = &payload.description {
|
||||||
updates.push(format!("description = ${}", param_count));
|
sep.push("description = ").push_bind(description);
|
||||||
values.push(description.clone());
|
|
||||||
param_count += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(unit) = &payload.unit {
|
if let Some(unit) = &payload.unit {
|
||||||
updates.push(format!("unit = ${}", param_count));
|
sep.push("unit = ").push_bind(unit);
|
||||||
values.push(unit.clone());
|
|
||||||
param_count += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(tag_id) = &payload.tag_id {
|
if let Some(tag_id) = &payload.tag_id {
|
||||||
updates.push(format!("tag_id = ${}", param_count));
|
sep.push("tag_id = ").push_bind(tag_id);
|
||||||
values.push(tag_id.to_string());
|
|
||||||
param_count += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Always update timestamp.
|
sep.push("updated_at = NOW()");
|
||||||
updates.push("updated_at = NOW()".to_string());
|
|
||||||
|
|
||||||
let sql = format!(
|
qb.push(" WHERE id = ").push_bind(point_id);
|
||||||
"UPDATE point SET {} WHERE id = ${}",
|
qb.build().execute(pool).await?;
|
||||||
updates.join(", "),
|
|
||||||
param_count
|
|
||||||
);
|
|
||||||
values.push(point_id.to_string());
|
|
||||||
|
|
||||||
let mut query = sqlx::query(&sql);
|
|
||||||
for value in &values {
|
|
||||||
query = query.bind(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
query.execute(pool).await?;
|
|
||||||
|
|
||||||
Ok(Json(serde_json::json!({"ok_msg": "Point updated successfully"})))
|
Ok(Json(serde_json::json!({"ok_msg": "Point updated successfully"})))
|
||||||
}
|
}
|
||||||
|
|
@ -277,7 +247,7 @@ pub async fn delete_point(
|
||||||
if let Some(source_id) = source_id {
|
if let Some(source_id) = source_id {
|
||||||
if let Err(e) = state
|
if let Err(e) = state
|
||||||
.event_manager
|
.event_manager
|
||||||
.send(crate::event::ReloadEvent::PointDeleteBatch {
|
.send(crate::event::AppEvent::PointDeleteBatch {
|
||||||
source_id,
|
source_id,
|
||||||
point_ids: vec![point_id],
|
point_ids: vec![point_id],
|
||||||
})
|
})
|
||||||
|
|
@ -395,7 +365,7 @@ pub async fn batch_create_points(
|
||||||
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
|
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
|
||||||
if let Err(e) = state
|
if let Err(e) = state
|
||||||
.event_manager
|
.event_manager
|
||||||
.send(crate::event::ReloadEvent::PointCreateBatch { source_id, point_ids })
|
.send(crate::event::AppEvent::PointCreateBatch { source_id, point_ids })
|
||||||
{
|
{
|
||||||
tracing::error!("Failed to send PointCreateBatch event: {}", e);
|
tracing::error!("Failed to send PointCreateBatch event: {}", e);
|
||||||
}
|
}
|
||||||
|
|
@ -457,7 +427,7 @@ pub async fn batch_delete_points(
|
||||||
let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
|
let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
|
||||||
if let Err(e) = state
|
if let Err(e) = state
|
||||||
.event_manager
|
.event_manager
|
||||||
.send(crate::event::ReloadEvent::PointDeleteBatch {
|
.send(crate::event::AppEvent::PointDeleteBatch {
|
||||||
source_id,
|
source_id,
|
||||||
point_ids: ids,
|
point_ids: ids,
|
||||||
})
|
})
|
||||||
|
|
@ -489,7 +459,7 @@ pub async fn batch_set_point_value(
|
||||||
return Err(ApiErr::Forbidden(
|
return Err(ApiErr::Forbidden(
|
||||||
"write permission denied".to_string(),
|
"write permission denied".to_string(),
|
||||||
Some(serde_json::json!({
|
Some(serde_json::json!({
|
||||||
"hint": "set WRITE_API_KEY and pass header X-Write-Key"
|
"hint": "set WRITE_API_KEY (or legacy WRITE_KEY) and pass header X-Write-Key"
|
||||||
})),
|
})),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -58,18 +58,47 @@ impl TreeNode {
|
||||||
#[derive(Debug, Serialize, Clone)]
|
#[derive(Debug, Serialize, Clone)]
|
||||||
pub struct SourceWithStatus {
|
pub struct SourceWithStatus {
|
||||||
#[serde(flatten)]
|
#[serde(flatten)]
|
||||||
pub source: Source,
|
pub source: SourcePublic,
|
||||||
pub is_connected: bool,
|
pub is_connected: bool,
|
||||||
pub last_error: Option<String>,
|
pub last_error: Option<String>,
|
||||||
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
|
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
|
||||||
pub last_time: Option<DateTime<Utc>>,
|
pub last_time: Option<DateTime<Utc>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Clone)]
|
||||||
|
pub struct SourcePublic {
|
||||||
|
pub id: Uuid,
|
||||||
|
pub name: String,
|
||||||
|
pub protocol: String,
|
||||||
|
pub endpoint: String,
|
||||||
|
pub security_policy: Option<String>,
|
||||||
|
pub security_mode: Option<String>,
|
||||||
|
pub enabled: bool,
|
||||||
|
#[serde(serialize_with = "crate::util::datetime::utc_to_local_str")]
|
||||||
|
pub created_at: DateTime<Utc>,
|
||||||
|
#[serde(serialize_with = "crate::util::datetime::utc_to_local_str")]
|
||||||
|
pub updated_at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Source> for SourcePublic {
|
||||||
|
fn from(source: Source) -> Self {
|
||||||
|
Self {
|
||||||
|
id: source.id,
|
||||||
|
name: source.name,
|
||||||
|
protocol: source.protocol,
|
||||||
|
endpoint: source.endpoint,
|
||||||
|
security_policy: source.security_policy,
|
||||||
|
security_mode: source.security_mode,
|
||||||
|
enabled: source.enabled,
|
||||||
|
created_at: source.created_at,
|
||||||
|
updated_at: source.updated_at,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
|
pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
|
||||||
let pool = &state.pool;
|
let pool = &state.pool;
|
||||||
let sources: Vec<Source> = sqlx::query_as(
|
let sources: Vec<Source> = crate::service::get_all_enabled_sources(pool).await?;
|
||||||
r#"SELECT * FROM source where enabled is true"#,
|
|
||||||
).fetch_all(pool).await?;
|
|
||||||
|
|
||||||
// 获取所有连接状态
|
// 获取所有连接状态
|
||||||
let status_map: std::collections::HashMap<Uuid, (bool, Option<String>, Option<DateTime<Utc>>)> =
|
let status_map: std::collections::HashMap<Uuid, (bool, Option<String>, Option<DateTime<Utc>>)> =
|
||||||
|
|
@ -87,7 +116,7 @@ pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoR
|
||||||
.map(|(connected, error, time)| (*connected, error.clone(), *time))
|
.map(|(connected, error, time)| (*connected, error.clone(), *time))
|
||||||
.unwrap_or((false, None, None));
|
.unwrap_or((false, None, None));
|
||||||
SourceWithStatus {
|
SourceWithStatus {
|
||||||
source,
|
source: source.into(),
|
||||||
is_connected,
|
is_connected,
|
||||||
last_error,
|
last_error,
|
||||||
last_time,
|
last_time,
|
||||||
|
|
@ -196,7 +225,7 @@ pub async fn create_source(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// 触发 SourceCreate 事件
|
// 触发 SourceCreate 事件
|
||||||
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceCreate { source_id: new_id });
|
let _ = state.event_manager.send(crate::event::AppEvent::SourceCreate { source_id: new_id });
|
||||||
|
|
||||||
Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id })))
|
Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id })))
|
||||||
}
|
}
|
||||||
|
|
@ -271,7 +300,7 @@ pub async fn update_source(
|
||||||
qb.push(" WHERE id = ").push_bind(source_id);
|
qb.push(" WHERE id = ").push_bind(source_id);
|
||||||
qb.build().execute(pool).await?;
|
qb.build().execute(pool).await?;
|
||||||
|
|
||||||
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceUpdate { source_id });
|
let _ = state.event_manager.send(crate::event::AppEvent::SourceUpdate { source_id });
|
||||||
|
|
||||||
Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"})))
|
Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"})))
|
||||||
}
|
}
|
||||||
|
|
@ -294,7 +323,7 @@ pub async fn delete_source(
|
||||||
}
|
}
|
||||||
|
|
||||||
// 触发 SourceDelete 事件
|
// 触发 SourceDelete 事件
|
||||||
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceDelete { source_id });
|
let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id });
|
||||||
|
|
||||||
Ok(StatusCode::NO_CONTENT)
|
Ok(StatusCode::NO_CONTENT)
|
||||||
}
|
}
|
||||||
|
|
@ -444,116 +473,22 @@ async fn process_reference(
|
||||||
let display_name = ref_desc.display_name.text.to_string();
|
let display_name = ref_desc.display_name.text.to_string();
|
||||||
let node_class = format!("{:?}", ref_desc.node_class);
|
let node_class = format!("{:?}", ref_desc.node_class);
|
||||||
|
|
||||||
let now = Utc::now();
|
let effective_parent_id = if let Some(pid) = parent_id {
|
||||||
let node_uuid = Uuid::new_v4();
|
let parent_exists = sqlx::query(r#"SELECT 1 FROM node WHERE id = $1"#)
|
||||||
|
.bind(pid)
|
||||||
// ?? 关键优化:直接 UPSERT(避免 SELECT)
|
|
||||||
// 注意:如果 parent_id 存在,则必须确保该父节点已存在于数据库中
|
|
||||||
// 否则会触发外键约束失败
|
|
||||||
if parent_id.is_some() {
|
|
||||||
// 检查父节点是否已存在于数据库中
|
|
||||||
let parent_exists = sqlx::query(
|
|
||||||
r#"SELECT 1 FROM node WHERE id = $1"#,
|
|
||||||
)
|
|
||||||
.bind(parent_id.unwrap())
|
|
||||||
.fetch_optional(tx.as_mut())
|
.fetch_optional(tx.as_mut())
|
||||||
.await?;
|
.await?;
|
||||||
|
if parent_exists.is_some() {
|
||||||
if parent_exists.is_none() {
|
Some(pid)
|
||||||
// 如果父节点不存在,则暂时不设置 parent_id
|
|
||||||
// 这样可以避免外键约束失败
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
INSERT INTO node (
|
|
||||||
id,
|
|
||||||
source_id,
|
|
||||||
external_id,
|
|
||||||
namespace_uri,
|
|
||||||
namespace_index,
|
|
||||||
identifier_type,
|
|
||||||
identifier,
|
|
||||||
browse_name,
|
|
||||||
display_name,
|
|
||||||
node_class,
|
|
||||||
parent_id,
|
|
||||||
)
|
|
||||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,NULL)
|
|
||||||
ON CONFLICT(source_id, external_id) DO UPDATE SET
|
|
||||||
namespace_uri = excluded.namespace_uri,
|
|
||||||
namespace_index = excluded.namespace_index,
|
|
||||||
identifier_type = excluded.identifier_type,
|
|
||||||
identifier = excluded.identifier,
|
|
||||||
browse_name = excluded.browse_name,
|
|
||||||
display_name = excluded.display_name,
|
|
||||||
node_class = excluded.node_class,
|
|
||||||
updated_at = NOW()
|
|
||||||
"#
|
|
||||||
)
|
|
||||||
.bind(node_uuid)
|
|
||||||
.bind(source_id)
|
|
||||||
.bind(&node_id_str)
|
|
||||||
.bind(&namespace_uri)
|
|
||||||
.bind(namespace_index.map(|v| v as i32))
|
|
||||||
.bind(&identifier_type)
|
|
||||||
.bind(&identifier)
|
|
||||||
.bind(&browse_name)
|
|
||||||
.bind(&display_name)
|
|
||||||
.bind(&node_class)
|
|
||||||
.execute(tx.as_mut())
|
|
||||||
.await
|
|
||||||
.context("Failed to execute UPSERT query")?;
|
|
||||||
} else {
|
} else {
|
||||||
// 如果父节点存在,则正常设置 parent_id
|
None
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
INSERT INTO node (
|
|
||||||
id,
|
|
||||||
source_id,
|
|
||||||
external_id,
|
|
||||||
namespace_uri,
|
|
||||||
namespace_index,
|
|
||||||
identifier_type,
|
|
||||||
identifier,
|
|
||||||
browse_name,
|
|
||||||
display_name,
|
|
||||||
node_class,
|
|
||||||
parent_id,
|
|
||||||
created_at,
|
|
||||||
updated_at
|
|
||||||
)
|
|
||||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,NOW(),NOW())
|
|
||||||
ON CONFLICT(source_id, external_id) DO UPDATE SET
|
|
||||||
namespace_uri = excluded.namespace_uri,
|
|
||||||
namespace_index = excluded.namespace_index,
|
|
||||||
identifier_type = excluded.identifier_type,
|
|
||||||
identifier = excluded.identifier,
|
|
||||||
browse_name = excluded.browse_name,
|
|
||||||
display_name = excluded.display_name,
|
|
||||||
node_class = excluded.node_class,
|
|
||||||
parent_id = excluded.parent_id,
|
|
||||||
updated_at = NOW()
|
|
||||||
"#
|
|
||||||
)
|
|
||||||
.bind(node_uuid)
|
|
||||||
.bind(source_id)
|
|
||||||
.bind(&node_id_str)
|
|
||||||
.bind(&namespace_uri)
|
|
||||||
.bind(namespace_index.map(|v| v as i32))
|
|
||||||
.bind(&identifier_type)
|
|
||||||
.bind(&identifier)
|
|
||||||
.bind(&browse_name)
|
|
||||||
.bind(&display_name)
|
|
||||||
.bind(&node_class)
|
|
||||||
.bind(parent_id)
|
|
||||||
.bind(now)
|
|
||||||
.bind(now)
|
|
||||||
.execute(tx.as_mut())
|
|
||||||
.await
|
|
||||||
.context("Failed to execute UPSERT query")?;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 如果没有 parent_id,则正常插入
|
None
|
||||||
sqlx::query(
|
};
|
||||||
|
|
||||||
|
// Use RETURNING id so queue always carries the actual DB node id.
|
||||||
|
let persisted_node_id = sqlx::query_scalar::<_, Uuid>(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO node (
|
INSERT INTO node (
|
||||||
id,
|
id,
|
||||||
|
|
@ -568,7 +503,7 @@ async fn process_reference(
|
||||||
node_class,
|
node_class,
|
||||||
parent_id
|
parent_id
|
||||||
)
|
)
|
||||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,NULL)
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
|
||||||
ON CONFLICT(source_id, external_id) DO UPDATE SET
|
ON CONFLICT(source_id, external_id) DO UPDATE SET
|
||||||
namespace_uri = excluded.namespace_uri,
|
namespace_uri = excluded.namespace_uri,
|
||||||
namespace_index = excluded.namespace_index,
|
namespace_index = excluded.namespace_index,
|
||||||
|
|
@ -577,10 +512,12 @@ async fn process_reference(
|
||||||
browse_name = excluded.browse_name,
|
browse_name = excluded.browse_name,
|
||||||
display_name = excluded.display_name,
|
display_name = excluded.display_name,
|
||||||
node_class = excluded.node_class,
|
node_class = excluded.node_class,
|
||||||
|
parent_id = COALESCE(excluded.parent_id, node.parent_id),
|
||||||
updated_at = NOW()
|
updated_at = NOW()
|
||||||
"#
|
RETURNING id
|
||||||
|
"#,
|
||||||
)
|
)
|
||||||
.bind(node_uuid)
|
.bind(Uuid::new_v4())
|
||||||
.bind(source_id)
|
.bind(source_id)
|
||||||
.bind(&node_id_str)
|
.bind(&node_id_str)
|
||||||
.bind(&namespace_uri)
|
.bind(&namespace_uri)
|
||||||
|
|
@ -590,13 +527,13 @@ async fn process_reference(
|
||||||
.bind(&browse_name)
|
.bind(&browse_name)
|
||||||
.bind(&display_name)
|
.bind(&display_name)
|
||||||
.bind(&node_class)
|
.bind(&node_class)
|
||||||
.execute(tx.as_mut())
|
.bind(effective_parent_id)
|
||||||
|
.fetch_one(tx.as_mut())
|
||||||
.await
|
.await
|
||||||
.context("Failed to execute UPSERT query")?;
|
.context("Failed to execute UPSERT query")?;
|
||||||
}
|
|
||||||
|
|
||||||
processed_nodes.insert(node_id_str.clone(), ());
|
processed_nodes.insert(node_id_str.clone(), ());
|
||||||
queue.push_back((node_id_obj.clone(), Some(node_uuid)));
|
queue.push_back((node_id_obj.clone(), Some(persisted_node_id)));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -142,6 +142,8 @@ fn build_router(state: AppState) -> Router {
|
||||||
.route("/api/point/batch/set-tags", put(handler::point::batch_set_point_tags))
|
.route("/api/point/batch/set-tags", put(handler::point::batch_set_point_tags))
|
||||||
.route("/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag))
|
.route("/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag))
|
||||||
.route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag))
|
.route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag))
|
||||||
|
.route("/api/page", get(handler::page::get_page_list).post(handler::page::create_page))
|
||||||
|
.route("/api/page/{page_id}", get(handler::page::get_page).put(handler::page::update_page).delete(handler::page::delete_page))
|
||||||
.route("/api/logs", get(handler::log::get_logs))
|
.route("/api/logs", get(handler::log::get_logs))
|
||||||
.route("/api/logs/stream", get(handler::log::stream_logs));
|
.route("/api/logs/stream", get(handler::log::stream_logs));
|
||||||
|
|
||||||
|
|
|
||||||
13
src/model.rs
13
src/model.rs
|
|
@ -1,6 +1,8 @@
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::FromRow;
|
use sqlx::FromRow;
|
||||||
|
use sqlx::types::Json;
|
||||||
|
use std::collections::HashMap;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use crate::util::datetime::utc_to_local_str;
|
use crate::util::datetime::utc_to_local_str;
|
||||||
|
|
||||||
|
|
@ -116,3 +118,14 @@ pub struct Tag {
|
||||||
pub updated_at: DateTime<Utc>,
|
pub updated_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
|
||||||
|
pub struct Page {
|
||||||
|
pub id: Uuid,
|
||||||
|
pub name: String,
|
||||||
|
pub data: Json<HashMap<String, Uuid>>,
|
||||||
|
#[serde(serialize_with = "utc_to_local_str")]
|
||||||
|
pub created_at: DateTime<Utc>,
|
||||||
|
#[serde(serialize_with = "utc_to_local_str")]
|
||||||
|
pub updated_at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,18 @@ pub async fn get_all_enabled_sources(pool: &PgPool) -> Result<Vec<Source>, sqlx:
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_point_by_id(
|
||||||
|
pool: &PgPool,
|
||||||
|
point_id: uuid::Uuid,
|
||||||
|
) -> Result<Option<crate::model::Point>, sqlx::Error> {
|
||||||
|
query_as::<_, crate::model::Point>(
|
||||||
|
r#"SELECT * FROM point WHERE id = $1"#,
|
||||||
|
)
|
||||||
|
.bind(point_id)
|
||||||
|
.fetch_optional(pool)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_points_grouped_by_source(
|
pub async fn get_points_grouped_by_source(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
point_ids: &[uuid::Uuid],
|
point_ids: &[uuid::Uuid],
|
||||||
|
|
|
||||||
|
|
@ -68,13 +68,7 @@ impl IntoResponse for ApiErr {
|
||||||
impl From<Error> for ApiErr {
|
impl From<Error> for ApiErr {
|
||||||
fn from(err: Error) -> Self {
|
fn from(err: Error) -> Self {
|
||||||
tracing::error!("Error: {:?}; root_cause: {}", err, err.root_cause());
|
tracing::error!("Error: {:?}; root_cause: {}", err, err.root_cause());
|
||||||
ApiErr::Internal(
|
ApiErr::Internal("internal server error".to_string(), None)
|
||||||
err.to_string(),
|
|
||||||
Some(serde_json::json!({
|
|
||||||
"root_cause": err.root_cause().to_string(),
|
|
||||||
"chain": err.chain().map(|e| e.to_string()).collect::<Vec<_>>()
|
|
||||||
})),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue