add order by construct in window function and logical plans#463
add order by construct in window function and logical plans#463alamb merged 1 commit intoapache:masterfrom
order by construct in window function and logical plans#463Conversation
faa28c6 to
c4f2779
Compare
Codecov Report
@@ Coverage Diff @@
## master #463 +/- ##
==========================================
- Coverage 75.84% 75.83% -0.01%
==========================================
Files 153 153
Lines 25876 26078 +202
==========================================
+ Hits 19626 19777 +151
- Misses 6250 6301 +51
Continue to review full report at Codecov.
|
1684b78 to
177506a
Compare
5aa2b4e to
cc51a62
Compare
| message WindowNode { | ||
| LogicalPlanNode input = 1; | ||
| repeated LogicalExprNode window_expr = 2; | ||
| repeated LogicalExprNode partition_by_expr = 3; |
There was a problem hiding this comment.
turns out these are no longer useful
cc51a62 to
3261152
Compare
order by construct in window function and logical plans
| // udaf = 3 | ||
| } | ||
| LogicalExprNode expr = 4; | ||
| // repeated LogicalExprNode partition_by = 5; |
There was a problem hiding this comment.
we can probably just delete the old fields in the protobuf files -- I suspect no one is using them in a way that requires backwards compatibility
There was a problem hiding this comment.
these commented out ones are not old, they are reminders of future fields.
| fun: fun.clone(), | ||
| args: expressions.to_vec(), | ||
| }), | ||
| Expr::WindowFunction { fun, .. } => { |
There was a problem hiding this comment.
this pattern is kind of ugly -- at some point I would love to rewrite all this in terms of the expression visitors. Some day (TM) lol
| self.create_physical_expr(e, physical_input_schema, ctx_state) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| // if !order_by.is_empty() { |
There was a problem hiding this comment.
Why is this commented out? It seems a better idea to generate an error than to silently error out to me
There was a problem hiding this comment.
i would bring them back when it comes to implementing exec plan for sort, but maybe later
| && window.order_by.is_empty() | ||
| && window.window_frame.is_none() | ||
| { | ||
| if window.partition_by.is_empty() && window.window_frame.is_none() { |
There was a problem hiding this comment.
I don't understand the check for partition_by.is_empty() and window_frame.is_none() -- wouldn't we want to aways process the window clause?
There was a problem hiding this comment.
they need to be empty otherwise it goes to the unsupported error clause which is needed to guard unintended usage.
| quick_test(sql, expected); | ||
| } | ||
|
|
||
| /// psql result |
There was a problem hiding this comment.
Thank you for including the postgres results
| /// -> Seq Scan on orders (cost=0.00..20.00 rows=1000 width=8) | ||
| /// ``` | ||
| /// | ||
| /// FIXME: for now we are not detecting prefix of sorting keys in order to save one sort exec phase |
There was a problem hiding this comment.
FWIW I think the FIXME is fine to do later -- let's get the functionality (with tests) working first and then we can optimize afterwards.
There was a problem hiding this comment.
lots of room for optimization. there are several considerations when it comes to:
- how to compute order by and partition by and re-order them to optimize
- how to compute window aggregations given a possibility of either a ever-growing window or a shifting window (that can shrink and expand, depending on # of rows or the absolute values)
There was a problem hiding this comment.
for 1, e.g. with max(a) over (partition by b order by c) you can either:
- hash partition by b, merge, and then sort by c
- sort by (b, c) so it's easier to implement but you lose parallelism here
There was a problem hiding this comment.
for 2, for ever growing window, accumulative scan can be used, but for shrinking or shifting window, vec dequeue can be used, but also there's segment tree...
| ); | ||
| } | ||
|
|
||
| /// psql result |
There was a problem hiding this comment.
I don't think it really matters, but the max, min and avg window functions don't actually depend on order (and so in theory all of the sorts here could be optimized away.
There was a problem hiding this comment.
they do. with order by they compute accumulative sum/avg/max/min, not a full partition one.
| fun: WindowFunction::AggregateFunction(AggregateFunction::Max), | ||
| args: vec![col("name")], | ||
| order_by: vec![ | ||
| Expr::Sort { |
There was a problem hiding this comment.
FYI you can use the sort method here for less verbosity if you want: https://docs.rs/datafusion/4.0.0/datafusion/logical_plan/enum.Expr.html#method.sort
So something like order_by: vec![col("age").sort(true, true)]
let sort_expr = col("foo").sort(true, true); // SORT ASC NULLS_FIRST
There was a problem hiding this comment.
thanks for the reminder - i plan to optimize this in subsequent PRs - as there would be more to comp
febc427 to
76e7e6d
Compare
| \n WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\ | ||
| \n Sort: #qty ASC NULLS FIRST, #order_id ASC NULLS FIRST\ | ||
| \n WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\ | ||
| \n Sort: #order_id ASC NULLS FIRST, #qty ASC NULLS FIRST\ |
There was a problem hiding this comment.
Might be good to remove the defaults (i.e. nulls first)
|
I am merging this one in so that @jimexist can continue with a minimum number of PRs open. Thanks again for keeping the 🚋 rolling! |
Which issue does this PR close?
add sort_by construct in window function and logical plans.
Related and partly implements #360
Rationale for this change
Implementing
order byconstructs and its logical planning by following a similar approach to PostgreSQL.Coming up: adding window frame unit and type defs, and allow window functions to operate differently depending on
RANGEorROWwindow frame spec.What changes are included in this PR?
order byAre there any user-facing changes?