compute: scaffold CollectionEdge for columnar dataflow edges#36507
Draft
antiguru wants to merge 5 commits into
Draft
compute: scaffold CollectionEdge for columnar dataflow edges#36507antiguru wants to merge 5 commits into
antiguru wants to merge 5 commits into
Conversation
Introduce `CollectionEdge<'scope, T>`, an enum that lets dataflow edges between Plan nodes carry either a row-based `VecCollection` or a columnar `Stream<Column<(Row, T, Diff)>>`. This is the foundation for a consumer-first migration to columnar edges: every Plan-node consumer learns to accept either variant before any producer emits the columnar variant, after which a single switch flips producers end-to-end. This commit is scaffolding only. No producer emits the columnar variant and `CollectionBundle` is unchanged; the columnar arms of `enter_region`, `negate`, and the `columnar_negate` primitive carry `todo!()` bodies that will be filled in alongside the producer switch. The design rule baked into the API is that a columnar-to-row decode at a consumer's input is only acceptable when the consumer would have decoded `Row` to `Datum` anyway; pure passthrough consumers must round-trip columnar without decoding. There is no user-visible change and no behavioural change in this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the `ColumnarBatch<T>` type alias with a struct `ColumnarCollection<'scope, T, D, R>` parameterised the same way as differential's `VecCollection<'scope, T, D, R>`, holding a `Stream<'scope, T, Column<(D, T, R)>>` instead of a `Stream<'scope, T, Vec<(D, T, R)>>`. The `CollectionEdge` variants now read symmetrically: `Vec(VecCollection<'scope, T, Row, Diff>)` and `Columnar(ColumnarCollection<'scope, T, Row, Diff>)`. `columnar_negate` is generalised to operate on any `ColumnarCollection<'scope, T, D, R>`. Also drop a doc link to a not-yet-defined `concat` method. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the oks side of `CollectionBundle.collection` from `VecCollection<'scope, T, Row, Diff>` to `CollectionEdge<'scope, T>`. The bundle now uniformly carries a `CollectionEdge` and exposes a new `from_edge` constructor; `from_collections` is preserved as a thin wrapper that builds a `CollectionEdge::Vec` arm. This is the bundle-shape commit, intentionally mechanical: every producer site still emits the `Vec` arm and every consumer either keeps using `as_specific_collection` (which extracts the Vec arm internally) or unwraps explicitly via the new transitional `CollectionEdge::expect_vec` / `expect_vec_mut` helpers. The fence sites that needed `expect_vec` are `as_specific_collection`, the unkeyed `flat_map` branch, the three take / store points in `ensure_collections`, the two LetRec bindings in `render.rs`, the `log_operator_hydration` path, and the raw collection branch in `sinks.rs`. Each fence is a future Phase-B PR target. Convert `Negate` to use `CollectionEdge::negate` natively rather than going through `as_specific_collection`: it now consumes the edge, flips diffs preserving the variant, and rewraps via `from_edge`. This is the first consumer to handle the edge enum directly. No behavioural change. The `Columnar` arm of `CollectionEdge::negate` is still wired to `columnar_negate`, which carries a `todo!()` body that will land alongside the producer switch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add `CollectionEdge::concat_many` and `CollectionEdge::consolidate_named`, both preserving variant. The columnar arms are `todo!()` until producers emit the columnar variant. Convert `Union` in `render.rs` to operate on `CollectionEdge` natively: pull each input's `(edge, errs)` directly out of the bundle, concatenate edges via `concat_many`, optionally consolidate via `consolidate_named`, and rewrap via `from_edge`. The error stream still goes through differential's `concatenate` since errors stay on the row-formatted side. Union is the second consumer to handle the edge enum directly, after Negate. No behavioural change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`flat_map_datums(max_demand, logic)` is the canonical unified entry point for "decoding consumers" — operators that read `Datum`s from each row anyway (MFP, FlatMap, joins, TopK, sinks). The closure receives a borrowed `DatumVecBorrow`, identical to the existing `CollectionBundle::flat_map` else-branch contract. The Vec arm reuses `DatumVec::borrow_with_limit` over each `Row` from the timely stream. The Columnar arm is `todo!()`; it will iterate `Column<(Row, T, Diff)>::borrow()` via the `Rows<_>::Index` impl (yielding `&RowRef`) and call `DatumVec::borrow_with_limit` per row, all without materialising an owned `Row`. Route `CollectionBundle::flat_map`'s unkeyed branch through the new method so the per-record decode lives in exactly one place. This deletes one of the transitional `expect_vec` fences. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Today's row-based dataflow edges in compute force per-record consumer handling.
Switching producers to columnar first imposes an unpack tax on every consumer that is not yet columnar-aware; this PR begins a consumer-first migration that avoids the tax by making every consumer ready to absorb columnar batches before any producer emits them.
Description
Adds
CollectionEdge<'scope, T>withVecandColumnararms insrc/compute/src/render/columnar.rs, plus acolumnar_negatestub.Scaffolding only: no producer emits the columnar variant,
CollectionBundleis unchanged, and columnar arms carrytodo!()bodies that land alongside the producer switch.The design rule encoded in the API is that a columnar-to-row decode at a consumer's input is only acceptable when the consumer would have decoded
RowtoDatumanyway; passthrough consumers must round-trip columnar without decoding.