diff --git a/Cargo.lock b/Cargo.lock index 46fb0a1be..74f08f7ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1995,6 +1995,7 @@ name = "logfwd" version = "0.1.0" dependencies = [ "dhat", + "libc", "logfwd-config", "logfwd-core", "logfwd-output", @@ -2075,6 +2076,7 @@ name = "logfwd-output" version = "0.1.0" dependencies = [ "arrow", + "libc", "logfwd-config", "logfwd-core", "ureq", diff --git a/crates/logfwd-config/src/lib.rs b/crates/logfwd-config/src/lib.rs index 33c032b44..036cb4995 100644 --- a/crates/logfwd-config/src/lib.rs +++ b/crates/logfwd-config/src/lib.rs @@ -85,6 +85,8 @@ pub enum Format { Syslog, Raw, Auto, + /// Human-readable colored console output for debugging/testing. + Console, } // --------------------------------------------------------------------------- diff --git a/crates/logfwd-output/Cargo.toml b/crates/logfwd-output/Cargo.toml index f10577132..a93261609 100644 --- a/crates/logfwd-output/Cargo.toml +++ b/crates/logfwd-output/Cargo.toml @@ -8,4 +8,5 @@ publish = false logfwd-config = { path = "../logfwd-config" } logfwd-core = { path = "../logfwd-core" } arrow = { workspace = true } +libc = "0.2" ureq = { version = "3", default-features = false, features = ["rustls"] } diff --git a/crates/logfwd-output/src/lib.rs b/crates/logfwd-output/src/lib.rs index e713e7731..db2bc23f4 100644 --- a/crates/logfwd-output/src/lib.rs +++ b/crates/logfwd-output/src/lib.rs @@ -189,6 +189,7 @@ pub fn build_output_sink(name: &str, cfg: &OutputConfig) -> Result { let fmt = match cfg.format.as_ref() { Some(Format::Json) => StdoutFormat::Json, + Some(Format::Console) => StdoutFormat::Console, _ => StdoutFormat::Text, }; Ok(Box::new(StdoutSink::new(name.to_string(), fmt))) diff --git a/crates/logfwd-output/src/stdout.rs b/crates/logfwd-output/src/stdout.rs index 6b6ca9471..ef4458ce0 100644 --- a/crates/logfwd-output/src/stdout.rs +++ b/crates/logfwd-output/src/stdout.rs @@ -14,6 +14,8 @@ use super::{BatchMetadata, OutputSink, build_col_infos, write_row_json}; pub enum StdoutFormat { Json, Text, + /// Human-readable colored output for debugging/testing. + Console, } /// Writes log records to stdout, one per line. @@ -21,14 +23,19 @@ pub struct StdoutSink { name: String, format: StdoutFormat, buf: Vec, + color: bool, } impl StdoutSink { pub fn new(name: String, format: StdoutFormat) -> Self { + let color = format == StdoutFormat::Console + && std::env::var_os("NO_COLOR").is_none() + && unsafe { libc::isatty(libc::STDOUT_FILENO) != 0 }; StdoutSink { name, format, buf: Vec::with_capacity(8192), + color, } } @@ -80,9 +87,142 @@ impl StdoutSink { dest.write_all(&self.buf)?; } } + StdoutFormat::Console => { + self.write_console(batch, dest)?; + } } Ok(()) } + + fn write_console(&mut self, batch: &RecordBatch, dest: &mut W) -> io::Result<()> { + let schema = batch.schema(); + let fields = schema.fields(); + + // Find well-known columns by name (with or without type suffix). + let ts_idx = find_col(fields, &["timestamp_str", "timestamp"]); + let level_idx = find_col(fields, &["level_str", "level"]); + let msg_idx = find_col(fields, &["message_str", "message", "msg_str", "msg"]); + + let cols = build_col_infos(batch); + + for row in 0..batch.num_rows() { + self.buf.clear(); + + // Timestamp (dim). + if let Some(idx) = ts_idx { + let arr = batch.column(idx).as_string::(); + if !arr.is_null(row) { + let ts = arr.value(row); + // Show just the time portion if it's a full ISO timestamp. + let short = ts.find('T').map(|i| &ts[i + 1..]).unwrap_or(ts); + if self.color { + self.buf.extend_from_slice(b"\x1b[2m"); + } + self.buf.extend_from_slice(short.as_bytes()); + if self.color { + self.buf.extend_from_slice(b"\x1b[0m"); + } + self.buf.extend_from_slice(b" "); + } + } + + // Level (colored). + if let Some(idx) = level_idx { + let arr = batch.column(idx).as_string::(); + if !arr.is_null(row) { + let level = arr.value(row); + if self.color { + let color = match level { + "ERROR" => "\x1b[1;31m", // bold red + "WARN" => "\x1b[33m", // yellow + "INFO" => "\x1b[32m", // green + "DEBUG" => "\x1b[2m", // dim + _ => "", + }; + self.buf.extend_from_slice(color.as_bytes()); + } + // Pad to 5 chars for alignment. + write!(self.buf, "{:<5}", level)?; + if self.color { + self.buf.extend_from_slice(b"\x1b[0m"); + } + self.buf.extend_from_slice(b" "); + } + } + + // Message. + if let Some(idx) = msg_idx { + let arr = batch.column(idx).as_string::(); + if !arr.is_null(row) { + self.buf.extend_from_slice(arr.value(row).as_bytes()); + } + } + + // Remaining fields as key=value pairs (dim). + let mut has_extra = false; + for col in &cols { + // Skip the well-known columns and _raw. + if Some(col.idx) == ts_idx + || Some(col.idx) == level_idx + || Some(col.idx) == msg_idx + || col.field_name == "_raw" + { + continue; + } + + let arr = batch.column(col.idx); + if arr.is_null(row) { + continue; + } + + if !has_extra { + self.buf.extend_from_slice(b" "); + has_extra = true; + } else { + self.buf.push(b' '); + } + + if self.color { + self.buf.extend_from_slice(b"\x1b[2m"); + } + self.buf.extend_from_slice(col.field_name.as_bytes()); + self.buf.push(b'='); + + match col.type_suffix.as_str() { + "int" => { + let arr = arr.as_primitive::(); + write!(self.buf, "{}", arr.value(row))?; + } + "float" => { + let arr = arr.as_primitive::(); + write!(self.buf, "{}", arr.value(row))?; + } + _ => { + let arr = arr.as_string::(); + self.buf.extend_from_slice(arr.value(row).as_bytes()); + } + } + + if self.color { + self.buf.extend_from_slice(b"\x1b[0m"); + } + } + + self.buf.push(b'\n'); + dest.write_all(&self.buf)?; + } + Ok(()) + } +} + +/// Find a column index by trying multiple name variants. +fn find_col(fields: &arrow::datatypes::Fields, names: &[&str]) -> Option { + for name in names { + if let Some(idx) = fields.iter().position(|f| f.name() == *name) { + return Some(idx); + } + } + None } impl OutputSink for StdoutSink { diff --git a/crates/logfwd/Cargo.toml b/crates/logfwd/Cargo.toml index 2f0ae93a6..4fc5f12b2 100644 --- a/crates/logfwd/Cargo.toml +++ b/crates/logfwd/Cargo.toml @@ -13,6 +13,7 @@ logfwd-core = { path = "../logfwd-core" } logfwd-transform = { path = "../logfwd-transform" } logfwd-output = { path = "../logfwd-output" } dhat = { version = "0.3", optional = true } +libc = "0.2" memchr = "2" opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true } diff --git a/crates/logfwd/src/main.rs b/crates/logfwd/src/main.rs index 02e6a8715..337d96edf 100644 --- a/crates/logfwd/src/main.rs +++ b/crates/logfwd/src/main.rs @@ -11,73 +11,343 @@ use std::time::{Duration, Instant}; use opentelemetry::metrics::MeterProvider; use opentelemetry_otlp::WithExportConfig; -fn main() -> io::Result<()> { +const VERSION: &str = env!("CARGO_PKG_VERSION"); + +// Exit codes. +const EXIT_OK: i32 = 0; +const EXIT_CONFIG: i32 = 1; +const EXIT_RUNTIME: i32 = 2; + +// --------------------------------------------------------------------------- +// Color support (respects NO_COLOR, checks stderr TTY) +// --------------------------------------------------------------------------- + +fn use_color() -> bool { + env::var_os("NO_COLOR").is_none() && unsafe { libc::isatty(libc::STDERR_FILENO) != 0 } +} + +macro_rules! style { + ($color:expr, $bold:expr) => { + if use_color() { + concat!("\x1b[", $bold, ";", $color, "m") + } else { + "" + } + }; +} + +fn green() -> &'static str { + style!("32", "0") +} +fn red() -> &'static str { + style!("31", "1") +} +fn yellow() -> &'static str { + style!("33", "0") +} +fn bold() -> &'static str { + if use_color() { "\x1b[1m" } else { "" } +} +fn dim() -> &'static str { + if use_color() { "\x1b[2m" } else { "" } +} +fn reset() -> &'static str { + if use_color() { "\x1b[0m" } else { "" } +} + +// --------------------------------------------------------------------------- +// Entry point +// --------------------------------------------------------------------------- + +fn main() { #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); + let args: Vec = env::args().collect(); - if args.len() < 2 { - eprintln!("Usage:"); - eprintln!(); - eprintln!(" Run pipeline from YAML config:"); - eprintln!(" logfwd --config [--validate] [--dry-run]"); - eprintln!(); - eprintln!(" Blackhole OTLP collector (for benchmarks):"); - eprintln!(" logfwd --blackhole [bind_addr] (default: 127.0.0.1:4318)"); - eprintln!(); - eprintln!(" Generate synthetic data:"); - eprintln!(" logfwd --generate-json "); - std::process::exit(1); + if args.len() < 2 || args.iter().any(|a| a == "--help" || a == "-h") { + print_usage(); + std::process::exit(EXIT_OK); + } + + if args.iter().any(|a| a == "--version" || a == "-V") { + println!("logfwd {VERSION}"); + std::process::exit(EXIT_OK); + } + + let result = match args[1].as_str() { + "--config" | "-c" => cmd_config(&args), + "--blackhole" => cmd_blackhole(&args), + "--generate-json" => cmd_generate_json(&args), + other => { + eprintln!("{}error{}: unknown command: {other}", red(), reset()); + eprintln!("Run {}logfwd --help{} for usage.", bold(), reset()); + std::process::exit(EXIT_CONFIG); + } + }; + + if let Err(e) = result { + eprintln!("{}error{}: {e}", red(), reset()); + std::process::exit(EXIT_RUNTIME); + } +} + +fn print_usage() { + eprintln!( + "{}logfwd{} {}v{VERSION}{} -- fast log forwarder with SQL transforms", + bold(), + reset(), + dim(), + reset(), + ); + eprintln!(); + eprintln!("{}USAGE:{}", bold(), reset()); + eprintln!(" logfwd --config [--validate] [--dry-run]"); + eprintln!(" logfwd --blackhole [bind_addr]"); + eprintln!(" logfwd --generate-json "); + eprintln!(); + eprintln!("{}OPTIONS:{}", bold(), reset()); + eprintln!(" -c, --config Run pipeline from YAML config"); + eprintln!(" --validate Validate config and exit (alias: --check)"); + eprintln!(" --dry-run Build pipelines without running"); + eprintln!(" --blackhole [addr] Start blackhole sink (default: 127.0.0.1:4318)"); + eprintln!(" --generate-json Generate synthetic JSON log file"); + eprintln!(" -h, --help Show this help"); + eprintln!(" -V, --version Show version"); + eprintln!(); + eprintln!("{}EXIT CODES:{}", bold(), reset()); + eprintln!(" 0 Success"); + eprintln!(" 1 Configuration error"); + eprintln!(" 2 Runtime error"); + eprintln!(); + eprintln!( + "{}Respects NO_COLOR (https://no-color.org){}", + dim(), + reset(), + ); +} + +// --------------------------------------------------------------------------- +// Commands +// --------------------------------------------------------------------------- + +fn cmd_config(args: &[String]) -> io::Result<()> { + if args.len() < 3 { + eprintln!("{}error{}: --config requires a path", red(), reset(),); + eprintln!(" logfwd --config [--validate] [--dry-run]"); + std::process::exit(EXIT_CONFIG); } - if args[1] == "--generate-json" { - if args.len() < 4 { - eprintln!("Usage: logfwd --generate-json "); - std::process::exit(1); + let config_path = &args[2]; + let mut validate_only = false; + let mut dry_run = false; + + // Parse flags after the config path — reject unknown flags. + for arg in &args[3..] { + match arg.as_str() { + "--validate" | "--check" => validate_only = true, + "--dry-run" => dry_run = true, + other => { + eprintln!("{}error{}: unknown flag: {other}", red(), reset()); + eprintln!(" logfwd --config [--validate] [--dry-run]"); + std::process::exit(EXIT_CONFIG); + } } - let num_lines: usize = args[2].parse().expect("invalid num_lines"); - let output = &args[3]; - return generate_json_log_file(num_lines, output); } - if args[1] == "--blackhole" { - let addr = args.get(2).map(|s| s.as_str()).unwrap_or("127.0.0.1:4318"); - return run_blackhole(addr); + let config = match logfwd_config::Config::load(config_path) { + Ok(c) => c, + Err(e) => { + eprintln!("{}error{}: {e}", red(), reset()); + std::process::exit(EXIT_CONFIG); + } + }; + + if validate_only || dry_run { + // Both --validate and --dry-run build pipelines to catch SQL/wiring errors. + return validate_pipelines(&config, dry_run); + } + + // Startup summary. + eprintln!("{}logfwd{} {}v{VERSION}{}", bold(), reset(), dim(), reset(),); + for (name, pipe_cfg) in &config.pipelines { + let n_in = pipe_cfg.inputs.len(); + let n_out = pipe_cfg.outputs.len(); + let sql = pipe_cfg + .transform + .as_deref() + .unwrap_or("SELECT * FROM logs"); + eprintln!( + " {}pipeline{} {}{name}{}: {n_in} input(s) {dim}-> {sql} ->{r} {n_out} output(s)", + dim(), + reset(), + bold(), + reset(), + dim = dim(), + r = reset(), + ); } - if args[1] == "--config" { - if args.len() < 3 { - eprintln!("Usage: logfwd --config [--validate] [--dry-run]"); - std::process::exit(1); + run_pipelines(config) +} + +fn cmd_blackhole(args: &[String]) -> io::Result<()> { + let addr = args.get(2).map(|s| s.as_str()).unwrap_or("127.0.0.1:4318"); + run_blackhole(addr) +} + +fn cmd_generate_json(args: &[String]) -> io::Result<()> { + if args.len() < 4 { + eprintln!( + "{}error{}: --generate-json requires ", + red(), + reset(), + ); + std::process::exit(EXIT_CONFIG); + } + let num_lines: usize = match args[2].parse() { + Ok(n) => n, + Err(e) => { + eprintln!( + "{}error{}: invalid num_lines '{}': {e}", + red(), + reset(), + args[2] + ); + std::process::exit(EXIT_CONFIG); } - let config_path = &args[2]; - let validate_only = args.iter().any(|a| a == "--validate"); - let dry_run = args.iter().any(|a| a == "--dry-run"); + }; + generate_json_log_file(num_lines, &args[3]) +} + +// --------------------------------------------------------------------------- +// Pipeline runner +// --------------------------------------------------------------------------- - let config = logfwd_config::Config::load(config_path) - .map_err(|e| io::Error::other(e.to_string()))?; +/// Validate config by building all pipelines. Used by --validate and --dry-run. +fn validate_pipelines(config: &logfwd_config::Config, dry_run: bool) -> io::Result<()> { + use logfwd::pipeline::Pipeline; + + // Build a no-op meter for validation (no OTel export needed). + let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder().build(); + let meter = meter_provider.meter("logfwd"); - if validate_only { - eprintln!("Config OK: {} pipeline(s)", config.pipelines.len()); - return Ok(()); + let mut errors = 0; + for (name, pipe_cfg) in &config.pipelines { + match Pipeline::from_config(name, pipe_cfg, &meter) { + Ok(_) => { + eprintln!(" {}ready{}: {}{name}{}", green(), reset(), bold(), reset()); + } + Err(e) => { + eprintln!(" {}error{}: pipeline '{name}': {e}", red(), reset()); + errors += 1; + } } + } - return run_pipelines(config, dry_run); + if errors > 0 { + eprintln!("\n{}validation failed{}: {errors} error(s)", red(), reset(),); + std::process::exit(EXIT_CONFIG); } - eprintln!("Unknown command: {}", args[1]); - eprintln!("Run 'logfwd' with no arguments for usage."); - std::process::exit(1); + let label = if dry_run { "dry run ok" } else { "config ok" }; + eprintln!( + "{}{label}{}: {} pipeline(s)", + green(), + reset(), + config.pipelines.len(), + ); + Ok(()) } +fn run_pipelines(config: logfwd_config::Config) -> io::Result<()> { + use logfwd::pipeline::Pipeline; + use logfwd_core::diagnostics::DiagnosticsServer; + let shutdown = Arc::new(AtomicBool::new(false)); + + let meter_provider = build_meter_provider(&config)?; + let meter = meter_provider.meter("logfwd"); + + let mut pipelines = Vec::new(); + for (name, pipe_cfg) in &config.pipelines { + match Pipeline::from_config(name, pipe_cfg, &meter) { + Ok(pipeline) => { + eprintln!(" {}ready{}: {}{name}{}", green(), reset(), bold(), reset()); + pipelines.push(pipeline); + } + Err(e) => { + eprintln!(" {}error{}: pipeline '{name}': {e}", red(), reset(),); + std::process::exit(EXIT_CONFIG); + } + } + } + + let _diag_handle = if let Some(ref addr) = config.server.diagnostics { + let mut server = DiagnosticsServer::new(addr); + for p in &pipelines { + server.add_pipeline(Arc::clone(p.metrics())); + } + eprintln!(" {}diagnostics{}: http://{addr}", dim(), reset()); + Some(server.start()) + } else { + None + }; + + eprintln!( + "{}logfwd running{} ({} pipeline(s))", + green(), + reset(), + pipelines.len(), + ); + + let mut handles = Vec::new(); + let main_pipeline = pipelines.pop(); + + for mut pipeline in pipelines { + let sd = shutdown.clone(); + handles.push(std::thread::spawn(move || pipeline.run(&sd))); + } + + if let Some(mut main_pipe) = main_pipeline { + main_pipe.run(&shutdown)?; + } + + for h in handles { + let _ = h.join(); + } + + if let Err(e) = meter_provider.shutdown() { + eprintln!( + "{}warning{}: meter provider shutdown: {e}", + yellow(), + reset() + ); + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Blackhole sink +// --------------------------------------------------------------------------- + fn run_blackhole(addr: &str) -> io::Result<()> { use std::sync::atomic::{AtomicU64, Ordering}; - eprintln!("logfwd blackhole collector listening on {addr}"); - eprintln!(" Accepts any POST, returns 200, counts bytes/lines."); - eprintln!(" Requests to /_bulk get an ES-compatible response."); - eprintln!(" GET /stats returns JSON counters."); - eprintln!(" Ctrl-C to stop."); + eprintln!( + "{}logfwd blackhole{} listening on {}{}{}", + bold(), + reset(), + bold(), + addr, + reset(), + ); + eprintln!( + " {}POST any path -> 200 OK (counts bytes/lines){}", + dim(), + reset(), + ); + eprintln!(" {}GET /stats -> JSON counters{}", dim(), reset()); let server = tiny_http::Server::http(addr).map_err(|e| io::Error::other(e.to_string()))?; @@ -86,7 +356,6 @@ fn run_blackhole(addr: &str) -> io::Result<()> { let total_lines = Arc::new(AtomicU64::new(0)); let start = Instant::now(); - // Stats reporter thread let reqs_clone = Arc::clone(&total_requests); let bytes_clone = Arc::clone(&total_bytes); let lines_clone = Arc::clone(&total_lines); @@ -116,15 +385,12 @@ fn run_blackhole(addr: &str) -> io::Result<()> { } }); - // ES bulk response: minimal valid response that says "everything succeeded". let es_bulk_response = r#"{"took":0,"errors":false,"items":[]}"#; - let stats_reqs = Arc::clone(&total_requests); let stats_bytes = Arc::clone(&total_bytes); let stats_lines = Arc::clone(&total_lines); for mut request in server.incoming_requests() { - // GET /stats — return JSON counters for programmatic polling if request.method() == &tiny_http::Method::Get && request.url() == "/stats" { let body = format!( r#"{{"requests":{},"lines":{},"bytes":{}}}"#, @@ -177,73 +443,10 @@ fn run_blackhole(addr: &str) -> io::Result<()> { Ok(()) } -fn run_pipelines(config: logfwd_config::Config, dry_run: bool) -> io::Result<()> { - use logfwd::pipeline::Pipeline; - use logfwd_core::diagnostics::DiagnosticsServer; - let shutdown = Arc::new(AtomicBool::new(false)); - - // Initialize OTel metrics. - let meter_provider = build_meter_provider(&config)?; - let meter = meter_provider.meter("logfwd"); - - // Build pipelines. - let mut pipelines = Vec::new(); - for (name, pipe_cfg) in &config.pipelines { - let pipeline = Pipeline::from_config(name, pipe_cfg, &meter).map_err(io::Error::other)?; - eprintln!(" pipeline '{}' ready", name); - pipelines.push(pipeline); - } +// --------------------------------------------------------------------------- +// OTel metrics +// --------------------------------------------------------------------------- - if dry_run { - eprintln!( - "Dry run: {} pipeline(s) constructed successfully", - pipelines.len() - ); - return Ok(()); - } - - // Start diagnostics server if configured. - let _diag_handle = if let Some(ref addr) = config.server.diagnostics { - let mut server = DiagnosticsServer::new(addr); - for p in &pipelines { - server.add_pipeline(Arc::clone(p.metrics())); - } - eprintln!(" diagnostics: http://{addr}"); - Some(server.start()) - } else { - None - }; - - eprintln!("logfwd starting ({} pipeline(s))", pipelines.len()); - - // Run each pipeline on its own thread. - let mut handles = Vec::new(); - let main_pipeline = pipelines.pop(); - - for mut pipeline in pipelines { - let sd = shutdown.clone(); - handles.push(std::thread::spawn(move || pipeline.run(&sd))); - } - - if let Some(mut main_pipe) = main_pipeline { - main_pipe.run(&shutdown)?; - } - - for h in handles { - let _ = h.join(); - } - - // Flush any pending OTLP exports. - if let Err(e) = meter_provider.shutdown() { - eprintln!("warning: meter provider shutdown: {e}"); - } - - Ok(()) -} - -/// Build an OTel MeterProvider. If `metrics_endpoint` is configured, adds a -/// PeriodicReader that pushes metrics via OTLP HTTP. Otherwise returns a -/// no-op provider (OTel counters still work, just nobody reads them). fn build_meter_provider( config: &logfwd_config::Config, ) -> io::Result { @@ -252,7 +455,6 @@ fn build_meter_provider( if let Some(ref endpoint) = config.server.metrics_endpoint { let interval_secs = config.server.metrics_interval_secs.unwrap_or(60); - // Create a tokio runtime for the periodic OTLP export. let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(1) .enable_time() @@ -260,7 +462,6 @@ fn build_meter_provider( .build() .map_err(|e| io::Error::other(format!("tokio runtime: {e}")))?; - // Build OTLP exporter + periodic reader inside the runtime context. let _guard = rt.enter(); let otlp_exporter = opentelemetry_otlp::MetricExporter::builder() @@ -273,24 +474,35 @@ fn build_meter_provider( .with_interval(std::time::Duration::from_secs(interval_secs)) .build(); - eprintln!(" metrics OTLP push: {endpoint} (every {interval_secs}s)"); + eprintln!( + " {}metrics push{}: {endpoint} (every {interval_secs}s)", + dim(), + reset(), + ); - // Leak the runtime so it lives for the process lifetime. - // The single worker thread is lightweight and we need it alive - // for the PeriodicReader's background export task. std::mem::forget(rt); Ok(SdkMeterProvider::builder().with_reader(reader).build()) } else { - // No OTLP endpoint — OTel counters still work (dual-write to atomics). Ok(SdkMeterProvider::builder().build()) } } +// --------------------------------------------------------------------------- +// Data generation +// --------------------------------------------------------------------------- + fn generate_json_log_file(num_lines: usize, output: &str) -> io::Result<()> { use std::io::BufWriter; - eprintln!("Generating {num_lines} JSON log lines to {output}..."); + eprintln!( + "Generating {}{num_lines}{} JSON log lines to {}{output}{}...", + bold(), + reset(), + bold(), + reset(), + ); + let file = std::fs::File::create(output)?; let mut writer = BufWriter::with_capacity(1024 * 1024, file); @@ -326,9 +538,11 @@ fn generate_json_log_file(num_lines: usize, output: &str) -> io::Result<()> { writer.flush()?; let size = std::fs::metadata(output)?.len(); eprintln!( - "Done: {:.1} MB, avg {:.0} bytes/line", + "{}done{}: {:.1} MB, avg {:.0} bytes/line", + green(), + reset(), size as f64 / (1024.0 * 1024.0), - size as f64 / num_lines as f64 + size as f64 / num_lines as f64, ); Ok(()) }