From e92fee98d770970ee85b7951057a4f9c1598ccd7 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 5 Feb 2026 11:08:08 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E4=BB=A5=E8=8E=B7=E5=8F=96=E6=80=BB=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 3 ++- src/main.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c71659f..926030f 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] salvo = { version = "0.68", features = ["logging", "cors"]} -tokio = { version = "1", features = ["macros"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } tracing = "0.1" tracing-subscriber = "0.3" sqlx = { version = "0.7", features = [ "runtime-tokio-native-tls" , "postgres", "chrono" ] } @@ -16,3 +16,4 @@ chrono = { version = "0.4", features = ["serde"] } serde_with = "3.8.1" dotenv = "0.15.0" tracing-appender = "0.2.3" +tokio-cron-scheduler = "0.10" diff --git a/src/main.rs b/src/main.rs index 30d0bde..c99f2cf 100755 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use chrono::{DateTime, Utc, FixedOffset}; +use tokio_cron_scheduler::{Job, JobScheduler}; use once_cell::sync::OnceCell; use salvo::logging::Logger; use salvo::prelude::*; @@ -119,6 +120,31 @@ async fn get_mplogx(req: &mut Request, res: &mut Response) { res.render(serde_json::to_string(&result).unwrap()); } +// 定时分析TimescaleDB的chunks +async fn analyze_chunks() { + let sql = r#" + DO $$ + DECLARE + c RECORD; + BEGIN + FOR c IN ( + SELECT chunk_schema, chunk_name + FROM timescaledb_information.chunks + WHERE hypertable_name = 'mplogx' + AND range_end > now() - interval '1 day' + ) LOOP + EXECUTE 'ANALYZE ' || quote_ident(c.chunk_schema) || '.' || quote_ident(c.chunk_name); + END LOOP; + END; + $$ LANGUAGE plpgsql; + "#; + + match sqlx::query(sql).execute(get_postgres()).await { + Ok(_) => tracing::info!("Successfully analyzed TimescaleDB chunks"), + Err(e) => tracing::error!("Failed to analyze TimescaleDB chunks: {}", e), + } +} + #[tokio::main] async fn main() { dotenv().ok(); @@ -133,6 +159,24 @@ async fn main() { let db_url = env::var("DB_URL").expect("DB_URL must be set"); let pool = PgPool::connect(&db_url).await.unwrap(); POSTGRES.set(pool).unwrap(); + + // 创建定时任务调度器 + let scheduler = JobScheduler::new().await.expect("Failed to create scheduler"); + + // 创建每10分钟执行一次的任务 + let analyze_job = Job::new_async("0 */10 * * * *", move |_uuid, _l| { + Box::pin(async move { + analyze_chunks().await; + }) + }).expect("Failed to create job"); + + // 添加任务到调度器 + scheduler.add(analyze_job).await.expect("Failed to add job to scheduler"); + + // 启动调度器 + scheduler.start().await.expect("Failed to start scheduler"); + + // 启动Web服务器 let cors_hander = Cors::new().allow_origin("*").allow_methods( vec![Method::GET, Method::POST, Method::OPTIONS, Method::PUT, Method::DELETE]).allow_headers( vec!["Content-Type", "Authorization"]).into_handler();