From 0891907399fbea508bfe5d644ec4d5b0eef6b6be Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Mon, 23 Dec 2019 16:20:27 -0800 Subject: [PATCH 01/15] only reset delay schedule timer if taskset didn't reject any resources due to delay scheduling --- .../spark/scheduler/TaskSchedulerImpl.scala | 45 ++++- .../spark/scheduler/TaskSetManager.scala | 34 ++-- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../scheduler/TaskSchedulerImplSuite.scala | 158 +++++++++++++++++- 4 files changed, 212 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f0f84fe63d1cf..9cbbc28af38a5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -36,7 +36,7 @@ import org.apache.spark.rpc.RpcEndpoint import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils} +import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, ThreadUtils, Utils} /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. @@ -61,7 +61,8 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils} private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures: Int, - isLocal: Boolean = false) + isLocal: Boolean = false, + clock: Clock = new SystemClock) extends TaskScheduler with Logging { import TaskSchedulerImpl._ @@ -125,7 +126,6 @@ private[spark] class TaskSchedulerImpl( protected val executorIdToHost = new HashMap[String, String] private val abortTimer = new Timer(true) - private val clock = new SystemClock // Exposed for testing val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long] @@ -331,8 +331,11 @@ private[spark] class TaskSchedulerImpl( availableCpus: Array[Int], availableResources: Array[Map[String, Buffer[String]]], tasks: IndexedSeq[ArrayBuffer[TaskDescription]], - addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = { + addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : + (Boolean, Boolean, Option[TaskLocality]) = { var launchedTask = false + var hasDelayScheduleReject = false + var minLaunchedLocality: Option[TaskLocality] = None // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point for (i <- 0 until shuffledOffers.size) { @@ -348,11 +351,15 @@ private[spark] class TaskSchedulerImpl( try { val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) - val taskDescOption = taskSet.resourceOffer(execId, host, maxLocality, + val (taskDescOption, reject) = taskSet.resourceOfferInternal(execId, host, maxLocality, taskResAssignments) + hasDelayScheduleReject |= reject for (task <- taskDescOption) { tasks(i) += task val tid = task.taskId + val locality = taskSet.taskInfos(task.taskId).taskLocality + minLaunchedLocality = minLaunchedLocality + .map(min => if (locality < min) locality else min).orElse(Some(locality)) taskIdToTaskSetManager.put(tid, taskSet) taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) @@ -379,12 +386,12 @@ private[spark] class TaskSchedulerImpl( logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. - return launchedTask + return (launchedTask, hasDelayScheduleReject, minLaunchedLocality) } } } } - launchedTask + (launchedTask, hasDelayScheduleReject, minLaunchedLocality) } /** @@ -471,7 +478,9 @@ private[spark] class TaskSchedulerImpl( * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ - def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { + def resourceOffers( + offers: IndexedSeq[WorkerOffer], + isAllFreeResources: Boolean = false): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false @@ -544,18 +553,36 @@ private[spark] class TaskSchedulerImpl( s"number of available slots is $numBarrierSlotsAvailable.") } else { var launchedAnyTask = false + var delaySchedulingRejectedAnyTask = false + var globalMinLocality: Option[TaskLocality] = None // Record all the executor IDs assigned barrier tasks on. val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() for (currentMaxLocality <- taskSet.myLocalityLevels) { var launchedTaskAtCurrentMaxLocality = false do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, + val (launched, hadDelayScheduleReject, minLocality) = + resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, availableResources, tasks, addressesWithDescs) + launchedTaskAtCurrentMaxLocality = launched launchedAnyTask |= launchedTaskAtCurrentMaxLocality + delaySchedulingRejectedAnyTask |= hadDelayScheduleReject + globalMinLocality = + if (minLocality.isEmpty) { globalMinLocality } + else if (globalMinLocality.isEmpty) { + minLocality + } else if (minLocality.get < globalMinLocality.get) { + minLocality + } else { + globalMinLocality + } } while (launchedTaskAtCurrentMaxLocality) } + if (isAllFreeResources && !delaySchedulingRejectedAnyTask) { + taskSet.resetDelayScheduleTimer(globalMinLocality) + } + if (!launchedAnyTask) { taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex => // If the taskSet is unschedulable we try to find an existing idle blacklisted diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2c792338b5295..f3d64a8fdac03 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -386,6 +386,22 @@ private[spark] class TaskSetManager( None } + def resourceOffer( + execId: String, + host: String, + maxLocality: TaskLocality.TaskLocality, + availableResources: Map[String, Seq[String]] = Map.empty) + : Option[TaskDescription] = { + resourceOfferInternal(execId, host, maxLocality, availableResources)._1 + } + + def resetDelayScheduleTimer(minLocality: Option[TaskLocality.TaskLocality]): Unit = { + lastLaunchTime = clock.getTimeMillis() + for (locality <- minLocality) { + currentLocalityIndex = getLocalityIndex(locality) + } + } + /** * Respond to an offer of a single executor from the scheduler by finding a task * @@ -398,12 +414,12 @@ private[spark] class TaskSetManager( * @param maxLocality the maximum locality we want to schedule the tasks at */ @throws[TaskNotSerializableException] - def resourceOffer( + def resourceOfferInternal( execId: String, host: String, maxLocality: TaskLocality.TaskLocality, taskResourceAssignments: Map[String, ResourceInformation] = Map.empty) - : Option[TaskDescription] = + : (Option[TaskDescription], Boolean) = { val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => blacklist.isNodeBlacklistedForTaskSet(host) || @@ -422,7 +438,9 @@ private[spark] class TaskSetManager( } } - dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => + val taskDescription = + dequeueTask(execId, host, allowedLocality) + .map { case ((index, taskLocality, speculative)) => // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() @@ -433,12 +451,6 @@ private[spark] class TaskSetManager( execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) - // Update our locality level for delay scheduling - // NO_PREF will not affect the variables related to delay scheduling - if (maxLocality != TaskLocality.NO_PREF) { - currentLocalityIndex = getLocalityIndex(taskLocality) - lastLaunchTime = curTime - } // Serialize and return the task val serializedTask: ByteBuffer = try { ser.serialize(task) @@ -482,8 +494,10 @@ private[spark] class TaskSetManager( taskResourceAssignments, serializedTask) } + (taskDescription, + taskDescription.isEmpty && maxLocality == TaskLocality.ANY && pendingTasks.all.nonEmpty) } else { - None + (None, false) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index cca8e86b48691..778c59022a37f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -303,7 +303,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp (rName, rInfo.availableAddrs.toBuffer) }, executorData.resourceProfileId) }.toIndexedSeq - scheduler.resourceOffers(workOffers) + scheduler.resourceOffers(workOffers, true) } if (taskDescs.nonEmpty) { launchTasks(taskDescs) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index b9a11e7f66c64..6f00899a6549a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -196,6 +196,150 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("SPARK-18886 Delay scheduling should not delay some executors indefinitely " + + "if one task is scheduled before delay timeout") { + val LOCALITY_WAIT_MS = 3000 + val clock = new ManualClock + val conf = new SparkConf() + sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) + val taskScheduler = new TaskSchedulerImpl(sc, + sc.conf.get(config.TASK_MAX_FAILURES), + clock = clock) { + override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + } + override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + // Don't shuffle the offers around for this test. Instead, we'll just pass in all + // the permutations we care about directly. + offers + } + } + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {} + + override def executorAdded(execId: String, host: String): Unit = {} + } + val valueSer = SparkEnv.get.serializer.newInstance() + taskScheduler.initialize(new FakeSchedulerBackend) + val taskSet = FakeTask.createTaskSet(8, 1, 1, + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")) + ) + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1))) + taskScheduler.submitTasks(taskSet) + + // First offer host2, exec2: no task should be chosen due to bad data locality + assert(taskScheduler.resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host2", 1)), + isAllFreeResources = true) + .flatten.isEmpty) + + // Offer exec 1 (local) and exec 2 (no local), only exec 1 should be utilized + val taskDescriptions0 = taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1), WorkerOffer("exec2", "host2", 1)), + isAllFreeResources = true) + .flatten + assert(taskDescriptions0.length === 1) + val task0 = taskDescriptions0.head + assert(task0.index === 0) + val result0 = new DirectTaskResult[Int](valueSer.serialize(0), Seq(), Array()) + taskScheduler.statusUpdate(task0.taskId, TaskState.FINISHED, valueSer.serialize(result0)) + + // clock advances, increasing data locality level to ANY + clock.advance(LOCALITY_WAIT_MS * 2) + val taskDescriptions1 = taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1), WorkerOffer("exec2", "host2", 1)), + isAllFreeResources = true) + .flatten + assert(taskDescriptions1.length === 2) + // Local resource is utilized again + val task1 = taskDescriptions1.head + assert(task1.index === 1) + // Even though we launched a local task, we still utilize non-local exec2 + // This is the behavior change to fix SPARK-18886. Also tested below in this same test + // This is because we have not utilized our 2 possible slots (exec1 + exec2) within + // the locality wait period. Timer should be reset, but not locality level of ANY. + val task2 = taskDescriptions1(1) + assert(task2.index === 2) + + // Finish data local task on exec 1 (still 1 task running on exec 2) + val result1 = new DirectTaskResult[Int](valueSer.serialize(1), Seq(), Array()) + taskScheduler.statusUpdate(task1.taskId, TaskState.FINISHED, valueSer.serialize(result1)) + + // Local resource will again be utilized. Data locality is reset to PROCESS_LOCAL + val taskDescriptions3 = taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = true) + .flatten + assert(taskDescriptions3.length === 1) + val task3 = taskDescriptions3.head + assert(task3.index === 3) + // Complete task 2 and task 3, no more tasks running + val result2 = new DirectTaskResult[Int](valueSer.serialize(2), Seq(), Array()) + taskScheduler.statusUpdate(task2.taskId, TaskState.FINISHED, valueSer.serialize(result2)) + val result3 = new DirectTaskResult[Int](valueSer.serialize(3), Seq(), Array()) + taskScheduler.statusUpdate(task3.taskId, TaskState.FINISHED, valueSer.serialize(result3)) + + // Non-local resource will not be utilized, since data locality was reset, + // and locality wait has not expired + assert(taskScheduler.resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host2", 1)), + isAllFreeResources = true) + .flatten.isEmpty) + + // Offer a total of 2 slots on exec 3, non are taken since non-local + assert(taskScheduler.resourceOffers( + IndexedSeq(WorkerOffer("exec3", "host3", 2)), + isAllFreeResources = true) + .flatten.isEmpty) + + taskScheduler.executorLost("exec1", LossReasonPending) + taskScheduler.executorLost("exec2", LossReasonPending) + + // clock advances, increasing data locality level to ANY + clock.advance(LOCALITY_WAIT_MS * 2) + + // All resources, non-local and local, are accepted. + // Locality timer is reset and locality level is set to PROCESS_LOCAL + val taskDescriptions4 = taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1), WorkerOffer("exec3", "host3", 2)), + isAllFreeResources = true) + .flatten + assert(taskDescriptions4.length === 3) + val task4 = taskDescriptions4.head + assert(task4.index === 4) + val task5 = taskDescriptions4(1) + assert(task5.index === 5) + val task6 = taskDescriptions4(2) + assert(task6.index === 6) + + // Finish data local task. Only 2 running tasks now + val result5 = new DirectTaskResult[Int](valueSer.serialize(5), Seq(), Array()) + taskScheduler.statusUpdate(task5.taskId, TaskState.FINISHED, valueSer.serialize(result5)) + + // Non-local resource (exec3) will not be utilized, since data locality was reset, + // and locality wait has not expired + val taskDescriptions5 = taskScheduler.resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1), WorkerOffer("exec3", "host3", 1)), + isAllFreeResources = true) + .flatten + + assert(taskDescriptions5.length === 1) + val tasks7 = taskDescriptions5.head + assert(tasks7.index === 7) + } + test("Scheduler does not crash when tasks are not serializable") { val taskCpus = 2 val taskScheduler = setupSchedulerWithMaster( @@ -683,9 +827,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val tsm = stageToMockTaskSetManager(0) // submit an offer with one executor - var taskAttempts = taskScheduler.resourceOffers(IndexedSeq( - WorkerOffer("executor0", "host0", 1) - )).flatten + var taskAttempts = taskScheduler.resourceOffers( + IndexedSeq(WorkerOffer("executor0", "host0", 1)), + isAllFreeResources = true).flatten // Fail the running task val failedTask = taskAttempts.head @@ -913,14 +1057,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("SPARK-16106 locality levels updated if executor added to existing host") { val taskScheduler = setupScheduler() + taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))) taskScheduler.submitTasks(FakeTask.createTaskSet(2, stageId = 0, stageAttemptId = 0, (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _* )) - val taskDescs = taskScheduler.resourceOffers(IndexedSeq( - new WorkerOffer("executor0", "host0", 1), - new WorkerOffer("executor1", "host1", 1) - )).flatten + val taskDescs = taskScheduler.resourceOffers( + IndexedSeq(WorkerOffer("executor0", "host0", 1), WorkerOffer("executor1", "host1", 1)), + isAllFreeResources = true).flatten // only schedule one task because of locality assert(taskDescs.size === 1) From 512b15fe3d32e8861b03224a97c0e1c8a4839d32 Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Wed, 25 Dec 2019 17:56:16 -0800 Subject: [PATCH 02/15] also reset on partial resource offers if possible --- .../spark/scheduler/TaskSchedulerImpl.scala | 47 ++-- .../spark/scheduler/TaskSetManager.scala | 3 +- .../scheduler/TaskSchedulerImplSuite.scala | 246 ++++++++++-------- 3 files changed, 176 insertions(+), 120 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 9cbbc28af38a5..3574a5f384674 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -98,6 +98,11 @@ private[spark] class TaskSchedulerImpl( // on this class. Protected by `this` private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] + // keyed by task set stage id + // value is true if there have been no resources rejected due to delay scheduling + // since the last "full" resource offer + private val noDelayScheduleRejects = new mutable.HashMap[Int, Boolean]() + // Protected by `this` private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] // Protected by `this` @@ -319,6 +324,7 @@ private[spark] class TaskSchedulerImpl( taskSetsByStageIdAndAttempt -= manager.taskSet.stageId } } + noDelayScheduleRejects -= manager.taskSet.stageId manager.parent.removeSchedulable(manager) logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + s" ${manager.parent.name}") @@ -473,6 +479,19 @@ private[spark] class TaskSchedulerImpl( }.sum } + def minTaskLocality(l1: Option[TaskLocality], l2: Option[TaskLocality]) : + Option[TaskLocality] = { + if (l1.isEmpty) { + l2 + } else if (l2.isEmpty) { + l1 + } else if (l1.get < l2.get) { + l1 + } else { + l2 + } + } + /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so @@ -553,34 +572,30 @@ private[spark] class TaskSchedulerImpl( s"number of available slots is $numBarrierSlotsAvailable.") } else { var launchedAnyTask = false - var delaySchedulingRejectedAnyTask = false + var hadAnyDelaySchedulingReject = false var globalMinLocality: Option[TaskLocality] = None // Record all the executor IDs assigned barrier tasks on. val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() for (currentMaxLocality <- taskSet.myLocalityLevels) { var launchedTaskAtCurrentMaxLocality = false do { - val (launched, hadDelayScheduleReject, minLocality) = - resourceOfferSingleTaskSet(taskSet, - currentMaxLocality, shuffledOffers, availableCpus, + val (launched, hadDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet( + taskSet, currentMaxLocality, shuffledOffers, availableCpus, availableResources, tasks, addressesWithDescs) launchedTaskAtCurrentMaxLocality = launched launchedAnyTask |= launchedTaskAtCurrentMaxLocality - delaySchedulingRejectedAnyTask |= hadDelayScheduleReject - globalMinLocality = - if (minLocality.isEmpty) { globalMinLocality } - else if (globalMinLocality.isEmpty) { - minLocality - } else if (minLocality.get < globalMinLocality.get) { - minLocality - } else { - globalMinLocality - } + hadAnyDelaySchedulingReject |= hadDelayScheduleReject + globalMinLocality = minTaskLocality(globalMinLocality, minLocality) } while (launchedTaskAtCurrentMaxLocality) } - if (isAllFreeResources && !delaySchedulingRejectedAnyTask) { - taskSet.resetDelayScheduleTimer(globalMinLocality) + if (!hadAnyDelaySchedulingReject) { + if (isAllFreeResources || noDelayScheduleRejects.getOrElse(taskSet.stageId, true)) { + taskSet.resetDelayScheduleTimer(globalMinLocality) + noDelayScheduleRejects.update(taskSet.stageId, true) + } + } else { + noDelayScheduleRejects.update(taskSet.stageId, false) } if (!launchedAnyTask) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f3d64a8fdac03..2803d03d7c1f8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -395,7 +395,8 @@ private[spark] class TaskSetManager( resourceOfferInternal(execId, host, maxLocality, availableResources)._1 } - def resetDelayScheduleTimer(minLocality: Option[TaskLocality.TaskLocality]): Unit = { + private[scheduler] def resetDelayScheduleTimer( + minLocality: Option[TaskLocality.TaskLocality]): Unit = { lastLaunchTime = clock.getTimeMillis() for (locality <- minLocality) { currentLocalityIndex = getLocalityIndex(locality) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 6f00899a6549a..23bc362429baa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -196,10 +196,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } - test("SPARK-18886 Delay scheduling should not delay some executors indefinitely " + - "if one task is scheduled before delay timeout") { - val LOCALITY_WAIT_MS = 3000 - val clock = new ManualClock + def setupTaskScheduler(clock: ManualClock): TaskSchedulerImpl = { val conf = new SparkConf() sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) val taskScheduler = new TaskSchedulerImpl(sc, @@ -220,7 +217,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B override def executorAdded(execId: String, host: String): Unit = {} } - val valueSer = SparkEnv.get.serializer.newInstance() taskScheduler.initialize(new FakeSchedulerBackend) val taskSet = FakeTask.createTaskSet(8, 1, 1, Seq(TaskLocation("host1", "exec1")), @@ -234,110 +230,155 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B ) taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1))) taskScheduler.submitTasks(taskSet) + taskScheduler + } + + test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer before " + + "any resources have been rejected") { + val clock = new ManualClock() + val taskScheduler = setupTaskScheduler(clock) + val advanceAmount = 2000 + + // by default, new partial resource (isAllFreeResources = false) offers reset timer + // if the resource is accepted + clock.advance(advanceAmount) + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) - // First offer host2, exec2: no task should be chosen due to bad data locality - assert(taskScheduler.resourceOffers( - IndexedSeq(WorkerOffer("exec2", "host2", 1)), - isAllFreeResources = true) + // would advance to NODE_LOCAL locality if timer wasn't reset above + // Verifying this node local task is not accepted + clock.advance(advanceAmount) + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1))) .flatten.isEmpty) + } + + test("SPARK-18886 - delay scheduling timer is reset when it accepts all resources offered when" + + "isAllFreeResources = true") { + val clock = new ManualClock() + val taskScheduler = setupTaskScheduler(clock) + val advanceAmount = 2000 - // Offer exec 1 (local) and exec 2 (no local), only exec 1 should be utilized - val taskDescriptions0 = taskScheduler + // timer is reset when tsm accepts all free resources offered to it + clock.advance(advanceAmount) + assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec1", "host1", 1), WorkerOffer("exec2", "host2", 1)), + IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = true) - .flatten - assert(taskDescriptions0.length === 1) - val task0 = taskDescriptions0.head - assert(task0.index === 0) - val result0 = new DirectTaskResult[Int](valueSer.serialize(0), Seq(), Array()) - taskScheduler.statusUpdate(task0.taskId, TaskState.FINISHED, valueSer.serialize(result0)) - - // clock advances, increasing data locality level to ANY - clock.advance(LOCALITY_WAIT_MS * 2) - val taskDescriptions1 = taskScheduler + .flatten.length === 1) + + // would advance to NODE_LOCAL locality if timer wasn't reset above + // Verifying this node local task is not accepted + clock.advance(advanceAmount) + assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec1", "host1", 1), WorkerOffer("exec2", "host2", 1)), - isAllFreeResources = true) - .flatten - assert(taskDescriptions1.length === 2) - // Local resource is utilized again - val task1 = taskDescriptions1.head - assert(task1.index === 1) - // Even though we launched a local task, we still utilize non-local exec2 - // This is the behavior change to fix SPARK-18886. Also tested below in this same test - // This is because we have not utilized our 2 possible slots (exec1 + exec2) within - // the locality wait period. Timer should be reset, but not locality level of ANY. - val task2 = taskDescriptions1(1) - assert(task2.index === 2) - - // Finish data local task on exec 1 (still 1 task running on exec 2) - val result1 = new DirectTaskResult[Int](valueSer.serialize(1), Seq(), Array()) - taskScheduler.statusUpdate(task1.taskId, TaskState.FINISHED, valueSer.serialize(result1)) - - // Local resource will again be utilized. Data locality is reset to PROCESS_LOCAL - val taskDescriptions3 = taskScheduler + IndexedSeq(WorkerOffer("exec2", "host1", 1))) + .flatten.isEmpty) + } + + test("SPARK-18886 - partial resource offers (isAllFreeResources = false) reset " + + "time if last full resource offer (isAllResources = true) was accepted as well as any " + + "following partial resource offers") { + val clock = new ManualClock() + val taskScheduler = setupTaskScheduler(clock) + val advanceAmount = 2000 + + assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = true) - .flatten - assert(taskDescriptions3.length === 1) - val task3 = taskDescriptions3.head - assert(task3.index === 3) - // Complete task 2 and task 3, no more tasks running - val result2 = new DirectTaskResult[Int](valueSer.serialize(2), Seq(), Array()) - taskScheduler.statusUpdate(task2.taskId, TaskState.FINISHED, valueSer.serialize(result2)) - val result3 = new DirectTaskResult[Int](valueSer.serialize(3), Seq(), Array()) - taskScheduler.statusUpdate(task3.taskId, TaskState.FINISHED, valueSer.serialize(result3)) - - // Non-local resource will not be utilized, since data locality was reset, - // and locality wait has not expired - assert(taskScheduler.resourceOffers( - IndexedSeq(WorkerOffer("exec2", "host2", 1)), - isAllFreeResources = true) + .flatten.length === 1) + + clock.advance(advanceAmount) + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + clock.advance(advanceAmount) + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + clock.advance(advanceAmount) + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1))) .flatten.isEmpty) + } + - // Offer a total of 2 slots on exec 3, non are taken since non-local - assert(taskScheduler.resourceOffers( - IndexedSeq(WorkerOffer("exec3", "host3", 2)), - isAllFreeResources = true) + // This tests two cases + // 1. partial resource offer doesn't reset timer after full resource offer had rejected resources + // 2. partial resource offer doesn't reset timer after partial resource offer + // had rejected resources + test("SPARK-18886 - partial resource offers (isAllFreeResources = false) do not reset " + + "time if any offer was rejected since last full offer was fully accepted") { + val clock = new ManualClock() + val taskScheduler = setupTaskScheduler(clock) + val advanceAmount = 2000 + + // case 1 from test description above + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = true) .flatten.isEmpty) - taskScheduler.executorLost("exec1", LossReasonPending) - taskScheduler.executorLost("exec2", LossReasonPending) + clock.advance(advanceAmount) + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + // Even though we launched a local task above, we still utilize non-local exec2 + // This is the behavior change to fix SPARK-18886. + // Locality level is NODE_LOCAL after this clock advance + clock.advance(advanceAmount) + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1))) + .flatten.length === 1) - // clock advances, increasing data locality level to ANY - clock.advance(LOCALITY_WAIT_MS * 2) - // All resources, non-local and local, are accepted. - // Locality timer is reset and locality level is set to PROCESS_LOCAL - val taskDescriptions4 = taskScheduler + // case 2 from test description above + // Reset timer and locality level to PROCESS_LOCAL + clock.advance(advanceAmount) + assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec1", "host1", 1), WorkerOffer("exec3", "host3", 2)), + IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = true) - .flatten - assert(taskDescriptions4.length === 3) - val task4 = taskDescriptions4.head - assert(task4.index === 4) - val task5 = taskDescriptions4(1) - assert(task5.index === 5) - val task6 = taskDescriptions4(2) - assert(task6.index === 6) - - // Finish data local task. Only 2 running tasks now - val result5 = new DirectTaskResult[Int](valueSer.serialize(5), Seq(), Array()) - taskScheduler.statusUpdate(task5.taskId, TaskState.FINISHED, valueSer.serialize(result5)) - - // Non-local resource (exec3) will not be utilized, since data locality was reset, - // and locality wait has not expired - val taskDescriptions5 = taskScheduler.resourceOffers( - IndexedSeq(WorkerOffer("exec1", "host1", 1), WorkerOffer("exec3", "host3", 1)), - isAllFreeResources = true) - .flatten - - assert(taskDescriptions5.length === 1) - val tasks7 = taskDescriptions5.head - assert(tasks7.index === 7) + .flatten.length === 1) + + // partial resource has rejected offer + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) + .flatten.isEmpty) + + // This does not reset timer since last partial resource offer was rejected + clock.advance(advanceAmount) + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + // Locality level is NODE_LOCAL after this clock advance + clock.advance(advanceAmount) + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1))) + .flatten.length === 1) } test("Scheduler does not crash when tasks are not serializable") { @@ -827,9 +868,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val tsm = stageToMockTaskSetManager(0) // submit an offer with one executor - var taskAttempts = taskScheduler.resourceOffers( - IndexedSeq(WorkerOffer("executor0", "host0", 1)), - isAllFreeResources = true).flatten + var taskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten // Fail the running task val failedTask = taskAttempts.head @@ -1045,12 +1086,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } // Here is the main check of this test -- we have the same offers again, and we schedule it - // successfully. Because the scheduler first tries to schedule with locality in mind, at first - // it won't schedule anything on executor1. But despite that, we don't abort the job. Then the - // scheduler tries for ANY locality, and successfully schedules tasks on executor1. + // successfully. Because the scheduler tries to schedule with locality in mind, at first + // it won't schedule anything on executor1. But despite that, we don't abort the job. val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten - assert(secondTaskAttempts.size == 2) - secondTaskAttempts.foreach { taskAttempt => assert("executor1" === taskAttempt.executorId) } + assert(secondTaskAttempts.isEmpty) assert(!failedTaskSet) } @@ -1062,9 +1101,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _* )) - val taskDescs = taskScheduler.resourceOffers( - IndexedSeq(WorkerOffer("executor0", "host0", 1), WorkerOffer("executor1", "host1", 1)), - isAllFreeResources = true).flatten + val taskDescs = taskScheduler.resourceOffers(IndexedSeq( + new WorkerOffer("executor0", "host0", 1), + new WorkerOffer("executor1", "host1", 1) + )).flatten // only schedule one task because of locality assert(taskDescs.size === 1) From 270718ac46d1d1030001c4bf2d04c8cc93a23a18 Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Wed, 25 Dec 2019 23:09:16 -0800 Subject: [PATCH 03/15] local backend always submits all free resources --- .../apache/spark/scheduler/local/LocalSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index e2b1198060f76..0ffe032296b0d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -88,7 +88,7 @@ private[spark] class LocalEndpoint( // local mode doesn't support extra resources like GPUs right now val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores, Some(rpcEnv.address.hostPort))) - for (task <- scheduler.resourceOffers(offers).flatten) { + for (task <- scheduler.resourceOffers(offers, true).flatten) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, task) } From 27c1e8ce689b87c69cf1082f637a8c6408c0726d Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Sat, 1 Feb 2020 20:30:28 -0800 Subject: [PATCH 04/15] only reset if task is launched add config to fall back to legacy behavior rename variables --- .../spark/internal/config/package.scala | 5 +++ .../spark/scheduler/TaskSchedulerImpl.scala | 42 ++++++++++--------- .../spark/scheduler/TaskSetManager.scala | 10 +++-- .../CoarseGrainedSchedulerBackend.scala | 2 +- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f70ee2e5c71c7..73941304f01a8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -543,6 +543,11 @@ package object config { .version("1.2.0") .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) + private[spark] val LEGACY_LOCALITY_WAIT_RESET = + ConfigBuilder("spark.locality.wait.legacyResetOnTaskLaunch") + .booleanConf + .createWithDefault(false) + private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") .version("0.5.0") .timeConf(TimeUnit.MILLISECONDS) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3574a5f384674..3c2ef9412e10e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -99,9 +99,9 @@ private[spark] class TaskSchedulerImpl( private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] // keyed by task set stage id - // value is true if there have been no resources rejected due to delay scheduling - // since the last "full" resource offer - private val noDelayScheduleRejects = new mutable.HashMap[Int, Boolean]() + // value is true if the task set's locality wait timer was reset on the last resource offer + private val resetOnPreviousOffer = new mutable.HashMap[Int, Boolean]() + private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET) // Protected by `this` private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] @@ -324,7 +324,7 @@ private[spark] class TaskSchedulerImpl( taskSetsByStageIdAndAttempt -= manager.taskSet.stageId } } - noDelayScheduleRejects -= manager.taskSet.stageId + resetOnPreviousOffer -= manager.taskSet.stageId manager.parent.removeSchedulable(manager) logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + s" ${manager.parent.name}") @@ -340,7 +340,7 @@ private[spark] class TaskSchedulerImpl( addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : (Boolean, Boolean, Option[TaskLocality]) = { var launchedTask = false - var hasDelayScheduleReject = false + var noDelayScheduleRejects = true var minLaunchedLocality: Option[TaskLocality] = None // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point @@ -357,9 +357,9 @@ private[spark] class TaskSchedulerImpl( try { val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) - val (taskDescOption, reject) = taskSet.resourceOfferInternal(execId, host, maxLocality, - taskResAssignments) - hasDelayScheduleReject |= reject + val (taskDescOption, didReject) = + taskSet.resourceOfferInternal(execId, host, maxLocality, availableResources(i)) + noDelayScheduleRejects &= !didReject for (task <- taskDescOption) { tasks(i) += task val tid = task.taskId @@ -392,12 +392,12 @@ private[spark] class TaskSchedulerImpl( logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. - return (launchedTask, hasDelayScheduleReject, minLaunchedLocality) + return (launchedTask, noDelayScheduleRejects, minLaunchedLocality) } } } } - (launchedTask, hasDelayScheduleReject, minLaunchedLocality) + (launchedTask, noDelayScheduleRejects, minLaunchedLocality) } /** @@ -499,7 +499,7 @@ private[spark] class TaskSchedulerImpl( */ def resourceOffers( offers: IndexedSeq[WorkerOffer], - isAllFreeResources: Boolean = false): Seq[Seq[TaskDescription]] = synchronized { + isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false @@ -572,30 +572,32 @@ private[spark] class TaskSchedulerImpl( s"number of available slots is $numBarrierSlotsAvailable.") } else { var launchedAnyTask = false - var hadAnyDelaySchedulingReject = false + var noDelaySchedulingRejects = true var globalMinLocality: Option[TaskLocality] = None // Record all the executor IDs assigned barrier tasks on. val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() for (currentMaxLocality <- taskSet.myLocalityLevels) { var launchedTaskAtCurrentMaxLocality = false do { - val (launched, hadDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet( + val (launched, noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, availableResources, tasks, addressesWithDescs) launchedTaskAtCurrentMaxLocality = launched launchedAnyTask |= launchedTaskAtCurrentMaxLocality - hadAnyDelaySchedulingReject |= hadDelayScheduleReject + noDelaySchedulingRejects &= noDelayScheduleReject globalMinLocality = minTaskLocality(globalMinLocality, minLocality) } while (launchedTaskAtCurrentMaxLocality) } - if (!hadAnyDelaySchedulingReject) { - if (isAllFreeResources || noDelayScheduleRejects.getOrElse(taskSet.stageId, true)) { - taskSet.resetDelayScheduleTimer(globalMinLocality) - noDelayScheduleRejects.update(taskSet.stageId, true) + if (!legacyLocalityWaitReset) { + if (noDelaySchedulingRejects && launchedAnyTask) { + if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.stageId, true)) { + taskSet.resetDelayScheduleTimer(globalMinLocality) + resetOnPreviousOffer.update(taskSet.stageId, true) + } + } else { + resetOnPreviousOffer.update(taskSet.stageId, false) } - } else { - noDelayScheduleRejects.update(taskSet.stageId, false) } if (!launchedAnyTask) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2803d03d7c1f8..623ce1dbce124 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -221,10 +221,11 @@ private[spark] class TaskSetManager( private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait) // Delay scheduling variables: we keep track of our current locality level and the time we - // last launched a task at that level, and move up a level when localityWaits[curLevel] expires. - // We then move down if we manage to launch a "more local" task. + // last reset the locality wait timer, and move up a level when localityWaits[curLevel] expires. + // We then move down if we manage to launch a "more local" task when resetting the timer + private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET) private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels - private var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level + private var lastLaunchTime = clock.getTimeMillis() // Time we last reset locality wait override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null @@ -452,6 +453,9 @@ private[spark] class TaskSetManager( execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) + if (legacyLocalityWaitReset && maxLocality != TaskLocality.NO_PREF) { + resetDelayScheduleTimer(Some(taskLocality)) + } // Serialize and return the task val serializedTask: ByteBuffer = try { ser.serialize(task) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 778c59022a37f..ad6f806a540bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -331,7 +331,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorData.resourcesInfo.map { case (rName, rInfo) => (rName, rInfo.availableAddrs.toBuffer) }, executorData.resourceProfileId)) - scheduler.resourceOffers(workOffers) + scheduler.resourceOffers(workOffers, false) } else { Seq.empty } From a15b2e0d396597d9af766a1923b322f32fe5352c Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Sat, 1 Feb 2020 20:38:41 -0800 Subject: [PATCH 05/15] more variable renaming --- .../apache/spark/scheduler/TaskSetManager.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 623ce1dbce124..2ee8c188cbe7b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -225,7 +225,7 @@ private[spark] class TaskSetManager( // We then move down if we manage to launch a "more local" task when resetting the timer private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET) private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels - private var lastLaunchTime = clock.getTimeMillis() // Time we last reset locality wait + private var lastLocalityWaitResetTime = clock.getTimeMillis() // Time we last reset locality wait override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null @@ -398,7 +398,7 @@ private[spark] class TaskSetManager( private[scheduler] def resetDelayScheduleTimer( minLocality: Option[TaskLocality.TaskLocality]): Unit = { - lastLaunchTime = clock.getTimeMillis() + lastLocalityWaitResetTime = clock.getTimeMillis() for (locality <- minLocality) { currentLocalityIndex = getLocalityIndex(locality) } @@ -566,14 +566,14 @@ private[spark] class TaskSetManager( // This is a performance optimization: if there are no more tasks that can // be scheduled at a particular locality level, there is no point in waiting // for the locality wait timeout (SPARK-4939). - lastLaunchTime = curTime + lastLocalityWaitResetTime = curTime logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") currentLocalityIndex += 1 - } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { - // Jump to the next locality level, and reset lastLaunchTime so that the next locality - // wait timer doesn't immediately expire - lastLaunchTime += localityWaits(currentLocalityIndex) + } else if (curTime - lastLocalityWaitResetTime >= localityWaits(currentLocalityIndex)) { + // Jump to the next locality level, and reset lastLocalityWaitResetTime so that the next + // locality wait timer doesn't immediately expire + lastLocalityWaitResetTime += localityWaits(currentLocalityIndex) logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " + s"${localityWaits(currentLocalityIndex)}ms") currentLocalityIndex += 1 From ec3d3c6450ef40cb37b1be3fd8358976090912fb Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Sun, 9 Feb 2020 20:25:47 -0800 Subject: [PATCH 06/15] update tests include test with legacy locality wait behavior use taskset in map instead of stage id formatting --- .../spark/scheduler/TaskSchedulerImpl.scala | 18 +- .../spark/scheduler/TaskSetManager.scala | 21 +- .../CoarseGrainedSchedulerBackendSuite.scala | 2 +- .../scheduler/TaskSchedulerImplSuite.scala | 19 +- .../spark/scheduler/TaskSetManagerSuite.scala | 229 +++++++++--------- ...esosFineGrainedSchedulerBackendSuite.scala | 2 +- 6 files changed, 146 insertions(+), 145 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3c2ef9412e10e..c1634273d6990 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -98,9 +98,9 @@ private[spark] class TaskSchedulerImpl( // on this class. Protected by `this` private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] - // keyed by task set stage id + // keyed by taskset // value is true if the task set's locality wait timer was reset on the last resource offer - private val resetOnPreviousOffer = new mutable.HashMap[Int, Boolean]() + private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]() private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET) // Protected by `this` @@ -324,7 +324,7 @@ private[spark] class TaskSchedulerImpl( taskSetsByStageIdAndAttempt -= manager.taskSet.stageId } } - resetOnPreviousOffer -= manager.taskSet.stageId + resetOnPreviousOffer -= manager.taskSet manager.parent.removeSchedulable(manager) logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + s" ${manager.parent.name}") @@ -337,8 +337,8 @@ private[spark] class TaskSchedulerImpl( availableCpus: Array[Int], availableResources: Array[Map[String, Buffer[String]]], tasks: IndexedSeq[ArrayBuffer[TaskDescription]], - addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : - (Boolean, Boolean, Option[TaskLocality]) = { + addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) + : (Boolean, Boolean, Option[TaskLocality]) = { var launchedTask = false var noDelayScheduleRejects = true var minLaunchedLocality: Option[TaskLocality] = None @@ -358,7 +358,7 @@ private[spark] class TaskSchedulerImpl( val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) val (taskDescOption, didReject) = - taskSet.resourceOfferInternal(execId, host, maxLocality, availableResources(i)) + taskSet.resourceOffer(execId, host, maxLocality, availableResources(i)) noDelayScheduleRejects &= !didReject for (task <- taskDescOption) { tasks(i) += task @@ -591,12 +591,12 @@ private[spark] class TaskSchedulerImpl( if (!legacyLocalityWaitReset) { if (noDelaySchedulingRejects && launchedAnyTask) { - if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.stageId, true)) { + if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) { taskSet.resetDelayScheduleTimer(globalMinLocality) - resetOnPreviousOffer.update(taskSet.stageId, true) + resetOnPreviousOffer.update(taskSet.taskSet, true) } } else { - resetOnPreviousOffer.update(taskSet.stageId, false) + resetOnPreviousOffer.update(taskSet.taskSet, false) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2ee8c188cbe7b..914983910d425 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -387,15 +387,6 @@ private[spark] class TaskSetManager( None } - def resourceOffer( - execId: String, - host: String, - maxLocality: TaskLocality.TaskLocality, - availableResources: Map[String, Seq[String]] = Map.empty) - : Option[TaskDescription] = { - resourceOfferInternal(execId, host, maxLocality, availableResources)._1 - } - private[scheduler] def resetDelayScheduleTimer( minLocality: Option[TaskLocality.TaskLocality]): Unit = { lastLocalityWaitResetTime = clock.getTimeMillis() @@ -414,9 +405,12 @@ private[spark] class TaskSetManager( * @param execId the executor Id of the offered resource * @param host the host Id of the offered resource * @param maxLocality the maximum locality we want to schedule the tasks at + * + * @return Tuple containing: + * (TaskDescription of launched task if any, rejected resource due to delay scheduling?) */ @throws[TaskNotSerializableException] - def resourceOfferInternal( + def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality, @@ -442,7 +436,7 @@ private[spark] class TaskSetManager( val taskDescription = dequeueTask(execId, host, allowedLocality) - .map { case ((index, taskLocality, speculative)) => + .map { case (index, taskLocality, speculative) => // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() @@ -499,8 +493,9 @@ private[spark] class TaskSetManager( taskResourceAssignments, serializedTask) } - (taskDescription, - taskDescription.isEmpty && maxLocality == TaskLocality.ANY && pendingTasks.all.nonEmpty) + val hasScheduleDelayReject = + taskDescription.isEmpty && maxLocality == TaskLocality.ANY && pendingTasks.all.nonEmpty + (taskDescription, hasScheduleDelayReject) } else { (None, false) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index f4745dbb5a747..058f4013005e5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -248,7 +248,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo "t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], new Properties(), taskResources, bytebuffer))) val ts = backend.getTaskSchedulerImpl() - when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]])).thenReturn(taskDescs) + when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs) backend.driverEndpoint.send(ReviveOffers) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 23bc362429baa..c1f7a5eba4217 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -233,7 +233,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler } - test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer before " + + test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer before " + "any resources have been rejected") { val clock = new ManualClock() val taskScheduler = setupTaskScheduler(clock) @@ -253,11 +253,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B clock.advance(advanceAmount) assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec2", "host1", 1))) + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) .flatten.isEmpty) } - test("SPARK-18886 - delay scheduling timer is reset when it accepts all resources offered when" + + test("SPARK-18886 - delay scheduling timer is reset when it accepts all resources offered when" + "isAllFreeResources = true") { val clock = new ManualClock() val taskScheduler = setupTaskScheduler(clock) @@ -276,7 +277,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B clock.advance(advanceAmount) assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec2", "host1", 1))) + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) .flatten.isEmpty) } @@ -310,7 +312,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B clock.advance(advanceAmount) assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec2", "host1", 1))) + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) .flatten.isEmpty) } @@ -345,7 +348,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B clock.advance(advanceAmount) assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec2", "host1", 1))) + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) .flatten.length === 1) @@ -377,7 +381,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B clock.advance(advanceAmount) assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec2", "host1", 1))) + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) .flatten.length === 1) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 4566e3ca7b3ee..b04bae42b5544 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -225,7 +225,7 @@ class TaskSetManagerSuite // Offer a host with NO_PREF as the constraint, // we should get a nopref task immediately since that's what we only have - val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption.isDefined) clock.advance(1) @@ -246,7 +246,7 @@ class TaskSetManagerSuite // First three offers should all find tasks for (i <- 0 until 3) { - val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === "exec1") @@ -254,7 +254,7 @@ class TaskSetManagerSuite assert(sched.startedTasks.toSet === Set(0, 1, 2)) // Re-offer the host -- now we should get no more tasks - assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None) + assert(manager.resourceOffer("exec1", "host1", NO_PREF)._1 === None) // Finish the first two tasks manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdatesByTask(0))) @@ -277,12 +277,12 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // An executor that is not NODE_LOCAL should be rejected. - assert(manager.resourceOffer("execC", "host2", ANY) === None) + assert(manager.resourceOffer("execC", "host2", ANY)._1 === None) // Because there are no alive PROCESS_LOCAL executors, the base locality level should be // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before // any of the locality wait timers expire. - assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("execA", "host1", ANY)._1.get.index === 0) } test("basic delay scheduling") { @@ -297,22 +297,22 @@ class TaskSetManagerSuite val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // First offer host1, exec1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1 === None) clock.advance(LOCALITY_WAIT_MS) // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 3) should // get chosen before the noPref task - assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2) + assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL)._1.get.index == 2) // Offer host2, exec2, at NODE_LOCAL level: we should choose task 2 - assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1) + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL)._1.get.index == 1) // Offer host2, exec2 again, at NODE_LOCAL level: we should get noPref task // after failing to find a node_Local task - assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None) + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL)._1 === None) clock.advance(LOCALITY_WAIT_MS) - assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3) + assert(manager.resourceOffer("exec2", "host2", NO_PREF)._1.get.index == 3) } test("we do not need to delay scheduling when we only have noPref tasks in the queue") { @@ -326,10 +326,10 @@ class TaskSetManagerSuite val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // First offer host1, exec1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0) - assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1) - assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None) - assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1.get.index === 0) + assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL)._1.get.index === 1) + assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL)._1 === None) + assert(manager.resourceOffer("exec3", "host2", NO_PREF)._1.get.index === 2) } test("delay scheduling with fallback") { @@ -343,33 +343,34 @@ class TaskSetManagerSuite Seq(TaskLocation("host3")), Seq(TaskLocation("host2")) ) + sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // First offer host1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) // Offer host1 again: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None) clock.advance(LOCALITY_WAIT_MS) // Offer host1 again: second task (on host2) should get chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 1) // Offer host1 again: third task (on host2) should get chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 2) // Offer host2: fifth task (also on host2) should get chosen - assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 4) + assert(manager.resourceOffer("exec2", "host2", ANY)._1.get.index === 4) // Now that we've launched a local task, we should no longer launch the task for host3 - assert(manager.resourceOffer("exec2", "host2", ANY) === None) + assert(manager.resourceOffer("exec2", "host2", ANY)._1 === None) clock.advance(LOCALITY_WAIT_MS) // After another delay, we can go ahead and launch that task non-locally - assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3) + assert(manager.resourceOffer("exec2", "host2", ANY)._1.get.index === 3) } test("delay scheduling with failed hosts") { @@ -385,28 +386,28 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // First offer host1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) // After this, nothing should get chosen, because we have separated tasks with unavailable // preference from the noPrefPendingTasks - assert(manager.resourceOffer("exec1", "host1", ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None) // Now mark host2 as dead sched.removeExecutor("exec2") manager.executorLost("exec2", "host2", SlaveLost()) // nothing should be chosen - assert(manager.resourceOffer("exec1", "host1", ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None) clock.advance(LOCALITY_WAIT_MS * 2) // task 1 and 2 would be scheduled as nonLocal task - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 1) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 2) // all finished - assert(manager.resourceOffer("exec1", "host1", ANY) === None) - assert(manager.resourceOffer("exec2", "host2", ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None) + assert(manager.resourceOffer("exec2", "host2", ANY)._1 === None) } test("task result lost") { @@ -417,14 +418,14 @@ class TaskSetManagerSuite clock.advance(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) // Tell it the task has finished but the result was lost. manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost) assert(sched.endedTasks(0) === TaskResultLost) // Re-offer the host -- now we should get task 0 again. - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) } test("repeated failures lead to task set abortion") { @@ -438,7 +439,7 @@ class TaskSetManagerSuite // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted // after the last failure. (1 to manager.maxTaskFailures).foreach { index => - val offerResult = manager.resourceOffer("exec1", "host1", ANY) + val offerResult = manager.resourceOffer("exec1", "host1", ANY)._1 assert(offerResult.isDefined, "Expect resource offer on iteration %s to return a task".format(index)) assert(offerResult.get.index === 0) @@ -474,7 +475,7 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) { - val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) + val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1 assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) @@ -485,15 +486,15 @@ class TaskSetManagerSuite assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec1 fails after failure 1 due to blacklist - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1.isEmpty) + assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL)._1.isEmpty) + assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL)._1.isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) } // Run the task on exec1.1 - should work, and then fail it on exec1.1 { - val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL) + val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)._1 assert(offerResult.isDefined, "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult) @@ -505,12 +506,12 @@ class TaskSetManagerSuite assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist - assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)._1.isEmpty) } // Run the task on exec2 - should work, and then fail it on exec2 { - val offerResult = manager.resourceOffer("exec2", "host2", ANY) + val offerResult = manager.resourceOffer("exec2", "host2", ANY)._1 assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) @@ -521,7 +522,7 @@ class TaskSetManagerSuite assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec2 fails after failure 3 due to blacklist - assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) + assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty) } // Despite advancing beyond the time for expiring executors from within the blacklist, @@ -529,17 +530,17 @@ class TaskSetManagerSuite clock.advance(rescheduleDelay) { - val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) + val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1 assert(offerResult.isEmpty) } { - val offerResult = manager.resourceOffer("exec3", "host3", ANY) + val offerResult = manager.resourceOffer("exec3", "host3", ANY)._1 assert(offerResult.isDefined) assert(offerResult.get.index === 0) assert(offerResult.get.executorId === "exec3") - assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) + assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty) // Cause exec3 to fail : failure 4 manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) @@ -598,14 +599,14 @@ class TaskSetManagerSuite manager.executorAdded() sched.addExecutor("execC", "host2") manager.executorAdded() - assert(manager.resourceOffer("exec1", "host1", ANY).isDefined) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isDefined) sched.removeExecutor("execA") manager.executorLost( "execA", "host1", ExecutorExited(143, false, "Terminated for reason unrelated to running tasks")) assert(!sched.taskSetsFailed.contains(taskSet.id)) - assert(manager.resourceOffer("execC", "host2", ANY).isDefined) + assert(manager.resourceOffer("execC", "host2", ANY)._1.isDefined) sched.removeExecutor("execC") manager.executorLost( "execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks")) @@ -633,12 +634,12 @@ class TaskSetManagerSuite clock.advance(LOCALITY_WAIT_MS * 3) // Offer host3 // No task is scheduled if we restrict locality to RACK_LOCAL - assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None) + assert(manager.resourceOffer("execC", "host3", RACK_LOCAL)._1 === None) // Task 0 can be scheduled with ANY - assert(manager.resourceOffer("execC", "host3", ANY).get.index === 0) + assert(manager.resourceOffer("execC", "host3", ANY)._1.get.index === 0) // Offer host2 // Task 1 can be scheduled with RACK_LOCAL - assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) + assert(manager.resourceOffer("execB", "host2", RACK_LOCAL)._1.get.index === 1) } test("do not emit warning when serialized task is small") { @@ -649,7 +650,7 @@ class TaskSetManagerSuite assert(!manager.emittedTaskSizeWarning) - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) assert(!manager.emittedTaskSizeWarning) } @@ -664,7 +665,7 @@ class TaskSetManagerSuite assert(!manager.emittedTaskSizeWarning) - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) assert(manager.emittedTaskSizeWarning) } @@ -752,13 +753,13 @@ class TaskSetManagerSuite // Offer host1, which should be accepted as a PROCESS_LOCAL location // by the one task in the task set - val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get + val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL)._1.get // Mark the task as available for speculation, and then offer another resource, // which should be used to launch a speculative copy of the task. manager.speculatableTasks += singleTask.partitionId manager.addPendingTask(singleTask.partitionId, speculatable = true) - val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get + val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY)._1.get assert(manager.runningTasks === 2) assert(manager.isZombie === false) @@ -844,7 +845,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec3" -> "host3", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(exec, host, NO_PREF) + val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === exec) @@ -870,7 +871,7 @@ class TaskSetManagerSuite assert(sched.speculativeTasks.toSet === Set(2, 3)) // Offer resource to start the speculative attempt for the running task 2.0 - val taskOption = manager.resourceOffer("exec2", "host2", ANY) + val taskOption = manager.resourceOffer("exec2", "host2", ANY)._1 assert(taskOption.isDefined) val task4 = taskOption.get assert(task4.index === 2) @@ -899,20 +900,20 @@ class TaskSetManagerSuite val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0) - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) - assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL)._1.get.index === 0) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1 === None) + assert(manager.resourceOffer("execA", "host1", NO_PREF)._1.get.index == 1) manager.speculatableTasks += 1 manager.addPendingTask(1, speculatable = true) clock.advance(LOCALITY_WAIT_MS) // schedule the nonPref task - assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) + assert(manager.resourceOffer("execA", "host1", NO_PREF)._1.get.index === 2) // schedule the speculative task - assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1) + assert(manager.resourceOffer("execB", "host2", NO_PREF)._1.get.index === 1) clock.advance(LOCALITY_WAIT_MS * 3) // schedule non-local tasks - assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) + assert(manager.resourceOffer("execB", "host2", ANY)._1.get.index === 3) } test("node-local tasks should be scheduled right away " + @@ -929,13 +930,13 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // node-local tasks are scheduled without delay - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) - assert(manager.resourceOffer("execA", "host2", NODE_LOCAL).get.index === 1) - assert(manager.resourceOffer("execA", "host3", NODE_LOCAL).get.index === 3) - assert(manager.resourceOffer("execA", "host3", NODE_LOCAL) === None) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1.get.index === 0) + assert(manager.resourceOffer("execA", "host2", NODE_LOCAL)._1.get.index === 1) + assert(manager.resourceOffer("execA", "host3", NODE_LOCAL)._1.get.index === 3) + assert(manager.resourceOffer("execA", "host3", NODE_LOCAL)._1 === None) // schedule no-preference after node local ones - assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2) + assert(manager.resourceOffer("execA", "host3", NO_PREF)._1.get.index === 2) } test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") @@ -951,13 +952,13 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // process-local tasks are scheduled first - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2) - assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1.get.index === 2) + assert(manager.resourceOffer("execB", "host2", NODE_LOCAL)._1.get.index === 3) // node-local tasks are scheduled without delay - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) - assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1) - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) - assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1.get.index === 0) + assert(manager.resourceOffer("execB", "host2", NODE_LOCAL)._1.get.index === 1) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1 === None) + assert(manager.resourceOffer("execB", "host2", NODE_LOCAL)._1 === None) } test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") { @@ -971,13 +972,13 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // process-local tasks are scheduled first - assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1) - assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2) + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL)._1.get.index === 1) + assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL)._1.get.index === 2) // no-pref tasks are scheduled without delay - assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None) - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) - assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0) - assert(manager.resourceOffer("execA", "host1", ANY) == None) + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL)._1 === None) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1 === None) + assert(manager.resourceOffer("execA", "host1", NO_PREF)._1.get.index === 0) + assert(manager.resourceOffer("execA", "host1", ANY)._1 === None) } test("Ensure TaskSetManager is usable after addition of levels") { @@ -1061,7 +1062,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec2" -> "host2", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(k, v, NO_PREF) + val taskOption = manager.resourceOffer(k, v, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === k) @@ -1082,7 +1083,7 @@ class TaskSetManagerSuite assert(sched.speculativeTasks.toSet === Set(3)) // Offer resource to start the speculative attempt for the running task - val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption5.isDefined) val task5 = taskOption5.get assert(task5.index === 3) @@ -1121,7 +1122,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec2" -> "host2", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(k, v, NO_PREF) + val taskOption = manager.resourceOffer(k, v, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === k) @@ -1154,7 +1155,7 @@ class TaskSetManagerSuite manager.handleFailedTask(task.taskId, TaskState.FAILED, endReason) sched.endedTasks(task.taskId) = endReason assert(!manager.isZombie) - val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF) + val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF)._1 assert(nextTask.isDefined, s"no offer for attempt $attempt of $index") tasks += nextTask.get } @@ -1170,7 +1171,7 @@ class TaskSetManagerSuite assert(manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set(3, 4)) // Offer resource to start the speculative attempt for the running task - val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption5.isDefined) val speculativeTask = taskOption5.get assert(speculativeTask.index === 3 || speculativeTask.index === 4) @@ -1195,7 +1196,7 @@ class TaskSetManagerSuite assert(!manager.isZombie) // now run another speculative task - val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOpt6.isDefined) val speculativeTask2 = taskOpt6.get assert(speculativeTask2.index === 3 || speculativeTask2.index === 4) @@ -1226,7 +1227,7 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1)) when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any(), any())).thenAnswer( (invocationOnMock: InvocationOnMock) => assert(manager.isZombie)) - val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption.isDefined) // this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon manager.handleSuccessfulTask(0, createTaskResult(0)) @@ -1271,7 +1272,7 @@ class TaskSetManagerSuite "exec2" -> "host1" ).flatMap { case (exec, host) => // offer each executor twice (simulating 2 cores per executor) - (0 until 2).flatMap{ _ => tsmSpy.resourceOffer(exec, host, TaskLocality.ANY)} + (0 until 2).flatMap{ _ => tsmSpy.resourceOffer(exec, host, TaskLocality.ANY)._1} } assert(taskDescs.size === 4) @@ -1308,7 +1309,7 @@ class TaskSetManagerSuite "exec2" -> "host2" ).flatMap { case (exec, host) => // offer each executor twice (simulating 2 cores per executor) - (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)._1} } assert(taskDescs.size === 4) @@ -1344,7 +1345,7 @@ class TaskSetManagerSuite val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(blacklistTracker)) val taskSetManagerSpy = spy(taskSetManager) - val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY) + val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY)._1 // Assert the task has been black listed on the executor it was last executed on. when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean(), anyBoolean())).thenAnswer( @@ -1372,9 +1373,9 @@ class TaskSetManagerSuite val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES, clock = new ManualClock) // all tasks from the first taskset have the same jars - val taskOption1 = manager1.resourceOffer("exec1", "host1", NO_PREF) + val taskOption1 = manager1.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption1.get.addedJars === addedJarsPreTaskSet) - val taskOption2 = manager1.resourceOffer("exec1", "host1", NO_PREF) + val taskOption2 = manager1.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption2.get.addedJars === addedJarsPreTaskSet) // even with a jar added mid-TaskSet @@ -1382,7 +1383,7 @@ class TaskSetManagerSuite sc.addJar(jarPath.toString) val addedJarsMidTaskSet = Map[String, Long](sc.addedJars.toSeq: _*) assert(addedJarsPreTaskSet !== addedJarsMidTaskSet) - val taskOption3 = manager1.resourceOffer("exec1", "host1", NO_PREF) + val taskOption3 = manager1.resourceOffer("exec1", "host1", NO_PREF)._1 // which should have the old version of the jars list assert(taskOption3.get.addedJars === addedJarsPreTaskSet) @@ -1390,7 +1391,7 @@ class TaskSetManagerSuite val taskSet2 = FakeTask.createTaskSet(1) val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = new ManualClock) - val taskOption4 = manager2.resourceOffer("exec1", "host1", NO_PREF) + val taskOption4 = manager2.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } @@ -1488,7 +1489,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec3" -> "host3", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(exec, host, NO_PREF) + val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === exec) @@ -1514,7 +1515,7 @@ class TaskSetManagerSuite assert(sched.speculativeTasks.toSet === Set(2, 3)) // Offer resource to start the speculative attempt for the running task 2.0 - val taskOption = manager.resourceOffer("exec2", "host2", ANY) + val taskOption = manager.resourceOffer("exec2", "host2", ANY)._1 assert(taskOption.isDefined) val task4 = taskOption.get assert(task4.index === 2) @@ -1560,7 +1561,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec2" -> "host2", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(k, v, NO_PREF) + val taskOption = manager.resourceOffer(k, v, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === k) @@ -1580,7 +1581,7 @@ class TaskSetManagerSuite assert(sched.speculativeTasks.toSet === Set(3)) // Offer resource to start the speculative attempt for the running task - val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption5.isDefined) val task5 = taskOption5.get assert(task5.index === 3) @@ -1640,7 +1641,7 @@ class TaskSetManagerSuite assert(FakeRackUtil.numBatchInvocation === 1) assert(FakeRackUtil.numSingleHostInvocation === 0) // with rack locality, reject an offer on a host with an unknown rack - assert(manager.resourceOffer("otherExec", "otherHost", TaskLocality.RACK_LOCAL).isEmpty) + assert(manager.resourceOffer("otherExec", "otherHost", TaskLocality.RACK_LOCAL)._1.isEmpty) (0 until 20).foreach { rackIdx => (0 until 5).foreach { offerIdx => // if we offer hosts which are not in preferred locations, @@ -1648,9 +1649,9 @@ class TaskSetManagerSuite // but accept them at RACK_LOCAL level if they're on OK racks val hostIdx = 100 + rackIdx assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.NODE_LOCAL) - .isEmpty) + ._1.isEmpty) assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.RACK_LOCAL) - .isDefined) + ._1.isDefined) } } // check no more expensive calls to the rack resolution. manager.resourceOffer() will call @@ -1670,7 +1671,7 @@ class TaskSetManagerSuite val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))) val taskOption = - manager.resourceOffer("exec1", "host1", NO_PREF, taskResourceAssignments) + manager.resourceOffer("exec1", "host1", NO_PREF, taskResourceAssignments)._1 assert(taskOption.isDefined) val allocatedResources = taskOption.get.resources assert(allocatedResources.size == 1) @@ -1693,7 +1694,7 @@ class TaskSetManagerSuite // Offer resources for 4 tasks to start, 2 on each exec Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) => (0 until 2).foreach { _ => - val taskOption = manager.resourceOffer(exec, host, NO_PREF) + val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 assert(taskOption.isDefined) assert(taskOption.get.executorId === exec) } @@ -1717,8 +1718,8 @@ class TaskSetManagerSuite // Offer resource to start the speculative attempt for the running task. We offer more // resources, and ensure that speculative tasks get scheduled appropriately -- only one extra // copy per speculatable task - val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF) - val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF) + val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 assert(taskOption2.isDefined) val task2 = taskOption2.get // Ensure that task index 3 is launched on host1 and task index 4 on host2 @@ -1738,9 +1739,9 @@ class TaskSetManagerSuite assert(manager.copiesRunning(1) === 2) assert(manager.copiesRunning(3) === 2) // Offering additional resources should not lead to any speculative tasks being respawned - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) - assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) - assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) + assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty) + assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty) } test("SPARK-26755 Ensure that a speculative task obeys original locality preferences") { @@ -1763,7 +1764,7 @@ class TaskSetManagerSuite } // Offer resources for 3 tasks to start Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) => - val taskOption = manager.resourceOffer(exec, host, NO_PREF) + val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 assert(taskOption.isDefined) assert(taskOption.get.executorId === exec) } @@ -1776,17 +1777,17 @@ class TaskSetManagerSuite assert(manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set(0, 1)) // Ensure that the speculatable tasks obey the original locality preferences - assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL)._1.isEmpty) // task 1 does have a node-local preference for host2 -- but we've already got a regular // task running there, so we should not schedule a speculative there as well. - assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) - assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) - assert(manager.resourceOffer("exec4", "host4", ANY).isDefined) + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL)._1.isEmpty) + assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL)._1.isDefined) + assert(manager.resourceOffer("exec4", "host4", ANY)._1.isDefined) // Since, all speculatable tasks have been launched, making another offer // should not schedule any more tasks - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) assert(!manager.checkSpeculatableTasks(0)) - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) } private def testSpeculationDurationSetup( @@ -1931,7 +1932,7 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) assert(sched.taskSetsFailed.isEmpty) - val offerResult = manager.resourceOffer("exec1", "host1", ANY) + val offerResult = manager.resourceOffer("exec1", "host1", ANY)._1 assert(offerResult.isDefined, "Expect resource offer on iteration 0 to return a task") assert(offerResult.get.index === 0) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index a5bd34888a0a6..e358c0f2c9ded 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -303,7 +303,7 @@ class MesosFineGrainedSchedulerBackendSuite mesosOffers2.add(createOffer(1, minMem, minCpu)) reset(taskScheduler) reset(driver) - when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]))).thenReturn(Seq(Seq())) + when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]), any[Boolean])).thenReturn(Seq(Seq())) when(taskScheduler.CPUS_PER_TASK).thenReturn(2) when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1)) From 2fceb6299dca90b12eb57b6adc6ab95a54862f62 Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Sun, 9 Feb 2020 20:31:45 -0800 Subject: [PATCH 07/15] update formatting --- .../cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index e358c0f2c9ded..36ed84858dbfb 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -303,7 +303,8 @@ class MesosFineGrainedSchedulerBackendSuite mesosOffers2.add(createOffer(1, minMem, minCpu)) reset(taskScheduler) reset(driver) - when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]), any[Boolean])).thenReturn(Seq(Seq())) + when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]), any[Boolean])) + .thenReturn(Seq(Seq())) when(taskScheduler.CPUS_PER_TASK).thenReturn(2) when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1)) From 913648add8bbac44bb9a573ace0a4622da7d9f94 Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Mon, 10 Feb 2020 23:21:33 -0800 Subject: [PATCH 08/15] add doc include speculative tasks on delay reject check --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ++++++++++++ .../org/apache/spark/scheduler/TaskSetManager.scala | 5 ++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index c1634273d6990..378c947dcbb57 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -57,6 +57,18 @@ import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, ThreadUtils, Ut * * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay * scheduling * * task-result-getter threads + * + * Delay Scheduling: + * Delay scheduling is an optimization that sacrifices job fairness for data locality in order to + * improve cluster and workload throughput. One useful definition of "delay" is how much time + * has passed since the TaskSet was using its fair share of resources. Since it is impractical to + * calculate this delay without a full simulation, the heuristic used is the time since the + * TaskSetManager last launched a task and has not rejected any resources due to delay scheduling + * since it was last offered its "fair share". A "fair share" offer is when [[resourceOffers]]'s + * parameter "isAllFreeResources" is set to true. A "delay scheduling reject" is when a resource + * is not utilized despite there being pending tasks (implemented inside [[TaskSetManager]]). + * The legacy heuristic only measured the time since the [[TaskSetManager]] last launched a task, + * and can be re-enabled by setting [[LEGACY_LOCALITY_WAIT_RESET]] to true. */ private[spark] class TaskSchedulerImpl( val sc: SparkContext, diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 914983910d425..8b8a048a48f22 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -494,7 +494,10 @@ private[spark] class TaskSetManager( serializedTask) } val hasScheduleDelayReject = - taskDescription.isEmpty && maxLocality == TaskLocality.ANY && pendingTasks.all.nonEmpty + taskDescription.isEmpty && + maxLocality == TaskLocality.ANY && + pendingTasks.all.nonEmpty && + pendingSpeculatableTasks.all.nonEmpty (taskDescription, hasScheduleDelayReject) } else { (None, false) From 202adac4c2e12a812ca4ea22ef96962894e72bf6 Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Tue, 11 Feb 2020 00:49:27 -0800 Subject: [PATCH 09/15] use config string instead of code reference --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 378c947dcbb57..286893b25e780 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -68,7 +68,7 @@ import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, ThreadUtils, Ut * parameter "isAllFreeResources" is set to true. A "delay scheduling reject" is when a resource * is not utilized despite there being pending tasks (implemented inside [[TaskSetManager]]). * The legacy heuristic only measured the time since the [[TaskSetManager]] last launched a task, - * and can be re-enabled by setting [[LEGACY_LOCALITY_WAIT_RESET]] to true. + * and can be re-enabled by setting spark.locality.wait.legacyResetOnTaskLaunch to true. */ private[spark] class TaskSchedulerImpl( val sc: SparkContext, From 349fddcd60497bfe19c96deba4a11c3ec8d9b52b Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Sun, 1 Mar 2020 19:41:26 -0800 Subject: [PATCH 10/15] add tasksetmanager tests --- .../spark/scheduler/TaskSetManager.scala | 4 ++-- .../scheduler/TaskSchedulerImplSuite.scala | 4 ++-- .../spark/scheduler/TaskSetManagerSuite.scala | 21 +++++++++++++++++++ 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8b8a048a48f22..a0e84b94735ec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -493,11 +493,11 @@ private[spark] class TaskSetManager( taskResourceAssignments, serializedTask) } + val hasPendingTasks = pendingTasks.all.nonEmpty || pendingSpeculatableTasks.all.nonEmpty val hasScheduleDelayReject = taskDescription.isEmpty && maxLocality == TaskLocality.ANY && - pendingTasks.all.nonEmpty && - pendingSpeculatableTasks.all.nonEmpty + hasPendingTasks (taskDescription, hasScheduleDelayReject) } else { (None, false) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index c1f7a5eba4217..3fef0a7ec69c4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -365,9 +365,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // partial resource has rejected offer assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec2", "host1", 1)), + IndexedSeq(WorkerOffer("exec2", "host1", 1), WorkerOffer("exec1", "host1", 1)), isAllFreeResources = false) - .flatten.isEmpty) + .flatten.size === 1) // This does not reset timer since last partial resource offer was rejected clock.advance(advanceAmount) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index b04bae42b5544..4978be3e04c1e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -369,8 +369,29 @@ class TaskSetManagerSuite clock.advance(LOCALITY_WAIT_MS) + // offers not accepted due to task set zombies are not delay schedule rejects + manager.isZombie = true + val (taskDesciption, delayReject) = manager.resourceOffer("exec2", "host2", ANY) + assert(taskDesciption.isEmpty) + assert(delayReject === false) + manager.isZombie = false + + // offers not accepted due to blacklisting are not delay schedule rejects + val tsmSpy = spy(manager) + val blacklist = mock(classOf[TaskSetBlacklist]) + when(tsmSpy.taskSetBlacklistHelperOpt).thenReturn(Some(blacklist)) + when(blacklist.isNodeBlacklistedForTaskSet(any())).thenReturn(true) + val (blacklistTask, blackListReject) = tsmSpy.resourceOffer("exec2", "host2", ANY) + assert(blacklistTask.isEmpty) + assert(blackListReject === false) + // After another delay, we can go ahead and launch that task non-locally assert(manager.resourceOffer("exec2", "host2", ANY)._1.get.index === 3) + + // offers not accepted due to no pending tasks are not delay schedule rejects + val (noPendingTask, noPendingReject) = manager.resourceOffer("exec2", "host2", ANY) + assert(noPendingTask.isEmpty) + assert(noPendingReject === false) } test("delay scheduling with failed hosts") { From ebce3b4dff9e9345a4bd6e431d456fec7bf383ce Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Sun, 8 Mar 2020 21:02:29 -0700 Subject: [PATCH 11/15] add docs, remove redundant return value --- .../spark/scheduler/TaskSchedulerImpl.scala | 35 +++++++++++++------ .../scheduler/TaskSchedulerImplSuite.scala | 5 +++ 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 286893b25e780..c1da40297ff75 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -342,6 +342,21 @@ private[spark] class TaskSchedulerImpl( s" ${manager.parent.name}") } + /** + * Offers resources to a single [[TaskSetManager]] at a given max allowed [[TaskLocality]]. + * + * @param taskSet task set to offer resources to + * @param maxLocality max locality to allow when scheduling + * @param shuffledOffers shuffled resource offers to use for scheduling, + * remaining resources are tracked by below fields as tasks are scheduled + * @param availableCpus remaining cpus per offer, + * value at index 'i' corresponds to shuffledOffers[i] + * @param availableResources remaining resources per offer, + * value at index 'i' corresponds to shuffledOffers[i] + * @param tasks tasks scheduled per offer, value at index 'i' corresponds to shuffledOffers[i] + * @param addressesWithDescs tasks scheduler per host:port, used for barrier tasks + * @return tuple of (had delay schedule rejects?, option of min locality of launched task) + */ private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, @@ -350,8 +365,7 @@ private[spark] class TaskSchedulerImpl( availableResources: Array[Map[String, Buffer[String]]], tasks: IndexedSeq[ArrayBuffer[TaskDescription]], addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) - : (Boolean, Boolean, Option[TaskLocality]) = { - var launchedTask = false + : (Boolean, Option[TaskLocality]) = { var noDelayScheduleRejects = true var minLaunchedLocality: Option[TaskLocality] = None // nodes and executors that are blacklisted for the entire application have already been @@ -376,8 +390,7 @@ private[spark] class TaskSchedulerImpl( tasks(i) += task val tid = task.taskId val locality = taskSet.taskInfos(task.taskId).taskLocality - minLaunchedLocality = minLaunchedLocality - .map(min => if (locality < min) locality else min).orElse(Some(locality)) + minLaunchedLocality = minTaskLocality(minLaunchedLocality, Some(locality)) taskIdToTaskSetManager.put(tid, taskSet) taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) @@ -397,19 +410,18 @@ private[spark] class TaskSchedulerImpl( // The executor address is expected to be non empty. addressesWithDescs += (shuffledOffers(i).address.get -> task) } - launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. - return (launchedTask, noDelayScheduleRejects, minLaunchedLocality) + return (noDelayScheduleRejects, minLaunchedLocality) } } } } - (launchedTask, noDelayScheduleRejects, minLaunchedLocality) + (noDelayScheduleRejects, minLaunchedLocality) } /** @@ -491,8 +503,9 @@ private[spark] class TaskSchedulerImpl( }.sum } - def minTaskLocality(l1: Option[TaskLocality], l2: Option[TaskLocality]) : - Option[TaskLocality] = { + def minTaskLocality( + l1: Option[TaskLocality], + l2: Option[TaskLocality]) : Option[TaskLocality] = { if (l1.isEmpty) { l2 } else if (l2.isEmpty) { @@ -591,10 +604,10 @@ private[spark] class TaskSchedulerImpl( for (currentMaxLocality <- taskSet.myLocalityLevels) { var launchedTaskAtCurrentMaxLocality = false do { - val (launched, noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet( + val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, availableResources, tasks, addressesWithDescs) - launchedTaskAtCurrentMaxLocality = launched + launchedTaskAtCurrentMaxLocality = minLocality.isDefined launchedAnyTask |= launchedTaskAtCurrentMaxLocality noDelaySchedulingRejects &= noDelayScheduleReject globalMinLocality = minTaskLocality(globalMinLocality, minLocality) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 3fef0a7ec69c4..85aad805ca80c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -228,6 +228,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B Seq(TaskLocation("host1", "exec1")), Seq(TaskLocation("host1", "exec1")) ) + + // Offer resources first so that when the taskset is submitted it can initialize + // with proper locality level. Otherwise, ANY would be the only locality level. + // See TaskSetManager.computeValidLocalityLevels() + // This begins the task set as PROCESS_LOCAL locality level taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1))) taskScheduler.submitTasks(taskSet) taskScheduler From d05b8dade915bbafff92e4e636c6217b69be3b41 Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Sun, 22 Mar 2020 16:43:48 -0700 Subject: [PATCH 12/15] add more comments on tests --- .../scheduler/TaskSchedulerImplSuite.scala | 90 +++++++++++++------ 1 file changed, 65 insertions(+), 25 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 85aad805ca80c..05251bf960912 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -241,21 +241,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer before " + "any resources have been rejected") { val clock = new ManualClock() + // All tasks created here are local to exec1, host1. + // Locality level starts at PROCESS_LOCAL. val taskScheduler = setupTaskScheduler(clock) - val advanceAmount = 2000 + // Locality levels increase at 3000 ms. + val advanceAmount = 3000 - // by default, new partial resource (isAllFreeResources = false) offers reset timer - // if the resource is accepted + // Advancing clock increases locality level to NODE_LOCAL. clock.advance(advanceAmount) + + // If there hasn't yet been any full resource offers, + // partial resource (isAllFreeResources = false) offers reset delay scheduling + // if this and previous offers were accepted. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = false) .flatten.length === 1) - // would advance to NODE_LOCAL locality if timer wasn't reset above - // Verifying this node local task is not accepted - clock.advance(advanceAmount) + // This NODE_LOCAL task should not be accepted. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec2", "host1", 1)), @@ -266,20 +271,24 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("SPARK-18886 - delay scheduling timer is reset when it accepts all resources offered when" + "isAllFreeResources = true") { val clock = new ManualClock() + // All tasks created here are local to exec1, host1. + // Locality level starts at PROCESS_LOCAL. val taskScheduler = setupTaskScheduler(clock) - val advanceAmount = 2000 + // Locality levels increase at 3000 ms. + val advanceAmount = 3000 - // timer is reset when tsm accepts all free resources offered to it + // Advancing clock increases locality level to NODE_LOCAL. clock.advance(advanceAmount) + + // If there are no rejects on an all resource offer, delay scheduling is reset. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = true) .flatten.length === 1) - // would advance to NODE_LOCAL locality if timer wasn't reset above - // Verifying this node local task is not accepted - clock.advance(advanceAmount) + // This NODE_LOCAL task should not be accepted. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec2", "host1", 1)), @@ -291,30 +300,47 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B "time if last full resource offer (isAllResources = true) was accepted as well as any " + "following partial resource offers") { val clock = new ManualClock() + // All tasks created here are local to exec1, host1. + // Locality level starts at PROCESS_LOCAL. val taskScheduler = setupTaskScheduler(clock) - val advanceAmount = 2000 + // Locality levels increase at 3000 ms. + val advanceAmount = 3000 + // PROCESS_LOCAL full resource offer is accepted. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = true) .flatten.length === 1) + // Advancing clock increases locality level to NODE_LOCAL. clock.advance(advanceAmount) + + // PROCESS_LOCAL partial resource is accepted. + // Since all offers have been accepted since the last full resource offer + // (this one and the previous one), delay scheduling is reset. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = false) .flatten.length === 1) + // Advancing clock increases locality level to NODE_LOCAL clock.advance(advanceAmount) + + // PROCESS_LOCAL partial resource is accepted + // Since all offers have been accepted since the last full resource offer + // (one previous full offer, one previous partial offer, and this partial offer), + // delay scheduling is reset. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = false) .flatten.length === 1) - clock.advance(advanceAmount) + // This NODE_LOCAL task should not be accepted. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec2", "host1", 1)), @@ -330,27 +356,35 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("SPARK-18886 - partial resource offers (isAllFreeResources = false) do not reset " + "time if any offer was rejected since last full offer was fully accepted") { val clock = new ManualClock() + // All tasks created here are local to exec1, host1. + // Locality level starts at PROCESS_LOCAL. val taskScheduler = setupTaskScheduler(clock) - val advanceAmount = 2000 + // Locality levels increase at 3000 ms. + val advanceAmount = 3000 - // case 1 from test description above + // case 1 from test description above. + // NODE_LOCAL full resource offer is rejected, so delay scheduling is not reset. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec2", "host1", 1)), isAllFreeResources = true) .flatten.isEmpty) + // Advancing clock increases locality level to NODE_LOCAL clock.advance(advanceAmount) + + // PROCESS_LOCAL partial resource is accepted, + // but because preceding full resource offer was rejected, delay scheduling is not reset. + // Locality level remains at NODE_LOCAL. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = false) .flatten.length === 1) - // Even though we launched a local task above, we still utilize non-local exec2 + // Even though we launched a local task above, we still utilize non-local exec2. // This is the behavior change to fix SPARK-18886. - // Locality level is NODE_LOCAL after this clock advance - clock.advance(advanceAmount) + // Locality level remains NODE_LOCAL after this clock advance. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec2", "host1", 1)), @@ -358,32 +392,38 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B .flatten.length === 1) - // case 2 from test description above - // Reset timer and locality level to PROCESS_LOCAL - clock.advance(advanceAmount) + // case 2 from test description above. + // PROCESS_LOCAL full resource offer is accepted, resetting delay scheduling. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = true) .flatten.length === 1) - // partial resource has rejected offer + // Partial resource offer: NODE_LOCAL exec 2 is rejected, PROCESS_LOCAL exec1 is accepted. + // Since there were rejects, delay scheduling is not reset, and follow up partial offers + // will not reset delay scheduling, even if they are accepted. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec2", "host1", 1), WorkerOffer("exec1", "host1", 1)), isAllFreeResources = false) .flatten.size === 1) - // This does not reset timer since last partial resource offer was rejected + // Advancing clock increases locality level to NODE_LOCAL clock.advance(advanceAmount) + + // PROCESS_LOCAL partial resource is accepted, but does not reset delay scheduling + // as described above. + // Locality level remains at NODE_LOCAL. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec1", "host1", 1)), isAllFreeResources = false) .flatten.length === 1) - // Locality level is NODE_LOCAL after this clock advance - clock.advance(advanceAmount) + // NODE_LOCAL partial resource offer is accepted, + // verifying locality level was not reset to PROCESS_LOCAL by above offer. assert(taskScheduler .resourceOffers( IndexedSeq(WorkerOffer("exec2", "host1", 1)), From 8d35ea14edfb8bd7cc276020784c13caf030532a Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Sun, 29 Mar 2020 23:06:09 -0700 Subject: [PATCH 13/15] fix rebase --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index c1da40297ff75..0c25235526599 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -384,7 +384,7 @@ private[spark] class TaskSchedulerImpl( val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) val (taskDescOption, didReject) = - taskSet.resourceOffer(execId, host, maxLocality, availableResources(i)) + taskSet.resourceOffer(execId, host, maxLocality, taskResAssignments) noDelayScheduleRejects &= !didReject for (task <- taskDescOption) { tasks(i) += task From a60639a40e025c96ca7d47c6abbb78493cf88320 Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Thu, 2 Apr 2020 22:17:27 -0700 Subject: [PATCH 14/15] address comments around private modifiers, legacy config fields, and more --- .../org/apache/spark/internal/config/package.scala | 5 +++++ .../apache/spark/scheduler/TaskSchedulerImpl.scala | 4 ++-- .../spark/scheduler/TaskSchedulerImplSuite.scala | 13 ++++++------- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 73941304f01a8..848a728bdb4d7 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -545,6 +545,11 @@ package object config { private[spark] val LEGACY_LOCALITY_WAIT_RESET = ConfigBuilder("spark.locality.wait.legacyResetOnTaskLaunch") + .doc("Whether to use the legacy behavior of locality wait, which resets the delay timer " + + "anytime a task is scheduled. See Delay Scheduling section of TaskSchedulerImpl's class " + + "documentation for more details.") + .internal() + .version("3.0.0") .booleanConf .createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 0c25235526599..92382e03f6d11 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -345,7 +345,7 @@ private[spark] class TaskSchedulerImpl( /** * Offers resources to a single [[TaskSetManager]] at a given max allowed [[TaskLocality]]. * - * @param taskSet task set to offer resources to + * @param taskSet task set manager to offer resources to * @param maxLocality max locality to allow when scheduling * @param shuffledOffers shuffled resource offers to use for scheduling, * remaining resources are tracked by below fields as tasks are scheduled @@ -503,7 +503,7 @@ private[spark] class TaskSchedulerImpl( }.sum } - def minTaskLocality( + private def minTaskLocality( l1: Option[TaskLocality], l2: Option[TaskLocality]) : Option[TaskLocality] = { if (l1.isEmpty) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 05251bf960912..056c34278c1ea 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -196,7 +196,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } - def setupTaskScheduler(clock: ManualClock): TaskSchedulerImpl = { + private def setupTaskSchedulerForLocalityTests(clock: ManualClock): TaskSchedulerImpl = { val conf = new SparkConf() sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) val taskScheduler = new TaskSchedulerImpl(sc, @@ -243,7 +243,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val clock = new ManualClock() // All tasks created here are local to exec1, host1. // Locality level starts at PROCESS_LOCAL. - val taskScheduler = setupTaskScheduler(clock) + val taskScheduler = setupTaskSchedulerForLocalityTests(clock) // Locality levels increase at 3000 ms. val advanceAmount = 3000 @@ -268,12 +268,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B .flatten.isEmpty) } - test("SPARK-18886 - delay scheduling timer is reset when it accepts all resources offered when" + + test("SPARK-18886 - delay scheduling timer is reset when it accepts all resources offered when " + "isAllFreeResources = true") { val clock = new ManualClock() // All tasks created here are local to exec1, host1. // Locality level starts at PROCESS_LOCAL. - val taskScheduler = setupTaskScheduler(clock) + val taskScheduler = setupTaskSchedulerForLocalityTests(clock) // Locality levels increase at 3000 ms. val advanceAmount = 3000 @@ -302,7 +302,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val clock = new ManualClock() // All tasks created here are local to exec1, host1. // Locality level starts at PROCESS_LOCAL. - val taskScheduler = setupTaskScheduler(clock) + val taskScheduler = setupTaskSchedulerForLocalityTests(clock) // Locality levels increase at 3000 ms. val advanceAmount = 3000 @@ -348,7 +348,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B .flatten.isEmpty) } - // This tests two cases // 1. partial resource offer doesn't reset timer after full resource offer had rejected resources // 2. partial resource offer doesn't reset timer after partial resource offer @@ -358,7 +357,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val clock = new ManualClock() // All tasks created here are local to exec1, host1. // Locality level starts at PROCESS_LOCAL. - val taskScheduler = setupTaskScheduler(clock) + val taskScheduler = setupTaskSchedulerForLocalityTests(clock) // Locality levels increase at 3000 ms. val advanceAmount = 3000 From 24c8ad9ae23d360c7bb5bc813d04e6f5f6715d93 Mon Sep 17 00:00:00 2001 From: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Date: Thu, 2 Apr 2020 22:28:20 -0700 Subject: [PATCH 15/15] fix StandaloneDynamicAllocationSuite test --- .../apache/spark/deploy/StandaloneDynamicAllocationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 0b2a58d8e135d..57cbda3c0620d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -510,7 +510,7 @@ class StandaloneDynamicAllocationSuite val taskScheduler = mock(classOf[TaskSchedulerImpl]) when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host")) - when(taskScheduler.resourceOffers(any())).thenReturn(Nil) + when(taskScheduler.resourceOffers(any(), any[Boolean])).thenReturn(Nil) when(taskScheduler.sc).thenReturn(sc) val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, securityManager)