diff --git a/crates/logfwd-core/src/format.rs b/crates/logfwd-core/src/format.rs index dcf11d251..3c97bd99b 100644 --- a/crates/logfwd-core/src/format.rs +++ b/crates/logfwd-core/src/format.rs @@ -108,8 +108,13 @@ impl FormatParser for RawParser { match b { b'"' => out.extend_from_slice(b"\\\""), b'\\' => out.extend_from_slice(b"\\\\"), + b'\n' => out.extend_from_slice(b"\\n"), b'\r' => out.extend_from_slice(b"\\r"), b'\t' => out.extend_from_slice(b"\\t"), + b if b < 0x20 => { + // Escape control characters per RFC 8259. + let _ = std::io::Write::write_fmt(out, format_args!("\\u{:04x}", b)); + } _ => out.push(b), } } diff --git a/crates/logfwd-core/src/streaming_builder.rs b/crates/logfwd-core/src/streaming_builder.rs index c64c940f8..794748ea2 100644 --- a/crates/logfwd-core/src/streaming_builder.rs +++ b/crates/logfwd-core/src/streaming_builder.rs @@ -173,8 +173,8 @@ impl StreamingBuilder { } self.written_bits |= bit; let fc = &mut self.fields[idx]; - fc.has_int = true; if let Some(v) = parse_int_fast(value) { + fc.has_int = true; fc.int_values.push((self.row_count, v)); } } @@ -187,8 +187,8 @@ impl StreamingBuilder { } self.written_bits |= bit; let fc = &mut self.fields[idx]; - fc.has_float = true; if let Some(v) = parse_float_fast(value) { + fc.has_float = true; fc.float_values.push((self.row_count, v)); } } diff --git a/crates/logfwd-output/src/fanout.rs b/crates/logfwd-output/src/fanout.rs index 24970e861..a74634da0 100644 --- a/crates/logfwd-output/src/fanout.rs +++ b/crates/logfwd-output/src/fanout.rs @@ -21,17 +21,35 @@ impl FanOut { impl OutputSink for FanOut { fn send_batch(&mut self, batch: &RecordBatch, meta: &BatchMetadata) -> io::Result<()> { + // Try ALL sinks before returning an error. Don't short-circuit. + let mut first_err: Option = None; for sink in &mut self.sinks { - sink.send_batch(batch, meta)?; + if let Err(e) = sink.send_batch(batch, meta) { + eprintln!("fanout: sink '{}' failed: {e}", sink.name()); + if first_err.is_none() { + first_err = Some(e); + } + } + } + match first_err { + Some(e) => Err(e), + None => Ok(()), } - Ok(()) } fn flush(&mut self) -> io::Result<()> { + let mut first_err: Option = None; for sink in &mut self.sinks { - sink.flush()?; + if let Err(e) = sink.flush() + && first_err.is_none() + { + first_err = Some(e); + } + } + match first_err { + Some(e) => Err(e), + None => Ok(()), } - Ok(()) } fn name(&self) -> &str { diff --git a/crates/logfwd-output/src/json_lines.rs b/crates/logfwd-output/src/json_lines.rs index 1b009529b..cada27198 100644 --- a/crates/logfwd-output/src/json_lines.rs +++ b/crates/logfwd-output/src/json_lines.rs @@ -15,15 +15,21 @@ pub struct JsonLinesSink { url: String, headers: Vec<(String, String)>, pub batch_buf: Vec, + http_agent: ureq::Agent, } impl JsonLinesSink { pub fn new(name: String, url: String, headers: Vec<(String, String)>) -> Self { + let http_agent = ureq::config::Config::builder() + .timeout_global(Some(std::time::Duration::from_secs(30))) + .build() + .new_agent(); JsonLinesSink { name, url, headers, batch_buf: Vec::with_capacity(64 * 1024), + http_agent, } } @@ -87,7 +93,7 @@ impl OutputSink for JsonLinesSink { return Ok(()); } - let mut req = ureq::post(&self.url); + let mut req = self.http_agent.post(&self.url); for (k, v) in &self.headers { req = req.header(k.as_str(), v.as_str()); } diff --git a/crates/logfwd-output/src/otlp_sink.rs b/crates/logfwd-output/src/otlp_sink.rs index 0b15517ff..b00039a84 100644 --- a/crates/logfwd-output/src/otlp_sink.rs +++ b/crates/logfwd-output/src/otlp_sink.rs @@ -32,6 +32,7 @@ pub struct OtlpSink { pub encoder_buf: Vec, compress_buf: Vec, compressor: Option, + http_agent: ureq::Agent, } impl OtlpSink { @@ -45,6 +46,10 @@ impl OtlpSink { Compression::Zstd => Some(ChunkCompressor::new(1)), _ => None, }; + let http_agent = ureq::config::Config::builder() + .timeout_global(Some(std::time::Duration::from_secs(30))) + .build() + .new_agent(); OtlpSink { name, endpoint, @@ -53,6 +58,7 @@ impl OtlpSink { encoder_buf: Vec::with_capacity(64 * 1024), compress_buf: Vec::with_capacity(64 * 1024), compressor, + http_agent, } } @@ -155,7 +161,7 @@ impl OutputSink for OtlpSink { OtlpProtocol::Http => "application/x-protobuf", }; - let mut req = ureq::post(&self.endpoint); + let mut req = self.http_agent.post(&self.endpoint); req = req.header("Content-Type", content_type); if self.compression == Compression::Zstd { req = req.header("Content-Encoding", "zstd"); diff --git a/crates/logfwd-transform/src/lib.rs b/crates/logfwd-transform/src/lib.rs index 3c4d366d1..bb6264dca 100644 --- a/crates/logfwd-transform/src/lib.rs +++ b/crates/logfwd-transform/src/lib.rs @@ -528,22 +528,21 @@ impl SqlTransform { } /// Add an enrichment table that will be registered in each DataFusion - /// session alongside the `logs` table. Panics if a table with the same - /// name is already registered or if the name conflicts with "logs". + /// session alongside the `logs` table. Returns an error if a table with + /// the same name is already registered or if the name conflicts with "logs". pub fn add_enrichment_table( &mut self, table: Arc, - ) { + ) -> Result<(), String> { let name = table.name(); - assert!( - name != "logs", - "enrichment table cannot be named 'logs' (reserved)" - ); - assert!( - !self.enrichment_tables.iter().any(|t| t.name() == name), - "duplicate enrichment table name: '{name}'" - ); + if name == "logs" { + return Err("enrichment table cannot be named 'logs' (reserved)".to_string()); + } + if self.enrichment_tables.iter().any(|t| t.name() == name) { + return Err(format!("duplicate enrichment table name: '{name}'")); + } self.enrichment_tables.push(table); + Ok(()) } /// Execute the SQL transform on a RecordBatch.