Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/logfwd-core/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,13 @@ impl FormatParser for RawParser {
match b {
b'"' => out.extend_from_slice(b"\\\""),
b'\\' => out.extend_from_slice(b"\\\\"),
b'\n' => out.extend_from_slice(b"\\n"),
b'\r' => out.extend_from_slice(b"\\r"),
b'\t' => out.extend_from_slice(b"\\t"),
b if b < 0x20 => {
// Escape control characters per RFC 8259.
let _ = std::io::Write::write_fmt(out, format_args!("\\u{:04x}", b));
}
_ => out.push(b),
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/logfwd-core/src/streaming_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ impl StreamingBuilder {
}
self.written_bits |= bit;
let fc = &mut self.fields[idx];
fc.has_int = true;
if let Some(v) = parse_int_fast(value) {
fc.has_int = true;
fc.int_values.push((self.row_count, v));
}
}
Expand All @@ -187,8 +187,8 @@ impl StreamingBuilder {
}
self.written_bits |= bit;
let fc = &mut self.fields[idx];
fc.has_float = true;
if let Some(v) = parse_float_fast(value) {
fc.has_float = true;
fc.float_values.push((self.row_count, v));
}
}
Expand Down
26 changes: 22 additions & 4 deletions crates/logfwd-output/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,35 @@ impl FanOut {

impl OutputSink for FanOut {
fn send_batch(&mut self, batch: &RecordBatch, meta: &BatchMetadata) -> io::Result<()> {
// Try ALL sinks before returning an error. Don't short-circuit.
let mut first_err: Option<io::Error> = None;
for sink in &mut self.sinks {
sink.send_batch(batch, meta)?;
if let Err(e) = sink.send_batch(batch, meta) {
eprintln!("fanout: sink '{}' failed: {e}", sink.name());
if first_err.is_none() {
first_err = Some(e);
}
}
}
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
Ok(())
}

fn flush(&mut self) -> io::Result<()> {
let mut first_err: Option<io::Error> = None;
for sink in &mut self.sinks {
sink.flush()?;
if let Err(e) = sink.flush()
&& first_err.is_none()
{
first_err = Some(e);
}
}
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
Ok(())
}

fn name(&self) -> &str {
Expand Down
8 changes: 7 additions & 1 deletion crates/logfwd-output/src/json_lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@ pub struct JsonLinesSink {
url: String,
headers: Vec<(String, String)>,
pub batch_buf: Vec<u8>,
http_agent: ureq::Agent,
}

impl JsonLinesSink {
pub fn new(name: String, url: String, headers: Vec<(String, String)>) -> Self {
let http_agent = ureq::config::Config::builder()
.timeout_global(Some(std::time::Duration::from_secs(30)))
.build()
.new_agent();
JsonLinesSink {
name,
url,
headers,
batch_buf: Vec::with_capacity(64 * 1024),
http_agent,
}
}

Expand Down Expand Up @@ -87,7 +93,7 @@ impl OutputSink for JsonLinesSink {
return Ok(());
}

let mut req = ureq::post(&self.url);
let mut req = self.http_agent.post(&self.url);
for (k, v) in &self.headers {
req = req.header(k.as_str(), v.as_str());
}
Expand Down
8 changes: 7 additions & 1 deletion crates/logfwd-output/src/otlp_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct OtlpSink {
pub encoder_buf: Vec<u8>,
compress_buf: Vec<u8>,
compressor: Option<ChunkCompressor>,
http_agent: ureq::Agent,
}

impl OtlpSink {
Expand All @@ -45,6 +46,10 @@ impl OtlpSink {
Compression::Zstd => Some(ChunkCompressor::new(1)),
_ => None,
};
let http_agent = ureq::config::Config::builder()
.timeout_global(Some(std::time::Duration::from_secs(30)))
.build()
.new_agent();
OtlpSink {
name,
endpoint,
Expand All @@ -53,6 +58,7 @@ impl OtlpSink {
encoder_buf: Vec::with_capacity(64 * 1024),
compress_buf: Vec::with_capacity(64 * 1024),
compressor,
http_agent,
}
}

Expand Down Expand Up @@ -155,7 +161,7 @@ impl OutputSink for OtlpSink {
OtlpProtocol::Http => "application/x-protobuf",
};

let mut req = ureq::post(&self.endpoint);
let mut req = self.http_agent.post(&self.endpoint);
req = req.header("Content-Type", content_type);
if self.compression == Compression::Zstd {
req = req.header("Content-Encoding", "zstd");
Expand Down
21 changes: 10 additions & 11 deletions crates/logfwd-transform/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,22 +528,21 @@ impl SqlTransform {
}

/// Add an enrichment table that will be registered in each DataFusion
/// session alongside the `logs` table. Panics if a table with the same
/// name is already registered or if the name conflicts with "logs".
/// session alongside the `logs` table. Returns an error if a table with
/// the same name is already registered or if the name conflicts with "logs".
pub fn add_enrichment_table(
&mut self,
table: Arc<dyn logfwd_core::enrichment::EnrichmentTable>,
) {
) -> Result<(), String> {
let name = table.name();
assert!(
name != "logs",
"enrichment table cannot be named 'logs' (reserved)"
);
assert!(
!self.enrichment_tables.iter().any(|t| t.name() == name),
"duplicate enrichment table name: '{name}'"
);
if name == "logs" {
return Err("enrichment table cannot be named 'logs' (reserved)".to_string());
}
if self.enrichment_tables.iter().any(|t| t.name() == name) {
return Err(format!("duplicate enrichment table name: '{name}'"));
}
self.enrichment_tables.push(table);
Ok(())
}

/// Execute the SQL transform on a RecordBatch.
Expand Down
Loading