Implement window functions with partition_by clause#558
Conversation
e663573 to
7cb72ec
Compare
partition_by clause
Codecov Report
@@ Coverage Diff @@
## master #558 +/- ##
==========================================
- Coverage 76.12% 76.08% -0.04%
==========================================
Files 156 156
Lines 27074 27121 +47
==========================================
+ Hits 20609 20635 +26
- Misses 6465 6486 +21
Continue to review full report at Codecov.
|
0c6f31f to
c3c0ef5
Compare
4a7a499 to
1ae529f
Compare
|
@Dandandan and @alamb this is ready now |
|
after this pull request i'll rebase and merge #564 so that we can have a benchmark for future iterations |
| new_null_array(value.data_type(), num_rows) | ||
| } else { | ||
| let value = ScalarValue::try_from_array(value, index)?; | ||
| value.to_array_of_size(num_rows) |
There was a problem hiding this comment.
The same here applies for normal aggregations as probably happens here: if we have a partition by that creates a lot of groups, we will create many individual arrow arrays (which is slow / memory consuming).
Probably what would be better in the long run is store the offsets to the values in a contiguous array, and the values as well and extend / update them instead.
There was a problem hiding this comment.
Not needed for this PR btw, but just noting there are similar needs/performance issues in both aggregation and window functions.
There was a problem hiding this comment.
I agree, not in this pull request but I believe this can warrant a dedicated compute kernel in arrow for batched array slice transformation and then concatenation
There was a problem hiding this comment.
@Dandandan down the road I've started to work on this issue:
- let results = partition_points
- .iter()
- .map(|partition_range| {
- let sort_partition_points =
- find_ranges_in_range(partition_range, &sort_partition_points);
- let mut window_accumulators = self.create_accumulator()?;
- sort_partition_points
- .iter()
- .map(|range| window_accumulators.scan_peers(&values, range))
- .collect::<Result<Vec<_>>>()
- })
- .collect::<Result<Vec<Vec<ArrayRef>>>>()?
- .into_iter()
- .flatten()
- .collect::<Vec<ArrayRef>>();
- let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
- concat(&results).map_err(DataFusionError::ArrowError)
+ let mut result = Vec::with_capacity(num_rows);
+ for partition_range in partition_points {
+ let sort_partition_points =
+ find_ranges_in_range(&partition_range, &sort_partition_points);
+ let mut window_accumulators = self.create_accumulator()?;
+ for range in sort_partition_points {
+ result.extend(window_accumulators.scan_peers(&values, range)?);
+ }
+ }
+ ScalarValue::iter_to_array(result.into_iter())There was a problem hiding this comment.
Yes - that should probably already be quite an improvement 👍
0ab7340 to
4f98195
Compare
|
@Dandandan this is fixed now |
Dandandan
left a comment
There was a problem hiding this comment.
Looks great again! - 2 comments about tests for being a bit more future proof
4f98195 to
2488674
Compare
fixed, about repartition i'll handle that in #569 but so far i'm seeing regressions in performance |
|
Thanks @jimexist |
…pping` (apache#558) * Re-implement int decode methods using safe code * fix * fix tests * more tests * fix * fix * re-implement using unsafe read/write unaligned and add benchmark * lint * macros * more macros * combine macros * replace another impl with the macro * fix a regression * remove zero_value arg from generate_cast_to_signed and rename impl_plain_decoding_int to the original name of make_int_variant_impl
Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.35.0 to 1.35.1. - [Release notes](https://github.com/tokio-rs/tokio/releases) - [Commits](tokio-rs/tokio@tokio-1.35.0...tokio-1.35.1) --- updated-dependencies: - dependency-name: tokio dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Which issue does this PR close?
Closes #299
Rationale for this change
with order by implemented, we can add partition by support.
What changes are included in this PR?
Are there any user-facing changes?