From 5699cf4eadc6957156833a5bc369ae5af2f32236 Mon Sep 17 00:00:00 2001 From: Yu Zhong Date: Thu, 21 Jan 2021 11:28:14 +0800 Subject: [PATCH 1/3] SPARK-33933 partial fix: start materialization for BroadcastQueryState first to avoid broadcast timeout in AQE --- .../adaptive/AdaptiveSparkPlanExec.scala | 14 ++++++++++- .../execution/adaptive/QueryStageExec.scala | 1 + .../adaptive/AdaptiveQueryExecSuite.scala | 25 +++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 89d3b53510469..b98e194d5d3e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -189,8 +189,20 @@ case class AdaptiveSparkPlanExec( stagesToReplace = result.newStages ++ stagesToReplace executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan))) + // SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting + // for tasks to be scheduled and leading to broadcast timeout. + // This partial fix only grantee the start of materialization for BroadcastQueryStage + // is prior to others, but because the submission of collect job for broadcasting is + // run in another thread, the issue is not completely solved. + val reorderedNewStages = result.newStages + .sortWith { + case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => false + case (_: BroadcastQueryStageExec, _) => true + case _ => false + } + // Start materialization of all new stages and fail fast if any stages failed eagerly - result.newStages.foreach { stage => + reorderedNewStages.foreach { stage => try { stage.materialize().onComplete { res => if (res.isSuccess) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 901daf6995b3c..f9c696c3342c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -77,6 +77,7 @@ abstract class QueryStageExec extends LeafExecNode { * stage is ready. */ final def materialize(): Future[Any] = executeQuery { + logDebug(s"Materialize query stage ${this.getClass.getSimpleName}: $id") doMaterialize() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 92f7f40b54770..fe18d900d24c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1460,4 +1460,29 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-33933: Materialize BroadcastQueryStage first in AQE") { + val testAppender = new LogAppender("aqe query stage materialization order test") + val df = spark.sparkContext.parallelize(Range(0, 10)) + .flatMap(x => { + for (i <- Range(0, 100)) yield (x % 26, x % 10) + }).toDF("index", "pv") + val dim = Range(0, 26).map(x => (x, ('a' + x).toChar.toString)) + .toDF("index", "name") + val testDf = df.groupBy("index") + .agg(sum($"pv").alias("pv")) + .join(dim, Seq("index")) + withLogAppender(testAppender, level = Some(Level.DEBUG)) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val result = testDf.collect() + assert(result.length == 26) + } + } + val materializeLogs = testAppender.loggingEvents + .map(_.getRenderedMessage) + .filter(_.startsWith("Materialize query stage")) + .toArray + assert(materializeLogs(0).startsWith("Materialize query stage BroadcastQueryStageExec")) + assert(materializeLogs(1).startsWith("Materialize query stage ShuffleQueryStageExec")) + } } From 08edec529bfc4fc363b0c91da75e0656c1440bc6 Mon Sep 17 00:00:00 2001 From: Yu Zhong Date: Thu, 21 Jan 2021 15:41:44 +0800 Subject: [PATCH 2/3] simplify UT --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index fe18d900d24c7..6138323dfb0f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1463,10 +1463,8 @@ class AdaptiveQueryExecSuite test("SPARK-33933: Materialize BroadcastQueryStage first in AQE") { val testAppender = new LogAppender("aqe query stage materialization order test") - val df = spark.sparkContext.parallelize(Range(0, 10)) - .flatMap(x => { - for (i <- Range(0, 100)) yield (x % 26, x % 10) - }).toDF("index", "pv") + val df = spark.range(1000).select($"id" % 26, $"id" % 10) + .toDF("index", "pv") val dim = Range(0, 26).map(x => (x, ('a' + x).toChar.toString)) .toDF("index", "name") val testDf = df.groupBy("index") From d0b8ee3776d2ed8dd966c96a6e4f1409430256a3 Mon Sep 17 00:00:00 2001 From: Yu Zhong Date: Thu, 21 Jan 2021 22:14:15 +0800 Subject: [PATCH 3/3] fix typo --- .../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index b98e194d5d3e4..bc56d153ff856 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -191,9 +191,9 @@ case class AdaptiveSparkPlanExec( // SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting // for tasks to be scheduled and leading to broadcast timeout. - // This partial fix only grantee the start of materialization for BroadcastQueryStage + // This partial fix only guarantees the start of materialization for BroadcastQueryStage // is prior to others, but because the submission of collect job for broadcasting is - // run in another thread, the issue is not completely solved. + // running in another thread, the issue is not completely resolved. val reorderedNewStages = result.newStages .sortWith { case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => false