From 1374abe550877546519b7ddfa3ac5c764539dc9c Mon Sep 17 00:00:00 2001 From: caoqianming Date: Mon, 9 Mar 2026 14:47:19 +0800 Subject: [PATCH] feat(log): add file-based log APIs and switch file logs to JSON --- Cargo.lock | 36 ++++++ Cargo.toml | 3 +- src/handler.rs | 3 +- src/handler/log.rs | 275 +++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 4 +- src/util/log.rs | 9 +- 6 files changed, 323 insertions(+), 7 deletions(-) create mode 100644 src/handler/log.rs diff --git a/Cargo.lock b/Cargo.lock index 2b1cb30..c4fd0e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,6 +180,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -808,6 +830,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-opcua", + "async-stream", "axum", "chrono", "dotenv", @@ -2548,6 +2571,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.22" @@ -2558,6 +2591,8 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", @@ -2565,6 +2600,7 @@ dependencies = [ "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6c7fdf6..5ae1bd0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uu serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_with = "3.0" +async-stream = "0.3" # Time handling chrono = "0.4" @@ -31,7 +32,7 @@ async-opcua = { version = "0.18", features = ["client"] } # Logging tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "json"] } tracing-appender = "0.2" # Environment variables diff --git a/src/handler.rs b/src/handler.rs index 0030a85..dc2be6b 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,3 +1,4 @@ pub mod source; pub mod point; -pub mod tag; \ No newline at end of file +pub mod tag; +pub mod log; diff --git a/src/handler/log.rs b/src/handler/log.rs new file mode 100644 index 0000000..aecb501 --- /dev/null +++ b/src/handler/log.rs @@ -0,0 +1,275 @@ +use std::{ + convert::Infallible, + path::{Path, PathBuf}, + time::SystemTime, +}; + +use async_stream::stream; +use axum::{ + extract::Query, + response::{ + sse::{Event, KeepAlive, Sse}, + IntoResponse, + }, + Json, +}; +use serde::{Deserialize, Serialize}; +use tokio::{ + fs, + io::{AsyncReadExt, AsyncSeekExt, SeekFrom}, + time::{Duration, interval}, +}; + +use crate::util::response::ApiErr; + +const LOG_DIR: &str = "./logs"; +const DEFAULT_TAIL_LINES: usize = 200; +const MAX_TAIL_LINES: usize = 2000; +const DEFAULT_MAX_BYTES: usize = 64 * 1024; +const STREAM_MAX_BYTES: usize = 32 * 1024; +const MAX_MAX_BYTES: usize = 512 * 1024; + +#[derive(Debug, Deserialize)] +pub struct LogQuery { + pub file: Option, + pub cursor: Option, + pub tail_lines: Option, + pub max_bytes: Option, +} + +#[derive(Debug, Serialize)] +pub struct LogChunkResponse { + pub file: String, + pub cursor: u64, + pub lines: Vec, + pub truncated: bool, + pub reset: bool, +} + +pub async fn get_logs(Query(query): Query) -> Result { + let path = resolve_log_file(query.file.as_deref()).await?; + let file_name = file_name_of(&path); + let max_bytes = query + .max_bytes + .unwrap_or(DEFAULT_MAX_BYTES) + .clamp(1, MAX_MAX_BYTES); + + let response = if let Some(cursor) = query.cursor { + read_since(&path, &file_name, cursor, max_bytes).await? + } else { + let tail_lines = query + .tail_lines + .unwrap_or(DEFAULT_TAIL_LINES) + .clamp(1, MAX_TAIL_LINES); + read_tail(&path, &file_name, tail_lines).await? + }; + + Ok(Json(response)) +} + +pub async fn stream_logs(Query(query): Query) -> Result { + let path = resolve_log_file(query.file.as_deref()).await?; + let file_name = file_name_of(&path); + let max_bytes = query + .max_bytes + .unwrap_or(STREAM_MAX_BYTES) + .clamp(1, MAX_MAX_BYTES); + let start_cursor = query.cursor.unwrap_or(file_len(&path).await?); + + let event_stream = stream! { + let mut ticker = interval(Duration::from_millis(800)); + let mut cursor = start_cursor; + + loop { + ticker.tick().await; + match read_since(&path, &file_name, cursor, max_bytes).await { + Ok(chunk) => { + cursor = chunk.cursor; + if chunk.reset || !chunk.lines.is_empty() { + match Event::default().event("log").json_data(&chunk) { + Ok(event) => yield Ok::(event), + Err(_) => { + yield Ok::( + Event::default().event("error").data("serialize log event failed") + ); + break; + } + } + } + } + Err(_) => { + yield Ok::( + Event::default().event("error").data("log stream read failed") + ); + break; + } + } + } + }; + + Ok( + Sse::new(event_stream) + .keep_alive(KeepAlive::new().interval(Duration::from_secs(10)).text("keepalive")), + ) +} + +async fn resolve_log_file(file: Option<&str>) -> Result { + let log_dir = PathBuf::from(LOG_DIR); + + if let Some(file_name) = file { + validate_file_name(file_name)?; + let path = log_dir.join(file_name); + ensure_exists(&path).await?; + return Ok(path); + } + + latest_log_file(&log_dir).await +} + +fn validate_file_name(file_name: &str) -> Result<(), ApiErr> { + if file_name.is_empty() { + return Err(ApiErr::BadRequest("file cannot be empty".to_string(), None)); + } + if file_name.contains(['/', '\\']) || file_name.contains("..") { + return Err(ApiErr::BadRequest("invalid log file name".to_string(), None)); + } + if !file_name.starts_with("app.log") { + return Err(ApiErr::BadRequest("only app.log* files are allowed".to_string(), None)); + } + Ok(()) +} + +async fn ensure_exists(path: &Path) -> Result<(), ApiErr> { + fs::metadata(path).await.map_err(|e| match e.kind() { + std::io::ErrorKind::NotFound => ApiErr::NotFound("log file not found".to_string(), None), + _ => ApiErr::Internal("failed to access log file".to_string(), None), + })?; + Ok(()) +} + +async fn latest_log_file(log_dir: &Path) -> Result { + let mut entries = fs::read_dir(log_dir).await.map_err(|e| match e.kind() { + std::io::ErrorKind::NotFound => ApiErr::NotFound("log directory not found".to_string(), None), + _ => ApiErr::Internal("failed to read log directory".to_string(), None), + })?; + + let mut latest: Option<(SystemTime, PathBuf)> = None; + + while let Some(entry) = entries + .next_entry() + .await + .map_err(|_| ApiErr::Internal("failed to enumerate log files".to_string(), None))? + { + let file_name = entry.file_name(); + let file_name = file_name.to_string_lossy(); + if !file_name.starts_with("app.log") { + continue; + } + + let metadata = entry + .metadata() + .await + .map_err(|_| ApiErr::Internal("failed to read log metadata".to_string(), None))?; + if !metadata.is_file() { + continue; + } + + let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH); + let path = entry.path(); + + match latest.as_ref() { + Some((latest_modified, _)) if modified <= *latest_modified => {} + _ => latest = Some((modified, path)), + } + } + + latest + .map(|(_, path)| path) + .ok_or_else(|| ApiErr::NotFound("no app.log files found".to_string(), None)) +} + +async fn read_tail(path: &Path, file_name: &str, tail_lines: usize) -> Result { + let mut file = fs::File::open(path).await.map_err(map_open_err)?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer) + .await + .map_err(|_| ApiErr::Internal("failed to read log file".to_string(), None))?; + + let cursor = buffer.len() as u64; + let text = String::from_utf8_lossy(&buffer); + let mut lines: Vec = text.lines().map(|line| line.to_string()).collect(); + let dropped = lines.len().saturating_sub(tail_lines); + if dropped > 0 { + lines = lines.into_iter().skip(dropped).collect(); + } + + Ok(LogChunkResponse { + file: file_name.to_string(), + cursor, + lines, + truncated: false, + reset: false, + }) +} + +async fn read_since( + path: &Path, + file_name: &str, + cursor: u64, + max_bytes: usize, +) -> Result { + let mut file = fs::File::open(path).await.map_err(map_open_err)?; + let metadata = file + .metadata() + .await + .map_err(|_| ApiErr::Internal("failed to read log metadata".to_string(), None))?; + let file_size = metadata.len(); + + let (start, reset) = if cursor > file_size { + (0, true) + } else { + (cursor, false) + }; + let end = (start + max_bytes as u64).min(file_size); + + file.seek(SeekFrom::Start(start)) + .await + .map_err(|_| ApiErr::Internal("failed to seek log file".to_string(), None))?; + + let mut buffer = vec![0u8; (end - start) as usize]; + if !buffer.is_empty() { + file.read_exact(&mut buffer) + .await + .map_err(|_| ApiErr::Internal("failed to read log file chunk".to_string(), None))?; + } + + let text = String::from_utf8_lossy(&buffer); + let lines = text.lines().map(|line| line.to_string()).collect(); + + Ok(LogChunkResponse { + file: file_name.to_string(), + cursor: end, + lines, + truncated: end < file_size, + reset, + }) +} + +async fn file_len(path: &Path) -> Result { + let metadata = fs::metadata(path).await.map_err(map_open_err)?; + Ok(metadata.len()) +} + +fn file_name_of(path: &Path) -> String { + path.file_name() + .and_then(|name| name.to_str()) + .unwrap_or_default() + .to_string() +} + +fn map_open_err(err: std::io::Error) -> ApiErr { + match err.kind() { + std::io::ErrorKind::NotFound => ApiErr::NotFound("log file not found".to_string(), None), + _ => ApiErr::Internal("failed to access log file".to_string(), None), + } +} diff --git a/src/main.rs b/src/main.rs index 66d306d..1def7d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -128,7 +128,9 @@ fn build_router(state: AppState) -> Router { .route("/api/point/{point_id}", get(handler::point::get_point).put(handler::point::update_point).delete(handler::point::delete_point)) .route("/api/point/batch/set-tags", put(handler::point::batch_set_point_tags)) .route("/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag)) - .route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag)); + .route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag)) + .route("/api/logs", get(handler::log::get_logs)) + .route("/api/logs/stream", get(handler::log::stream_logs)); Router::new() .merge(all_route) diff --git a/src/util/log.rs b/src/util/log.rs index 9f21683..7ab9159 100644 --- a/src/util/log.rs +++ b/src/util/log.rs @@ -1,6 +1,6 @@ +use std::sync::OnceLock; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use tracing_appender::{rolling, non_blocking}; -use std::sync::OnceLock; use time::UtcOffset; static LOG_GUARD: OnceLock = OnceLock::new(); @@ -11,7 +11,6 @@ pub fn init_logger() { let file_appender = rolling::daily("./logs", "app.log"); let (file_writer, guard) = non_blocking(file_appender); LOG_GUARD.set(guard).ok(); - let timer = fmt::time::OffsetTime::new( UtcOffset::from_hms(8, 0, 0).unwrap(), time::format_description::well_known::Rfc3339, @@ -27,10 +26,12 @@ pub fn init_logger() { ) .with( fmt::layer() - .compact() + .json() .with_timer(timer) .with_writer(file_writer) - .with_ansi(false), + .with_ansi(false) + .with_current_span(false) + .with_span_list(false), ) .init(); }