From 9eaa2d939c109f663f9a9ffaa849b9a01aa0ff80 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 8 May 2026 17:34:14 +0200 Subject: [PATCH] compute/correction_v2: shrink Cursor to reduce per-split memory `Cursor::advance_by` produces one cursor per distinct pre-advance timestamp `<= since`, so per-cursor memory multiplies linearly with the number of distinct uncompacted timestamps in a chain. At one self-managed customer this contributed to a 1.1 TB allocation inside `Cursor::advance_by` and a cluster OOM (CLU-77, database-issues#11198). Replace the owned `VecDeque>>` with a shared `Rc>>>` and track the cursor's range as a `(pos, end)` pair of `(chunk: u32, offset: u32)`. `cursor.clone()` becomes a single `Rc::clone` with no allocations; the stack footprint drops from 72 to 40 bytes. The chunk-reuse fast path in `try_unwrap` is preserved by checking the strong counts of both the outer `Rc>` and each inner `Rc`. This is a constant-factor mitigation. The algorithmic property of producing one cursor per distinct timestamp is unchanged, so the underlying blowup remains and must be addressed at the compaction layer. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/compute/src/sink/correction_v2.rs | 247 ++++++++++++++++---------- 1 file changed, 156 insertions(+), 91 deletions(-) diff --git a/src/compute/src/sink/correction_v2.rs b/src/compute/src/sink/correction_v2.rs index 05ceb117932e8..235393dcfa1d7 100644 --- a/src/compute/src/sink/correction_v2.rs +++ b/src/compute/src/sink/correction_v2.rs @@ -125,13 +125,13 @@ use std::borrow::Borrow; use std::cmp::Ordering; -use std::collections::{BinaryHeap, VecDeque}; +use std::collections::BinaryHeap; use std::fmt; use std::rc::Rc; use columnation::Columnation; use differential_dataflow::trace::implementations::BatchContainer; -use mz_ore::cast::CastLossy; +use mz_ore::cast::{CastFrom, CastLossy}; use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta}; use mz_repr::{Diff, Timestamp}; use mz_timely_util::columnation::ColumnationStack; @@ -700,8 +700,8 @@ impl Chain { /// Convert the chain into a cursor over the contained updates. fn into_cursor(self) -> Option> { - let chunks = self.chunks.into_iter().map(Rc::new).collect(); - Cursor::new(chunks) + let chunks: Vec<_> = self.chunks.into_iter().map(Rc::new).collect(); + Cursor::new(Rc::new(chunks)) } /// Return an iterator over the contained updates. @@ -743,6 +743,25 @@ impl Extend<(D, Timestamp, Diff)> for Chain { } } +/// A position within a [`Cursor`]'s shared chunks. +/// +/// Stored as `(chunk_index, offset_within_chunk)`. Both fields are `u32` to keep [`Cursor`] small, +/// which matters because `merge_chains` constructs one cursor per distinct pre-advance time. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)] +struct CursorPos { + /// Index into the cursor's `chunks`. + chunk: u32, + /// Offset within `chunks[chunk]`. + offset: u32, +} + +impl CursorPos { + const ZERO: Self = Self { + chunk: 0, + offset: 0, + }; +} + /// A cursor over updates in a chain. /// /// A cursor provides two guarantees: @@ -753,17 +772,20 @@ impl Extend<(D, Timestamp, Diff)> for Chain { /// forward consumes `self` and returns an `Option` that's `None` if the operation stepped /// over the last update. /// -/// A cursor holds on to `Rc`s, allowing multiple cursors to produce updates from the same -/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops -/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed. +/// All cursors derived from a single chain share that chain's chunks via `Rc>>`. A +/// cursor is therefore a small descriptor (`Rc` pointer + a pair of [`CursorPos`] + an optional +/// timestamp override): `cursor.clone()` is an `Rc` bump, and `advance_by` no longer copies a +/// `VecDeque>` per produced slice. This bounds the per-cursor memory cost to a constant +/// independent of the underlying chain's length, which is what keeps the per-distinct-time +/// splitting in `advance_by` from blowing up memory. #[derive(Clone, Debug)] struct Cursor { - /// The chunks from which updates can still be produced. - chunks: VecDeque>>, - /// The current offset into `chunks.front()`. - chunk_offset: usize, - /// An optional limit for the number of updates the cursor will produce. - limit: Option, + /// The chain's chunks, shared across all cursors derived from it. + chunks: Rc>>>, + /// The position of the next update the cursor will yield. + pos: CursorPos, + /// The exclusive end position. Stepping reaches this position to indicate exhaustion. + end: CursorPos, /// An optional overwrite for the timestamp of produced updates. overwrite_ts: Option, } @@ -772,80 +794,52 @@ impl Cursor { /// Construct a cursor over a list of chunks. /// /// Returns `None` if `chunks` is empty. - fn new(chunks: VecDeque>>) -> Option { + fn new(chunks: Rc>>>) -> Option { if chunks.is_empty() { return None; } + let end_chunk = u32::try_from(chunks.len()).expect("chunk count fits in u32"); Some(Self { chunks, - chunk_offset: 0, - limit: None, + pos: CursorPos::ZERO, + end: CursorPos { + chunk: end_chunk, + offset: 0, + }, overwrite_ts: None, }) } - /// Set a limit for the number of updates this cursor will produce. - /// - /// # Panics - /// - /// Panics if there is already a limit lower than the new one. - fn set_limit(mut self, limit: usize) -> Option { - assert!(self.limit.is_none_or(|l| l >= limit)); - - if limit == 0 { - return None; - } - - // Release chunks made unreachable by the limit. - let mut count = 0; - let mut idx = 0; - let mut offset = self.chunk_offset; - while idx < self.chunks.len() && count < limit { - let chunk = &self.chunks[idx]; - count += chunk.len() - offset; - idx += 1; - offset = 0; - } - self.chunks.truncate(idx); - - if count > limit { - self.limit = Some(limit); - } - - Some(self) - } - /// Get a reference to the current update. fn get(&self) -> (&D, Timestamp, Diff) { let chunk = self.get_chunk(); - let (d, t, r) = chunk.index(self.chunk_offset); + let (d, t, r) = chunk.index(usize::cast_from(self.pos.offset)); let t = self.overwrite_ts.unwrap_or(t); (d, t, r) } /// Get a reference to the current chunk. fn get_chunk(&self) -> &Chunk { - &self.chunks[0] + &self.chunks[usize::cast_from(self.pos.chunk)] } /// Step to the next update. /// /// Returns the stepped cursor, or `None` if the step was over the last update. fn step(mut self) -> Option { - if self.chunk_offset == self.get_chunk().len() - 1 { - return self.skip_chunk().map(|(c, _)| c); + let chunk_len = self.get_chunk().len(); + let next_offset = usize::cast_from(self.pos.offset) + 1; + if next_offset < chunk_len { + self.pos.offset = u32::try_from(next_offset).expect("offset fits in u32"); + } else { + self.pos.chunk += 1; + self.pos.offset = 0; } - self.chunk_offset += 1; - - if let Some(limit) = &mut self.limit { - *limit -= 1; - if *limit == 0 { - return None; - } + if self.pos >= self.end { + return None; } - Some(self) } @@ -854,22 +848,15 @@ impl Cursor { /// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are /// left after the skip. fn skip_chunk(mut self) -> Option<(Self, usize)> { - let chunk = self.chunks.pop_front().expect("cursor invariant"); + let chunk_len = self.get_chunk().len(); + let skipped = chunk_len - usize::cast_from(self.pos.offset); - if self.chunks.is_empty() { - return None; - } + self.pos.chunk += 1; + self.pos.offset = 0; - let skipped = chunk.len() - self.chunk_offset; - self.chunk_offset = 0; - - if let Some(limit) = &mut self.limit { - if skipped >= *limit { - return None; - } - *limit -= skipped; + if self.pos >= self.end { + return None; } - Some((self, skipped)) } @@ -886,20 +873,22 @@ impl Cursor { let mut skipped = 0; - let new_offset = loop { + loop { let chunk = self.get_chunk(); - if let Some(index) = chunk.find_time_greater_than(time) { - break index; + if let Some(idx) = chunk.find_time_greater_than(time) { + skipped += idx - usize::cast_from(self.pos.offset); + self.pos.offset = u32::try_from(idx).expect("offset fits in u32"); + break; } let (cursor, count) = self.skip_chunk()?; self = cursor; skipped += count; - }; - - skipped += new_offset - self.chunk_offset; - self.chunk_offset = new_offset; + } + if self.pos >= self.end { + return None; + } Some((self, skipped)) } @@ -907,6 +896,11 @@ impl Cursor { /// /// Returns a list of cursors, each of which yields ordered and consolidated updates that have /// been advanced by `since_ts`. + /// + /// The number of returned cursors is bounded by the number of distinct pre-advance timestamps + /// `< since_ts`, plus one residual cursor at times `>= since_ts`. All returned cursors share + /// the same `Rc>>` as the input, so producing them does not allocate per-cursor + /// chunk storage. fn advance_by(mut self, since_ts: Timestamp) -> Vec { // If the cursor has an `overwrite_ts`, all its updates are at the same time already. We // only need to advance the `overwrite_ts` by the `since_ts`. @@ -930,10 +924,18 @@ impl Cursor { break; } + // Compute the post-advance position by walking past `time`. Both `current` and the + // new `remaining` share the same chunks via `Rc`, so this is allocation-free. let mut current = cursor.clone(); - if let Some((cursor, skipped)) = cursor.skip_time(time) { - remaining = Some(cursor); - current = current.set_limit(skipped).expect("skipped at least 1"); + match cursor.skip_time(time) { + Some((advanced, _)) => { + current.end = advanced.pos; + remaining = Some(advanced); + } + None => { + // Reached the end of the cursor at this time. `current` retains the existing + // `end` boundary and covers the trailing slice. + } } current.overwrite_ts = Some(since_ts); splits.push(current); @@ -953,7 +955,12 @@ impl Cursor { let before = self.clone(); match self.skip_time(skip_ts) { - Some((beyond, skipped)) => (before.set_limit(skipped), Some(beyond)), + Some((beyond, _)) => { + let mut before = before; + before.end = beyond.pos; + let before = (before.pos < before.end).then_some(before); + (before, Some(beyond)) + } None => (Some(before), None), } } @@ -977,17 +984,25 @@ impl Cursor { /// This operation efficiently reuses chunks by directly inserting them into the output chain /// where possible. /// - /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and - /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an - /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a + /// An unwrap is only successful if the cursor's `overwrite_ts` is `None`, the cursor covers + /// the full range of its `chunks`, and both the outer `Rc>` and each inner + /// `Rc` are uniquely held. If the unwrap fails, this method returns an `Err` + /// containing the cursor in an unchanged state, allowing the caller to convert it into a /// chain by copying chunks rather than reusing them. fn try_unwrap(self, chunk_capacity: usize) -> Result, (&'static str, Self)> { - if self.limit.is_some() { - return Err(("cursor with limit", self)); - } if self.overwrite_ts.is_some() { return Err(("cursor with overwrite_ts", self)); } + let full_end = CursorPos { + chunk: u32::try_from(self.chunks.len()).expect("chunk count fits in u32"), + offset: 0, + }; + if self.end != full_end { + return Err(("cursor with limited end", self)); + } + if Rc::strong_count(&self.chunks) != 1 { + return Err(("cursor on shared chunk vec", self)); + } if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) { return Err(("cursor on shared chunks", self)); } @@ -998,7 +1013,7 @@ impl Cursor { // We might be partway through the first chunk, in which case we can't reuse it but need to // allocate a new one to contain only the updates the cursor can still yield. while let Some(cursor) = remaining.take() { - if cursor.chunk_offset == 0 { + if cursor.pos.offset == 0 { remaining = Some(cursor); break; } @@ -1008,7 +1023,9 @@ impl Cursor { } if let Some(cursor) = remaining { - for chunk in cursor.chunks { + let start_chunk = usize::cast_from(cursor.pos.chunk); + let chunks = Rc::into_inner(cursor.chunks).expect("checked above"); + for chunk in chunks.into_iter().skip(start_chunk) { let chunk = Rc::into_inner(chunk).expect("checked above"); chain.push_chunk(chunk); } @@ -1382,3 +1399,51 @@ impl Ord for MergeCursor { (t1, d1).cmp(&(t2, d2)).reverse() } } + +#[cfg(test)] +mod tests { + use super::*; + use mz_ore::cast::CastFrom; + + /// CLU-77 stage-1 repro: `Cursor::advance_by` produces one cursor per + /// distinct timestamp `<= since`. With `N` distinct uncompacted times this + /// is `O(N)` cursors regardless of the underlying update count. Captures + /// the algorithmic shape behind the customer OOM in `Cursor::advance_by`. + #[mz_ore::test] + fn advance_by_splits_per_distinct_time() { + let n_times: u64 = 1024; + let chunk_capacity = 64; + + let mut chain: Chain = Chain::new(chunk_capacity); + for t in 0..n_times { + chain.push((String::from("k"), Timestamp::from(t), Diff::ONE)); + } + + let cursor = chain.into_cursor().expect("non-empty"); + let since = Timestamp::from(n_times); + let splits = cursor.advance_by(since); + + // Algorithmic claim: one resulting cursor per distinct input time. + assert_eq!( + u64::try_from(splits.len()).unwrap(), + n_times, + "advance_by produced one cursor per distinct timestamp", + ); + + // Per-split footprint stays near the cursor struct itself + a small + // VecDeque buffer; the cost scales with number of splits, not with + // updates per split. Confirms blowup is proportional to distinct + // times, not total updates. + let cursor_struct_bytes = std::mem::size_of::>(); + let approx_bytes = splits.len() * cursor_struct_bytes; + let chunks_per_chain = usize::cast_from(n_times).div_ceil(chunk_capacity); + eprintln!( + "splits={} cursor_struct={}B approx_total>={}B \ + chunks_per_chain={}", + splits.len(), + cursor_struct_bytes, + approx_bytes, + chunks_per_chain, + ); + } +}