feat(log): add file-based log APIs and switch file logs to JSON

This commit is contained in:
caoqianming 2026-03-09 14:47:19 +08:00
parent 63bcf679c2
commit 1374abe550
6 changed files with 323 additions and 7 deletions

36
Cargo.lock generated
View File

@ -180,6 +180,28 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.89" version = "0.1.89"
@ -808,6 +830,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-opcua", "async-opcua",
"async-stream",
"axum", "axum",
"chrono", "chrono",
"dotenv", "dotenv",
@ -2548,6 +2571,16 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-serde"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
dependencies = [
"serde",
"tracing-core",
]
[[package]] [[package]]
name = "tracing-subscriber" name = "tracing-subscriber"
version = "0.3.22" version = "0.3.22"
@ -2558,6 +2591,8 @@ dependencies = [
"nu-ansi-term", "nu-ansi-term",
"once_cell", "once_cell",
"regex-automata", "regex-automata",
"serde",
"serde_json",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
@ -2565,6 +2600,7 @@ dependencies = [
"tracing", "tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
"tracing-serde",
] ]
[[package]] [[package]]

View File

@ -18,6 +18,7 @@ sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uu
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
serde_with = "3.0" serde_with = "3.0"
async-stream = "0.3"
# Time handling # Time handling
chrono = "0.4" chrono = "0.4"
@ -31,7 +32,7 @@ async-opcua = { version = "0.18", features = ["client"] }
# Logging # Logging
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "json"] }
tracing-appender = "0.2" tracing-appender = "0.2"
# Environment variables # Environment variables

View File

@ -1,3 +1,4 @@
pub mod source; pub mod source;
pub mod point; pub mod point;
pub mod tag; pub mod tag;
pub mod log;

275
src/handler/log.rs Normal file
View File

@ -0,0 +1,275 @@
use std::{
convert::Infallible,
path::{Path, PathBuf},
time::SystemTime,
};
use async_stream::stream;
use axum::{
extract::Query,
response::{
sse::{Event, KeepAlive, Sse},
IntoResponse,
},
Json,
};
use serde::{Deserialize, Serialize};
use tokio::{
fs,
io::{AsyncReadExt, AsyncSeekExt, SeekFrom},
time::{Duration, interval},
};
use crate::util::response::ApiErr;
const LOG_DIR: &str = "./logs";
const DEFAULT_TAIL_LINES: usize = 200;
const MAX_TAIL_LINES: usize = 2000;
const DEFAULT_MAX_BYTES: usize = 64 * 1024;
const STREAM_MAX_BYTES: usize = 32 * 1024;
const MAX_MAX_BYTES: usize = 512 * 1024;
#[derive(Debug, Deserialize)]
pub struct LogQuery {
pub file: Option<String>,
pub cursor: Option<u64>,
pub tail_lines: Option<usize>,
pub max_bytes: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct LogChunkResponse {
pub file: String,
pub cursor: u64,
pub lines: Vec<String>,
pub truncated: bool,
pub reset: bool,
}
pub async fn get_logs(Query(query): Query<LogQuery>) -> Result<impl IntoResponse, ApiErr> {
let path = resolve_log_file(query.file.as_deref()).await?;
let file_name = file_name_of(&path);
let max_bytes = query
.max_bytes
.unwrap_or(DEFAULT_MAX_BYTES)
.clamp(1, MAX_MAX_BYTES);
let response = if let Some(cursor) = query.cursor {
read_since(&path, &file_name, cursor, max_bytes).await?
} else {
let tail_lines = query
.tail_lines
.unwrap_or(DEFAULT_TAIL_LINES)
.clamp(1, MAX_TAIL_LINES);
read_tail(&path, &file_name, tail_lines).await?
};
Ok(Json(response))
}
pub async fn stream_logs(Query(query): Query<LogQuery>) -> Result<impl IntoResponse, ApiErr> {
let path = resolve_log_file(query.file.as_deref()).await?;
let file_name = file_name_of(&path);
let max_bytes = query
.max_bytes
.unwrap_or(STREAM_MAX_BYTES)
.clamp(1, MAX_MAX_BYTES);
let start_cursor = query.cursor.unwrap_or(file_len(&path).await?);
let event_stream = stream! {
let mut ticker = interval(Duration::from_millis(800));
let mut cursor = start_cursor;
loop {
ticker.tick().await;
match read_since(&path, &file_name, cursor, max_bytes).await {
Ok(chunk) => {
cursor = chunk.cursor;
if chunk.reset || !chunk.lines.is_empty() {
match Event::default().event("log").json_data(&chunk) {
Ok(event) => yield Ok::<Event, Infallible>(event),
Err(_) => {
yield Ok::<Event, Infallible>(
Event::default().event("error").data("serialize log event failed")
);
break;
}
}
}
}
Err(_) => {
yield Ok::<Event, Infallible>(
Event::default().event("error").data("log stream read failed")
);
break;
}
}
}
};
Ok(
Sse::new(event_stream)
.keep_alive(KeepAlive::new().interval(Duration::from_secs(10)).text("keepalive")),
)
}
async fn resolve_log_file(file: Option<&str>) -> Result<PathBuf, ApiErr> {
let log_dir = PathBuf::from(LOG_DIR);
if let Some(file_name) = file {
validate_file_name(file_name)?;
let path = log_dir.join(file_name);
ensure_exists(&path).await?;
return Ok(path);
}
latest_log_file(&log_dir).await
}
fn validate_file_name(file_name: &str) -> Result<(), ApiErr> {
if file_name.is_empty() {
return Err(ApiErr::BadRequest("file cannot be empty".to_string(), None));
}
if file_name.contains(['/', '\\']) || file_name.contains("..") {
return Err(ApiErr::BadRequest("invalid log file name".to_string(), None));
}
if !file_name.starts_with("app.log") {
return Err(ApiErr::BadRequest("only app.log* files are allowed".to_string(), None));
}
Ok(())
}
async fn ensure_exists(path: &Path) -> Result<(), ApiErr> {
fs::metadata(path).await.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => ApiErr::NotFound("log file not found".to_string(), None),
_ => ApiErr::Internal("failed to access log file".to_string(), None),
})?;
Ok(())
}
async fn latest_log_file(log_dir: &Path) -> Result<PathBuf, ApiErr> {
let mut entries = fs::read_dir(log_dir).await.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => ApiErr::NotFound("log directory not found".to_string(), None),
_ => ApiErr::Internal("failed to read log directory".to_string(), None),
})?;
let mut latest: Option<(SystemTime, PathBuf)> = None;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|_| ApiErr::Internal("failed to enumerate log files".to_string(), None))?
{
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
if !file_name.starts_with("app.log") {
continue;
}
let metadata = entry
.metadata()
.await
.map_err(|_| ApiErr::Internal("failed to read log metadata".to_string(), None))?;
if !metadata.is_file() {
continue;
}
let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
let path = entry.path();
match latest.as_ref() {
Some((latest_modified, _)) if modified <= *latest_modified => {}
_ => latest = Some((modified, path)),
}
}
latest
.map(|(_, path)| path)
.ok_or_else(|| ApiErr::NotFound("no app.log files found".to_string(), None))
}
async fn read_tail(path: &Path, file_name: &str, tail_lines: usize) -> Result<LogChunkResponse, ApiErr> {
let mut file = fs::File::open(path).await.map_err(map_open_err)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
.await
.map_err(|_| ApiErr::Internal("failed to read log file".to_string(), None))?;
let cursor = buffer.len() as u64;
let text = String::from_utf8_lossy(&buffer);
let mut lines: Vec<String> = text.lines().map(|line| line.to_string()).collect();
let dropped = lines.len().saturating_sub(tail_lines);
if dropped > 0 {
lines = lines.into_iter().skip(dropped).collect();
}
Ok(LogChunkResponse {
file: file_name.to_string(),
cursor,
lines,
truncated: false,
reset: false,
})
}
async fn read_since(
path: &Path,
file_name: &str,
cursor: u64,
max_bytes: usize,
) -> Result<LogChunkResponse, ApiErr> {
let mut file = fs::File::open(path).await.map_err(map_open_err)?;
let metadata = file
.metadata()
.await
.map_err(|_| ApiErr::Internal("failed to read log metadata".to_string(), None))?;
let file_size = metadata.len();
let (start, reset) = if cursor > file_size {
(0, true)
} else {
(cursor, false)
};
let end = (start + max_bytes as u64).min(file_size);
file.seek(SeekFrom::Start(start))
.await
.map_err(|_| ApiErr::Internal("failed to seek log file".to_string(), None))?;
let mut buffer = vec![0u8; (end - start) as usize];
if !buffer.is_empty() {
file.read_exact(&mut buffer)
.await
.map_err(|_| ApiErr::Internal("failed to read log file chunk".to_string(), None))?;
}
let text = String::from_utf8_lossy(&buffer);
let lines = text.lines().map(|line| line.to_string()).collect();
Ok(LogChunkResponse {
file: file_name.to_string(),
cursor: end,
lines,
truncated: end < file_size,
reset,
})
}
async fn file_len(path: &Path) -> Result<u64, ApiErr> {
let metadata = fs::metadata(path).await.map_err(map_open_err)?;
Ok(metadata.len())
}
fn file_name_of(path: &Path) -> String {
path.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default()
.to_string()
}
fn map_open_err(err: std::io::Error) -> ApiErr {
match err.kind() {
std::io::ErrorKind::NotFound => ApiErr::NotFound("log file not found".to_string(), None),
_ => ApiErr::Internal("failed to access log file".to_string(), None),
}
}

View File

@ -128,7 +128,9 @@ fn build_router(state: AppState) -> Router {
.route("/api/point/{point_id}", get(handler::point::get_point).put(handler::point::update_point).delete(handler::point::delete_point)) .route("/api/point/{point_id}", get(handler::point::get_point).put(handler::point::update_point).delete(handler::point::delete_point))
.route("/api/point/batch/set-tags", put(handler::point::batch_set_point_tags)) .route("/api/point/batch/set-tags", put(handler::point::batch_set_point_tags))
.route("/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag)) .route("/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag))
.route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag)); .route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag))
.route("/api/logs", get(handler::log::get_logs))
.route("/api/logs/stream", get(handler::log::stream_logs));
Router::new() Router::new()
.merge(all_route) .merge(all_route)

View File

@ -1,6 +1,6 @@
use std::sync::OnceLock;
use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use tracing_appender::{rolling, non_blocking}; use tracing_appender::{rolling, non_blocking};
use std::sync::OnceLock;
use time::UtcOffset; use time::UtcOffset;
static LOG_GUARD: OnceLock<non_blocking::WorkerGuard> = OnceLock::new(); static LOG_GUARD: OnceLock<non_blocking::WorkerGuard> = OnceLock::new();
@ -11,7 +11,6 @@ pub fn init_logger() {
let file_appender = rolling::daily("./logs", "app.log"); let file_appender = rolling::daily("./logs", "app.log");
let (file_writer, guard) = non_blocking(file_appender); let (file_writer, guard) = non_blocking(file_appender);
LOG_GUARD.set(guard).ok(); LOG_GUARD.set(guard).ok();
let timer = fmt::time::OffsetTime::new( let timer = fmt::time::OffsetTime::new(
UtcOffset::from_hms(8, 0, 0).unwrap(), UtcOffset::from_hms(8, 0, 0).unwrap(),
time::format_description::well_known::Rfc3339, time::format_description::well_known::Rfc3339,
@ -27,10 +26,12 @@ pub fn init_logger() {
) )
.with( .with(
fmt::layer() fmt::layer()
.compact() .json()
.with_timer(timer) .with_timer(timer)
.with_writer(file_writer) .with_writer(file_writer)
.with_ansi(false), .with_ansi(false)
.with_current_span(false)
.with_span_list(false),
) )
.init(); .init();
} }