plc_control/src/handler/log.rs

355 lines
11 KiB
Rust

use std::{
convert::Infallible,
path::{Path, PathBuf},
time::SystemTime,
};
use async_stream::stream;
use axum::{
extract::Query,
response::{
sse::{Event, KeepAlive, Sse},
IntoResponse,
},
Json,
};
use serde::{Deserialize, Serialize};
use tokio::{
fs,
io::{AsyncReadExt, AsyncSeekExt, SeekFrom},
time::{Duration, interval},
};
use crate::util::response::ApiErr;
const LOG_DIR: &str = "./logs";
const DEFAULT_TAIL_LINES: usize = 200;
const MAX_TAIL_LINES: usize = 2000;
const DEFAULT_MAX_BYTES: usize = 64 * 1024;
const STREAM_MAX_BYTES: usize = 32 * 1024;
const MAX_MAX_BYTES: usize = 512 * 1024;
#[derive(Debug, Deserialize)]
pub struct LogQuery {
pub file: Option<String>,
pub cursor: Option<u64>,
pub tail_lines: Option<usize>,
pub max_bytes: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct LogChunkResponse {
pub file: String,
pub cursor: u64,
pub lines: Vec<String>,
pub truncated: bool,
pub reset: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct StreamFileState {
path: PathBuf,
file_name: String,
cursor: u64,
}
pub async fn get_logs(Query(query): Query<LogQuery>) -> Result<impl IntoResponse, ApiErr> {
let path = resolve_log_file(query.file.as_deref()).await?;
let file_name = file_name_of(&path);
let max_bytes = query
.max_bytes
.unwrap_or(DEFAULT_MAX_BYTES)
.clamp(1, MAX_MAX_BYTES);
let response = if let Some(cursor) = query.cursor {
read_since(&path, &file_name, cursor, max_bytes).await?
} else {
let tail_lines = query
.tail_lines
.unwrap_or(DEFAULT_TAIL_LINES)
.clamp(1, MAX_TAIL_LINES);
read_tail(&path, &file_name, tail_lines).await?
};
Ok(Json(response))
}
pub async fn stream_logs(Query(query): Query<LogQuery>) -> Result<impl IntoResponse, ApiErr> {
let path = resolve_log_file(query.file.as_deref()).await?;
let file_name = file_name_of(&path);
let max_bytes = query
.max_bytes
.unwrap_or(STREAM_MAX_BYTES)
.clamp(1, MAX_MAX_BYTES);
let follow_latest = query.file.is_none();
let start_cursor = query.cursor.unwrap_or(file_len(&path).await?);
let event_stream = stream! {
let mut ticker = interval(Duration::from_millis(800));
let mut stream_file = StreamFileState {
path,
file_name,
cursor: start_cursor,
};
loop {
ticker.tick().await;
let switched = if follow_latest {
match latest_log_file(Path::new(LOG_DIR)).await {
Ok(latest_path) => {
let latest = StreamFileState {
file_name: file_name_of(&latest_path),
path: latest_path,
cursor: 0,
};
let (next, switched) = advance_stream_file(&stream_file, &latest);
stream_file = next;
switched
}
Err(_) => false,
}
} else {
false
};
match read_since(&stream_file.path, &stream_file.file_name, stream_file.cursor, max_bytes).await {
Ok(chunk) => {
stream_file.cursor = chunk.cursor;
let chunk = LogChunkResponse {
reset: chunk.reset || switched,
..chunk
};
if chunk.reset || !chunk.lines.is_empty() {
match Event::default().event("log").json_data(&chunk) {
Ok(event) => yield Ok::<Event, Infallible>(event),
Err(_) => {
yield Ok::<Event, Infallible>(
Event::default().event("error").data("serialize log event failed")
);
break;
}
}
}
}
Err(_) => {
yield Ok::<Event, Infallible>(
Event::default().event("error").data("log stream read failed")
);
break;
}
}
}
};
Ok(
Sse::new(event_stream)
.keep_alive(KeepAlive::new().interval(Duration::from_secs(10)).text("keepalive")),
)
}
async fn resolve_log_file(file: Option<&str>) -> Result<PathBuf, ApiErr> {
let log_dir = PathBuf::from(LOG_DIR);
if let Some(file_name) = file {
validate_file_name(file_name)?;
let path = log_dir.join(file_name);
ensure_exists(&path).await?;
return Ok(path);
}
latest_log_file(&log_dir).await
}
fn validate_file_name(file_name: &str) -> Result<(), ApiErr> {
if file_name.is_empty() {
return Err(ApiErr::BadRequest("file cannot be empty".to_string(), None));
}
if file_name.contains(['/', '\\']) || file_name.contains("..") {
return Err(ApiErr::BadRequest("invalid log file name".to_string(), None));
}
if !file_name.starts_with("app.log") {
return Err(ApiErr::BadRequest("only app.log* files are allowed".to_string(), None));
}
Ok(())
}
async fn ensure_exists(path: &Path) -> Result<(), ApiErr> {
fs::metadata(path).await.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => ApiErr::NotFound("log file not found".to_string(), None),
_ => ApiErr::Internal("failed to access log file".to_string(), None),
})?;
Ok(())
}
async fn latest_log_file(log_dir: &Path) -> Result<PathBuf, ApiErr> {
let mut entries = fs::read_dir(log_dir).await.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => ApiErr::NotFound("log directory not found".to_string(), None),
_ => ApiErr::Internal("failed to read log directory".to_string(), None),
})?;
let mut latest: Option<(SystemTime, PathBuf)> = None;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|_| ApiErr::Internal("failed to enumerate log files".to_string(), None))?
{
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
if !file_name.starts_with("app.log") {
continue;
}
let metadata = entry
.metadata()
.await
.map_err(|_| ApiErr::Internal("failed to read log metadata".to_string(), None))?;
if !metadata.is_file() {
continue;
}
let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
let path = entry.path();
match latest.as_ref() {
Some((latest_modified, _)) if modified <= *latest_modified => {}
_ => latest = Some((modified, path)),
}
}
latest
.map(|(_, path)| path)
.ok_or_else(|| ApiErr::NotFound("no app.log files found".to_string(), None))
}
async fn read_tail(path: &Path, file_name: &str, tail_lines: usize) -> Result<LogChunkResponse, ApiErr> {
let mut file = fs::File::open(path).await.map_err(map_open_err)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
.await
.map_err(|_| ApiErr::Internal("failed to read log file".to_string(), None))?;
let cursor = buffer.len() as u64;
let text = String::from_utf8_lossy(&buffer);
let mut lines: Vec<String> = text.lines().map(|line| line.to_string()).collect();
let dropped = lines.len().saturating_sub(tail_lines);
if dropped > 0 {
lines = lines.into_iter().skip(dropped).collect();
}
Ok(LogChunkResponse {
file: file_name.to_string(),
cursor,
lines,
truncated: false,
reset: false,
})
}
async fn read_since(
path: &Path,
file_name: &str,
cursor: u64,
max_bytes: usize,
) -> Result<LogChunkResponse, ApiErr> {
let mut file = fs::File::open(path).await.map_err(map_open_err)?;
let metadata = file
.metadata()
.await
.map_err(|_| ApiErr::Internal("failed to read log metadata".to_string(), None))?;
let file_size = metadata.len();
let (start, reset) = if cursor > file_size {
(0, true)
} else {
(cursor, false)
};
let end = (start + max_bytes as u64).min(file_size);
file.seek(SeekFrom::Start(start))
.await
.map_err(|_| ApiErr::Internal("failed to seek log file".to_string(), None))?;
let mut buffer = vec![0u8; (end - start) as usize];
if !buffer.is_empty() {
file.read_exact(&mut buffer)
.await
.map_err(|_| ApiErr::Internal("failed to read log file chunk".to_string(), None))?;
}
let text = String::from_utf8_lossy(&buffer);
let lines = text.lines().map(|line| line.to_string()).collect();
Ok(LogChunkResponse {
file: file_name.to_string(),
cursor: end,
lines,
truncated: end < file_size,
reset,
})
}
async fn file_len(path: &Path) -> Result<u64, ApiErr> {
let metadata = fs::metadata(path).await.map_err(map_open_err)?;
Ok(metadata.len())
}
fn file_name_of(path: &Path) -> String {
path.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default()
.to_string()
}
fn advance_stream_file(
current: &StreamFileState,
latest: &StreamFileState,
) -> (StreamFileState, bool) {
if current.path == latest.path {
return (current.clone(), false);
}
(
StreamFileState {
path: latest.path.clone(),
file_name: latest.file_name.clone(),
cursor: 0,
},
true,
)
}
fn map_open_err(err: std::io::Error) -> ApiErr {
match err.kind() {
std::io::ErrorKind::NotFound => ApiErr::NotFound("log file not found".to_string(), None),
_ => ApiErr::Internal("failed to access log file".to_string(), None),
}
}
#[cfg(test)]
mod tests {
use super::{advance_stream_file, StreamFileState};
use std::path::PathBuf;
#[test]
fn advance_stream_file_switches_to_latest_file_and_resets_cursor() {
let current = StreamFileState {
path: PathBuf::from("logs/app.log"),
file_name: "app.log".to_string(),
cursor: 128,
};
let latest = StreamFileState {
path: PathBuf::from("logs/app.log.1"),
file_name: "app.log.1".to_string(),
cursor: 42,
};
let (next, switched) = advance_stream_file(&current, &latest);
assert!(switched);
assert_eq!(next.path, latest.path);
assert_eq!(next.file_name, latest.file_name);
assert_eq!(next.cursor, 0);
}
}