Implement GROUPING aggregate function (following Postgres behavior.)#12565
Implement GROUPING aggregate function (following Postgres behavior.)#12565bgjackma wants to merge 2 commits intoapache:mainfrom
Conversation
| // The PhysicalExprs of grouping_exprs must be Column PhysicalExpr. Because if | ||
| // the group by PhysicalExpr in SQL is non-Column PhysicalExpr, then there is | ||
| // a ProjectionExec before AggregateExec to convert the non-column PhysicalExpr | ||
| // to Column PhysicalExpr. | ||
| let column_index = | ||
| |expr: &Arc<dyn PhysicalExpr>| match expr.as_any().downcast_ref::<Column>() { | ||
| Some(column) => Ok(column.index()), | ||
| None => internal_err!("Grouping doesn't support expr: {}", expr), | ||
| }; |
There was a problem hiding this comment.
This is only true when one enabled the optimizer rule CommonSubexprEliminate . Does not seems like a acceptable to depend on optimizer rules for correctness/basic support.
There was a problem hiding this comment.
Can we look for equal PhysicalExprs?
The Postgres docs imply they do ~text comparison but I'm not sure how accessible that info is at this layer.
| // GROUPING is a special fxn that exposes info about group organization | ||
| if let Some(grouping) = agg_expr.fun().inner().as_any().downcast_ref::<Grouping>() { | ||
| let args = agg_expr.all_expressions().args; | ||
| return grouping.create_grouping_accumulator(&args, &group_by.expr); | ||
| } |
There was a problem hiding this comment.
If we need special handling like this it seems to me that we should consider just making Grouping a build in.
Or we should probably make it more generic so it can be used to implement some other function. But since the input is is just the bitmaks and the output is the same. I wonder if there are any conceivable functions that could not just be implemented as a transformation on a builtin grouping function.
There was a problem hiding this comment.
It's kind of in a weird place, it's sort of not a real aggregation function but instead a way to leak metadata. That might be a reason to make it a built-in.
Do you have ideas about how and when to go about doing that?
There's another function called GROUP_ID (not to be confused with GROUPING_ID) which disambiguates duplicate rows, it might be relevant.
There was a problem hiding this comment.
It's kind of in a weird place, it's sort of not a real aggregation function but instead a way to leak metadata. That might be a reason to make it a built-in.
Do you have ideas about how and when to go about doing that?
One way might be that we expose the grouping_id column used in #12571 and implement the function as transformation on that. This should be possible as that column should "leak" the needed metadata. This is what was proposed in #5749
There was a problem hiding this comment.
I pushed an initial implemenation of this here: #12704
I think someone with more experince with this project should decide what is the best way forward.
|
Looks like there is a minor clippy failure on this PR |
382bb23 to
9c0ed8a
Compare
| grouping_args: &[Arc<dyn PhysicalExpr>], | ||
| group_exprs: &[(Arc<dyn PhysicalExpr>, String)], | ||
| ) -> Result<Box<dyn GroupsAccumulator>> { | ||
| if grouping_args.len() > 32 { |
| }; | ||
| let group_by_columns: Result<Vec<_>> = | ||
| group_exprs.iter().map(|(e, _)| column_index(e)).collect(); | ||
| let group_by_columns = group_by_columns?; |
| struct GroupingAccumulator { | ||
| // Grouping ID value for each group | ||
| grouping_ids: Vec<u32>, | ||
| // Indices of GROUPING arguments as they appear in the GROUPING SET |
There was a problem hiding this comment.
can we have more details or example on indices?
| } | ||
|
|
||
| impl GroupingAccumulator { | ||
| fn mask_to_id(&self, mask: &[bool]) -> Result<u32> { |
There was a problem hiding this comment.
Please add more description on this method, how it changes the mask
| _opt_filter: Option<&BooleanArray>, | ||
| total_num_groups: usize, | ||
| ) -> Result<()> { | ||
| assert_eq!(values.len(), 1, "single argument to merge_batch"); |
There was a problem hiding this comment.
so we always expect only 1 array ?
| expr_indices: vec![5], | ||
| }; | ||
| let res = grouping.mask_to_id(&[false]); | ||
| assert!(res.is_err()) |
There was a problem hiding this comment.
you may want to check the error message as well
| .iter() | ||
| .map(|group| { | ||
| group | ||
| let v = group |
There was a problem hiding this comment.
lets have more meaningful name?
| agg_expr: &AggregateFunctionExpr, | ||
| group_by: &PhysicalGroupBy, | ||
| ) -> Result<Box<dyn GroupsAccumulator>> { | ||
| // GROUPING is a special fxn that exposes info about group organization |
There was a problem hiding this comment.
| // GROUPING is a special fxn that exposes info about group organization | |
| // GROUPING is a special function that exposes info about group organization |
?
| | AggregateMode::SinglePartitioned => output.push(acc.evaluate(emit_to)?), | ||
| } | ||
| } | ||
| debug!("Output: {:?}", output); |
There was a problem hiding this comment.
| debug!("Output: {:?}", output); |
| })?; | ||
| let mut output = group_values | ||
| .first() | ||
| .map(|gs| gs.values.clone()) |
Which issue does this PR close?
Closes #5647.
Rationale for this change
Implements the GROUPING function as per Postgres.
https://www.postgresql.org/docs/15/functions-aggregate.html#FUNCTIONS-GROUPING-TABLE
This is in contrast to other implementations including Databricks and Oracle where GROUPING takes only one column and there is a GROUPING_ID function that yields a similar bitfield.
What changes are included in this PR?
Implement the aggregate function in the Physical Planning stage.
Are these changes tested?
A few unit tests and an integration test provided by @JasonLi-cn in a previous unfinished PR. May add more.
Are there any user-facing changes?