Compare commits

..

4 Commits

Author SHA1 Message Date
caoqianming 503aefc4cb refactor(event): rename ReloadEvent to AppEvent and split event channels
Clarify event semantics by renaming ReloadEvent to AppEvent and route control vs telemetry traffic through dedicated channels. This keeps control events isolated from high-frequency PointNewValue updates while preserving the existing send() call pattern.

Made-with: Cursor
2026-03-13 14:44:30 +08:00
caoqianming 5fa63ad6dd fix(opcua): stabilize reconnect loop and coalesce telemetry events
Always clear reconnect-in-progress markers after reconnect attempts so heartbeat-triggered retries are not blocked. Reduce high-frequency event overhead by coalescing consecutive point updates in the event worker and processing only the latest value per source/client handle.

Made-with: Cursor
2026-03-13 14:26:50 +08:00
caoqianming 5406568969 fix: harden event handling and source safety
Improve runtime resilience by bounding the reload event queue and processing telemetry updates without per-point spawned tasks. Also reduce security risk by sanitizing source responses, avoiding internal error detail leaks, and standardizing write-key configuration with backward compatibility.

Made-with: Cursor
2026-03-13 14:22:16 +08:00
caoqianming 6f215162a3 feat(page): add page table and CRUD handlers 2026-03-11 13:54:14 +08:00
13 changed files with 542 additions and 356 deletions

View File

@ -12,7 +12,7 @@ axum = { version = "0.8", features = ["ws"] }
tower-http = { version = "0.6", features = ["cors", "fs"] }
# Database
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "json"] }
# Serialization
serde = { version = "1.0", features = ["derive"] }

View File

@ -0,0 +1,11 @@
-- Page table for UI bindings
CREATE TABLE page (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Common indexes
CREATE INDEX idx_page_name ON page(name);

View File

@ -17,7 +17,10 @@ impl AppConfig {
.unwrap_or_else(|_| "60309".to_string())
.parse::<u16>()
.map_err(|_| "PORT must be a number")?;
let write_api_key = env::var("WRITE_KEY").ok();
// Prefer WRITE_API_KEY, keep WRITE_KEY as backward-compatible fallback.
let write_api_key = env::var("WRITE_API_KEY")
.ok()
.or_else(|| env::var("WRITE_KEY").ok());
Ok(Self {
database_url,

View File

@ -436,7 +436,7 @@ impl ConnectionManager {
.map(crate::telemetry::PointQuality::from_status_code)
.unwrap_or(crate::telemetry::PointQuality::Unknown);
let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue(
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue {
source_id,
point_id: Some(poll_point.point_id),
@ -702,11 +702,9 @@ impl ConnectionManager {
// 再重新连接
let result = self.connect_from_source(pool, source_id).await;
// 清除重连标记
if result.is_ok() {
let mut reconnecting = self.reconnecting.write().await;
reconnecting.remove(&source_id);
}
// 无论成功还是失败都清除重连标记,以便心跳检测到问题后可以再次触发重连
let mut reconnecting = self.reconnecting.write().await;
reconnecting.remove(&source_id);
result
}
@ -988,7 +986,7 @@ impl ConnectionManager {
// Emit local updates only when the full batch succeeds.
if let Some(event_manager) = &self.event_manager {
for (source_id, point_id, variant) in success_events {
if let Err(e) = event_manager.send(crate::event::ReloadEvent::PointNewValue(
if let Err(e) = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue {
source_id,
point_id: Some(point_id),
@ -1128,7 +1126,7 @@ impl ConnectionManager {
.unwrap_or(crate::telemetry::PointQuality::Unknown);
if let Some(event_manager) = &data_manager.event_manager {
let _ = event_manager.send(crate::event::ReloadEvent::PointNewValue(
let _ = event_manager.send(crate::event::AppEvent::PointNewValue(
crate::telemetry::PointNewValue {
source_id: current_source_id,
point_id: None,

View File

@ -1,8 +1,12 @@
use std::collections::HashMap;
use tokio::sync::mpsc;
use uuid::Uuid;
const CONTROL_EVENT_CHANNEL_CAPACITY: usize = 1024;
const TELEMETRY_EVENT_CHANNEL_CAPACITY: usize = 4096;
#[derive(Debug, Clone)]
pub enum ReloadEvent {
pub enum AppEvent {
SourceCreate {
source_id: Uuid,
},
@ -24,7 +28,8 @@ pub enum ReloadEvent {
}
pub struct EventManager {
sender: mpsc::UnboundedSender<ReloadEvent>,
control_sender: mpsc::Sender<AppEvent>,
telemetry_sender: mpsc::Sender<crate::telemetry::PointNewValue>,
}
impl EventManager {
@ -33,153 +38,224 @@ impl EventManager {
connection_manager: std::sync::Arc<crate::connection::ConnectionManager>,
ws_manager: Option<std::sync::Arc<crate::websocket::WebSocketManager>>,
) -> Self {
let (sender, mut receiver) = mpsc::unbounded_channel::<ReloadEvent>();
let ws_manager_clone = ws_manager.clone();
let (control_sender, mut control_receiver) =
mpsc::channel::<AppEvent>(CONTROL_EVENT_CHANNEL_CAPACITY);
let (telemetry_sender, mut telemetry_receiver) =
mpsc::channel::<crate::telemetry::PointNewValue>(TELEMETRY_EVENT_CHANNEL_CAPACITY);
let control_cm = connection_manager.clone();
let control_pool = pool.clone();
tokio::spawn(async move {
// 在循环外克隆,避免在循环中移动
let connection_manager_clone = connection_manager.clone();
while let Some(event) = receiver.recv().await {
match event {
ReloadEvent::SourceCreate { source_id } => {
tracing::info!("Processing SourceCreate event for {}", source_id);
if let Err(e) = connection_manager_clone.connect_from_source(&pool, source_id).await {
tracing::error!("Failed to connect to source {}: {}", source_id, e);
while let Some(event) = control_receiver.recv().await {
handle_control_event(event, &control_pool, &control_cm).await;
}
});
let ws_manager_clone = ws_manager.clone();
let telemetry_cm = connection_manager.clone();
tokio::spawn(async move {
while let Some(payload) = telemetry_receiver.recv().await {
let mut latest_by_key: HashMap<(Uuid, u32), crate::telemetry::PointNewValue> =
HashMap::new();
latest_by_key.insert((payload.source_id, payload.client_handle), payload);
loop {
match telemetry_receiver.try_recv() {
Ok(next_payload) => {
latest_by_key.insert(
(next_payload.source_id, next_payload.client_handle),
next_payload,
);
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
break;
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
break;
}
}
ReloadEvent::SourceUpdate { source_id } => {
tracing::info!("Processing SourceUpdate event for {}", source_id);
if let Err(e) = connection_manager_clone.reconnect(&pool, source_id).await {
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
}
}
ReloadEvent::SourceDelete { source_id } => {
tracing::info!("Processing SourceDelete event for {}", source_id);
if let Err(e) = connection_manager_clone.disconnect(source_id).await {
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
}
}
ReloadEvent::PointCreateBatch { source_id, point_ids } => {
let requested_count = point_ids.len();
match connection_manager_clone
.subscribe_points_from_source(source_id, Some(point_ids), &pool)
.await
{
Ok(stats) => {
let subscribed = *stats.get("subscribed").unwrap_or(&0);
let polled = *stats.get("polled").unwrap_or(&0);
let total = *stats.get("total").unwrap_or(&0);
tracing::info!(
"PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}",
source_id,
requested_count,
subscribed,
polled,
total
);
}
Err(e) => {
tracing::error!("Failed to subscribe to points: {}", e);
}
}
}
ReloadEvent::PointDeleteBatch { source_id, point_ids } => {
tracing::info!(
"Processing PointDeleteBatch event for source {} with {} points",
source_id,
point_ids.len()
);
if let Err(e) = connection_manager_clone
.unsubscribe_points_from_source(source_id, point_ids)
.await
{
tracing::error!("Failed to unsubscribe points: {}", e);
}
}
ReloadEvent::PointNewValue(payload) => {
let source_id = payload.source_id;
let client_handle = payload.client_handle;
let point_id = if let Some(point_id) = payload.point_id {
Some(point_id)
} else {
let status = connection_manager_clone.get_status_read_guard().await;
status
.get(&source_id)
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
};
if let Some(point_id) = point_id {
// 从缓存中读取旧值
let (old_value, old_timestamp, value_changed) = {
let monitor_data = connection_manager_clone.get_point_monitor_data_read_guard().await;
let old_monitor_info = monitor_data.get(&point_id);
}
if let Some(old_info) = old_monitor_info {
let changed = old_info.value != payload.value ||
old_info.timestamp != payload.timestamp;
(old_info.value.clone(), old_info.timestamp, changed)
} else {
(None, None, false)
}
};
let monitor = crate::telemetry::PointMonitorInfo {
protocol: payload.protocol.clone(),
source_id,
point_id,
client_handle,
scan_mode: payload.scan_mode.clone(),
timestamp: payload.timestamp,
quality: payload.quality.clone(),
value: payload.value.clone(),
value_type: payload.value_type.clone(),
value_text: payload.value_text.clone(),
old_value,
old_timestamp,
value_changed,
};
// 克隆 monitor用于并行执行
let monitor_for_ws = monitor.clone();
let monitor_for_db = monitor.clone();
// 并行执行 update_point_monitor_data 和 send_to_public不等待完成
let cm_clone = connection_manager_clone.clone();
tokio::spawn(async move {
// 更新监控数据
if let Err(e) = cm_clone.update_point_monitor_data(monitor_for_db).await {
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
}
});
let ws_clone = ws_manager_clone.clone();
tokio::spawn(async move {
// 发送WebSocket消息
if let Some(ws_manager) = ws_clone {
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor_for_ws);
if let Err(e) = ws_manager.send_to_public(ws_message).await {
tracing::error!("Failed to send WebSocket message to public room: {}", e);
}
// 暂时注释掉 send_to_client因为现在信息只需发送到 public
// if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
// tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
// }
}
});
} else {
tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle);
}
}
for point_payload in latest_by_key.into_values() {
process_point_new_value(point_payload, &telemetry_cm, ws_manager_clone.as_ref())
.await;
}
}
});
Self { sender }
Self {
control_sender,
telemetry_sender,
}
}
pub fn send(&self, event: ReloadEvent) -> Result<(), String> {
self.sender
.send(event)
.map_err(|e| format!("Failed to send event: {}", e))
pub fn send(&self, event: AppEvent) -> Result<(), String> {
match event {
AppEvent::PointNewValue(payload) => match self.telemetry_sender.try_send(payload) {
Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
Err(format!("Failed to send telemetry event: channel closed ({e:?})"))
}
Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => {
// High-frequency telemetry is lossy by design under sustained pressure.
tracing::warn!(
"Dropping PointNewValue due to full telemetry queue: source={}, client_handle={}",
payload.source_id,
payload.client_handle
);
Ok(())
}
},
control_event => match self.control_sender.try_send(control_event) {
Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Closed(e)) => {
Err(format!("Failed to send control event: channel closed ({e:?})"))
}
Err(tokio::sync::mpsc::error::TrySendError::Full(e)) => {
Err(format!("Failed to send control event: queue full ({e:?})"))
}
},
}
}
}
async fn handle_control_event(
event: AppEvent,
pool: &sqlx::PgPool,
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
) {
match event {
AppEvent::SourceCreate { source_id } => {
tracing::info!("Processing SourceCreate event for {}", source_id);
if let Err(e) = connection_manager.connect_from_source(pool, source_id).await {
tracing::error!("Failed to connect to source {}: {}", source_id, e);
}
}
AppEvent::SourceUpdate { source_id } => {
tracing::info!("Processing SourceUpdate event for {}", source_id);
if let Err(e) = connection_manager.reconnect(pool, source_id).await {
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
}
}
AppEvent::SourceDelete { source_id } => {
tracing::info!("Processing SourceDelete event for {}", source_id);
if let Err(e) = connection_manager.disconnect(source_id).await {
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
}
}
AppEvent::PointCreateBatch { source_id, point_ids } => {
let requested_count = point_ids.len();
match connection_manager
.subscribe_points_from_source(source_id, Some(point_ids), pool)
.await
{
Ok(stats) => {
let subscribed = *stats.get("subscribed").unwrap_or(&0);
let polled = *stats.get("polled").unwrap_or(&0);
let total = *stats.get("total").unwrap_or(&0);
tracing::info!(
"PointCreateBatch subscribe finished for source {}: requested={}, subscribed={}, polled={}, total={}",
source_id,
requested_count,
subscribed,
polled,
total
);
}
Err(e) => {
tracing::error!("Failed to subscribe to points: {}", e);
}
}
}
AppEvent::PointDeleteBatch { source_id, point_ids } => {
tracing::info!(
"Processing PointDeleteBatch event for source {} with {} points",
source_id,
point_ids.len()
);
if let Err(e) = connection_manager
.unsubscribe_points_from_source(source_id, point_ids)
.await
{
tracing::error!("Failed to unsubscribe points: {}", e);
}
}
AppEvent::PointNewValue(_) => {
tracing::warn!("PointNewValue routed to control worker unexpectedly");
}
}
}
async fn process_point_new_value(
payload: crate::telemetry::PointNewValue,
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
) {
let source_id = payload.source_id;
let client_handle = payload.client_handle;
let point_id = if let Some(point_id) = payload.point_id {
Some(point_id)
} else {
let status = connection_manager.get_status_read_guard().await;
status
.get(&source_id)
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
};
if let Some(point_id) = point_id {
// 从缓存中读取旧值
let (old_value, old_timestamp, value_changed) = {
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
let old_monitor_info = monitor_data.get(&point_id);
if let Some(old_info) = old_monitor_info {
let changed = old_info.value != payload.value || old_info.timestamp != payload.timestamp;
(old_info.value.clone(), old_info.timestamp, changed)
} else {
(None, None, false)
}
};
let monitor = crate::telemetry::PointMonitorInfo {
protocol: payload.protocol,
source_id,
point_id,
client_handle,
scan_mode: payload.scan_mode,
timestamp: payload.timestamp,
quality: payload.quality,
value: payload.value,
value_type: payload.value_type,
value_text: payload.value_text,
old_value,
old_timestamp,
value_changed,
};
if let Err(e) = connection_manager
.update_point_monitor_data(monitor.clone())
.await
{
tracing::error!(
"Failed to update point monitor data for point {}: {}",
point_id,
e
);
}
if let Some(ws_manager) = ws_manager {
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
if let Err(e) = ws_manager.send_to_public(ws_message).await {
tracing::warn!(
"Failed to send WebSocket message to public room: {}",
e
);
}
}
} else {
tracing::warn!(
"Point not found for source {} client_handle {}",
source_id,
client_handle
);
}
}

View File

@ -2,3 +2,4 @@ pub mod source;
pub mod point;
pub mod tag;
pub mod log;
pub mod page;

169
src/handler/page.rs Normal file
View File

@ -0,0 +1,169 @@
use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse};
use serde::Deserialize;
use std::collections::HashMap;
use sqlx::types::Json as SqlxJson;
use uuid::Uuid;
use validator::Validate;
use crate::model::Page;
use crate::util::response::ApiErr;
use crate::AppState;
#[derive(Deserialize, Validate)]
pub struct GetPageListQuery {
#[validate(length(min = 1, max = 100))]
pub name: Option<String>,
}
pub async fn get_page_list(
State(state): State<AppState>,
Query(query): Query<GetPageListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.pool;
let pages: Vec<Page> = if let Some(name) = query.name {
sqlx::query_as::<_, Page>(
r#"
SELECT * FROM page
WHERE name ILIKE $1
ORDER BY created_at
"#,
)
.bind(format!("%{}%", name))
.fetch_all(pool)
.await?
} else {
sqlx::query_as::<_, Page>(
r#"SELECT * FROM page ORDER BY created_at"#,
)
.fetch_all(pool)
.await?
};
Ok(Json(pages))
}
pub async fn get_page(
State(state): State<AppState>,
Path(page_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let page = sqlx::query_as::<_, Page>("SELECT * FROM page WHERE id = $1")
.bind(page_id)
.fetch_optional(&state.pool)
.await?;
match page {
Some(p) => Ok(Json(p)),
None => Err(ApiErr::NotFound("Page not found".to_string(), None)),
}
}
#[derive(Debug, Deserialize, Validate)]
pub struct CreatePageReq {
#[validate(length(min = 1, max = 100))]
pub name: String,
pub data: HashMap<String, Uuid>,
}
#[derive(Debug, Deserialize, Validate)]
pub struct UpdatePageReq {
#[validate(length(min = 1, max = 100))]
pub name: Option<String>,
pub data: Option<HashMap<String, Uuid>>,
}
pub async fn create_page(
State(state): State<AppState>,
Json(payload): Json<CreatePageReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let page_id = sqlx::query_scalar::<_, Uuid>(
r#"
INSERT INTO page (name, data)
VALUES ($1, $2)
RETURNING id
"#,
)
.bind(&payload.name)
.bind(SqlxJson(payload.data))
.fetch_one(&state.pool)
.await?;
Ok((StatusCode::CREATED, Json(serde_json::json!({
"id": page_id,
"ok_msg": "Page created successfully"
}))))
}
pub async fn update_page(
State(state): State<AppState>,
Path(page_id): Path<Uuid>,
Json(payload): Json<UpdatePageReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let exists = sqlx::query("SELECT 1 FROM page WHERE id = $1")
.bind(page_id)
.fetch_optional(&state.pool)
.await?;
if exists.is_none() {
return Err(ApiErr::NotFound("Page not found".to_string(), None));
}
if payload.name.is_none() && payload.data.is_none() {
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
let mut updates = Vec::new();
let mut param_count = 1;
if payload.name.is_some() {
updates.push(format!("name = ${}", param_count));
param_count += 1;
}
if payload.data.is_some() {
updates.push(format!("data = ${}", param_count));
param_count += 1;
}
updates.push("updated_at = NOW()".to_string());
let sql = format!(
r#"UPDATE page SET {} WHERE id = ${}"#,
updates.join(", "),
param_count
);
let mut query = sqlx::query(&sql);
if let Some(name) = payload.name {
query = query.bind(name);
}
if let Some(data) = payload.data {
query = query.bind(SqlxJson(data));
}
query = query.bind(page_id);
query.execute(&state.pool).await?;
Ok(Json(serde_json::json!({
"ok_msg": "Page updated successfully"
})))
}
pub async fn delete_page(
State(state): State<AppState>,
Path(page_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let result = sqlx::query("DELETE FROM page WHERE id = $1")
.bind(page_id)
.execute(&state.pool)
.await?;
if result.rows_affected() == 0 {
return Err(ApiErr::NotFound("Page not found".to_string(), None));
}
Ok(StatusCode::NO_CONTENT)
}

View File

@ -2,7 +2,7 @@ use axum::{Json, extract::{Path, Query, State}, http::HeaderMap, response::IntoR
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use validator::Validate;
use sqlx::Row;
use sqlx::{Row, QueryBuilder};
use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}};
@ -69,12 +69,7 @@ pub async fn get_point(
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let point = sqlx::query_as::<_, Point>(
r#"SELECT * FROM point WHERE id = $1"#,
)
.bind(point_id)
.fetch_optional(pool)
.await?;
let point = crate::service::get_point_by_id(pool, point_id).await?;
Ok(Json(point))
}
@ -136,51 +131,26 @@ pub async fn update_point(
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
// Build dynamic UPDATE SQL for provided fields.
let mut updates = Vec::new();
let mut values: Vec<String> = Vec::new();
let mut param_count = 1;
let mut qb = QueryBuilder::new("UPDATE point SET ");
let mut sep = qb.separated(", ");
if let Some(name) = &payload.name {
updates.push(format!("name = ${}", param_count));
values.push(name.clone());
param_count += 1;
sep.push("name = ").push_bind(name);
}
if let Some(description) = &payload.description {
updates.push(format!("description = ${}", param_count));
values.push(description.clone());
param_count += 1;
sep.push("description = ").push_bind(description);
}
if let Some(unit) = &payload.unit {
updates.push(format!("unit = ${}", param_count));
values.push(unit.clone());
param_count += 1;
sep.push("unit = ").push_bind(unit);
}
if let Some(tag_id) = &payload.tag_id {
updates.push(format!("tag_id = ${}", param_count));
values.push(tag_id.to_string());
param_count += 1;
sep.push("tag_id = ").push_bind(tag_id);
}
// Always update timestamp.
updates.push("updated_at = NOW()".to_string());
sep.push("updated_at = NOW()");
let sql = format!(
"UPDATE point SET {} WHERE id = ${}",
updates.join(", "),
param_count
);
values.push(point_id.to_string());
let mut query = sqlx::query(&sql);
for value in &values {
query = query.bind(value);
}
query.execute(pool).await?;
qb.push(" WHERE id = ").push_bind(point_id);
qb.build().execute(pool).await?;
Ok(Json(serde_json::json!({"ok_msg": "Point updated successfully"})))
}
@ -277,7 +247,7 @@ pub async fn delete_point(
if let Some(source_id) = source_id {
if let Err(e) = state
.event_manager
.send(crate::event::ReloadEvent::PointDeleteBatch {
.send(crate::event::AppEvent::PointDeleteBatch {
source_id,
point_ids: vec![point_id],
})
@ -395,7 +365,7 @@ pub async fn batch_create_points(
let point_ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state
.event_manager
.send(crate::event::ReloadEvent::PointCreateBatch { source_id, point_ids })
.send(crate::event::AppEvent::PointCreateBatch { source_id, point_ids })
{
tracing::error!("Failed to send PointCreateBatch event: {}", e);
}
@ -457,7 +427,7 @@ pub async fn batch_delete_points(
let ids: Vec<Uuid> = points.into_iter().map(|p| p.point_id).collect();
if let Err(e) = state
.event_manager
.send(crate::event::ReloadEvent::PointDeleteBatch {
.send(crate::event::AppEvent::PointDeleteBatch {
source_id,
point_ids: ids,
})
@ -489,7 +459,7 @@ pub async fn batch_set_point_value(
return Err(ApiErr::Forbidden(
"write permission denied".to_string(),
Some(serde_json::json!({
"hint": "set WRITE_API_KEY and pass header X-Write-Key"
"hint": "set WRITE_API_KEY (or legacy WRITE_KEY) and pass header X-Write-Key"
})),
));
}

View File

@ -58,18 +58,47 @@ impl TreeNode {
#[derive(Debug, Serialize, Clone)]
pub struct SourceWithStatus {
#[serde(flatten)]
pub source: Source,
pub source: SourcePublic,
pub is_connected: bool,
pub last_error: Option<String>,
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
pub last_time: Option<DateTime<Utc>>,
}
#[derive(Debug, Serialize, Clone)]
pub struct SourcePublic {
pub id: Uuid,
pub name: String,
pub protocol: String,
pub endpoint: String,
pub security_policy: Option<String>,
pub security_mode: Option<String>,
pub enabled: bool,
#[serde(serialize_with = "crate::util::datetime::utc_to_local_str")]
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "crate::util::datetime::utc_to_local_str")]
pub updated_at: DateTime<Utc>,
}
impl From<Source> for SourcePublic {
fn from(source: Source) -> Self {
Self {
id: source.id,
name: source.name,
protocol: source.protocol,
endpoint: source.endpoint,
security_policy: source.security_policy,
security_mode: source.security_mode,
enabled: source.enabled,
created_at: source.created_at,
updated_at: source.updated_at,
}
}
}
pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let sources: Vec<Source> = sqlx::query_as(
r#"SELECT * FROM source where enabled is true"#,
).fetch_all(pool).await?;
let sources: Vec<Source> = crate::service::get_all_enabled_sources(pool).await?;
// 获取所有连接状态
let status_map: std::collections::HashMap<Uuid, (bool, Option<String>, Option<DateTime<Utc>>)> =
@ -87,7 +116,7 @@ pub async fn get_source_list(State(state): State<AppState>) -> Result<impl IntoR
.map(|(connected, error, time)| (*connected, error.clone(), *time))
.unwrap_or((false, None, None));
SourceWithStatus {
source,
source: source.into(),
is_connected,
last_error,
last_time,
@ -196,7 +225,7 @@ pub async fn create_source(
.await?;
// 触发 SourceCreate 事件
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceCreate { source_id: new_id });
let _ = state.event_manager.send(crate::event::AppEvent::SourceCreate { source_id: new_id });
Ok((StatusCode::CREATED, Json(CreateSourceRes { id: new_id })))
}
@ -271,7 +300,7 @@ pub async fn update_source(
qb.push(" WHERE id = ").push_bind(source_id);
qb.build().execute(pool).await?;
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceUpdate { source_id });
let _ = state.event_manager.send(crate::event::AppEvent::SourceUpdate { source_id });
Ok(Json(serde_json::json!({"ok_msg": "Source updated successfully"})))
}
@ -294,7 +323,7 @@ pub async fn delete_source(
}
// 触发 SourceDelete 事件
let _ = state.event_manager.send(crate::event::ReloadEvent::SourceDelete { source_id });
let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id });
Ok(StatusCode::NO_CONTENT)
}
@ -444,159 +473,67 @@ async fn process_reference(
let display_name = ref_desc.display_name.text.to_string();
let node_class = format!("{:?}", ref_desc.node_class);
let now = Utc::now();
let node_uuid = Uuid::new_v4();
// ?? 关键优化:直接 UPSERT避免 SELECT
// 注意:如果 parent_id 存在,则必须确保该父节点已存在于数据库中
// 否则会触发外键约束失败
if parent_id.is_some() {
// 检查父节点是否已存在于数据库中
let parent_exists = sqlx::query(
r#"SELECT 1 FROM node WHERE id = $1"#,
)
.bind(parent_id.unwrap())
.fetch_optional(tx.as_mut())
.await?;
if parent_exists.is_none() {
// 如果父节点不存在,则暂时不设置 parent_id
// 这样可以避免外键约束失败
sqlx::query(
r#"
INSERT INTO node (
id,
source_id,
external_id,
namespace_uri,
namespace_index,
identifier_type,
identifier,
browse_name,
display_name,
node_class,
parent_id,
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,NULL)
ON CONFLICT(source_id, external_id) DO UPDATE SET
namespace_uri = excluded.namespace_uri,
namespace_index = excluded.namespace_index,
identifier_type = excluded.identifier_type,
identifier = excluded.identifier,
browse_name = excluded.browse_name,
display_name = excluded.display_name,
node_class = excluded.node_class,
updated_at = NOW()
"#
)
.bind(node_uuid)
.bind(source_id)
.bind(&node_id_str)
.bind(&namespace_uri)
.bind(namespace_index.map(|v| v as i32))
.bind(&identifier_type)
.bind(&identifier)
.bind(&browse_name)
.bind(&display_name)
.bind(&node_class)
.execute(tx.as_mut())
.await
.context("Failed to execute UPSERT query")?;
let effective_parent_id = if let Some(pid) = parent_id {
let parent_exists = sqlx::query(r#"SELECT 1 FROM node WHERE id = $1"#)
.bind(pid)
.fetch_optional(tx.as_mut())
.await?;
if parent_exists.is_some() {
Some(pid)
} else {
// 如果父节点存在,则正常设置 parent_id
sqlx::query(
r#"
INSERT INTO node (
id,
source_id,
external_id,
namespace_uri,
namespace_index,
identifier_type,
identifier,
browse_name,
display_name,
node_class,
parent_id,
created_at,
updated_at
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,NOW(),NOW())
ON CONFLICT(source_id, external_id) DO UPDATE SET
namespace_uri = excluded.namespace_uri,
namespace_index = excluded.namespace_index,
identifier_type = excluded.identifier_type,
identifier = excluded.identifier,
browse_name = excluded.browse_name,
display_name = excluded.display_name,
node_class = excluded.node_class,
parent_id = excluded.parent_id,
updated_at = NOW()
"#
)
.bind(node_uuid)
.bind(source_id)
.bind(&node_id_str)
.bind(&namespace_uri)
.bind(namespace_index.map(|v| v as i32))
.bind(&identifier_type)
.bind(&identifier)
.bind(&browse_name)
.bind(&display_name)
.bind(&node_class)
.bind(parent_id)
.bind(now)
.bind(now)
.execute(tx.as_mut())
.await
.context("Failed to execute UPSERT query")?;
None
}
} else {
// 如果没有 parent_id则正常插入
sqlx::query(
r#"
INSERT INTO node (
id,
source_id,
external_id,
namespace_uri,
namespace_index,
identifier_type,
identifier,
browse_name,
display_name,
node_class,
parent_id
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,NULL)
ON CONFLICT(source_id, external_id) DO UPDATE SET
namespace_uri = excluded.namespace_uri,
namespace_index = excluded.namespace_index,
identifier_type = excluded.identifier_type,
identifier = excluded.identifier,
browse_name = excluded.browse_name,
display_name = excluded.display_name,
node_class = excluded.node_class,
updated_at = NOW()
"#
None
};
// Use RETURNING id so queue always carries the actual DB node id.
let persisted_node_id = sqlx::query_scalar::<_, Uuid>(
r#"
INSERT INTO node (
id,
source_id,
external_id,
namespace_uri,
namespace_index,
identifier_type,
identifier,
browse_name,
display_name,
node_class,
parent_id
)
.bind(node_uuid)
.bind(source_id)
.bind(&node_id_str)
.bind(&namespace_uri)
.bind(namespace_index.map(|v| v as i32))
.bind(&identifier_type)
.bind(&identifier)
.bind(&browse_name)
.bind(&display_name)
.bind(&node_class)
.execute(tx.as_mut())
.await
.context("Failed to execute UPSERT query")?;
}
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT(source_id, external_id) DO UPDATE SET
namespace_uri = excluded.namespace_uri,
namespace_index = excluded.namespace_index,
identifier_type = excluded.identifier_type,
identifier = excluded.identifier,
browse_name = excluded.browse_name,
display_name = excluded.display_name,
node_class = excluded.node_class,
parent_id = COALESCE(excluded.parent_id, node.parent_id),
updated_at = NOW()
RETURNING id
"#,
)
.bind(Uuid::new_v4())
.bind(source_id)
.bind(&node_id_str)
.bind(&namespace_uri)
.bind(namespace_index.map(|v| v as i32))
.bind(&identifier_type)
.bind(&identifier)
.bind(&browse_name)
.bind(&display_name)
.bind(&node_class)
.bind(effective_parent_id)
.fetch_one(tx.as_mut())
.await
.context("Failed to execute UPSERT query")?;
processed_nodes.insert(node_id_str.clone(), ());
queue.push_back((node_id_obj.clone(), Some(node_uuid)));
queue.push_back((node_id_obj.clone(), Some(persisted_node_id)));
Ok(())
}

View File

@ -142,6 +142,8 @@ fn build_router(state: AppState) -> Router {
.route("/api/point/batch/set-tags", put(handler::point::batch_set_point_tags))
.route("/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag))
.route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag))
.route("/api/page", get(handler::page::get_page_list).post(handler::page::create_page))
.route("/api/page/{page_id}", get(handler::page::get_page).put(handler::page::update_page).delete(handler::page::delete_page))
.route("/api/logs", get(handler::log::get_logs))
.route("/api/logs/stream", get(handler::log::stream_logs));

View File

@ -1,6 +1,8 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use sqlx::types::Json;
use std::collections::HashMap;
use uuid::Uuid;
use crate::util::datetime::utc_to_local_str;
@ -116,3 +118,14 @@ pub struct Tag {
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct Page {
pub id: Uuid,
pub name: String,
pub data: Json<HashMap<String, Uuid>>,
#[serde(serialize_with = "utc_to_local_str")]
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "utc_to_local_str")]
pub updated_at: DateTime<Utc>,
}

View File

@ -17,6 +17,18 @@ pub async fn get_all_enabled_sources(pool: &PgPool) -> Result<Vec<Source>, sqlx:
.await
}
pub async fn get_point_by_id(
pool: &PgPool,
point_id: uuid::Uuid,
) -> Result<Option<crate::model::Point>, sqlx::Error> {
query_as::<_, crate::model::Point>(
r#"SELECT * FROM point WHERE id = $1"#,
)
.bind(point_id)
.fetch_optional(pool)
.await
}
pub async fn get_points_grouped_by_source(
pool: &PgPool,
point_ids: &[uuid::Uuid],

View File

@ -68,13 +68,7 @@ impl IntoResponse for ApiErr {
impl From<Error> for ApiErr {
fn from(err: Error) -> Self {
tracing::error!("Error: {:?}; root_cause: {}", err, err.root_cause());
ApiErr::Internal(
err.to_string(),
Some(serde_json::json!({
"root_cause": err.root_cause().to_string(),
"chain": err.chain().map(|e| e.to_string()).collect::<Vec<_>>()
})),
)
ApiErr::Internal("internal server error".to_string(), None)
}
}