Compare commits

...

13 Commits

Author SHA1 Message Date
caoqianming d156108148 feat: 为 get_tag_list 接口添加分页功能 2026-03-04 15:19:39 +08:00
caoqianming f9284303f6 fix: 改进错误处理机制,支持查询参数反序列化和多种Axum响应类型转换 2026-03-04 14:11:59 +08:00
caoqianming 550c7b3974 修改migrate名称 2026-03-04 13:57:40 +08:00
caoqianming 606b57eb73 refactor: 优化点位列表查询功能 2026-03-04 13:39:06 +08:00
caoqianming fdb4f10ba4 feat: 添加通用的分页工具模块 2026-03-04 13:37:58 +08:00
caoqianming 3311823800 优化数据序列化:时间戳使用本地时间格式,简化 DataValue 输出 2026-03-04 12:41:14 +08:00
caoqianming 1ddb707a9b 优化 WebSocket 消息发送:移除不必要的 clone,注释掉 send_to_client 2026-03-04 12:35:22 +08:00
caoqianming 173814416f 修复潜在的死锁问题:将读锁的作用域限制在块内,避免在持有读锁时尝试获取写锁 2026-03-04 12:20:13 +08:00
caoqianming 6f62d753a5 修复编译警告:将 PollPointInfo 改为公开,移除未使用的字段 2026-03-04 11:26:38 +08:00
caoqianming 475ac02322 重构:将 poll_task_handles 和 poll_points 合并为一个双向映射,使用 PollPointInfo 结构 2026-03-04 11:24:13 +08:00
caoqianming b22225ad72 重构:将 poll_points_by_source 的内层 HashSet 移动到 ConnectionStatus 中,简化数据结构 2026-03-04 11:16:08 +08:00
caoqianming 4bb9bdd27d 优化 event.rs 中的克隆操作,减少 monitor 的克隆次数从 3 次减少到 1 次 2026-03-04 08:47:24 +08:00
caoqianming 562a2d566b feat: 为 PointMonitorInfo 添加旧值追踪和值变化检测 2026-03-04 08:35:28 +08:00
12 changed files with 542 additions and 112 deletions

157
Cargo.lock generated
View File

@ -467,8 +467,18 @@ version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
dependencies = [
"darling_core",
"darling_macro",
"darling_core 0.20.11",
"darling_macro 0.20.11",
]
[[package]]
name = "darling"
version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0"
dependencies = [
"darling_core 0.21.3",
"darling_macro 0.21.3",
]
[[package]]
@ -485,13 +495,38 @@ dependencies = [
"syn",
]
[[package]]
name = "darling_core"
version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core",
"darling_core 0.20.11",
"quote",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
dependencies = [
"darling_core 0.21.3",
"quote",
"syn",
]
@ -533,6 +568,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
dependencies = [
"powerfmt",
"serde_core",
]
[[package]]
@ -570,6 +606,12 @@ version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
[[package]]
name = "dyn-clone"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
[[package]]
name = "either"
version = "1.15.0"
@ -771,6 +813,7 @@ dependencies = [
"dotenv",
"serde",
"serde_json",
"serde_with",
"sqlx",
"time",
"tokio",
@ -838,6 +881,12 @@ dependencies = [
"wasip3",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.15.5"
@ -1122,6 +1171,17 @@ dependencies = [
"icu_properties",
]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
"serde",
]
[[package]]
name = "indexmap"
version = "2.13.0"
@ -1604,6 +1664,26 @@ dependencies = [
"bitflags",
]
[[package]]
name = "ref-cast"
version = "1.0.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d"
dependencies = [
"ref-cast-impl",
]
[[package]]
name = "ref-cast-impl"
version = "1.0.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "regex"
version = "1.12.3"
@ -1680,6 +1760,30 @@ version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
[[package]]
name = "schemars"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f"
dependencies = [
"dyn-clone",
"ref-cast",
"serde",
"serde_json",
]
[[package]]
name = "schemars"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc"
dependencies = [
"dyn-clone",
"ref-cast",
"serde",
"serde_json",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -1758,13 +1862,44 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_with"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "381b283ce7bc6b476d903296fb59d0d36633652b633b27f64db4fb46dcbfc3b9"
dependencies = [
"base64",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.13.0",
"schemars 0.9.0",
"schemars 1.2.1",
"serde_core",
"serde_json",
"serde_with_macros",
"time",
]
[[package]]
name = "serde_with_macros"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6d4e30573c8cb306ed6ab1dca8423eec9a463ea0e155f45399455e0368b27e0"
dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_yaml"
version = "0.9.34+deprecated"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
dependencies = [
"indexmap",
"indexmap 2.13.0",
"itoa",
"ryu",
"serde",
@ -1904,7 +2039,7 @@ dependencies = [
"futures-util",
"hashbrown 0.15.5",
"hashlink",
"indexmap",
"indexmap 2.13.0",
"log",
"memchr",
"once_cell",
@ -2558,7 +2693,7 @@ version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7df16e474ef958526d1205f6dda359fdfab79d9aa6d54bafcb92dcd07673dca"
dependencies = [
"darling",
"darling 0.20.11",
"once_cell",
"proc-macro-error2",
"proc-macro2",
@ -2676,7 +2811,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909"
dependencies = [
"anyhow",
"indexmap",
"indexmap 2.13.0",
"wasm-encoder",
"wasmparser",
]
@ -2689,7 +2824,7 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags",
"hashbrown 0.15.5",
"indexmap",
"indexmap 2.13.0",
"semver",
]
@ -3012,7 +3147,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21"
dependencies = [
"anyhow",
"heck",
"indexmap",
"indexmap 2.13.0",
"prettyplease",
"syn",
"wasm-metadata",
@ -3043,7 +3178,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags",
"indexmap",
"indexmap 2.13.0",
"log",
"serde",
"serde_derive",
@ -3062,7 +3197,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736"
dependencies = [
"anyhow",
"id-arena",
"indexmap",
"indexmap 2.13.0",
"log",
"semver",
"serde",

View File

@ -17,6 +17,7 @@ sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uu
# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_with = "3.0"
# Time handling
chrono = "0.4"

View File

@ -59,7 +59,31 @@ struct PointWriteTarget {
external_id: String,
}
#[derive(Clone)]
pub struct PollPointInfo {
handle: JoinHandle<()>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ConnectionStatusView {
pub is_connected: bool,
pub last_error: Option<String>,
pub last_time: DateTime<Utc>,
pub subscription_id: Option<u32>,
pub next_client_handle: u32,
}
impl From<&ConnectionStatus> for ConnectionStatusView {
fn from(status: &ConnectionStatus) -> Self {
Self {
is_connected: status.is_connected,
last_error: status.last_error.clone(),
last_time: status.last_time,
subscription_id: status.subscription_id,
next_client_handle: status.next_client_handle,
}
}
}
pub struct ConnectionStatus {
pub session: Option<Arc<Session>>,
pub is_connected: bool,
@ -69,6 +93,7 @@ pub struct ConnectionStatus {
pub next_client_handle: u32,
pub client_handle_map: HashMap<u32, Uuid>, // client_handle -> point_id
pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id
pub poll_points: HashMap<Uuid, PollPointInfo>, // 正在轮询的点集合
}
#[derive(Clone)]
@ -77,8 +102,6 @@ pub struct ConnectionManager {
point_monitor_data: Arc<RwLock<HashMap<Uuid, PointMonitorInfo>>>,
point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>,
point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>,
poll_task_handles: Arc<RwLock<HashMap<Uuid, JoinHandle<()>>>>,
poll_points_by_source: Arc<RwLock<HashMap<Uuid, HashSet<Uuid>>>>,
pool: Option<sqlx::PgPool>,
event_manager: Option<std::sync::Arc<crate::event::EventManager>>,
}
@ -136,8 +159,6 @@ impl ConnectionManager {
point_monitor_data: Arc::new(RwLock::new(HashMap::new())),
point_history_data: Arc::new(RwLock::new(HashMap::new())),
point_write_target_cache: Arc::new(RwLock::new(HashMap::new())),
poll_task_handles: Arc::new(RwLock::new(HashMap::new())),
poll_points_by_source: Arc::new(RwLock::new(HashMap::new())),
event_manager: None,
}
}
@ -218,9 +239,11 @@ impl ConnectionManager {
.ok_or_else(|| "Event manager is not initialized".to_string())?;
{
let poll_tasks = self.poll_task_handles.read().await;
if poll_tasks.contains_key(&point.point_id) {
return Ok(());
let status = self.status.read().await;
if let Some(conn_status) = status.get(&source_id) {
if conn_status.poll_points.contains_key(&point.point_id) {
return Ok(());
}
}
}
@ -296,60 +319,42 @@ impl ConnectionManager {
});
{
let mut poll_tasks = self.poll_task_handles.write().await;
poll_tasks.insert(point_id, handle);
}
{
let mut points_by_source = self.poll_points_by_source.write().await;
points_by_source
.entry(source_id)
.or_insert_with(HashSet::new)
.insert(point_id);
let mut status = self.status.write().await;
if let Some(conn_status) = status.get_mut(&source_id) {
conn_status.poll_points.insert(
point_id,
PollPointInfo { handle },
);
}
}
Ok(())
}
async fn stop_polling_for_point(&self, point_id: Uuid) {
if let Some(handle) = self.poll_task_handles.write().await.remove(&point_id) {
handle.abort();
}
let mut points_by_source = self.poll_points_by_source.write().await;
let source_ids: Vec<Uuid> = points_by_source
.iter()
.filter(|(_, point_set)| point_set.contains(&point_id))
.map(|(source_id, _)| *source_id)
.collect();
for source_id in source_ids {
if let Some(point_set) = points_by_source.get_mut(&source_id) {
point_set.remove(&point_id);
if point_set.is_empty() {
points_by_source.remove(&source_id);
}
let mut status = self.status.write().await;
for conn_status in status.values_mut() {
if let Some(poll_info) = conn_status.poll_points.remove(&point_id) {
poll_info.handle.abort();
}
}
}
async fn stop_polling_for_source(&self, source_id: Uuid) {
let point_ids = {
let mut points_by_source = self.poll_points_by_source.write().await;
points_by_source
.remove(&source_id)
.map(|set| set.into_iter().collect::<Vec<_>>())
let poll_infos = {
let mut status = self.status.write().await;
status
.get_mut(&source_id)
.map(|conn_status| conn_status.poll_points.drain().collect::<Vec<_>>())
.unwrap_or_default()
};
if point_ids.is_empty() {
if poll_infos.is_empty() {
return;
}
let mut poll_tasks = self.poll_task_handles.write().await;
for point_id in point_ids {
if let Some(handle) = poll_tasks.remove(&point_id) {
handle.abort();
}
for (_, poll_info) in poll_infos {
poll_info.handle.abort();
}
}
@ -496,6 +501,7 @@ impl ConnectionManager {
next_client_handle: 1000,
client_handle_map: HashMap::new(),
monitored_item_map: HashMap::new(),
poll_points: HashMap::new(),
},
);
@ -516,6 +522,7 @@ impl ConnectionManager {
client_handle_map: HashMap::new(),
monitored_item_map: HashMap::new(),
next_client_handle: 1000,
poll_points: HashMap::new(),
},
);
}
@ -577,14 +584,14 @@ impl ConnectionManager {
}
}
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatus> {
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> {
let status = self.status.read().await;
status.get(&source_id).cloned()
status.get(&source_id).map(ConnectionStatusView::from)
}
pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatus)> {
pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatusView)> {
let status = self.status.read().await;
status.iter().map(|(source_id, conn_status)| (*source_id, conn_status.clone())).collect()
status.iter().map(|(source_id, conn_status)| (*source_id, ConnectionStatusView::from(conn_status))).collect()
}
pub async fn write_point_values_batch(
@ -657,7 +664,7 @@ impl ConnectionManager {
let source_connected = self
.get_status(*source_id)
.await
.map(|s| s.is_connected && s.session.is_some())
.map(|s| s.is_connected)
.unwrap_or(false);
if !source_connected {
return Ok(Self::write_value_batch_result(

View File

@ -129,8 +129,21 @@ impl EventManager {
.get(&source_id)
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
};
if let Some(point_id) = point_id {
// 从缓存中读取旧值
let (old_value, old_timestamp, value_changed) = {
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
let old_monitor_info = monitor_data.get(&point_id);
if let Some(old_info) = old_monitor_info {
let changed = old_info.value != payload.value ||
old_info.timestamp != payload.timestamp;
(old_info.value.clone(), old_info.timestamp, changed)
} else {
(None, None, false)
}
};
let monitor = crate::telemetry::PointMonitorInfo {
protocol: payload.protocol.clone(),
source_id,
@ -142,25 +155,28 @@ impl EventManager {
value: payload.value.clone(),
value_type: payload.value_type.clone(),
value_text: payload.value_text.clone(),
old_value,
old_timestamp,
value_changed,
};
if let Err(e) = connection_manager.update_point_monitor_data(monitor.clone()).await {
// 只克隆一次 monitor减少内存分配
let monitor_clone = monitor.clone();
if let Err(e) = connection_manager.update_point_monitor_data(monitor_clone).await {
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
}
if let Some(ws_manager) = &ws_manager_clone {
let ws_message = crate::websocket::WsMessage::PointNewValue(
monitor.clone(),
);
if let Err(e) = ws_manager.send_to_public(ws_message.clone()).await {
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
if let Err(e) = ws_manager.send_to_public(ws_message).await {
tracing::error!("Failed to send WebSocket message to public room: {}", e);
}
if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
}
}
// 暂时注释掉 send_to_client因为现在信息只需发送到 public
// if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
// tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
// }
}
} else {
tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle);
}

View File

@ -4,7 +4,7 @@ use uuid::Uuid;
use validator::Validate;
use sqlx::Row;
use crate::util::response::ApiErr;
use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}};
use crate::{
AppState,
@ -12,9 +12,11 @@ use crate::{
};
/// List all points.
#[derive(Deserialize)]
#[derive(Deserialize, Validate)]
pub struct GetPointListQuery {
pub source_id: Option<Uuid>,
#[serde(flatten)]
pub pagination: PaginationParams,
}
#[derive(Serialize)]
@ -28,37 +30,26 @@ pub async fn get_point_list(
State(state): State<AppState>,
Query(query): Query<GetPointListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let pool = &state.pool;
let points: Vec<Point> = match query.source_id {
Some(source_id) => {
sqlx::query_as::<_, Point>(
r#"
SELECT p.*
FROM point p
INNER JOIN node n ON p.node_id = n.id
WHERE n.source_id = $1
ORDER BY p.created_at
"#,
)
.bind(source_id)
.fetch_all(pool)
.await?
}
None => {
sqlx::query_as::<_, Point>(
r#"SELECT * FROM point ORDER BY created_at"#,
)
.fetch_all(pool)
.await?
}
};
// 获取总数
let total = crate::service::get_points_count(pool, query.source_id).await?;
// 获取分页数据
let points = crate::service::get_points_paginated(
pool,
query.source_id,
query.pagination.page_size,
query.pagination.offset(),
).await?;
let monitor_guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let resp: Vec<PointWithMonitor> = points
let data: Vec<PointWithMonitor> = points
.into_iter()
.map(|point| {
let point_monitor = monitor_guard
@ -68,7 +59,9 @@ pub async fn get_point_list(
})
.collect();
Ok(Json(resp))
let response = PaginatedResponse::new(data, total, query.pagination.page, query.pagination.page_size);
Ok(Json(response))
}
/// Get a point by id.
pub async fn get_point(

View File

@ -1,17 +1,38 @@
use axum::{Json, extract::{Path, State}, http::StatusCode, response::IntoResponse};
use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse};
use serde::Deserialize;
use uuid::Uuid;
use validator::Validate;
use crate::util::response::ApiErr;
use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}};
use crate::{AppState};
/// 获取所有标签
#[derive(Deserialize, Validate)]
pub struct GetTagListQuery {
#[serde(flatten)]
pub pagination: PaginationParams,
}
pub async fn get_tag_list(
State(state): State<AppState>,
Query(query): Query<GetTagListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
let tags = crate::service::get_all_tags(&state.pool).await?;
Ok(Json(tags))
query.validate()?;
let pool = &state.pool;
// 获取总数
let total = crate::service::get_tags_count(pool).await?;
// 获取分页数据
let tags = crate::service::get_tags_paginated(
pool,
query.pagination.page_size,
query.pagination.offset(),
).await?;
let response = PaginatedResponse::new(tags, total, query.pagination.page, query.pagination.page_size);
Ok(Json(response))
}
/// 获取标签下的点位信息

View File

@ -115,19 +115,150 @@ pub async fn get_points_with_ids(
.collect())
}
// ==================== Point 相关服务函数 ====================
/// 获取点位总数
pub async fn get_points_count(
pool: &PgPool,
source_id: Option<uuid::Uuid>,
) -> Result<i64, sqlx::Error> {
match source_id {
Some(source_id) => {
sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*)
FROM point p
INNER JOIN node n ON p.node_id = n.id
WHERE n.source_id = $1
"#,
)
.bind(source_id)
.fetch_one(pool)
.await
}
None => {
sqlx::query_scalar::<_, i64>(
r#"SELECT COUNT(*) FROM point"#,
)
.fetch_one(pool)
.await
}
}
}
/// 获取分页点位列表
pub async fn get_points_paginated(
pool: &PgPool,
source_id: Option<uuid::Uuid>,
page_size: i32,
offset: u32,
) -> Result<Vec<crate::model::Point>, sqlx::Error> {
match source_id {
Some(source_id) => {
if page_size == 0 {
Ok(vec![])
} else if page_size == -1 {
sqlx::query_as::<_, crate::model::Point>(
r#"
SELECT p.*
FROM point p
INNER JOIN node n ON p.node_id = n.id
WHERE n.source_id = $1
ORDER BY p.created_at
"#,
)
.bind(source_id)
.fetch_all(pool)
.await
} else {
sqlx::query_as::<_, crate::model::Point>(
r#"
SELECT p.*
FROM point p
INNER JOIN node n ON p.node_id = n.id
WHERE n.source_id = $1
ORDER BY p.created_at
LIMIT $2 OFFSET $3
"#,
)
.bind(source_id)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
None => {
if page_size == 0 {
Ok(vec![])
} else if page_size == -1 {
sqlx::query_as::<_, crate::model::Point>(
r#"
SELECT * FROM point
ORDER BY created_at
"#,
)
.fetch_all(pool)
.await
} else {
sqlx::query_as::<_, crate::model::Point>(
r#"
SELECT * FROM point
ORDER BY created_at
LIMIT $1 OFFSET $2
"#,
)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
}
}
// ==================== Tag 相关服务函数 ====================
/// 获取所有标签
pub async fn get_all_tags(
/// 获取标签总数
pub async fn get_tags_count(
pool: &PgPool,
) -> Result<Vec<crate::model::Tag>, sqlx::Error> {
query_as::<_, crate::model::Tag>(
r#"SELECT * FROM tag ORDER BY created_at"#
) -> Result<i64, sqlx::Error> {
sqlx::query_scalar::<_, i64>(
r#"SELECT COUNT(*) FROM tag"#,
)
.fetch_all(pool)
.fetch_one(pool)
.await
}
/// 获取分页标签列表
pub async fn get_tags_paginated(
pool: &PgPool,
page_size: i32,
offset: u32,
) -> Result<Vec<crate::model::Tag>, sqlx::Error> {
if page_size == 0 {
Ok(vec![])
} else if page_size == -1 {
sqlx::query_as::<_, crate::model::Tag>(
r#"SELECT * FROM tag ORDER BY created_at"#,
)
.fetch_all(pool)
.await
} else {
sqlx::query_as::<_, crate::model::Tag>(
r#"
SELECT * FROM tag
ORDER BY created_at
LIMIT $1 OFFSET $2
"#,
)
.bind(page_size)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
/// 根据ID获取标签
pub async fn get_tag_by_id(
pool: &PgPool,

View File

@ -41,8 +41,8 @@ impl PointQuality {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum DataValue {
Null,
Bool(bool),
@ -80,11 +80,16 @@ pub struct PointMonitorInfo {
pub point_id: Uuid,
pub client_handle: u32,
pub scan_mode: ScanMode,
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
pub timestamp: Option<DateTime<Utc>>,
pub quality: PointQuality,
pub value: Option<DataValue>,
pub value_type: Option<ValueType>,
pub value_text: Option<String>,
pub old_value: Option<DataValue>,
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
pub old_timestamp: Option<DateTime<Utc>>,
pub value_changed: bool,
}
impl PointMonitorInfo {

View File

@ -1,4 +1,5 @@
pub mod datetime;
pub mod log;
pub mod pagination;
pub mod response;
pub mod validator;

61
src/util/pagination.rs Normal file
View File

@ -0,0 +1,61 @@
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use validator::Validate;
/// 分页响应结构
#[derive(Serialize)]
pub struct PaginatedResponse<T> {
pub data: Vec<T>,
pub total: i64,
pub page: u32,
pub page_size: i32,
pub total_pages: u32,
}
impl<T> PaginatedResponse<T> {
/// 创建分页响应
pub fn new(data: Vec<T>, total: i64, page: u32, page_size: i32) -> Self {
let total_pages = if page_size > 0 {
((total as f64) / (page_size as f64)).ceil() as u32
} else {
0
};
Self {
data,
total,
page,
page_size,
total_pages,
}
}
}
/// 分页查询参数
#[serde_as]
#[derive(Deserialize, Validate)]
pub struct PaginationParams {
#[validate(range(min = 1))]
#[serde_as(as = "serde_with::DisplayFromStr")]
#[serde(default = "default_page")]
pub page: u32,
#[validate(range(min = -1, max = 100))]
#[serde_as(as = "serde_with::DisplayFromStr")]
#[serde(default = "default_page_size")]
pub page_size: i32,
}
fn default_page() -> u32 {
1
}
fn default_page_size() -> i32 {
20
}
impl PaginationParams {
/// 计算偏移量
pub fn offset(&self) -> u32 {
(self.page - 1) * self.page_size.max(0) as u32
}
}

View File

@ -1,5 +1,15 @@
use anyhow::Error;
use axum::{Json, http::StatusCode, response::IntoResponse};
use axum::{
Json,
http::StatusCode,
response::IntoResponse,
extract::rejection::{
QueryRejection,
PathRejection,
JsonRejection,
FormRejection,
},
};
use serde::Serialize;
use serde_json::Value;
use sqlx::Error as SqlxError;
@ -89,3 +99,52 @@ impl From<SqlxError> for ApiErr {
}
}
}
impl From<QueryRejection> for ApiErr {
fn from(rejection: QueryRejection) -> Self {
tracing::warn!("Query parameter error: {}", rejection);
ApiErr::BadRequest(
"Invalid query parameters".to_string(),
Some(serde_json::json!({
"detail": rejection.to_string()
}))
)
}
}
impl From<PathRejection> for ApiErr {
fn from(rejection: PathRejection) -> Self {
tracing::warn!("Path parameter error: {}", rejection);
ApiErr::BadRequest(
"Invalid path parameter".to_string(),
Some(serde_json::json!({
"detail": rejection.to_string()
}))
)
}
}
impl From<JsonRejection> for ApiErr {
fn from(rejection: JsonRejection) -> Self {
tracing::warn!("JSON parsing error: {}", rejection);
ApiErr::BadRequest(
"Invalid JSON format".to_string(),
Some(serde_json::json!({
"detail": rejection.to_string()
}))
)
}
}
impl From<FormRejection> for ApiErr {
fn from(rejection: FormRejection) -> Self {
tracing::warn!("Form data error: {}", rejection);
ApiErr::BadRequest(
"Invalid form data".to_string(),
Some(serde_json::json!({
"detail": rejection.to_string()
}))
)
}
}