Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 156 additions & 91 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@

use std::borrow::Borrow;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
use std::collections::BinaryHeap;
use std::fmt;
use std::rc::Rc;

use columnation::Columnation;
use differential_dataflow::trace::implementations::BatchContainer;
use mz_ore::cast::CastLossy;
use mz_ore::cast::{CastFrom, CastLossy};
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use mz_timely_util::columnation::ColumnationStack;
Expand Down Expand Up @@ -700,8 +700,8 @@ impl<D: Data> Chain<D> {

/// Convert the chain into a cursor over the contained updates.
fn into_cursor(self) -> Option<Cursor<D>> {
let chunks = self.chunks.into_iter().map(Rc::new).collect();
Cursor::new(chunks)
let chunks: Vec<_> = self.chunks.into_iter().map(Rc::new).collect();
Cursor::new(Rc::new(chunks))
}

/// Return an iterator over the contained updates.
Expand Down Expand Up @@ -743,6 +743,25 @@ impl<D: Data> Extend<(D, Timestamp, Diff)> for Chain<D> {
}
}

/// A position within a [`Cursor`]'s shared chunks.
///
/// Stored as `(chunk_index, offset_within_chunk)`. Both fields are `u32` to keep [`Cursor`] small,
/// which matters because `merge_chains` constructs one cursor per distinct pre-advance time.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
struct CursorPos {
/// Index into the cursor's `chunks`.
chunk: u32,
/// Offset within `chunks[chunk]`.
offset: u32,
}

impl CursorPos {
const ZERO: Self = Self {
chunk: 0,
offset: 0,
};
}

/// A cursor over updates in a chain.
///
/// A cursor provides two guarantees:
Expand All @@ -753,17 +772,20 @@ impl<D: Data> Extend<(D, Timestamp, Diff)> for Chain<D> {
/// forward consumes `self` and returns an `Option<Cursor>` that's `None` if the operation stepped
/// over the last update.
///
/// A cursor holds on to `Rc<Chunk>`s, allowing multiple cursors to produce updates from the same
/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops
/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed.
/// All cursors derived from a single chain share that chain's chunks via `Rc<Vec<Rc<Chunk>>>`. A
/// cursor is therefore a small descriptor (`Rc` pointer + a pair of [`CursorPos`] + an optional
/// timestamp override): `cursor.clone()` is an `Rc` bump, and `advance_by` no longer copies a
/// `VecDeque<Rc<Chunk>>` per produced slice. This bounds the per-cursor memory cost to a constant
/// independent of the underlying chain's length, which is what keeps the per-distinct-time
/// splitting in `advance_by` from blowing up memory.
#[derive(Clone, Debug)]
struct Cursor<D: Data> {
/// The chunks from which updates can still be produced.
chunks: VecDeque<Rc<Chunk<D>>>,
/// The current offset into `chunks.front()`.
chunk_offset: usize,
/// An optional limit for the number of updates the cursor will produce.
limit: Option<usize>,
/// The chain's chunks, shared across all cursors derived from it.
chunks: Rc<Vec<Rc<Chunk<D>>>>,
/// The position of the next update the cursor will yield.
pos: CursorPos,
/// The exclusive end position. Stepping reaches this position to indicate exhaustion.
end: CursorPos,
/// An optional overwrite for the timestamp of produced updates.
overwrite_ts: Option<Timestamp>,
}
Expand All @@ -772,80 +794,52 @@ impl<D: Data> Cursor<D> {
/// Construct a cursor over a list of chunks.
///
/// Returns `None` if `chunks` is empty.
fn new(chunks: VecDeque<Rc<Chunk<D>>>) -> Option<Self> {
fn new(chunks: Rc<Vec<Rc<Chunk<D>>>>) -> Option<Self> {
if chunks.is_empty() {
return None;
}

let end_chunk = u32::try_from(chunks.len()).expect("chunk count fits in u32");
Some(Self {
chunks,
chunk_offset: 0,
limit: None,
pos: CursorPos::ZERO,
end: CursorPos {
chunk: end_chunk,
offset: 0,
},
overwrite_ts: None,
})
}

/// Set a limit for the number of updates this cursor will produce.
///
/// # Panics
///
/// Panics if there is already a limit lower than the new one.
fn set_limit(mut self, limit: usize) -> Option<Self> {
assert!(self.limit.is_none_or(|l| l >= limit));

if limit == 0 {
return None;
}

// Release chunks made unreachable by the limit.
let mut count = 0;
let mut idx = 0;
let mut offset = self.chunk_offset;
while idx < self.chunks.len() && count < limit {
let chunk = &self.chunks[idx];
count += chunk.len() - offset;
idx += 1;
offset = 0;
}
self.chunks.truncate(idx);

if count > limit {
self.limit = Some(limit);
}

Some(self)
}

/// Get a reference to the current update.
fn get(&self) -> (&D, Timestamp, Diff) {
let chunk = self.get_chunk();
let (d, t, r) = chunk.index(self.chunk_offset);
let (d, t, r) = chunk.index(usize::cast_from(self.pos.offset));
let t = self.overwrite_ts.unwrap_or(t);
(d, t, r)
}

/// Get a reference to the current chunk.
fn get_chunk(&self) -> &Chunk<D> {
&self.chunks[0]
&self.chunks[usize::cast_from(self.pos.chunk)]
}

/// Step to the next update.
///
/// Returns the stepped cursor, or `None` if the step was over the last update.
fn step(mut self) -> Option<Self> {
if self.chunk_offset == self.get_chunk().len() - 1 {
return self.skip_chunk().map(|(c, _)| c);
let chunk_len = self.get_chunk().len();
let next_offset = usize::cast_from(self.pos.offset) + 1;
if next_offset < chunk_len {
self.pos.offset = u32::try_from(next_offset).expect("offset fits in u32");
} else {
self.pos.chunk += 1;
self.pos.offset = 0;
}

self.chunk_offset += 1;

if let Some(limit) = &mut self.limit {
*limit -= 1;
if *limit == 0 {
return None;
}
if self.pos >= self.end {
return None;
}

Some(self)
}

Expand All @@ -854,22 +848,15 @@ impl<D: Data> Cursor<D> {
/// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are
/// left after the skip.
fn skip_chunk(mut self) -> Option<(Self, usize)> {
let chunk = self.chunks.pop_front().expect("cursor invariant");
let chunk_len = self.get_chunk().len();
let skipped = chunk_len - usize::cast_from(self.pos.offset);

if self.chunks.is_empty() {
return None;
}
self.pos.chunk += 1;
self.pos.offset = 0;

let skipped = chunk.len() - self.chunk_offset;
self.chunk_offset = 0;

if let Some(limit) = &mut self.limit {
if skipped >= *limit {
return None;
}
*limit -= skipped;
if self.pos >= self.end {
return None;
}

Some((self, skipped))
}

Expand All @@ -886,27 +873,34 @@ impl<D: Data> Cursor<D> {

let mut skipped = 0;

let new_offset = loop {
loop {
let chunk = self.get_chunk();
if let Some(index) = chunk.find_time_greater_than(time) {
break index;
if let Some(idx) = chunk.find_time_greater_than(time) {
skipped += idx - usize::cast_from(self.pos.offset);
self.pos.offset = u32::try_from(idx).expect("offset fits in u32");
break;
}

let (cursor, count) = self.skip_chunk()?;
self = cursor;
skipped += count;
};

skipped += new_offset - self.chunk_offset;
self.chunk_offset = new_offset;
}

if self.pos >= self.end {
return None;
}
Some((self, skipped))
}

/// Advance all updates in this cursor by the given `since_ts`.
///
/// Returns a list of cursors, each of which yields ordered and consolidated updates that have
/// been advanced by `since_ts`.
///
/// The number of returned cursors is bounded by the number of distinct pre-advance timestamps
/// `< since_ts`, plus one residual cursor at times `>= since_ts`. All returned cursors share
/// the same `Rc<Vec<Rc<Chunk>>>` as the input, so producing them does not allocate per-cursor
/// chunk storage.
fn advance_by(mut self, since_ts: Timestamp) -> Vec<Self> {
// If the cursor has an `overwrite_ts`, all its updates are at the same time already. We
// only need to advance the `overwrite_ts` by the `since_ts`.
Expand All @@ -930,10 +924,18 @@ impl<D: Data> Cursor<D> {
break;
}

// Compute the post-advance position by walking past `time`. Both `current` and the
// new `remaining` share the same chunks via `Rc`, so this is allocation-free.
let mut current = cursor.clone();
if let Some((cursor, skipped)) = cursor.skip_time(time) {
remaining = Some(cursor);
current = current.set_limit(skipped).expect("skipped at least 1");
match cursor.skip_time(time) {
Some((advanced, _)) => {
current.end = advanced.pos;
remaining = Some(advanced);
}
None => {
// Reached the end of the cursor at this time. `current` retains the existing
// `end` boundary and covers the trailing slice.
}
}
current.overwrite_ts = Some(since_ts);
splits.push(current);
Expand All @@ -953,7 +955,12 @@ impl<D: Data> Cursor<D> {

let before = self.clone();
match self.skip_time(skip_ts) {
Some((beyond, skipped)) => (before.set_limit(skipped), Some(beyond)),
Some((beyond, _)) => {
let mut before = before;
before.end = beyond.pos;
let before = (before.pos < before.end).then_some(before);
(before, Some(beyond))
}
None => (Some(before), None),
}
}
Expand All @@ -977,17 +984,25 @@ impl<D: Data> Cursor<D> {
/// This operation efficiently reuses chunks by directly inserting them into the output chain
/// where possible.
///
/// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
/// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
/// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
/// An unwrap is only successful if the cursor's `overwrite_ts` is `None`, the cursor covers
/// the full range of its `chunks`, and both the outer `Rc<Vec<...>>` and each inner
/// `Rc<Chunk>` are uniquely held. If the unwrap fails, this method returns an `Err`
/// containing the cursor in an unchanged state, allowing the caller to convert it into a
/// chain by copying chunks rather than reusing them.
fn try_unwrap(self, chunk_capacity: usize) -> Result<Chain<D>, (&'static str, Self)> {
if self.limit.is_some() {
return Err(("cursor with limit", self));
}
if self.overwrite_ts.is_some() {
return Err(("cursor with overwrite_ts", self));
}
let full_end = CursorPos {
chunk: u32::try_from(self.chunks.len()).expect("chunk count fits in u32"),
offset: 0,
};
if self.end != full_end {
return Err(("cursor with limited end", self));
}
if Rc::strong_count(&self.chunks) != 1 {
return Err(("cursor on shared chunk vec", self));
}
if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) {
return Err(("cursor on shared chunks", self));
}
Expand All @@ -998,7 +1013,7 @@ impl<D: Data> Cursor<D> {
// We might be partway through the first chunk, in which case we can't reuse it but need to
// allocate a new one to contain only the updates the cursor can still yield.
while let Some(cursor) = remaining.take() {
if cursor.chunk_offset == 0 {
if cursor.pos.offset == 0 {
remaining = Some(cursor);
break;
}
Expand All @@ -1008,7 +1023,9 @@ impl<D: Data> Cursor<D> {
}

if let Some(cursor) = remaining {
for chunk in cursor.chunks {
let start_chunk = usize::cast_from(cursor.pos.chunk);
let chunks = Rc::into_inner(cursor.chunks).expect("checked above");
for chunk in chunks.into_iter().skip(start_chunk) {
let chunk = Rc::into_inner(chunk).expect("checked above");
chain.push_chunk(chunk);
}
Expand Down Expand Up @@ -1382,3 +1399,51 @@ impl<D: Data> Ord for MergeCursor<D> {
(t1, d1).cmp(&(t2, d2)).reverse()
}
}

#[cfg(test)]
mod tests {
use super::*;
use mz_ore::cast::CastFrom;

/// CLU-77 stage-1 repro: `Cursor::advance_by` produces one cursor per
/// distinct timestamp `<= since`. With `N` distinct uncompacted times this
/// is `O(N)` cursors regardless of the underlying update count. Captures
/// the algorithmic shape behind the customer OOM in `Cursor::advance_by`.
#[mz_ore::test]
fn advance_by_splits_per_distinct_time() {
let n_times: u64 = 1024;
let chunk_capacity = 64;

let mut chain: Chain<String> = Chain::new(chunk_capacity);
for t in 0..n_times {
chain.push((String::from("k"), Timestamp::from(t), Diff::ONE));
}

let cursor = chain.into_cursor().expect("non-empty");
let since = Timestamp::from(n_times);
let splits = cursor.advance_by(since);

// Algorithmic claim: one resulting cursor per distinct input time.
assert_eq!(
u64::try_from(splits.len()).unwrap(),
n_times,
"advance_by produced one cursor per distinct timestamp",
);

// Per-split footprint stays near the cursor struct itself + a small
// VecDeque buffer; the cost scales with number of splits, not with
// updates per split. Confirms blowup is proportional to distinct
// times, not total updates.
let cursor_struct_bytes = std::mem::size_of::<Cursor<String>>();
let approx_bytes = splits.len() * cursor_struct_bytes;
let chunks_per_chain = usize::cast_from(n_times).div_ceil(chunk_capacity);
eprintln!(
"splits={} cursor_struct={}B approx_total>={}B \
chunks_per_chain={}",
splits.len(),
cursor_struct_bytes,
approx_bytes,
chunks_per_chain,
);
}
}
Loading