Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[default.extend-words]
# Intentional partial strings in test data and example error messages.
caf = "caf"
hel = "hel"
leve = "leve"
# CSS class prefix for pipeline nodes in the dashboard.
Expand Down
4 changes: 4 additions & 0 deletions crates/logfwd-arrow/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ impl ScanBuilder for StreamingBuilder {
self.append_str_by_idx(idx, v);
}
#[inline(always)]
fn append_decoded_str_by_idx(&mut self, idx: usize, v: &[u8]) {
self.append_decoded_str_by_idx(idx, v);
}
#[inline(always)]
fn append_int_by_idx(&mut self, idx: usize, v: &[u8]) {
self.append_int_by_idx(idx, v);
}
Expand Down
67 changes: 62 additions & 5 deletions crates/logfwd-arrow/src/streaming_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use crate::check_dup_bits;

struct FieldColumns {
name: Vec<u8>,
/// String values: (row, offset_in_buffer, len). Views into the shared buffer.
/// String values: (row, offset_in_buffer, len). Views into the shared
/// buffer. Offsets `< buf.len()` reference the original input buffer;
/// offsets `>= buf.len()` reference the decoded-strings buffer at
/// `offset - buf.len()`. See `StreamingBuilder::decoded_buf`.
str_views: Vec<(u32, u32, u32)>,
/// Int values: (row, parsed_value).
int_values: Vec<(u32, i64)>,
Expand Down Expand Up @@ -99,6 +102,10 @@ pub struct StreamingBuilder {
/// Reference-counted buffer. Stored here to compute offsets safely
/// and shared with Arrow StringViewArrays in finish_batch.
buf: bytes::Bytes,
/// Secondary buffer for decoded string values (JSON escape sequences).
/// String views with offsets `>= buf.len()` reference this buffer at
/// `offset - buf.len()`. Allocated lazily; empty when no escapes are decoded.
decoded_buf: Vec<u8>,
/// When true, `append_raw` stores (offset, len) views for the `_raw` column.
keep_raw: bool,
/// Raw line views: (offset_in_buf, len) per row, in row order.
Expand All @@ -123,6 +130,7 @@ impl StreamingBuilder {
row_count: 0,
written_bits: 0,
buf: bytes::Bytes::new(),
decoded_buf: Vec::new(),
keep_raw,
raw_views: Vec::new(),
state: BuilderState::Idle,
Expand All @@ -148,6 +156,7 @@ impl StreamingBuilder {
buf.len()
);
self.buf = buf;
self.decoded_buf.clear();
self.row_count = 0;
// Only clear the slots that were active in the previous batch.
// This preserves the inner-Vec capacity of each FieldColumns for
Expand Down Expand Up @@ -263,6 +272,36 @@ impl StreamingBuilder {
.push((self.row_count, offset, value.len() as u32));
}

/// Append a decoded string value that is NOT a subslice of the input
/// buffer. Used for strings whose JSON escape sequences have been decoded
/// (see issue #410). Appends the bytes to `decoded_buf` and records a
/// view in the same `str_views` vector as regular strings, with the
/// offset shifted by `buf.len()` so that `finish_batch` can create a
/// combined Arrow buffer.
#[inline(always)]
pub fn append_decoded_str_by_idx(&mut self, idx: usize, value: &[u8]) {
debug_assert_eq!(
self.state,
BuilderState::InRow,
"append_decoded_str_by_idx called outside of a row"
);
if check_dup_bits(&mut self.written_bits, idx) {
return;
}
if std::str::from_utf8(value).is_err() {
return;
}
let decoded_offset = self.decoded_buf.len() as u32;
self.decoded_buf.extend_from_slice(value);
// Offset into the combined buffer: original buf bytes come first,
// decoded bytes follow at buf.len() + decoded_offset.
let combined_offset = self.buf.len() as u32 + decoded_offset;
let fc = &mut self.fields[idx];
fc.has_str = true;
fc.str_views
.push((self.row_count, combined_offset, value.len() as u32));
}

#[inline(always)]
pub fn append_int_by_idx(&mut self, idx: usize, value: &[u8]) {
debug_assert_eq!(
Expand Down Expand Up @@ -328,16 +367,34 @@ impl StreamingBuilder {

/// Build a RecordBatch with zero-copy StringViewArrays.
///
/// The resulting RecordBatch shares the input buffer via Bytes reference
/// counting -- no string data is copied.
/// When no JSON escape sequences were decoded, the resulting RecordBatch
/// shares the input buffer via Bytes reference counting (zero-copy).
/// When decoded strings exist, a combined buffer is built that appends
/// decoded bytes after the original input so that all str_views offsets
/// resolve into a single contiguous Arrow buffer.
pub fn finish_batch(&mut self) -> Result<RecordBatch, ArrowError> {
debug_assert_eq!(
self.state,
BuilderState::InBatch,
"finish_batch called outside of a batch (call begin_batch first, and ensure all rows are closed with end_row)"
);
let num_rows = self.row_count as usize;
let arrow_buf = Buffer::from(self.buf.clone());

// Build the Arrow buffer. When no decoded strings exist, this is
// zero-copy via Bytes refcount. When decoded strings are present,
// we concatenate the original buffer with the decoded buffer so that
// str_views offsets >= buf.len() resolve correctly.
let arrow_buf = if self.decoded_buf.is_empty() {
Buffer::from(self.buf.clone())
} else {
let mut combined = Vec::with_capacity(self.buf.len() + self.decoded_buf.len());
combined.extend_from_slice(&self.buf);
combined.extend_from_slice(&self.decoded_buf);
Buffer::from(combined)
};
// Separate zero-copy buffer for _raw views (always into the original
// input buffer, never into decoded_buf).
let raw_arrow_buf = Buffer::from(self.buf.clone());

let mut schema_fields: Vec<Field> = Vec::with_capacity(self.num_active);
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(self.num_active);
Expand Down Expand Up @@ -510,7 +567,7 @@ impl StreamingBuilder {
num_rows
);
let mut builder = StringViewBuilder::new();
let block = builder.append_block(arrow_buf.clone());
let block = builder.append_block(raw_arrow_buf);

for row in 0..num_rows {
if row < self.raw_views.len() {
Expand Down
Loading
Loading