[SPARK-40295][SQL] Allow v2 functions with literal args in write distribution/ordering#37749
[SPARK-40295][SQL] Allow v2 functions with literal args in write distribution/ordering#37749aokolnychyi wants to merge 7 commits into
Conversation
…ribution/ordering
|
@cloud-fan @sunchao @pan3793, could you take a look? |
| Literal.create(l.value, l.dataType) | ||
| case arg => | ||
| throw new AnalysisException( | ||
| s"Only references and literals are supported as transform arguments: $arg") |
There was a problem hiding this comment.
Technically, transform args can be arbitrary V2 expressions but I am not sure we want to invest into building a framework for supporting those right now.
pan3793
left a comment
There was a problem hiding this comment.
Thanks @aokolnychyi, the change makes sense to me.
| KeyGroupedPartitioning(expressions, partitionValues.size, Some(partitionValues)) | ||
| } | ||
|
|
||
| def supportsExpressions(expressions: Seq[Expression]): Boolean = { |
There was a problem hiding this comment.
@sunchao @cloud-fan, I went back and forth on where to add this validation. I decided to add it here as it is a current limitation of internal Catalyst KeyGroupedPartitioning. It is fine if a data source reports a partitioning with multi-arg transforms, we just can't benefit from it right now.
Let me know what you think. I also added a test to KeyGroupedPartitioningSuite.
There was a problem hiding this comment.
Is this expected to fail for both BucketTransform and SortedBucketTransform, which always have more than 1 child expression?
There was a problem hiding this comment.
I have managed to execute a SortMergeJoin of data sources partitioned by (SingleColumnTransform, SortedBucketTransform) benefitting from the partitioning, but I have had to transform the DataSourceV2Relation to DataSourceV2ScanRelation myself and I think it is the only way because of this check. Could it be dropped?
There was a problem hiding this comment.
I guess, this code is supposed to make sure there is only one child, but it was skipped in my execution as sorted was empty, though I don't understand what is this requirement for:
https://github.com/apache/spark/pull/35657/files#diff-715d0c2d59a4ddb8a4b5952c1b05be9f035b6d9b0d9670c70b58989dc722b252R86
Making sorted empty let me pass this check, though I need it for the join optimization. But I have managed to bring this property back on the physical level using org.apache.spark.sql.connector.read.SupportsReportOrdering and execute the SortMergeJoin without the exchanges and sorts.
cloud-fan
left a comment
There was a problem hiding this comment.
LGTM except for one suggestion
…s/physical/partitioning.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
|
Merged to master. Thanks @aokolnychyi , @cloud-fan , @pan3793 ! |
|
Thanks for reviewing, @sunchao @cloud-fan @pan3793! |
What changes were proposed in this pull request?
This PR adapts
V2ExpressionUtilsto support arbitrary transforms with multiple args that are either references or literals.Why are the changes needed?
After PR #36995, data sources can request distribution and ordering that reference v2 functions. If a data source needs a transform with multiple input args or a transform where not all args are references, Spark will throw an exception.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This PR adapts the test added recently in PR #36995.