Conversation
datafusion/src/physical_plan/sort.rs
Outdated
| /// Get the output partitioning of this plan | ||
| fn output_partitioning(&self) -> Partitioning { | ||
| Partitioning::UnknownPartitioning(1) | ||
| self.input.output_partitioning() |
ea493f8 to
f090a0a
Compare
alamb
left a comment
There was a problem hiding this comment.
Thank you @tustvold -- I think this is the last missing physical operator we need in DataFusion to start enabling sort based optimizations (e.g. sort-merge-join, etc)
I think this is pretty amazing work -- I am sure there will be more work to optimize this, but I like the overall structure and I think it is looking very cool.
I think we should let at least one other pair of eyes read it carefully so I will hold off on clicking approve until that happens. But from what I can see at this point, this PR is basically ready to go
| } | ||
|
|
||
| /// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender | ||
| pub(crate) fn spawn_execution( |
There was a problem hiding this comment.
this is a nice abstraction (and we can probably use it elsewhere)
| Partitioning::UnknownPartitioning(1) | ||
| } | ||
|
|
||
| fn required_child_distribution(&self) -> Distribution { |
There was a problem hiding this comment.
eventually (not as part of this PR) we should add something like required_child_sort_order so the operators can report on what sortedness they are assuming.
| (true, false) => return Ok(Ordering::Less), | ||
| (false, false) => {} | ||
| (true, true) => { | ||
| // TODO: Building the predicate each time is sub-optimal |
There was a problem hiding this comment.
I predicate this line will be the bottleneck of this operator.
However, I feel like getting it in and working and then optimizing as a follow on is the correct course of action in this case.
| "+---+---+-------------------------------+", | ||
| "| 1 | | 1970-01-01 00:00:00.000000008 |", | ||
| "| 1 | | 1970-01-01 00:00:00.000000008 |", | ||
| "| 2 | a | |", |
There was a problem hiding this comment.
In order to cover the nulls_first: false case for "c" I think you need several rows here with a tie for a and b, and both a null and non value for c. I didn't see any such cases (though I may have missed it)
Perhaps adding a row like the following would be enough
"| 7 | b | NULL |",
There was a problem hiding this comment.
The sort key is just b and c so the lines
"| 7 | b | 1970-01-01 00:00:00.000000006 |",
"| 2 | b | |",
test this?
| assert_eq!(basic, partition); | ||
| } | ||
|
|
||
| // Split the provided record batch into multiple batch_size record batches |
There was a problem hiding this comment.
This might be a function that we could add to RecordBatch itself? I can file a ticket to do so if you would like
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_partition_sort_streaming_input_output() { |
There was a problem hiding this comment.
I think this test covers the case where each input stream has more than one RecordBatch, right (each input partition has three record batches).
Is there any value to another test that has input streams with differing numbers of input batches (I am thinking of an input with 3 partitions: 0 record batches, 1 record batch, and "many" (aka 2 or 3))?
Codecov Report
@@ Coverage Diff @@
## master #379 +/- ##
==========================================
+ Coverage 74.85% 75.39% +0.54%
==========================================
Files 146 148 +2
Lines 24565 25242 +677
==========================================
+ Hits 18387 19031 +644
- Misses 6178 6211 +33
Continue to review full report at Codecov.
|
| /// if all cursors for all streams are exhausted | ||
| fn next_stream_idx(&self) -> Result<Option<usize>> { | ||
| let mut min_cursor: Option<(usize, &SortKeyCursor)> = None; | ||
| for (idx, candidate) in self.cursors.iter().enumerate() { |
There was a problem hiding this comment.
For bigger number of partitions, storing the cursors in a BinaryHeap, sorted by their current item, would be beneficial.
A rust implementation of that approach can be seen in this blog post and the first comment under it. I have implemented the same approach in java before. I agree with @alamb though to make it work first, and then optimize later.
There was a problem hiding this comment.
great suggestion @jhorstmann -- thank you -- I filed #416 so it is more visible
|
Will rebase to remove merges |
9999e4d to
29c767b
Compare
|
This PR appears to need some rebasing / test fixing love: https://github.com/apache/arrow-datafusion/pull/379/checks?check_run_id=2674096854 |
|
Apologies - I stripped out the merge that fixed the logical conflict 🤦 Pushed a commit that fixes it 😄 |
alamb
left a comment
There was a problem hiding this comment.
I think this PR is ready -- thanks again @tustvold
What do you think @Dandandan / @andygrove ? Any objections to merging this (as a step towards a more sorted future in DataFusion)?
| /// if all cursors for all streams are exhausted | ||
| fn next_stream_idx(&self) -> Result<Option<usize>> { | ||
| let mut min_cursor: Option<(usize, &SortKeyCursor)> = None; | ||
| for (idx, candidate) in self.cursors.iter().enumerate() { |
There was a problem hiding this comment.
great suggestion @jhorstmann -- thank you -- I filed #416 so it is more visible
|
I just fixed a merge conflict -- if the tests pass I plan to merge this PR in |
* upgrade maturin to 0.15.1 * migrate maturin meta
Closes #362.
Creating as draft as currently builds on top of #378 as it uses a partitioned SortExec as part of its tests.
This PR adds a SortPreservingMergeExec operator that allows merging together multiple sorted partitions into a single partition.
The main implementation is contained within SortPreservingMergeStream and SortKeyCursor:
SortKeyCursorprovides the ability to compare the sort keys of the next row that could be yielded for each stream, in order to determine which one to yield.SortPreservingMergeStreammaintains a list ofSortKeyCursorfor each stream and builds up a list of sorted indices identifying rows within these cursors. When it reads the last row of a RecordBatch, it fetches another from the input. Once it has accumulated target_batch_size` row indexes (or exhausted all input streams) it will combine the relevant rows from the buffered RecordBatches into a single RecordBatch, drop any cursors it no longer needs, and yield the batch.