Compare commits
No commits in common. "a750a630dfb16ead7be1085665a7d853d1ceeaff" and "d4e616f83cb83d69d4a0b5288ce3e67c0dd780d2" have entirely different histories.
a750a630df
...
d4e616f83c
|
|
@ -315,17 +315,6 @@ version = "2.4.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
|
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "cron"
|
|
||||||
version = "0.12.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07"
|
|
||||||
dependencies = [
|
|
||||||
"chrono",
|
|
||||||
"nom",
|
|
||||||
"once_cell",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crossbeam-channel"
|
name = "crossbeam-channel"
|
||||||
version = "0.5.15"
|
version = "0.5.15"
|
||||||
|
|
@ -826,7 +815,6 @@ dependencies = [
|
||||||
"serde_with",
|
"serde_with",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-cron-scheduler",
|
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-appender",
|
"tracing-appender",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
|
|
@ -1427,17 +1415,6 @@ version = "0.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "num-derive"
|
|
||||||
version = "0.3.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num-integer"
|
name = "num-integer"
|
||||||
version = "0.1.46"
|
version = "0.1.46"
|
||||||
|
|
@ -2807,21 +2784,6 @@ dependencies = [
|
||||||
"windows-sys 0.52.0",
|
"windows-sys 0.52.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio-cron-scheduler"
|
|
||||||
version = "0.10.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a4c2e3a88f827f597799cf70a6f673074e62f3fc5ba5993b2873345c618a29af"
|
|
||||||
dependencies = [
|
|
||||||
"chrono",
|
|
||||||
"cron",
|
|
||||||
"num-derive",
|
|
||||||
"num-traits",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
"uuid",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-macros"
|
name = "tokio-macros"
|
||||||
version = "2.5.0"
|
version = "2.5.0"
|
||||||
|
|
@ -3147,17 +3109,6 @@ version = "1.0.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
|
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "uuid"
|
|
||||||
version = "1.20.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f"
|
|
||||||
dependencies = [
|
|
||||||
"getrandom 0.3.3",
|
|
||||||
"js-sys",
|
|
||||||
"wasm-bindgen",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "valuable"
|
name = "valuable"
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
salvo = { version = "0.68", features = ["logging", "cors"]}
|
salvo = { version = "0.68", features = ["logging", "cors"]}
|
||||||
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
|
tokio = { version = "1", features = ["macros"] }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
sqlx = { version = "0.7", features = [ "runtime-tokio-native-tls" , "postgres", "chrono" ] }
|
sqlx = { version = "0.7", features = [ "runtime-tokio-native-tls" , "postgres", "chrono" ] }
|
||||||
|
|
@ -16,4 +16,3 @@ chrono = { version = "0.4", features = ["serde"] }
|
||||||
serde_with = "3.8.1"
|
serde_with = "3.8.1"
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
tracing-appender = "0.2.3"
|
tracing-appender = "0.2.3"
|
||||||
tokio-cron-scheduler = "0.10"
|
|
||||||
|
|
|
||||||
73
src/main.rs
73
src/main.rs
|
|
@ -1,5 +1,4 @@
|
||||||
use chrono::{DateTime, Utc, FixedOffset};
|
use chrono::{DateTime, Utc, FixedOffset};
|
||||||
use tokio_cron_scheduler::{Job, JobScheduler};
|
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use salvo::logging::Logger;
|
use salvo::logging::Logger;
|
||||||
use salvo::prelude::*;
|
use salvo::prelude::*;
|
||||||
|
|
@ -7,7 +6,6 @@ use serde::{Deserialize, Serialize, Serializer};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use sqlx::{FromRow, PgPool};
|
use sqlx::{FromRow, PgPool};
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::sync::atomic::{AtomicI64, Ordering};
|
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use tracing::level_filters::LevelFilter;
|
use tracing::level_filters::LevelFilter;
|
||||||
use tracing_subscriber::{
|
use tracing_subscriber::{
|
||||||
|
|
@ -20,7 +18,7 @@ use salvo::cors::Cors;
|
||||||
use salvo::http::Method;
|
use salvo::http::Method;
|
||||||
|
|
||||||
static POSTGRES: OnceCell<PgPool> = OnceCell::new();
|
static POSTGRES: OnceCell<PgPool> = OnceCell::new();
|
||||||
static TOTAL_COUNT: AtomicI64 = AtomicI64::new(0);
|
|
||||||
|
|
||||||
pub fn format_time_8<S>(time: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
|
pub fn format_time_8<S>(time: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
|
|
@ -83,16 +81,10 @@ async fn get_mplogx(req: &mut Request, res: &mut Response) {
|
||||||
.unwrap_or(0)
|
.unwrap_or(0)
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
// 使用缓存的TOTAL_COUNT值,如果为0则执行查询
|
sqlx::query_scalar(&count_sql)
|
||||||
let cached_count = TOTAL_COUNT.load(Ordering::SeqCst);
|
.fetch_one(get_postgres())
|
||||||
if cached_count == 0 {
|
.await
|
||||||
sqlx::query_scalar(&count_sql)
|
.unwrap_or(0)
|
||||||
.fetch_one(get_postgres())
|
|
||||||
.await
|
|
||||||
.unwrap_or(0)
|
|
||||||
} else {
|
|
||||||
cached_count
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -127,43 +119,6 @@ async fn get_mplogx(req: &mut Request, res: &mut Response) {
|
||||||
res.render(serde_json::to_string(&result).unwrap());
|
res.render(serde_json::to_string(&result).unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
// 定时分析TimescaleDB的chunks
|
|
||||||
async fn analyze_chunks() {
|
|
||||||
let sql = r#"
|
|
||||||
DO $$
|
|
||||||
DECLARE
|
|
||||||
c RECORD;
|
|
||||||
BEGIN
|
|
||||||
FOR c IN (
|
|
||||||
SELECT chunk_schema, chunk_name
|
|
||||||
FROM timescaledb_information.chunks
|
|
||||||
WHERE hypertable_name = 'mplogx'
|
|
||||||
AND range_end > now() - interval '1 day'
|
|
||||||
) LOOP
|
|
||||||
EXECUTE 'ANALYZE ' || quote_ident(c.chunk_schema) || '.' || quote_ident(c.chunk_name);
|
|
||||||
END LOOP;
|
|
||||||
END;
|
|
||||||
$$ LANGUAGE plpgsql;
|
|
||||||
"#;
|
|
||||||
|
|
||||||
match sqlx::query(sql).execute(get_postgres()).await {
|
|
||||||
Ok(_) => {
|
|
||||||
tracing::info!("Successfully analyzed TimescaleDB chunks");
|
|
||||||
// 更新总行数
|
|
||||||
if let Ok(count) = sqlx::query_scalar::<_, i64>("SELECT approximate_row_count('mplogx');")
|
|
||||||
.fetch_one(get_postgres())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
TOTAL_COUNT.store(count, Ordering::SeqCst);
|
|
||||||
tracing::info!("Updated total_count to {}", count);
|
|
||||||
} else {
|
|
||||||
tracing::error!("Failed to update total_count");
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(e) => tracing::error!("Failed to analyze TimescaleDB chunks: {}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
dotenv().ok();
|
dotenv().ok();
|
||||||
|
|
@ -178,24 +133,6 @@ async fn main() {
|
||||||
let db_url = env::var("DB_URL").expect("DB_URL must be set");
|
let db_url = env::var("DB_URL").expect("DB_URL must be set");
|
||||||
let pool = PgPool::connect(&db_url).await.unwrap();
|
let pool = PgPool::connect(&db_url).await.unwrap();
|
||||||
POSTGRES.set(pool).unwrap();
|
POSTGRES.set(pool).unwrap();
|
||||||
|
|
||||||
// 创建定时任务调度器
|
|
||||||
let scheduler = JobScheduler::new().await.expect("Failed to create scheduler");
|
|
||||||
|
|
||||||
// 创建每10分钟执行一次的任务
|
|
||||||
let analyze_job = Job::new_async("0 */10 * * * *", move |_uuid, _l| {
|
|
||||||
Box::pin(async move {
|
|
||||||
analyze_chunks().await;
|
|
||||||
})
|
|
||||||
}).expect("Failed to create job");
|
|
||||||
|
|
||||||
// 添加任务到调度器
|
|
||||||
scheduler.add(analyze_job).await.expect("Failed to add job to scheduler");
|
|
||||||
|
|
||||||
// 启动调度器
|
|
||||||
scheduler.start().await.expect("Failed to start scheduler");
|
|
||||||
|
|
||||||
// 启动Web服务器
|
|
||||||
let cors_hander = Cors::new().allow_origin("*").allow_methods(
|
let cors_hander = Cors::new().allow_origin("*").allow_methods(
|
||||||
vec![Method::GET, Method::POST, Method::OPTIONS, Method::PUT, Method::DELETE]).allow_headers(
|
vec![Method::GET, Method::POST, Method::OPTIONS, Method::PUT, Method::DELETE]).allow_headers(
|
||||||
vec!["Content-Type", "Authorization"]).into_handler();
|
vec!["Content-Type", "Authorization"]).into_handler();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue