Compare commits
13 Commits
8127d04855
...
d156108148
| Author | SHA1 | Date |
|---|---|---|
|
|
d156108148 | |
|
|
f9284303f6 | |
|
|
550c7b3974 | |
|
|
606b57eb73 | |
|
|
fdb4f10ba4 | |
|
|
3311823800 | |
|
|
1ddb707a9b | |
|
|
173814416f | |
|
|
6f62d753a5 | |
|
|
475ac02322 | |
|
|
b22225ad72 | |
|
|
4bb9bdd27d | |
|
|
562a2d566b |
|
|
@ -467,8 +467,18 @@ version = "0.20.11"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
|
||||
dependencies = [
|
||||
"darling_core",
|
||||
"darling_macro",
|
||||
"darling_core 0.20.11",
|
||||
"darling_macro 0.20.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling"
|
||||
version = "0.21.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0"
|
||||
dependencies = [
|
||||
"darling_core 0.21.3",
|
||||
"darling_macro 0.21.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -485,13 +495,38 @@ dependencies = [
|
|||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling_core"
|
||||
version = "0.21.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4"
|
||||
dependencies = [
|
||||
"fnv",
|
||||
"ident_case",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"strsim",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling_macro"
|
||||
version = "0.20.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
|
||||
dependencies = [
|
||||
"darling_core",
|
||||
"darling_core 0.20.11",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling_macro"
|
||||
version = "0.21.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
|
||||
dependencies = [
|
||||
"darling_core 0.21.3",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
|
@ -533,6 +568,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
|
||||
dependencies = [
|
||||
"powerfmt",
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -570,6 +606,12 @@ version = "0.15.7"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
|
||||
|
||||
[[package]]
|
||||
name = "dyn-clone"
|
||||
version = "1.0.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.15.0"
|
||||
|
|
@ -771,6 +813,7 @@ dependencies = [
|
|||
"dotenv",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"sqlx",
|
||||
"time",
|
||||
"tokio",
|
||||
|
|
@ -838,6 +881,12 @@ dependencies = [
|
|||
"wasip3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.15.5"
|
||||
|
|
@ -1122,6 +1171,17 @@ dependencies = [
|
|||
"icu_properties",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown 0.12.3",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.13.0"
|
||||
|
|
@ -1604,6 +1664,26 @@ dependencies = [
|
|||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ref-cast"
|
||||
version = "1.0.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d"
|
||||
dependencies = [
|
||||
"ref-cast-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ref-cast-impl"
|
||||
version = "1.0.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.12.3"
|
||||
|
|
@ -1680,6 +1760,30 @@ version = "1.0.23"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
|
||||
|
||||
[[package]]
|
||||
name = "schemars"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f"
|
||||
dependencies = [
|
||||
"dyn-clone",
|
||||
"ref-cast",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schemars"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc"
|
||||
dependencies = [
|
||||
"dyn-clone",
|
||||
"ref-cast",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
|
|
@ -1758,13 +1862,44 @@ dependencies = [
|
|||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_with"
|
||||
version = "3.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "381b283ce7bc6b476d903296fb59d0d36633652b633b27f64db4fb46dcbfc3b9"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"chrono",
|
||||
"hex",
|
||||
"indexmap 1.9.3",
|
||||
"indexmap 2.13.0",
|
||||
"schemars 0.9.0",
|
||||
"schemars 1.2.1",
|
||||
"serde_core",
|
||||
"serde_json",
|
||||
"serde_with_macros",
|
||||
"time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_with_macros"
|
||||
version = "3.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6d4e30573c8cb306ed6ab1dca8423eec9a463ea0e155f45399455e0368b27e0"
|
||||
dependencies = [
|
||||
"darling 0.21.3",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_yaml"
|
||||
version = "0.9.34+deprecated"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"indexmap 2.13.0",
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde",
|
||||
|
|
@ -1904,7 +2039,7 @@ dependencies = [
|
|||
"futures-util",
|
||||
"hashbrown 0.15.5",
|
||||
"hashlink",
|
||||
"indexmap",
|
||||
"indexmap 2.13.0",
|
||||
"log",
|
||||
"memchr",
|
||||
"once_cell",
|
||||
|
|
@ -2558,7 +2693,7 @@ version = "0.20.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b7df16e474ef958526d1205f6dda359fdfab79d9aa6d54bafcb92dcd07673dca"
|
||||
dependencies = [
|
||||
"darling",
|
||||
"darling 0.20.11",
|
||||
"once_cell",
|
||||
"proc-macro-error2",
|
||||
"proc-macro2",
|
||||
|
|
@ -2676,7 +2811,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"indexmap",
|
||||
"indexmap 2.13.0",
|
||||
"wasm-encoder",
|
||||
"wasmparser",
|
||||
]
|
||||
|
|
@ -2689,7 +2824,7 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
|
|||
dependencies = [
|
||||
"bitflags",
|
||||
"hashbrown 0.15.5",
|
||||
"indexmap",
|
||||
"indexmap 2.13.0",
|
||||
"semver",
|
||||
]
|
||||
|
||||
|
|
@ -3012,7 +3147,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"heck",
|
||||
"indexmap",
|
||||
"indexmap 2.13.0",
|
||||
"prettyplease",
|
||||
"syn",
|
||||
"wasm-metadata",
|
||||
|
|
@ -3043,7 +3178,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"bitflags",
|
||||
"indexmap",
|
||||
"indexmap 2.13.0",
|
||||
"log",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
|
|
@ -3062,7 +3197,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"id-arena",
|
||||
"indexmap",
|
||||
"indexmap 2.13.0",
|
||||
"log",
|
||||
"semver",
|
||||
"serde",
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uu
|
|||
# Serialization
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
serde_with = "3.0"
|
||||
|
||||
# Time handling
|
||||
chrono = "0.4"
|
||||
|
|
|
|||
|
|
@ -59,7 +59,31 @@ struct PointWriteTarget {
|
|||
external_id: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PollPointInfo {
|
||||
handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct ConnectionStatusView {
|
||||
pub is_connected: bool,
|
||||
pub last_error: Option<String>,
|
||||
pub last_time: DateTime<Utc>,
|
||||
pub subscription_id: Option<u32>,
|
||||
pub next_client_handle: u32,
|
||||
}
|
||||
|
||||
impl From<&ConnectionStatus> for ConnectionStatusView {
|
||||
fn from(status: &ConnectionStatus) -> Self {
|
||||
Self {
|
||||
is_connected: status.is_connected,
|
||||
last_error: status.last_error.clone(),
|
||||
last_time: status.last_time,
|
||||
subscription_id: status.subscription_id,
|
||||
next_client_handle: status.next_client_handle,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectionStatus {
|
||||
pub session: Option<Arc<Session>>,
|
||||
pub is_connected: bool,
|
||||
|
|
@ -69,6 +93,7 @@ pub struct ConnectionStatus {
|
|||
pub next_client_handle: u32,
|
||||
pub client_handle_map: HashMap<u32, Uuid>, // client_handle -> point_id
|
||||
pub monitored_item_map: HashMap<Uuid, u32>, // point_id -> monitored_item_id
|
||||
pub poll_points: HashMap<Uuid, PollPointInfo>, // 正在轮询的点集合
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
|
@ -77,8 +102,6 @@ pub struct ConnectionManager {
|
|||
point_monitor_data: Arc<RwLock<HashMap<Uuid, PointMonitorInfo>>>,
|
||||
point_history_data: Arc<RwLock<HashMap<Uuid, VecDeque<PointMonitorInfo>>>>,
|
||||
point_write_target_cache: Arc<RwLock<HashMap<Uuid, PointWriteTarget>>>,
|
||||
poll_task_handles: Arc<RwLock<HashMap<Uuid, JoinHandle<()>>>>,
|
||||
poll_points_by_source: Arc<RwLock<HashMap<Uuid, HashSet<Uuid>>>>,
|
||||
pool: Option<sqlx::PgPool>,
|
||||
event_manager: Option<std::sync::Arc<crate::event::EventManager>>,
|
||||
}
|
||||
|
|
@ -136,8 +159,6 @@ impl ConnectionManager {
|
|||
point_monitor_data: Arc::new(RwLock::new(HashMap::new())),
|
||||
point_history_data: Arc::new(RwLock::new(HashMap::new())),
|
||||
point_write_target_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
poll_task_handles: Arc::new(RwLock::new(HashMap::new())),
|
||||
poll_points_by_source: Arc::new(RwLock::new(HashMap::new())),
|
||||
event_manager: None,
|
||||
}
|
||||
}
|
||||
|
|
@ -218,9 +239,11 @@ impl ConnectionManager {
|
|||
.ok_or_else(|| "Event manager is not initialized".to_string())?;
|
||||
|
||||
{
|
||||
let poll_tasks = self.poll_task_handles.read().await;
|
||||
if poll_tasks.contains_key(&point.point_id) {
|
||||
return Ok(());
|
||||
let status = self.status.read().await;
|
||||
if let Some(conn_status) = status.get(&source_id) {
|
||||
if conn_status.poll_points.contains_key(&point.point_id) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -296,60 +319,42 @@ impl ConnectionManager {
|
|||
});
|
||||
|
||||
{
|
||||
let mut poll_tasks = self.poll_task_handles.write().await;
|
||||
poll_tasks.insert(point_id, handle);
|
||||
}
|
||||
{
|
||||
let mut points_by_source = self.poll_points_by_source.write().await;
|
||||
points_by_source
|
||||
.entry(source_id)
|
||||
.or_insert_with(HashSet::new)
|
||||
.insert(point_id);
|
||||
let mut status = self.status.write().await;
|
||||
if let Some(conn_status) = status.get_mut(&source_id) {
|
||||
conn_status.poll_points.insert(
|
||||
point_id,
|
||||
PollPointInfo { handle },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop_polling_for_point(&self, point_id: Uuid) {
|
||||
if let Some(handle) = self.poll_task_handles.write().await.remove(&point_id) {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
let mut points_by_source = self.poll_points_by_source.write().await;
|
||||
let source_ids: Vec<Uuid> = points_by_source
|
||||
.iter()
|
||||
.filter(|(_, point_set)| point_set.contains(&point_id))
|
||||
.map(|(source_id, _)| *source_id)
|
||||
.collect();
|
||||
|
||||
for source_id in source_ids {
|
||||
if let Some(point_set) = points_by_source.get_mut(&source_id) {
|
||||
point_set.remove(&point_id);
|
||||
if point_set.is_empty() {
|
||||
points_by_source.remove(&source_id);
|
||||
}
|
||||
let mut status = self.status.write().await;
|
||||
for conn_status in status.values_mut() {
|
||||
if let Some(poll_info) = conn_status.poll_points.remove(&point_id) {
|
||||
poll_info.handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn stop_polling_for_source(&self, source_id: Uuid) {
|
||||
let point_ids = {
|
||||
let mut points_by_source = self.poll_points_by_source.write().await;
|
||||
points_by_source
|
||||
.remove(&source_id)
|
||||
.map(|set| set.into_iter().collect::<Vec<_>>())
|
||||
let poll_infos = {
|
||||
let mut status = self.status.write().await;
|
||||
status
|
||||
.get_mut(&source_id)
|
||||
.map(|conn_status| conn_status.poll_points.drain().collect::<Vec<_>>())
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
if point_ids.is_empty() {
|
||||
if poll_infos.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut poll_tasks = self.poll_task_handles.write().await;
|
||||
for point_id in point_ids {
|
||||
if let Some(handle) = poll_tasks.remove(&point_id) {
|
||||
handle.abort();
|
||||
}
|
||||
for (_, poll_info) in poll_infos {
|
||||
poll_info.handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -496,6 +501,7 @@ impl ConnectionManager {
|
|||
next_client_handle: 1000,
|
||||
client_handle_map: HashMap::new(),
|
||||
monitored_item_map: HashMap::new(),
|
||||
poll_points: HashMap::new(),
|
||||
},
|
||||
);
|
||||
|
||||
|
|
@ -516,6 +522,7 @@ impl ConnectionManager {
|
|||
client_handle_map: HashMap::new(),
|
||||
monitored_item_map: HashMap::new(),
|
||||
next_client_handle: 1000,
|
||||
poll_points: HashMap::new(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
|
@ -577,14 +584,14 @@ impl ConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatus> {
|
||||
pub async fn get_status(&self, source_id: Uuid) -> Option<ConnectionStatusView> {
|
||||
let status = self.status.read().await;
|
||||
status.get(&source_id).cloned()
|
||||
status.get(&source_id).map(ConnectionStatusView::from)
|
||||
}
|
||||
|
||||
pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatus)> {
|
||||
pub async fn get_all_status(&self) -> Vec<(Uuid, ConnectionStatusView)> {
|
||||
let status = self.status.read().await;
|
||||
status.iter().map(|(source_id, conn_status)| (*source_id, conn_status.clone())).collect()
|
||||
status.iter().map(|(source_id, conn_status)| (*source_id, ConnectionStatusView::from(conn_status))).collect()
|
||||
}
|
||||
|
||||
pub async fn write_point_values_batch(
|
||||
|
|
@ -657,7 +664,7 @@ impl ConnectionManager {
|
|||
let source_connected = self
|
||||
.get_status(*source_id)
|
||||
.await
|
||||
.map(|s| s.is_connected && s.session.is_some())
|
||||
.map(|s| s.is_connected)
|
||||
.unwrap_or(false);
|
||||
if !source_connected {
|
||||
return Ok(Self::write_value_batch_result(
|
||||
|
|
|
|||
38
src/event.rs
38
src/event.rs
|
|
@ -129,8 +129,21 @@ impl EventManager {
|
|||
.get(&source_id)
|
||||
.and_then(|s| s.client_handle_map.get(&client_handle).copied())
|
||||
};
|
||||
|
||||
if let Some(point_id) = point_id {
|
||||
// 从缓存中读取旧值
|
||||
let (old_value, old_timestamp, value_changed) = {
|
||||
let monitor_data = connection_manager.get_point_monitor_data_read_guard().await;
|
||||
let old_monitor_info = monitor_data.get(&point_id);
|
||||
|
||||
if let Some(old_info) = old_monitor_info {
|
||||
let changed = old_info.value != payload.value ||
|
||||
old_info.timestamp != payload.timestamp;
|
||||
(old_info.value.clone(), old_info.timestamp, changed)
|
||||
} else {
|
||||
(None, None, false)
|
||||
}
|
||||
};
|
||||
|
||||
let monitor = crate::telemetry::PointMonitorInfo {
|
||||
protocol: payload.protocol.clone(),
|
||||
source_id,
|
||||
|
|
@ -142,25 +155,28 @@ impl EventManager {
|
|||
value: payload.value.clone(),
|
||||
value_type: payload.value_type.clone(),
|
||||
value_text: payload.value_text.clone(),
|
||||
old_value,
|
||||
old_timestamp,
|
||||
value_changed,
|
||||
};
|
||||
|
||||
if let Err(e) = connection_manager.update_point_monitor_data(monitor.clone()).await {
|
||||
// 只克隆一次 monitor,减少内存分配
|
||||
let monitor_clone = monitor.clone();
|
||||
if let Err(e) = connection_manager.update_point_monitor_data(monitor_clone).await {
|
||||
tracing::error!("Failed to update point monitor data for point {}: {}", point_id, e);
|
||||
}
|
||||
|
||||
if let Some(ws_manager) = &ws_manager_clone {
|
||||
let ws_message = crate::websocket::WsMessage::PointNewValue(
|
||||
monitor.clone(),
|
||||
);
|
||||
|
||||
if let Err(e) = ws_manager.send_to_public(ws_message.clone()).await {
|
||||
let ws_message = crate::websocket::WsMessage::PointNewValue(monitor);
|
||||
if let Err(e) = ws_manager.send_to_public(ws_message).await {
|
||||
tracing::error!("Failed to send WebSocket message to public room: {}", e);
|
||||
}
|
||||
|
||||
if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
|
||||
tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
|
||||
}
|
||||
}
|
||||
// 暂时注释掉 send_to_client,因为现在信息只需发送到 public
|
||||
// if let Err(e) = ws_manager.send_to_client(point_id, ws_message).await {
|
||||
// tracing::error!("Failed to send WebSocket message to client room {}: {}", point_id, e);
|
||||
// }
|
||||
}
|
||||
} else {
|
||||
tracing::warn!("Point not found for source {} client_handle {}", source_id, client_handle);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use uuid::Uuid;
|
|||
use validator::Validate;
|
||||
use sqlx::Row;
|
||||
|
||||
use crate::util::response::ApiErr;
|
||||
use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}};
|
||||
|
||||
use crate::{
|
||||
AppState,
|
||||
|
|
@ -12,9 +12,11 @@ use crate::{
|
|||
};
|
||||
|
||||
/// List all points.
|
||||
#[derive(Deserialize)]
|
||||
#[derive(Deserialize, Validate)]
|
||||
pub struct GetPointListQuery {
|
||||
pub source_id: Option<Uuid>,
|
||||
#[serde(flatten)]
|
||||
pub pagination: PaginationParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
|
@ -28,37 +30,26 @@ pub async fn get_point_list(
|
|||
State(state): State<AppState>,
|
||||
Query(query): Query<GetPointListQuery>,
|
||||
) -> Result<impl IntoResponse, ApiErr> {
|
||||
query.validate()?;
|
||||
let pool = &state.pool;
|
||||
let points: Vec<Point> = match query.source_id {
|
||||
Some(source_id) => {
|
||||
sqlx::query_as::<_, Point>(
|
||||
r#"
|
||||
SELECT p.*
|
||||
FROM point p
|
||||
INNER JOIN node n ON p.node_id = n.id
|
||||
WHERE n.source_id = $1
|
||||
ORDER BY p.created_at
|
||||
"#,
|
||||
)
|
||||
.bind(source_id)
|
||||
.fetch_all(pool)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
sqlx::query_as::<_, Point>(
|
||||
r#"SELECT * FROM point ORDER BY created_at"#,
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
// 获取总数
|
||||
let total = crate::service::get_points_count(pool, query.source_id).await?;
|
||||
|
||||
// 获取分页数据
|
||||
let points = crate::service::get_points_paginated(
|
||||
pool,
|
||||
query.source_id,
|
||||
query.pagination.page_size,
|
||||
query.pagination.offset(),
|
||||
).await?;
|
||||
|
||||
let monitor_guard = state
|
||||
.connection_manager
|
||||
.get_point_monitor_data_read_guard()
|
||||
.await;
|
||||
|
||||
let resp: Vec<PointWithMonitor> = points
|
||||
let data: Vec<PointWithMonitor> = points
|
||||
.into_iter()
|
||||
.map(|point| {
|
||||
let point_monitor = monitor_guard
|
||||
|
|
@ -68,7 +59,9 @@ pub async fn get_point_list(
|
|||
})
|
||||
.collect();
|
||||
|
||||
Ok(Json(resp))
|
||||
let response = PaginatedResponse::new(data, total, query.pagination.page, query.pagination.page_size);
|
||||
|
||||
Ok(Json(response))
|
||||
}
|
||||
/// Get a point by id.
|
||||
pub async fn get_point(
|
||||
|
|
|
|||
|
|
@ -1,17 +1,38 @@
|
|||
use axum::{Json, extract::{Path, State}, http::StatusCode, response::IntoResponse};
|
||||
use axum::{Json, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse};
|
||||
use serde::Deserialize;
|
||||
use uuid::Uuid;
|
||||
use validator::Validate;
|
||||
|
||||
use crate::util::response::ApiErr;
|
||||
use crate::util::{response::ApiErr, pagination::{PaginatedResponse, PaginationParams}};
|
||||
use crate::{AppState};
|
||||
|
||||
/// 获取所有标签
|
||||
#[derive(Deserialize, Validate)]
|
||||
pub struct GetTagListQuery {
|
||||
#[serde(flatten)]
|
||||
pub pagination: PaginationParams,
|
||||
}
|
||||
|
||||
pub async fn get_tag_list(
|
||||
State(state): State<AppState>,
|
||||
Query(query): Query<GetTagListQuery>,
|
||||
) -> Result<impl IntoResponse, ApiErr> {
|
||||
let tags = crate::service::get_all_tags(&state.pool).await?;
|
||||
Ok(Json(tags))
|
||||
query.validate()?;
|
||||
let pool = &state.pool;
|
||||
|
||||
// 获取总数
|
||||
let total = crate::service::get_tags_count(pool).await?;
|
||||
|
||||
// 获取分页数据
|
||||
let tags = crate::service::get_tags_paginated(
|
||||
pool,
|
||||
query.pagination.page_size,
|
||||
query.pagination.offset(),
|
||||
).await?;
|
||||
|
||||
let response = PaginatedResponse::new(tags, total, query.pagination.page, query.pagination.page_size);
|
||||
|
||||
Ok(Json(response))
|
||||
}
|
||||
|
||||
/// 获取标签下的点位信息
|
||||
|
|
|
|||
143
src/service.rs
143
src/service.rs
|
|
@ -115,19 +115,150 @@ pub async fn get_points_with_ids(
|
|||
.collect())
|
||||
}
|
||||
|
||||
// ==================== Point 相关服务函数 ====================
|
||||
|
||||
/// 获取点位总数
|
||||
pub async fn get_points_count(
|
||||
pool: &PgPool,
|
||||
source_id: Option<uuid::Uuid>,
|
||||
) -> Result<i64, sqlx::Error> {
|
||||
match source_id {
|
||||
Some(source_id) => {
|
||||
sqlx::query_scalar::<_, i64>(
|
||||
r#"
|
||||
SELECT COUNT(*)
|
||||
FROM point p
|
||||
INNER JOIN node n ON p.node_id = n.id
|
||||
WHERE n.source_id = $1
|
||||
"#,
|
||||
)
|
||||
.bind(source_id)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
sqlx::query_scalar::<_, i64>(
|
||||
r#"SELECT COUNT(*) FROM point"#,
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 获取分页点位列表
|
||||
pub async fn get_points_paginated(
|
||||
pool: &PgPool,
|
||||
source_id: Option<uuid::Uuid>,
|
||||
page_size: i32,
|
||||
offset: u32,
|
||||
) -> Result<Vec<crate::model::Point>, sqlx::Error> {
|
||||
match source_id {
|
||||
Some(source_id) => {
|
||||
if page_size == 0 {
|
||||
Ok(vec![])
|
||||
} else if page_size == -1 {
|
||||
sqlx::query_as::<_, crate::model::Point>(
|
||||
r#"
|
||||
SELECT p.*
|
||||
FROM point p
|
||||
INNER JOIN node n ON p.node_id = n.id
|
||||
WHERE n.source_id = $1
|
||||
ORDER BY p.created_at
|
||||
"#,
|
||||
)
|
||||
.bind(source_id)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_as::<_, crate::model::Point>(
|
||||
r#"
|
||||
SELECT p.*
|
||||
FROM point p
|
||||
INNER JOIN node n ON p.node_id = n.id
|
||||
WHERE n.source_id = $1
|
||||
ORDER BY p.created_at
|
||||
LIMIT $2 OFFSET $3
|
||||
"#,
|
||||
)
|
||||
.bind(source_id)
|
||||
.bind(page_size as i64)
|
||||
.bind(offset as i64)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if page_size == 0 {
|
||||
Ok(vec![])
|
||||
} else if page_size == -1 {
|
||||
sqlx::query_as::<_, crate::model::Point>(
|
||||
r#"
|
||||
SELECT * FROM point
|
||||
ORDER BY created_at
|
||||
"#,
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_as::<_, crate::model::Point>(
|
||||
r#"
|
||||
SELECT * FROM point
|
||||
ORDER BY created_at
|
||||
LIMIT $1 OFFSET $2
|
||||
"#,
|
||||
)
|
||||
.bind(page_size as i64)
|
||||
.bind(offset as i64)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== Tag 相关服务函数 ====================
|
||||
|
||||
/// 获取所有标签
|
||||
pub async fn get_all_tags(
|
||||
/// 获取标签总数
|
||||
pub async fn get_tags_count(
|
||||
pool: &PgPool,
|
||||
) -> Result<Vec<crate::model::Tag>, sqlx::Error> {
|
||||
query_as::<_, crate::model::Tag>(
|
||||
r#"SELECT * FROM tag ORDER BY created_at"#
|
||||
) -> Result<i64, sqlx::Error> {
|
||||
sqlx::query_scalar::<_, i64>(
|
||||
r#"SELECT COUNT(*) FROM tag"#,
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
/// 获取分页标签列表
|
||||
pub async fn get_tags_paginated(
|
||||
pool: &PgPool,
|
||||
page_size: i32,
|
||||
offset: u32,
|
||||
) -> Result<Vec<crate::model::Tag>, sqlx::Error> {
|
||||
if page_size == 0 {
|
||||
Ok(vec![])
|
||||
} else if page_size == -1 {
|
||||
sqlx::query_as::<_, crate::model::Tag>(
|
||||
r#"SELECT * FROM tag ORDER BY created_at"#,
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_as::<_, crate::model::Tag>(
|
||||
r#"
|
||||
SELECT * FROM tag
|
||||
ORDER BY created_at
|
||||
LIMIT $1 OFFSET $2
|
||||
"#,
|
||||
)
|
||||
.bind(page_size)
|
||||
.bind(offset as i64)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// 根据ID获取标签
|
||||
pub async fn get_tag_by_id(
|
||||
pool: &PgPool,
|
||||
|
|
|
|||
|
|
@ -41,8 +41,8 @@ impl PointQuality {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(untagged)]
|
||||
pub enum DataValue {
|
||||
Null,
|
||||
Bool(bool),
|
||||
|
|
@ -80,11 +80,16 @@ pub struct PointMonitorInfo {
|
|||
pub point_id: Uuid,
|
||||
pub client_handle: u32,
|
||||
pub scan_mode: ScanMode,
|
||||
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
|
||||
pub timestamp: Option<DateTime<Utc>>,
|
||||
pub quality: PointQuality,
|
||||
pub value: Option<DataValue>,
|
||||
pub value_type: Option<ValueType>,
|
||||
pub value_text: Option<String>,
|
||||
pub old_value: Option<DataValue>,
|
||||
#[serde(serialize_with = "crate::util::datetime::option_utc_to_local_str")]
|
||||
pub old_timestamp: Option<DateTime<Utc>>,
|
||||
pub value_changed: bool,
|
||||
}
|
||||
|
||||
impl PointMonitorInfo {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
pub mod datetime;
|
||||
pub mod log;
|
||||
pub mod pagination;
|
||||
pub mod response;
|
||||
pub mod validator;
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use validator::Validate;
|
||||
|
||||
/// 分页响应结构
|
||||
#[derive(Serialize)]
|
||||
pub struct PaginatedResponse<T> {
|
||||
pub data: Vec<T>,
|
||||
pub total: i64,
|
||||
pub page: u32,
|
||||
pub page_size: i32,
|
||||
pub total_pages: u32,
|
||||
}
|
||||
|
||||
impl<T> PaginatedResponse<T> {
|
||||
/// 创建分页响应
|
||||
pub fn new(data: Vec<T>, total: i64, page: u32, page_size: i32) -> Self {
|
||||
let total_pages = if page_size > 0 {
|
||||
((total as f64) / (page_size as f64)).ceil() as u32
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
Self {
|
||||
data,
|
||||
total,
|
||||
page,
|
||||
page_size,
|
||||
total_pages,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 分页查询参数
|
||||
#[serde_as]
|
||||
#[derive(Deserialize, Validate)]
|
||||
pub struct PaginationParams {
|
||||
#[validate(range(min = 1))]
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
#[serde(default = "default_page")]
|
||||
pub page: u32,
|
||||
#[validate(range(min = -1, max = 100))]
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
#[serde(default = "default_page_size")]
|
||||
pub page_size: i32,
|
||||
}
|
||||
|
||||
fn default_page() -> u32 {
|
||||
1
|
||||
}
|
||||
|
||||
fn default_page_size() -> i32 {
|
||||
20
|
||||
}
|
||||
|
||||
impl PaginationParams {
|
||||
/// 计算偏移量
|
||||
pub fn offset(&self) -> u32 {
|
||||
(self.page - 1) * self.page_size.max(0) as u32
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,15 @@
|
|||
use anyhow::Error;
|
||||
use axum::{Json, http::StatusCode, response::IntoResponse};
|
||||
use axum::{
|
||||
Json,
|
||||
http::StatusCode,
|
||||
response::IntoResponse,
|
||||
extract::rejection::{
|
||||
QueryRejection,
|
||||
PathRejection,
|
||||
JsonRejection,
|
||||
FormRejection,
|
||||
},
|
||||
};
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use sqlx::Error as SqlxError;
|
||||
|
|
@ -89,3 +99,52 @@ impl From<SqlxError> for ApiErr {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<QueryRejection> for ApiErr {
|
||||
fn from(rejection: QueryRejection) -> Self {
|
||||
tracing::warn!("Query parameter error: {}", rejection);
|
||||
ApiErr::BadRequest(
|
||||
"Invalid query parameters".to_string(),
|
||||
Some(serde_json::json!({
|
||||
"detail": rejection.to_string()
|
||||
}))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PathRejection> for ApiErr {
|
||||
fn from(rejection: PathRejection) -> Self {
|
||||
tracing::warn!("Path parameter error: {}", rejection);
|
||||
ApiErr::BadRequest(
|
||||
"Invalid path parameter".to_string(),
|
||||
Some(serde_json::json!({
|
||||
"detail": rejection.to_string()
|
||||
}))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JsonRejection> for ApiErr {
|
||||
fn from(rejection: JsonRejection) -> Self {
|
||||
tracing::warn!("JSON parsing error: {}", rejection);
|
||||
ApiErr::BadRequest(
|
||||
"Invalid JSON format".to_string(),
|
||||
Some(serde_json::json!({
|
||||
"detail": rejection.to_string()
|
||||
}))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FormRejection> for ApiErr {
|
||||
fn from(rejection: FormRejection) -> Self {
|
||||
tracing::warn!("Form data error: {}", rejection);
|
||||
ApiErr::BadRequest(
|
||||
"Invalid form data".to_string(),
|
||||
Some(serde_json::json!({
|
||||
"detail": rejection.to_string()
|
||||
}))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue