Compare commits

...

4 Commits

Author SHA1 Message Date
caoqianming 503aefc4cb refactor(event): rename ReloadEvent to AppEvent and split event channels
Clarify event semantics by renaming ReloadEvent to AppEvent and route control vs telemetry traffic through dedicated channels. This keeps control events isolated from high-frequency PointNewValue updates while preserving the existing send() call pattern.

Made-with: Cursor
2026-03-13 14:44:30 +08:00
caoqianming 5fa63ad6dd fix(opcua): stabilize reconnect loop and coalesce telemetry events
Always clear reconnect-in-progress markers after reconnect attempts so heartbeat-triggered retries are not blocked. Reduce high-frequency event overhead by coalescing consecutive point updates in the event worker and processing only the latest value per source/client handle.

Made-with: Cursor
2026-03-13 14:26:50 +08:00
caoqianming 5406568969 fix: harden event handling and source safety
Improve runtime resilience by bounding the reload event queue and processing telemetry updates without per-point spawned tasks. Also reduce security risk by sanitizing source responses, avoiding internal error detail leaks, and standardizing write-key configuration with backward compatibility.

Made-with: Cursor
2026-03-13 14:22:16 +08:00
caoqianming 6f215162a3 feat(page): add page table and CRUD handlers 2026-03-11 13:54:14 +08:00
13 changed files with 542 additions and 356 deletions

View File

@ -12,7 +12,7 @@ axum = { version = "0.8", features = ["ws"] }
tower-http = { version = "0.6", features = ["cors", "fs"] } tower-http = { version = "0.6", features = ["cors", "fs"] }
# Database # Database
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid"] } sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "json"] }
# Serialization # Serialization
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View File

@ -0,0 +1,11 @@
-- Page table for UI bindings
CREATE TABLE page (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Common indexes
CREATE INDEX idx_page_name ON page(name);

View File

@ -17,7 +17,10 @@ impl AppConfig {
.unwrap_or_else(|_| "60309".to_string()) .unwrap_or_else(|_| "60309".to_string())
.parse::<u16>() .parse::<u16>()
.map_err(|_| "PORT must be a number")?; .map_err(|_| "PORT must be a number")?;
let write_api_key = env::var("WRITE_KEY").ok(); // Prefer WRITE_API_KEY, keep WRITE_KEY as backward-compatible fallback.
let write_api_key = env::var("WRITE_API_KEY")
.ok()
.or_else(|| env::var("WRITE_KEY").ok());
Ok(Self { Ok(Self {
database_url, database_url,

View File

@ -436,7 +436,7 @@ impl ConnectionManager {
.map(crate::telemetry::PointQuality::from_status_code) .map(crate::telemetry::PointQuality::from_status_code)
.unwrap_or(crate::telemetry::PointQuality::Unknown); .unwrap_or(crate::telemetry::PointQuality::Unknown);
let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue { crate::telemetry::PointNewValue {
source_id, source_id,
point_id: Some(poll_point.point_id), point_id: Some(poll_point.point_id),
@ -702,11 +702,9 @@ impl ConnectionManager {
// 再重新连接 // 再重新连接
let result = self.connect_from_source(pool, source_id).await; let result = self.connect_from_source(pool, source_id).await;
// 清除重连标记 // 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连
if result.is_ok() { let mut reconnecting = self.reconnecting.write().await;
let mut reconnecting = self.reconnecting.write().await; reconnecting.remove(&source_id);
reconnecting.remove(&source_id);
}
result result
} }
@ -988,7 +986,7 @@ impl ConnectionManager {
// Emit local updates only when the full batch succeeds. // Emit local updates only when the full batch succeeds.
if let Some(event_manager) = &self.event_manager { if let Some(event_manager) = &self.event_manager {
for (source_id, point_id, variant) in success_events { for (source_id, point_id, variant) in success_events {
if let Err(e) = event_manager.send(crate::event::ReloadEvent::PointNewValue( if let Err(e) = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue { crate::telemetry::PointNewValue {
source_id, source_id,
point_id: Some(point_id), point_id: Some(point_id),
@ -1128,7 +1126,7 @@ impl ConnectionManager {
.unwrap_or(crate::telemetry::PointQuality::Unknown); .unwrap_or(crate::telemetry::PointQuality::Unknown);
if let Some(event_manager) = &data_manager.event_manager { if let Some(event_manager) = &data_manager.event_manager {
let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue( let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue { crate::telemetry::PointNewValue {
source_id: current_source_id, source_id: current_source_id,
point_id: None, point_id: None,

View File

@ -1,8 +1,12 @@
use std::collections::HashMap;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024;
const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum ReloadEvent { pub enum AppEvent {
SourceCreate { SourceCreate {
source_id: Uuid, source_id: Uuid,
}, },
@ -24,7 +28,8 @@ pub enum ReloadEvent {
} }
pub struct EventManager { pub struct EventManager {
sender: mpsc::UnboundedSender<ReloadEvent>, control_sender: mpsc::Sender<AppEvent>,
telemetry_sender: mpsc::Sender<crate::telemetry::PointNewValue>,
} }
impl EventManager { impl EventManager {
@ -33,153 +38,224 @@ impl EventManager {
connection_manager: std::sync::Arc<crate::connection::ConnectionManager>, connection_manager: std::sync::Arc<crate::connection::ConnectionManager>,
ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>, ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>,
) -> Self { ) -> Self {
let (sender, mut receiver) = mpsc::unbounded_channel::<ReloadEvent>(); let (control_sender, mut control_receiver) =
let ws_manager_clone = ws_manager.clone(); mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
let (telemetry_sender, mut telemetry_receiver) =
mpsc::channel::<crate::telemetry::PointNewValue>(TELEMETRY_EVENT_CHANNEL_CAPACITY);
let control_cm = connection_manager.clone();
let control_pool = pool.clone();
tokio::spawn(async move { tokio::spawn(async move {
// 在循环外克隆,避免在循环中移动 while let Some(event) = control_receiver.recv().await {
let connection_manager_clone = connection_manager.clone(); handle_control_event(event, &control_pool, &control_cm).await;
while let Some(event) = receiver.recv().await { }
match event { });
ReloadEvent::SourceCreate { source_id } => {
tracing::info!("Processing SourceCreate event for {}", source_id); let ws_manager_clone = ws_manager.clone();
if let Err(e) = connection_manager_clone.connect_from_source(&pool, source_id).await { let telemetry_cm = connection_manager.clone();
tracing::error!("Failed to connect to source {}: {}", source_id, e); tokio::spawn(async move {
while let Some(payload) = telemetry_receiver.recv().await {
let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> =
HashMap::new();
latest_by_key.insert((payload.source_id, payload.client_handle), payload);
loop {
match telemetry_receiver.try_recv() {
Ok(next_payload) => {
latest_by_key.insert(
(next_payload.source_id, next_payload.client_handle),
next_payload,
);
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
break;
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
break;
} }
} }
ReloadEvent::SourceUpdate { source_id } => { }
tracing::info!("Processing SourceUpdate event for {}", source_id);
if let Err(e) = connection_manager_clone.reconnect(&pool, source_id).await {
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
}
}
ReloadEvent::SourceDelete { source_id } => {
tracing::info!("Processing SourceDelete event for {}", source_id);
if let Err(e) = connection_manager_clone.disconnect(source_id).await {
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
}
}
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
let requested_count = point_ids.len();
match connection_manager_clone
.subscribe_points_from_source(source_id, Some(point_ids), &pool)
.await
{
Ok(stats) => {
let subscribed = *stats.get("subscribed").unwrap_or(&0);
let polled = *stats.get("polled").unwrap_or(&0);
let total = *stats.get("total").unwrap_or(&0);
tracing::info!(
"PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}",
source_id,
requested_count,
subscribed,
polled,
total
);
}
Err(e) => {
tracing::error!("Failed to subscribe to points: {}", e);
}
}
}
ReloadEvent::PointDeleteBatch { source_id, point_ids } => {
tracing::info!(
"Processing PointDeleteBatch event for source {} with {} points",
source_id,
point_ids.len()
);
if let Err(e) = connection_manager_clone
.unsubscribe_points_from_source(source_id, point_ids)
.await
{
tracing::error!("Failed to unsubscribe points: {}", e);
}
}
ReloadEvent::PointNewValue(payload) => {
let source_id = payload.source_id;
let client_handle = payload.client_handle;
let point_id = if let Some(point_id) = payload.point_id {
Some(point_id)
} else {
let status = connection_manager_clone.get_status_read_guard().await;
status
.get(&source_id)
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
};
if let Some(point_id) = point_id {
// 从缓存中读取旧值
let (old_value, old_timestamp, value_changed) = {
let monitor_data = connection_manager_clone.get_point_monitor_data_read_guard().await;
let old_monitor_info = monitor_data.get(&point_id);
if let Some(old_info) = old_monitor_info { for point_payload in latest_by_key.into_values() {
let changed = old_info.value != payload.value || process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref())
old_info.timestamp != payload.timestamp; .await;
(old_info.value.clone(), old_info.timestamp, changed)
} else {
(None, None, false)
}
};
let monitor = crate::telemetry::PointMonitorInfo {
protocol: payload.protocol.clone(),
source_id,
point_id,
client_handle,
scan_mode: payload.scan_mode.clone(),
timestamp: payload.timestamp,
quality: payload.quality.clone(),
value: payload.value.clone(),
value_type: payload.value_type.clone(),
value_text: payload.value_text.clone(),
old_value,
old_timestamp,
value_changed,
};
// 克隆 monitor用于并行执行
let monitor_for_ws = monitor.clone();
let monitor_for_db = monitor.clone();
// 并行执行 update_point_monitor_data 和 send_to_public不等待完成
let cm_clone = connection_manager_clone.clone();
tokio::spawn(async move {
// 更新监控数据
if let Err(e) = cm_clone.update_point_monitor_data(monitor_for_db).await {
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
}
});
let ws_clone = ws_manager_clone.clone();
tokio::spawn(async move {
// 发送WebSocket消息
if let Some(ws_manager) = ws_clone {
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws);
if let Err(e) = ws_manager.send_to_public(ws_message).await {
tracing::error!("Failed to send WebSocket message to public room: {}", e);
}
// 暂时注释掉 send_to_client因为现在信息只需发送到 public
// if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
// tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
// }
}
});
} else {
tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle);
}
}
} }
} }
}); });
Self { sender } Self {
control_sender,
telemetry_sender,
}
} }
pub fn send(&self, event: ReloadEvent) -> Result<(), String> { pub fn send(&self, event: AppEvent) -> Result<(), String> {
self.sender match event {
.send(event) AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) {
.map_err(|e| format!("Failed to send event: {}", e)) Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
Err(format!("Failed to send telemetry event: channel closed ({e:?})"))
}
Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => {
// High-frequency telemetry is lossy by design under sustained pressure.
tracing::warn!(
"Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}",
payload.source_id,
payload.client_handle
);
Ok(())
}
},
control_event => match self.control_sender.try_send(control_event) {
Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
Err(format!("Failed to send control event: channel closed ({e:?})"))
}
Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => {
Err(format!("Failed to send control event: queue full ({e:?})"))
}
},
}
}
}
async fn handle_control_event(
event: AppEvent,
pool: &sqlx::PgPool,
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
) {
match event {
AppEvent::SourceCreate { source_id } => {
tracing::info!("Processing SourceCreate event for {}", source_id);
if let Err(e) = connection_manager.connect_from_source(pool, source_id).await {
tracing::error!("Failed to connect to source {}: {}", source_id, e);
}
}
AppEvent::SourceUpdate { source_id } => {
tracing::info!("Processing SourceUpdate event for {}", source_id);
if let Err(e) = connection_manager.reconnect(pool, source_id).await {
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
}
}
AppEvent::SourceDelete { source_id } => {
tracing::info!("Processing SourceDelete event for {}", source_id);
if let Err(e) = connection_manager.disconnect(source_id).await {
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
}
}
AppEvent::PointCreateBatch { source_id, point_ids } => {
let requested_count = point_ids.len();
match connection_manager
.subscribe_points_from_source(source_id, Some(point_ids), pool)
.await
{
Ok(stats) => {
let subscribed = *stats.get("subscribed").unwrap_or(&0);
let polled = *stats.get("polled").unwrap_or(&0);
let total = *stats.get("total").unwrap_or(&0);
tracing::info!(
"PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}",
source_id,
requested_count,
subscribed,
polled,
total
);
}
Err(e) => {
tracing::error!("Failed to subscribe to points: {}", e);
}
}
}
AppEvent::PointDeleteBatch { source_id, point_ids } => {
tracing::info!(
"Processing PointDeleteBatch event for source {} with {} points",
source_id,
point_ids.len()
);
if let Err(e) = connection_manager
.unsubscribe_points_from_source(source_id, point_ids)
.await
{
tracing::error!("Failed to unsubscribe points: {}", e);
}
}
AppEvent::PointNewValue(_) => {
tracing::warn!("PointNewValue routed to control worker unexpectedly");
}
}
}
async fn process_point_new_value(
payload: crate::telemetry::PointNewValue,
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
) {
let source_id = payload.source_id;
let client_handle = payload.client_handle;
let point_id = if let Some(point_id) = payload.point_id {
Some(point_id)
} else {
let status = connection_manager.get_status_read_guard().await;
status
.get(&source_id)
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
};
if let Some(point_id) = point_id {
// 从缓存中读取旧值
let (old_value, old_timestamp, value_changed) = {
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
let old_monitor_info = monitor_data.get(&point_id);
if let Some(old_info) = old_monitor_info {
let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp;
(old_info.value.clone(), old_info.timestamp, changed)
} else {
(None, None, false)
}
};
let monitor = crate::telemetry::PointMonitorInfo {
protocol: payload.protocol,
source_id,
point_id,
client_handle,
scan_mode: payload.scan_mode,
timestamp: payload.timestamp,
quality: payload.quality,
value: payload.value,
value_type: payload.value_type,
value_text: payload.value_text,
old_value,
old_timestamp,
value_changed,
};
if let Err(e) = connection_manager
.update_point_monitor_data(monitor.clone())
.await
{
tracing::error!(
"Failed to update point monitor data for point {}: {}",
point_id,
e
);
}
if let Some(ws_manager) = ws_manager {
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
if let Err(e) = ws_manager.send_to_public(ws_message).await {
tracing::warn!(
"Failed to send WebSocket message to public room: {}",
e
);
}
}
} else {
tracing::warn!(
"Point not found for source {} client_handle {}",
source_id,
client_handle
);
} }
} }

View File

@ -2,3 +2,4 @@ pub mod source;
pub mod point; pub mod point;
pub mod tag; pub mod tag;
pub mod log; pub mod log;
pub mod page;

169
src/handler/page.rs Normal file
View File

@ -0,0 +1,169 @@
use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse};
use serde::Deserialize;
use std::collections::HashMap;
use sqlx::types::Json as SqlxJson;
use uuid::Uuid;
use validator::Validate;
use crate::model::Page;
use crate::util::response::ApiErr;
use crate::AppState;
#[derive(Deserialize, Validate)]
pub struct GetPageListQuery {
#[validate(length(min = 1, max = 100))]
pub name: Option<String>,
}
pub async fn get_page_list(
State(state): State<AppState>,
Query(query): Query<GetPageListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.pool;
let pages: Vec<Page> = if let Some(name) = query.name {
sqlx::query_as::<_, Page>(
r#"
SELECT * FROM page
WHERE name ILIKE $1
ORDER BY created_at
"#,
)
.bind(format!("%{}%", name))
.fetch_all(pool)
.await?
} else {
sqlx::query_as::<_, Page>(
r#"SELECT * FROM page ORDER BY created_at"#,
)
.fetch_all(pool)
.await?
};
Ok(Json(pages))
}
pub async fn get_page(
State(state): State<AppState>,
Path(page_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let page = sqlx::query_as::<_, Page>("SELECT * FROM page WHERE id = $1")
.bind(page_id)
.fetch_optional(&state.pool)
.await?;
match page {
Some(p) => Ok(Json(p)),
None => Err(ApiErr::NotFound("Page not found".to_string(), None)),
}
}
#[derive(Debug, Deserialize, Validate)]
pub struct CreatePageReq {
#[validate(length(min = 1, max = 100))]
pub name: String,
pub data: HashMap<String, Uuid>,
}
#[derive(Debug, Deserialize, Validate)]
pub struct UpdatePageReq {
#[validate(length(min = 1, max = 100))]
pub name: Option<String>,
pub data: Option<HashMap<String, Uuid>>,
}
pub async fn create_page(
State(state): State<AppState>,
Json(payload): Json<CreatePageReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let page_id = sqlx::query_scalar::<_, Uuid>(
r#"
INSERT INTO page (name, data)
VALUES ($1, $2)
RETURNING id
"#,
)
.bind(&payload.name)
.bind(SqlxJson(payload.data))
.fetch_one(&state.pool)
.await?;
Ok((StatusCode::CREATED, Json(serde_json::json!({
"id": page_id,
"ok_msg": "Page created successfully"
}))))
}
pub async fn update_page(
State(state): State<AppState>,
Path(page_id): Path<Uuid>,
Json(payload): Json<UpdatePageReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let exists = sqlx::query("SELECT 1 FROM page WHERE id = $1")
.bind(page_id)
.fetch_optional(&state.pool)
.await?;
if exists.is_none() {
return Err(ApiErr::NotFound("Page not found".to_string(), None));
}
if payload.name.is_none() && payload.data.is_none() {
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
let mut updates = Vec::new();
let mut param_count = 1;
if payload.name.is_some() {
updates.push(format!("name = ${}", param_count));
param_count += 1;
}
if payload.data.is_some() {
updates.push(format!("data = ${}", param_count));
param_count += 1;
}
updates.push("updated_at = NOW()".to_string());
let sql = format!(
r#"UPDATE page SET {} WHERE id = ${}"#,
updates.join(", "),
param_count
);
let mut query = sqlx::query(&sql);
if let Some(name) = payload.name {
query = query.bind(name);
}
if let Some(data) = payload.data {
query = query.bind(SqlxJson(data));
}
query = query.bind(page_id);
query.execute(&state.pool).await?;
Ok(Json(serde_json::json!({
"ok_msg": "Page updated successfully"
})))
}
pub async fn delete_page(
State(state): State<AppState>,
Path(page_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let result = sqlx::query("DELETE FROM page WHERE id = $1")
.bind(page_id)
.execute(&state.pool)
.await?;
if result.rows_affected() == 0 {
return Err(ApiErr::NotFound("Page not found".to_string(), None));
}
Ok(StatusCode::NO_CONTENT)
}

View File

@ -2,7 +2,7 @@ use axum::{Json, extract::{Path, Query, State}, http::HeaderMap, response::IntoR
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use validator::Validate; use validator::Validate;
use sqlx::Row; use sqlx::{Row, QueryBuilder};
use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}}; use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}};
@ -69,12 +69,7 @@ pub async fn get_point(
Path(point_id): Path<Uuid>, Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool; let pool = &state.pool;
let point = sqlx::query_as::<_, Point>( let point = crate::service::get_point_by_id(pool, point_id).await?;
r#"SELECT * FROM point WHERE id = $1"#,
)
.bind(point_id)
.fetch_optional(pool)
.await?;
Ok(Json(point)) Ok(Json(point))
} }
@ -136,51 +131,26 @@ pub async fn update_point(
return Err(ApiErr::NotFound("Point not found".to_string(), None)); return Err(ApiErr::NotFound("Point not found".to_string(), None));
} }
// Build dynamic UPDATE SQL for provided fields. let mut qb = QueryBuilder::new("UPDATE point SET ");
let mut updates = Vec::new(); let mut sep = qb.separated(", ");
let mut values: Vec<String> = Vec::new();
let mut param_count = 1;
if let Some(name) = &payload.name { if let Some(name) = &payload.name {
updates.push(format!("name = ${}", param_count)); sep.push("name = ").push_bind(name);
values.push(name.clone());
param_count += 1;
} }
if let Some(description) = &payload.description { if let Some(description) = &payload.description {
updates.push(format!("description = ${}", param_count)); sep.push("description = ").push_bind(description);
values.push(description.clone());
param_count += 1;
} }
if let Some(unit) = &payload.unit { if let Some(unit) = &payload.unit {
updates.push(format!("unit = ${}", param_count)); sep.push("unit = ").push_bind(unit);
values.push(unit.clone());
param_count += 1;
} }
if let Some(tag_id) = &payload.tag_id { if let Some(tag_id) = &payload.tag_id {
updates.push(format!("tag_id = ${}", param_count)); sep.push("tag_id = ").push_bind(tag_id);
values.push(tag_id.to_string());
param_count += 1;
} }
// Always update timestamp. sep.push("updated_at = NOW()");
updates.push("updated_at = NOW()".to_string());
let sql = format!( qb.push(" WHERE id = ").push_bind(point_id);
"UPDATE point SET {} WHERE id = ${}", qb.build().execute(pool).await?;
updates.join(", "),
param_count
);
values.push(point_id.to_string());
let mut query = sqlx::query(&sql);
for value in &values {
query = query.bind(value);
}
query.execute(pool).await?;
Ok(Json(serde_json::json!({"ok_msg": "Point updated successfully"}))) Ok(Json(serde_json::json!({"ok_msg": "Point updated successfully"})))
} }
@ -277,7 +247,7 @@ pub async fn delete_point(
if let Some(source_id) = source_id { if let Some(source_id) = source_id {
if let Err(e) = state if let Err(e) = state
.event_manager .event_manager
.send(crate::event::ReloadEvent::PointDeleteBatch { .send(crate::event::AppEvent::PointDeleteBatch {
source_id, source_id,
point_ids: vec![point_id], point_ids: vec![point_id],
}) })
@ -395,7 +365,7 @@ pub async fn batch_create_points(
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect(); let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state if let Err(e) = state
.event_manager .event_manager
.send(crate::event::ReloadEvent::PointCreateBatch { source_id, point_ids }) .send(crate::event::AppEvent::PointCreateBatch { source_id, point_ids })
{ {
tracing::error!("Failed to send PointCreateBatch event: {}", e); tracing::error!("Failed to send PointCreateBatch event: {}", e);
} }
@ -457,7 +427,7 @@ pub async fn batch_delete_points(
let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect(); let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state if let Err(e) = state
.event_manager .event_manager
.send(crate::event::ReloadEvent::PointDeleteBatch { .send(crate::event::AppEvent::PointDeleteBatch {
source_id, source_id,
point_ids: ids, point_ids: ids,
}) })
@ -489,7 +459,7 @@ pub async fn batch_set_point_value(
return Err(ApiErr::Forbidden( return Err(ApiErr::Forbidden(
"write permission denied".to_string(), "write permission denied".to_string(),
Some(serde_json::json!({ Some(serde_json::json!({
"hint": "set WRITE_API_KEY and pass header X-Write-Key" "hint": "set WRITE_API_KEY (or legacy WRITE_KEY) and pass header X-Write-Key"
})), })),
)); ));
} }

View File

@ -58,18 +58,47 @@ impl TreeNode {
#[derive(Debug, Serialize, Clone)] #[derive(Debug, Serialize, Clone)]
pub struct SourceWithStatus { pub struct SourceWithStatus {
#[serde(flatten)] #[serde(flatten)]
pub source: Source, pub source: SourcePublic,
pub is_connected: bool, pub is_connected: bool,
pub last_error: Option<String>, pub last_error: Option<String>,
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")] #[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
pub last_time: Option<DateTime<Utc>>, pub last_time: Option<DateTime<Utc>>,
} }
#[derive(Debug, Serialize, Clone)]
pub struct SourcePublic {
pub id: Uuid,
pub name: String,
pub protocol: String,
pub endpoint: String,
pub security_policy: Option<String>,
pub security_mode: Option<String>,
pub enabled: bool,
#[serde(serialize_with = "crate::util::datetime::utc_to_local_str")]
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "crate::util::datetime::utc_to_local_str")]
pub updated_at: DateTime<Utc>,
}
impl From<Source> for SourcePublic {
fn from(source: Source) -> Self {
Self {
id: source.id,
name: source.name,
protocol: source.protocol,
endpoint: source.endpoint,
security_policy: source.security_policy,
security_mode: source.security_mode,
enabled: source.enabled,
created_at: source.created_at,
updated_at: source.updated_at,
}
}
}
pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> { pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool; let pool = &state.pool;
let sources: Vec<Source> = sqlx::query_as( let sources: Vec<Source> = crate::service::get_all_enabled_sources(pool).await?;
r#"SELECT * FROM source where enabled is true"#,
).fetch_all(pool).await?;
// 获取所有连接状态 // 获取所有连接状态
let status_map: std::collections::HashMap<Uuid, (bool, Option<String>, Option<DateTime<Utc>>)> = let status_map: std::collections::HashMap<Uuid, (bool, Option<String>, Option<DateTime<Utc>>)> =
@ -87,7 +116,7 @@ pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoR
.map(|(connected, error, time)| (*connected, error.clone(), *time)) .map(|(connected, error, time)| (*connected, error.clone(), *time))
.unwrap_or((false, None, None)); .unwrap_or((false, None, None));
SourceWithStatus { SourceWithStatus {
source, source: source.into(),
is_connected, is_connected,
last_error, last_error,
last_time, last_time,
@ -196,7 +225,7 @@ pub async fn create_source(
.await?; .await?;
// 触发 SourceCreate 事件 // 触发 SourceCreate 事件
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceCreate { source_id: new_id }); let _ = state.event_manager.send(crate::event::AppEvent::SourceCreate { source_id: new_id });
Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id }))) Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id })))
} }
@ -271,7 +300,7 @@ pub async fn update_source(
qb.push(" WHERE id = ").push_bind(source_id); qb.push(" WHERE id = ").push_bind(source_id);
qb.build().execute(pool).await?; qb.build().execute(pool).await?;
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceUpdate { source_id }); let _ = state.event_manager.send(crate::event::AppEvent::SourceUpdate { source_id });
Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"}))) Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"})))
} }
@ -294,7 +323,7 @@ pub async fn delete_source(
} }
// 触发 SourceDelete 事件 // 触发 SourceDelete 事件
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceDelete { source_id }); let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id });
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
@ -444,159 +473,67 @@ async fn process_reference(
let display_name = ref_desc.display_name.text.to_string(); let display_name = ref_desc.display_name.text.to_string();
let node_class = format!("{:?}", ref_desc.node_class); let node_class = format!("{:?}", ref_desc.node_class);
let now = Utc::now(); let effective_parent_id = if let Some(pid) = parent_id {
let node_uuid = Uuid::new_v4(); let parent_exists = sqlx::query(r#"SELECT 1 FROM node WHERE id = $1"#)
.bind(pid)
// ?? 关键优化:直接 UPSERT避免 SELECT .fetch_optional(tx.as_mut())
// 注意:如果 parent_id 存在,则必须确保该父节点已存在于数据库中 .await?;
// 否则会触发外键约束失败 if parent_exists.is_some() {
if parent_id.is_some() { Some(pid)
// 检查父节点是否已存在于数据库中
let parent_exists = sqlx::query(
r#"SELECT 1 FROM node WHERE id = $1"#,
)
.bind(parent_id.unwrap())
.fetch_optional(tx.as_mut())
.await?;
if parent_exists.is_none() {
// 如果父节点不存在,则暂时不设置 parent_id
// 这样可以避免外键约束失败
sqlx::query(
r#"
INSERT INTO node (
id,
source_id,
external_id,
namespace_uri,
namespace_index,
identifier_type,
identifier,
browse_name,
display_name,
node_class,
parent_id,
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,NULL)
ON CONFLICT(source_id, external_id) DO UPDATE SET
namespace_uri = excluded.namespace_uri,
namespace_index = excluded.namespace_index,
identifier_type = excluded.identifier_type,
identifier = excluded.identifier,
browse_name = excluded.browse_name,
display_name = excluded.display_name,
node_class = excluded.node_class,
updated_at = NOW()
"#
)
.bind(node_uuid)
.bind(source_id)
.bind(&node_id_str)
.bind(&namespace_uri)
.bind(namespace_index.map(|v| v as i32))
.bind(&identifier_type)
.bind(&identifier)
.bind(&browse_name)
.bind(&display_name)
.bind(&node_class)
.execute(tx.as_mut())
.await
.context("Failed to execute UPSERT query")?;
} else { } else {
// 如果父节点存在,则正常设置 parent_id None
sqlx::query(
r#"
INSERT INTO node (
id,
source_id,
external_id,
namespace_uri,
namespace_index,
identifier_type,
identifier,
browse_name,
display_name,
node_class,
parent_id,
created_at,
updated_at
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,NOW(),NOW())
ON CONFLICT(source_id, external_id) DO UPDATE SET
namespace_uri = excluded.namespace_uri,
namespace_index = excluded.namespace_index,
identifier_type = excluded.identifier_type,
identifier = excluded.identifier,
browse_name = excluded.browse_name,
display_name = excluded.display_name,
node_class = excluded.node_class,
parent_id = excluded.parent_id,
updated_at = NOW()
"#
)
.bind(node_uuid)
.bind(source_id)
.bind(&node_id_str)
.bind(&namespace_uri)
.bind(namespace_index.map(|v| v as i32))
.bind(&identifier_type)
.bind(&identifier)
.bind(&browse_name)
.bind(&display_name)
.bind(&node_class)
.bind(parent_id)
.bind(now)
.bind(now)
.execute(tx.as_mut())
.await
.context("Failed to execute UPSERT query")?;
} }
} else { } else {
// 如果没有 parent_id则正常插入 None
sqlx::query( };
r#"
INSERT INTO node ( // Use RETURNING id so queue always carries the actual DB node id.
id, let persisted_node_id = sqlx::query_scalar::<_, Uuid>(
source_id, r#"
external_id, INSERT INTO node (
namespace_uri, id,
namespace_index, source_id,
identifier_type, external_id,
identifier, namespace_uri,
browse_name, namespace_index,
display_name, identifier_type,
node_class, identifier,
parent_id browse_name,
) display_name,
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,NULL) node_class,
ON CONFLICT(source_id, external_id) DO UPDATE SET parent_id
namespace_uri = excluded.namespace_uri,
namespace_index = excluded.namespace_index,
identifier_type = excluded.identifier_type,
identifier = excluded.identifier,
browse_name = excluded.browse_name,
display_name = excluded.display_name,
node_class = excluded.node_class,
updated_at = NOW()
"#
) )
.bind(node_uuid) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
.bind(source_id) ON CONFLICT(source_id, external_id) DO UPDATE SET
.bind(&node_id_str) namespace_uri = excluded.namespace_uri,
.bind(&namespace_uri) namespace_index = excluded.namespace_index,
.bind(namespace_index.map(|v| v as i32)) identifier_type = excluded.identifier_type,
.bind(&identifier_type) identifier = excluded.identifier,
.bind(&identifier) browse_name = excluded.browse_name,
.bind(&browse_name) display_name = excluded.display_name,
.bind(&display_name) node_class = excluded.node_class,
.bind(&node_class) parent_id = COALESCE(excluded.parent_id, node.parent_id),
.execute(tx.as_mut()) updated_at = NOW()
.await RETURNING id
.context("Failed to execute UPSERT query")?; "#,
} )
.bind(Uuid::new_v4())
.bind(source_id)
.bind(&node_id_str)
.bind(&namespace_uri)
.bind(namespace_index.map(|v| v as i32))
.bind(&identifier_type)
.bind(&identifier)
.bind(&browse_name)
.bind(&display_name)
.bind(&node_class)
.bind(effective_parent_id)
.fetch_one(tx.as_mut())
.await
.context("Failed to execute UPSERT query")?;
processed_nodes.insert(node_id_str.clone(), ()); processed_nodes.insert(node_id_str.clone(), ());
queue.push_back((node_id_obj.clone(), Some(node_uuid))); queue.push_back((node_id_obj.clone(), Some(persisted_node_id)));
Ok(()) Ok(())
} }

View File

@ -142,6 +142,8 @@ fn build_router(state: AppState) -> Router {
.route("/api/point/batch/set-tags", put(handler::point::batch_set_point_tags)) .route("/api/point/batch/set-tags", put(handler::point::batch_set_point_tags))
.route("/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag)) .route("/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag))
.route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag)) .route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag))
.route("/api/page", get(handler::page::get_page_list).post(handler::page::create_page))
.route("/api/page/{page_id}", get(handler::page::get_page).put(handler::page::update_page).delete(handler::page::delete_page))
.route("/api/logs", get(handler::log::get_logs)) .route("/api/logs", get(handler::log::get_logs))
.route("/api/logs/stream", get(handler::log::stream_logs)); .route("/api/logs/stream", get(handler::log::stream_logs));

View File

@ -1,6 +1,8 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::FromRow; use sqlx::FromRow;
use sqlx::types::Json;
use std::collections::HashMap;
use uuid::Uuid; use uuid::Uuid;
use crate::util::datetime::utc_to_local_str; use crate::util::datetime::utc_to_local_str;
@ -116,3 +118,14 @@ pub struct Tag {
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
} }
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct Page {
pub id: Uuid,
pub name: String,
pub data: Json<HashMap<String, Uuid>>,
#[serde(serialize_with = "utc_to_local_str")]
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "utc_to_local_str")]
pub updated_at: DateTime<Utc>,
}

View File

@ -17,6 +17,18 @@ pub async fn get_all_enabled_sources(pool: &PgPool) -> Result<Vec<Source>, sqlx:
.await .await
} }
pub async fn get_point_by_id(
pool: &PgPool,
point_id: uuid::Uuid,
) -> Result<Option<crate::model::Point>, sqlx::Error> {
query_as::<_, crate::model::Point>(
r#"SELECT * FROM point WHERE id = $1"#,
)
.bind(point_id)
.fetch_optional(pool)
.await
}
pub async fn get_points_grouped_by_source( pub async fn get_points_grouped_by_source(
pool: &PgPool, pool: &PgPool,
point_ids: &[uuid::Uuid], point_ids: &[uuid::Uuid],

View File

@ -68,13 +68,7 @@ impl IntoResponse for ApiErr {
impl From<Error> for ApiErr { impl From<Error> for ApiErr {
fn from(err: Error) -> Self { fn from(err: Error) -> Self {
tracing::error!("Error: {:?}; root_cause: {}", err, err.root_cause()); tracing::error!("Error: {:?}; root_cause: {}", err, err.root_cause());
ApiErr::Internal( ApiErr::Internal("internal server error".to_string(), None)
err.to_string(),
Some(serde_json::json!({
"root_cause": err.root_cause().to_string(),
"chain": err.chain().map(|e| e.to_string()).collect::<Vec<_>>()
})),
)
} }
} }