feat: 添加定时任务以获取总数据量

This commit is contained in:
caoqianming 2026-02-05 11:08:08 +08:00
parent d4e616f83c
commit e92fee98d7
2 changed files with 46 additions and 1 deletions

View File

@ -5,7 +5,7 @@ edition = "2021"
[dependencies]
salvo = { version = "0.68", features = ["logging", "cors"]}
tokio = { version = "1", features = ["macros"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
tracing = "0.1"
tracing-subscriber = "0.3"
sqlx = { version = "0.7", features = [ "runtime-tokio-native-tls" , "postgres", "chrono" ] }
@ -16,3 +16,4 @@ chrono = { version = "0.4", features = ["serde"] }
serde_with = "3.8.1"
dotenv = "0.15.0"
tracing-appender = "0.2.3"
tokio-cron-scheduler = "0.10"

View File

@ -1,4 +1,5 @@
use chrono::{DateTime, Utc, FixedOffset};
use tokio_cron_scheduler::{Job, JobScheduler};
use once_cell::sync::OnceCell;
use salvo::logging::Logger;
use salvo::prelude::*;
@ -119,6 +120,31 @@ async fn get_mplogx(req: &mut Request, res: &mut Response) {
res.render(serde_json::to_string(&result).unwrap());
}
// 定时分析TimescaleDB的chunks
async fn analyze_chunks() {
let sql = r#"
DO $$
DECLARE
c RECORD;
BEGIN
FOR c IN (
SELECT chunk_schema, chunk_name
FROM timescaledb_information.chunks
WHERE hypertable_name = 'mplogx'
AND range_end > now() - interval '1 day'
) LOOP
EXECUTE 'ANALYZE ' || quote_ident(c.chunk_schema) || '.' || quote_ident(c.chunk_name);
END LOOP;
END;
$$ LANGUAGE plpgsql;
"#;
match sqlx::query(sql).execute(get_postgres()).await {
Ok(_) => tracing::info!("Successfully analyzed TimescaleDB chunks"),
Err(e) => tracing::error!("Failed to analyze TimescaleDB chunks: {}", e),
}
}
#[tokio::main]
async fn main() {
dotenv().ok();
@ -133,6 +159,24 @@ async fn main() {
let db_url = env::var("DB_URL").expect("DB_URL must be set");
let pool = PgPool::connect(&db_url).await.unwrap();
POSTGRES.set(pool).unwrap();
// 创建定时任务调度器
let scheduler = JobScheduler::new().await.expect("Failed to create scheduler");
// 创建每10分钟执行一次的任务
let analyze_job = Job::new_async("0 */10 * * * *", move |_uuid, _l| {
Box::pin(async move {
analyze_chunks().await;
})
}).expect("Failed to create job");
// 添加任务到调度器
scheduler.add(analyze_job).await.expect("Failed to add job to scheduler");
// 启动调度器
scheduler.start().await.expect("Failed to start scheduler");
// 启动Web服务器
let cors_hander = Cors::new().allow_origin("*").allow_methods(
vec![Method::GET, Method::POST, Method::OPTIONS, Method::PUT, Method::DELETE]).allow_headers(
vec!["Content-Type", "Authorization"]).into_handler();