Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/logfwd-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub enum Format {
Syslog,
Raw,
Auto,
/// Human-readable colored console output for debugging/testing.
Console,
}

// ---------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions crates/logfwd-output/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ publish = false
logfwd-config = { path = "../logfwd-config" }
logfwd-core = { path = "../logfwd-core" }
arrow = { workspace = true }
libc = "0.2"
ureq = { version = "3", default-features = false, features = ["rustls"] }
1 change: 1 addition & 0 deletions crates/logfwd-output/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ pub fn build_output_sink(name: &str, cfg: &OutputConfig) -> Result<Box<dyn Outpu
OutputType::Stdout => {
let fmt = match cfg.format.as_ref() {
Some(Format::Json) => StdoutFormat::Json,
Some(Format::Console) => StdoutFormat::Console,
_ => StdoutFormat::Text,
};
Ok(Box::new(StdoutSink::new(name.to_string(), fmt)))
Expand Down
140 changes: 140 additions & 0 deletions crates/logfwd-output/src/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,28 @@ use super::{BatchMetadata, OutputSink, build_col_infos, write_row_json};
pub enum StdoutFormat {
Json,
Text,
/// Human-readable colored output for debugging/testing.
Console,
}

/// Writes log records to stdout, one per line.
pub struct StdoutSink {
name: String,
format: StdoutFormat,
buf: Vec<u8>,
color: bool,
}

impl StdoutSink {
pub fn new(name: String, format: StdoutFormat) -> Self {
let color = format == StdoutFormat::Console
&& std::env::var_os("NO_COLOR").is_none()
&& unsafe { libc::isatty(libc::STDOUT_FILENO) != 0 };
StdoutSink {
name,
format,
buf: Vec::with_capacity(8192),
color,
}
}

Expand Down Expand Up @@ -80,9 +87,142 @@ impl StdoutSink {
dest.write_all(&self.buf)?;
}
}
StdoutFormat::Console => {
self.write_console(batch, dest)?;
}
}
Ok(())
}

fn write_console<W: Write>(&mut self, batch: &RecordBatch, dest: &mut W) -> io::Result<()> {
let schema = batch.schema();
let fields = schema.fields();

// Find well-known columns by name (with or without type suffix).
let ts_idx = find_col(fields, &["timestamp_str", "timestamp"]);
let level_idx = find_col(fields, &["level_str", "level"]);
let msg_idx = find_col(fields, &["message_str", "message", "msg_str", "msg"]);

let cols = build_col_infos(batch);

for row in 0..batch.num_rows() {
self.buf.clear();

// Timestamp (dim).
if let Some(idx) = ts_idx {
let arr = batch.column(idx).as_string::<i32>();
if !arr.is_null(row) {
let ts = arr.value(row);
// Show just the time portion if it's a full ISO timestamp.
let short = ts.find('T').map(|i| &ts[i + 1..]).unwrap_or(ts);
if self.color {
self.buf.extend_from_slice(b"\x1b[2m");
}
self.buf.extend_from_slice(short.as_bytes());
if self.color {
self.buf.extend_from_slice(b"\x1b[0m");
}
self.buf.extend_from_slice(b" ");
}
}

// Level (colored).
if let Some(idx) = level_idx {
let arr = batch.column(idx).as_string::<i32>();
if !arr.is_null(row) {
let level = arr.value(row);
if self.color {
let color = match level {
"ERROR" => "\x1b[1;31m", // bold red
"WARN" => "\x1b[33m", // yellow
"INFO" => "\x1b[32m", // green
"DEBUG" => "\x1b[2m", // dim
_ => "",
};
self.buf.extend_from_slice(color.as_bytes());
}
// Pad to 5 chars for alignment.
write!(self.buf, "{:<5}", level)?;
if self.color {
self.buf.extend_from_slice(b"\x1b[0m");
}
self.buf.extend_from_slice(b" ");
}
}

// Message.
if let Some(idx) = msg_idx {
let arr = batch.column(idx).as_string::<i32>();
if !arr.is_null(row) {
self.buf.extend_from_slice(arr.value(row).as_bytes());
}
}

// Remaining fields as key=value pairs (dim).
let mut has_extra = false;
for col in &cols {
// Skip the well-known columns and _raw.
if Some(col.idx) == ts_idx
|| Some(col.idx) == level_idx
|| Some(col.idx) == msg_idx
|| col.field_name == "_raw"
{
continue;
}

let arr = batch.column(col.idx);
if arr.is_null(row) {
continue;
}

if !has_extra {
self.buf.extend_from_slice(b" ");
has_extra = true;
} else {
self.buf.push(b' ');
}

if self.color {
self.buf.extend_from_slice(b"\x1b[2m");
}
self.buf.extend_from_slice(col.field_name.as_bytes());
self.buf.push(b'=');

match col.type_suffix.as_str() {
"int" => {
let arr = arr.as_primitive::<arrow::datatypes::Int64Type>();
write!(self.buf, "{}", arr.value(row))?;
}
"float" => {
let arr = arr.as_primitive::<arrow::datatypes::Float64Type>();
write!(self.buf, "{}", arr.value(row))?;
}
_ => {
let arr = arr.as_string::<i32>();
self.buf.extend_from_slice(arr.value(row).as_bytes());
}
}

if self.color {
self.buf.extend_from_slice(b"\x1b[0m");
}
}

self.buf.push(b'\n');
dest.write_all(&self.buf)?;
}
Ok(())
}
}

/// Find a column index by trying multiple name variants.
fn find_col(fields: &arrow::datatypes::Fields, names: &[&str]) -> Option<usize> {
for name in names {
if let Some(idx) = fields.iter().position(|f| f.name() == *name) {
return Some(idx);
}
}
None
}

impl OutputSink for StdoutSink {
Expand Down
1 change: 1 addition & 0 deletions crates/logfwd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ logfwd-core = { path = "../logfwd-core" }
logfwd-transform = { path = "../logfwd-transform" }
logfwd-output = { path = "../logfwd-output" }
dhat = { version = "0.3", optional = true }
libc = "0.2"
memchr = "2"
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
Expand Down
Loading
Loading