From d02bc04127b8c1253557e46896bdd2b56c891c0f Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 27 May 2020 22:47:48 +0800 Subject: [PATCH 1/7] fix --- .../spark/scheduler/TaskSetManager.scala | 6 ++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 21 ++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a0e84b94735ec..161a8b7e5d69e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1107,10 +1107,16 @@ private[spark] class TaskSetManager( def recomputeLocality(): Unit = { // A zombie TaskSetManager may reach here while executorLost happens if (isZombie) return + val previousLocalityIndex = currentLocalityIndex val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) myLocalityLevels = computeValidLocalityLevels() localityWaits = myLocalityLevels.map(getLocalityWait) currentLocalityIndex = getLocalityIndex(previousLocalityLevel) + if (currentLocalityIndex > previousLocalityIndex) { + // there's new higher level locality, so shift to the highest + // locality level in terms of better data locality + currentLocalityIndex = 0 + } } def executorAdded(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 4978be3e04c1e..7e7c9abee9569 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -620,7 +620,7 @@ class TaskSetManagerSuite manager.executorAdded() sched.addExecutor("execC", "host2") manager.executorAdded() - assert(manager.resourceOffer("exec1", "host1", ANY)._1.isDefined) + assert(manager.resourceOffer("execB", "host1", ANY)._1.isDefined) sched.removeExecutor("execA") manager.executorLost( "execA", @@ -634,6 +634,25 @@ class TaskSetManagerSuite assert(sched.taskSetsFailed.contains(taskSet.id)) } + test("Shift to the new higher locality level if there is when recomputeLocality") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc) + val taskSet = FakeTask.createTaskSet(2, + Seq(TaskLocation("host1", "execA")), + Seq(TaskLocation("host1", "execA"))) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, 1, clock = clock) + // before any executors are added to TaskScheduler, the manager's + // locality level only has ANY, so tasks can be scheduled anyway. + assert(manager.resourceOffer("execB", "host2", ANY)._1.isDefined) + sched.addExecutor("execA", "host1") + manager.executorAdded() + // after adding a new executor, the manager locality has PROCESS_LOCAL, NODE_LOCAL, ANY. + // And we'll shift to the new highest locality level, which is PROCESS_LOCAL in this case. + assert(manager.resourceOffer("execC", "host3", ANY)._1.isEmpty) + assert(manager.resourceOffer("execA", "host1", ANY)._1.isDefined) + } + test("test RACK_LOCAL tasks") { // Assign host1 to rack1 FakeRackUtil.assignHostToRack("host1", "rack1") From 8c333e1c11a8e630286c5b2bcda3fdd5b316486a Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 27 May 2020 22:54:35 +0800 Subject: [PATCH 2/7] add jira id --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 ++-- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 161a8b7e5d69e..e64058d81d0bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1113,8 +1113,8 @@ private[spark] class TaskSetManager( localityWaits = myLocalityLevels.map(getLocalityWait) currentLocalityIndex = getLocalityIndex(previousLocalityLevel) if (currentLocalityIndex > previousLocalityIndex) { - // there's new higher level locality, so shift to the highest - // locality level in terms of better data locality + // SPARK-31837: there's new higher level locality, so shift to + // the highest locality level in terms of better data locality currentLocalityIndex = 0 } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 7e7c9abee9569..e4aad58d25064 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -634,7 +634,7 @@ class TaskSetManagerSuite assert(sched.taskSetsFailed.contains(taskSet.id)) } - test("Shift to the new higher locality level if there is when recomputeLocality") { + test("SPARK-31837: Shift to the new highest locality level if there is when recomputeLocality") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc) val taskSet = FakeTask.createTaskSet(2, From c7426668bd4adaf26d44740db80a0cf65153f207 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 28 May 2020 00:22:09 +0800 Subject: [PATCH 3/7] address comment --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index e64058d81d0bf..8b5d4e427bb51 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1113,7 +1113,7 @@ private[spark] class TaskSetManager( localityWaits = myLocalityLevels.map(getLocalityWait) currentLocalityIndex = getLocalityIndex(previousLocalityLevel) if (currentLocalityIndex > previousLocalityIndex) { - // SPARK-31837: there's new higher level locality, so shift to + // SPARK-31837: there's new higher locality level, so shift to // the highest locality level in terms of better data locality currentLocalityIndex = 0 } From 312700a7ba51d4a04b24cfdceda9c5d018767db9 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 28 May 2020 16:18:46 +0800 Subject: [PATCH 4/7] fix SPARK-31485 --- .../org/apache/spark/scheduler/BarrierTaskContextSuite.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index 54899bfcf34fa..01c82f894cf98 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -276,9 +276,6 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with test("SPARK-31485: barrier stage should fail if only partial tasks are launched") { initLocalClusterSparkContext(2) - // It's required to reset the delay timer when a task is scheduled, otherwise all the tasks - // could get scheduled at ANY level. - sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true) val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2) val dep = new OneToOneDependency[Int](rdd0) // set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for From 0773806047b1de475b6bb05fd30e9c5fd811d728 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 29 May 2020 15:26:15 +0800 Subject: [PATCH 5/7] remove the hack of spark-16106 test --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a8541cb863478..a75bae56229b4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1208,7 +1208,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("SPARK-16106 locality levels updated if executor added to existing host") { val taskScheduler = setupScheduler() - taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))) taskScheduler.submitTasks(FakeTask.createTaskSet(2, stageId = 0, stageAttemptId = 0, (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _* )) From cb51bb40f417183e84a92e4b0fd918326ad13c89 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 29 May 2020 15:46:57 +0800 Subject: [PATCH 6/7] update --- .../org/apache/spark/scheduler/TaskSetManager.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8b5d4e427bb51..cd872094efcab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1109,13 +1109,16 @@ private[spark] class TaskSetManager( if (isZombie) return val previousLocalityIndex = currentLocalityIndex val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) + val previousMyLocalityLevels = myLocalityLevels myLocalityLevels = computeValidLocalityLevels() localityWaits = myLocalityLevels.map(getLocalityWait) currentLocalityIndex = getLocalityIndex(previousLocalityLevel) if (currentLocalityIndex > previousLocalityIndex) { - // SPARK-31837: there's new higher locality level, so shift to - // the highest locality level in terms of better data locality - currentLocalityIndex = 0 + // SPARK-31837: there's more local locality level, so shift to the new most local locality + // level in terms of better data locality. For example, say the previous locality levels + // are [PROCESS, NODE, ANY] and current level is ANY. After recompute, the locality levels + // are [PROCESS, NODE, RACK, ANY]. Then, we'll shift to RACK level. + currentLocalityIndex = getLocalityIndex(myLocalityLevels.diff(previousMyLocalityLevels).head) } } From 8744f1ec02db28975a80410b5a01d2c2557ad216 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 29 May 2020 15:48:57 +0800 Subject: [PATCH 7/7] update comment --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index cd872094efcab..a302f680a272e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1114,10 +1114,10 @@ private[spark] class TaskSetManager( localityWaits = myLocalityLevels.map(getLocalityWait) currentLocalityIndex = getLocalityIndex(previousLocalityLevel) if (currentLocalityIndex > previousLocalityIndex) { - // SPARK-31837: there's more local locality level, so shift to the new most local locality - // level in terms of better data locality. For example, say the previous locality levels - // are [PROCESS, NODE, ANY] and current level is ANY. After recompute, the locality levels - // are [PROCESS, NODE, RACK, ANY]. Then, we'll shift to RACK level. + // SPARK-31837: If the new level is more local, shift to the new most local locality + // level in terms of better data locality. For example, say the previous locality + // levels are [PROCESS, NODE, ANY] and current level is ANY. After recompute, the + // locality levels are [PROCESS, NODE, RACK, ANY]. Then, we'll shift to RACK level. currentLocalityIndex = getLocalityIndex(myLocalityLevels.diff(previousMyLocalityLevels).head) } }