From 5ae6809a1a88e2627752cf0eed0bbf2d09a471da Mon Sep 17 00:00:00 2001 From: zzzzming95 <505306252@qq.com> Date: Mon, 10 Apr 2023 13:59:32 +0800 Subject: [PATCH 1/4] SPARK-43021 --- .../adaptive/AdaptiveSparkPlanExec.scala | 3 ++- .../spark/sql/sources/BucketedReadSuite.scala | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index a760991ab5180..6d3a9b4ab991f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._ -import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan +import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric} @@ -118,6 +118,7 @@ case class AdaptiveSparkPlanExec( val ensureRequirements = EnsureRequirements(requiredDistribution.isDefined, requiredDistribution) Seq( + CoalesceBucketsInJoin, RemoveRedundantProjects, ensureRequirements, AdjustShuffleExchangePosition, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index fc7c4e5761be1..29294983fb509 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, DisableAdaptiveExecution} import org.apache.spark.sql.execution.datasources.BucketingUtils -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1050,4 +1050,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } } + + test("SPARK-43021: Make coalesceBucketsInJoin effective enable AQE") { + withSQLConf( + SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTable("bucketed_table1", "bucketed_table2") { + df1.write.format("parquet").bucketBy(4, "i").saveAsTable("bucketed_table1") + df2.write.format("parquet").bucketBy(2, "i").saveAsTable("bucketed_table2") + val t1 = spark.table("bucketed_table1") + val t2 = spark.table("bucketed_table2") + val plan = t1.join(t2, t1("i") === t2("i")).queryExecution.executedPlan + assert(collect(plan) { case _: Exchange => true }.size === 0) + } + } + } } From 527e6d88be57c009be73ac0e76dc9cf76c81a27a Mon Sep 17 00:00:00 2001 From: zzzzming95 <505306252@qq.com> Date: Wed, 12 Apr 2023 22:05:32 +0800 Subject: [PATCH 2/4] update AdaptiveSparkPlanExec.scala --- .../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 6d3a9b4ab991f..fceb9db411200 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -117,6 +117,8 @@ case class AdaptiveSparkPlanExec( // around this case. val ensureRequirements = EnsureRequirements(requiredDistribution.isDefined, requiredDistribution) + // CoalesceBucketsInJoin can help eliminate shuffles and must be run before + // EnsureRequirements Seq( CoalesceBucketsInJoin, RemoveRedundantProjects, From 36257ae8e6039555f982fe93a24815cbffc17ead Mon Sep 17 00:00:00 2001 From: zzzzming95 <505306252@qq.com> Date: Wed, 12 Apr 2023 22:11:42 +0800 Subject: [PATCH 3/4] BucketedReadSuite.scala --- .../spark/sql/sources/BucketedReadSuite.scala | 57 +++++++------------ 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 29294983fb509..d3c159e4d7d93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -25,9 +25,9 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, SparkPlan} -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, DisableAdaptiveExecution} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper} import org.apache.spark.sql.execution.datasources.BucketingUtils -import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1010,8 +1010,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } - test("bucket coalescing is applied when join expressions match with partitioning expressions", - DisableAdaptiveExecution("Expected shuffle num mismatched")) { + test("bucket coalescing is applied when join expressions match with partitioning expressions") { withTable("t1", "t2") { df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2") @@ -1020,21 +1019,25 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { def verify( - query: String, - expectedNumShuffles: Int, - expectedCoalescedNumBuckets: Option[Int]): Unit = { - val plan = sql(query).queryExecution.executedPlan - val shuffles = plan.collect { case s: ShuffleExchangeExec => s } - assert(shuffles.length == expectedNumShuffles) - - val scans = plan.collect { - case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f - } - if (expectedCoalescedNumBuckets.isDefined) { - assert(scans.length == 1) - assert(scans.head.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) - } else { - assert(scans.isEmpty) + query: String, + expectedNumShuffles: Int, + expectedCoalescedNumBuckets: Option[Int]): Unit = { + Seq(true, false).foreach { aqeEnabled => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) { + val plan = sql(query).queryExecution.executedPlan + val shuffles = collect(plan) { case s: ShuffleExchangeExec => s } + assert(shuffles.length == expectedNumShuffles) + + val scans = collect(plan) { + case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f + } + if (expectedCoalescedNumBuckets.isDefined) { + assert(scans.length == 1) + assert(scans.head.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + } else { + assert(scans.isEmpty) + } + } } } @@ -1050,20 +1053,4 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } } - - test("SPARK-43021: Make coalesceBucketsInJoin effective enable AQE") { - withSQLConf( - SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - withTable("bucketed_table1", "bucketed_table2") { - df1.write.format("parquet").bucketBy(4, "i").saveAsTable("bucketed_table1") - df2.write.format("parquet").bucketBy(2, "i").saveAsTable("bucketed_table2") - val t1 = spark.table("bucketed_table1") - val t2 = spark.table("bucketed_table2") - val plan = t1.join(t2, t1("i") === t2("i")).queryExecution.executedPlan - assert(collect(plan) { case _: Exchange => true }.size === 0) - } - } - } } From 2a55aac16c1f4dba05cfba90d8661fc3349a74b0 Mon Sep 17 00:00:00 2001 From: zzzzming95 <505306252@qq.com> Date: Thu, 13 Apr 2023 22:11:54 +0800 Subject: [PATCH 4/4] update BucketedReadSuite.scala --- .../org/apache/spark/sql/sources/BucketedReadSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index d3c159e4d7d93..a18c681e0fe42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -1019,9 +1019,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { def verify( - query: String, - expectedNumShuffles: Int, - expectedCoalescedNumBuckets: Option[Int]): Unit = { + query: String, + expectedNumShuffles: Int, + expectedCoalescedNumBuckets: Option[Int]): Unit = { Seq(true, false).foreach { aqeEnabled => withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) { val plan = sql(query).queryExecution.executedPlan