Handle ordering of first last aggregation inside aggregator#8662
Handle ordering of first last aggregation inside aggregator#8662ozankabak merged 10 commits intoapache:mainfrom synnada-ai:feature/first_last_non_order_sensitive
Conversation
| SELECT DISTINCT ON (c1) c2 FROM aggregate_test_100 ORDER BY c1, c3; | ||
| ---- | ||
| 5 | ||
| 4 |
There was a problem hiding this comment.
I ran same query in the postgre, it gave the same result with the new version.
| // Append ordering requirements to expressions' results. | ||
| // This way order sensitive aggregators can satisfy requirement | ||
| // themselves. | ||
| if let Some(ordering_req) = agg.order_bys() { |
There was a problem hiding this comment.
Since aggregators themselves handle ordering. We append ordering expression values to the field also for all modes.
| } | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| let indices = lexsort_to_indices(&sort_columns, Some(1))?; |
There was a problem hiding this comment.
If there is a min max alternative to this we can use that one also. However, as far as I know there is no util for this support. Maybe @tustvold can answer this, if he is familiar with.
There was a problem hiding this comment.
I'm not aware of a min/max kernel that returns the ordinal position of the min/max
There was a problem hiding this comment.
BTW I had the same basic need (find the position of min/max so I could find a value in a corresponding column) while implementing our special selector_first, selector_last, etc functions in InfluxDB 3.0 (I also had to code them specially)
There was a problem hiding this comment.
Do you think you implementation is more efficient? If that is the case, maybe we can use that code instead?
There was a problem hiding this comment.
I think our implementation is (slightly) more efficient, but it is less general (only works for timestamp columns). You can see the basic idea here
https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs
And the comparision is here: https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127
I think we should stay with the ScalarValue implementation unless we find some query where this calculation is taking most of the time
| ----------------------CoalesceBatchesExec: target_batch_size=8192 | ||
| ------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 | ||
| --------------------------MemoryExec: partitions=1, partition_sizes=[3] | ||
| ------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] |
There was a problem hiding this comment.
Since first_value and last_value no longer requires ordering at its input. SortExecs are removed from the plan.
|
|
||
| query III | ||
| SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c | ||
| SELECT a, b, LAST_VALUE(c ORDER BY a DESC, c ASC) as last_c |
There was a problem hiding this comment.
The result of this test was not unique according to specifications (Since column a is not unique). I changed test to make result unique.
| // - There is a more recent entry in terms of requirement | ||
| if !self.is_set | ||
| || self.orderings.is_empty() | ||
| || compare_rows( |
There was a problem hiding this comment.
I'm sure you are aware but https://docs.rs/arrow-row/latest/arrow_row/ will be a much faster way to perform row-based comparisons than relying on ScalarValue
There was a problem hiding this comment.
Indeed, however, here we are checking just a single row (row that have lowest value). Hence I don't think it is worth to conversion here.
There was a problem hiding this comment.
I agree that since it is a single column max comparison this is probably fine (and no worse than the current implementation). If we need to optimize performance we could probably implement specialized implementations (like FirstValue<ArrowPrimitiveType> and skip the copying entirely.
That is likely a premature optimization at this point
Update: Row format may well be a good idea (not for this PR). I will wait until I have reviewed this code to offer a more informed opinion
There was a problem hiding this comment.
I re-reviewed and I agree that the RowFormat is not needed here (and in fact it may actually be slower) because, as @mustafasrepo points out, this code uses ScalarValue to compare a single row per batch (it finds the largest/smallest row per batch using lexsort_to_indices). We would have to benchmark to be sure.
|
I plan to review this carefully either later today or tomorrow. I want to get a draft of #8491 first |
alamb
left a comment
There was a problem hiding this comment.
Thank you @mustafasrepo and @ozankabak -- this PR looks good to me. ❤️
I believe if we applied the same change to ArrayAgg I think we can remove the limitation of a single compatible ORDER BY in a query -- aka #8582 -- is that your understanding too?
I am sorry for the delay in reviewing, I am partly on holiday this week so don't have as much time to devote to these endeavors as normal.
I think that in many common queries, this implementation is likely faster than what is on main because it doesn't potentially re-sort the entire input (it instead used lexsort_to_indices)
As we discussed in the design document the potential downside of this approach is that if multiple aggregates share the same ORDER BY clause, they will each independently sort the input batches, which is unfortunate but could be optimized in future PTs
| }; | ||
| // Update when there is no entry in the state, or we have an "earlier" | ||
| // entry according to sort requirements. | ||
| if !self.is_set |
There was a problem hiding this comment.
In theory, we may be able to use a Option<ScalarValue> instead of ScalarValue and is_set flag, but I don't think it matters for performance and this PR follows the existing implementation as well 👍
| // - There is a more recent entry in terms of requirement | ||
| if !self.is_set | ||
| || self.orderings.is_empty() | ||
| || compare_rows( |
There was a problem hiding this comment.
I re-reviewed and I agree that the RowFormat is not needed here (and in fact it may actually be slower) because, as @mustafasrepo points out, this code uses ScalarValue to compare a single row per batch (it finds the largest/smallest row per batch using lexsort_to_indices). We would have to benchmark to be sure.
| } | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| let indices = lexsort_to_indices(&sort_columns, Some(1))?; |
There was a problem hiding this comment.
I think our implementation is (slightly) more efficient, but it is less general (only works for timestamp columns). You can see the basic idea here
https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs
And the comparision is here: https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127
I think we should stay with the ScalarValue implementation unless we find some query where this calculation is taking most of the time
| aggr_expr.as_any().is::<FirstValue>() | ||
| || aggr_expr.as_any().is::<LastValue>() | ||
| || aggr_expr.as_any().is::<OrderSensitiveArrayAgg>() | ||
| aggr_expr.as_any().is::<OrderSensitiveArrayAgg>() |
There was a problem hiding this comment.
Eventually this would be a nice thing to move into the AggregateExpr trait directly so we could override it and avoid special casing built in functions. Not for this PR though :)
| --------CoalesceBatchesExec: target_batch_size=8192 | ||
| ----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4 | ||
| ------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)], ordering_mode=Sorted | ||
| --------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST] |
There was a problem hiding this comment.
I do love the lack of Sort here
Yes, if we make
Exactly. We will explore both the split/diamond approach and the approach above in the upcoming weeks |
Which issue does this PR close?
Closes #.
Improves situation on #8662
Related to #8582
Rationale for this change
This PR implements the observation by @alamb at the PR that for first and last value aggregation we do not need to sort entire data at its input.
In other words, This PR is the
FIRST_VALUEandLAST_VALUEaggregation support of the approach 3 in the design documentWhat changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?