From 169460be6699ce23e2f0982de8b4818f4e184594 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 27 Apr 2021 10:44:53 +0800 Subject: [PATCH 1/3] fix --- .../apache/spark/scheduler/DAGScheduler.scala | 7 +++---- .../spark/scheduler/DAGSchedulerSuite.scala | 16 ++++++++++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a92d9fab6efc6..c3441ddb9a72d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1765,10 +1765,9 @@ private[spark] class DAGScheduler( val abortMessage = if (disallowStageRetryForTest) { "Fetch failure will not retry stage due to testing config" } else { - s"""$failedStage (${failedStage.name}) - |has failed the maximum allowable number of - |times: $maxConsecutiveStageAttempts. - |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") + s"$failedStage (${failedStage.name}) has failed the maximum allowable number of " + + s"times: $maxConsecutiveStageAttempts. Most recent failure reason:\n" + + failureMessage } abortStage(failedStage, abortMessage, None) } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4c74e4fbb3728..cc881320ce3a8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2655,12 +2655,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti } } - failAfter(10.seconds) { - val e = intercept[SparkException] { - runJobWithPersistentFetchFailure - } - assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) - } + runJobWithPersistentFetchFailure + +// failAfter(10.seconds) { +// val e = intercept[SparkException] { +// +// } +// // scalastyle:off +// +// assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) +// } // Run a second job that will fail due to a fetch failure. // This job will hang without the fix for SPARK-17644. From 78caf0a53e60d81e6211faadfc45fede5ef9c941 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 27 Apr 2021 10:49:31 +0800 Subject: [PATCH 2/3] revert testing code --- .../spark/scheduler/DAGSchedulerSuite.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index cc881320ce3a8..4c74e4fbb3728 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2655,16 +2655,12 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti } } - runJobWithPersistentFetchFailure - -// failAfter(10.seconds) { -// val e = intercept[SparkException] { -// -// } -// // scalastyle:off -// -// assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) -// } + failAfter(10.seconds) { + val e = intercept[SparkException] { + runJobWithPersistentFetchFailure + } + assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) + } // Run a second job that will fail due to a fetch failure. // This job will hang without the fix for SPARK-17644. From 0896255f43c622e096cab6b0eee5bc4f715abc40 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 27 Apr 2021 11:25:46 +0800 Subject: [PATCH 3/3] fix another one --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c3441ddb9a72d..b359501793dbd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1950,11 +1950,8 @@ private[spark] class DAGScheduler( "Barrier stage will not retry stage due to testing config. Most recent failure " + s"reason: $message" } else { - s"""$failedStage (${failedStage.name}) - |has failed the maximum allowable number of - |times: $maxConsecutiveStageAttempts. - |Most recent failure reason: $message - """.stripMargin.replaceAll("\n", " ") + s"$failedStage (${failedStage.name}) has failed the maximum allowable number of " + + s"times: $maxConsecutiveStageAttempts. Most recent failure reason: $message" } abortStage(failedStage, abortMessage, None) } else {