From 6e8a028fce5d9fe6df69fd79e7a2c6ff748a575f Mon Sep 17 00:00:00 2001 From: "xueyan.li" Date: Fri, 15 Jan 2016 16:28:05 +0800 Subject: [PATCH 1/4] [SPARK-12832][CORE] Fix dispatcher does not have a constraints config --- .../cluster/mesos/MesosClusterScheduler.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 05fda0fded7f8..980b82bf75376 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -142,6 +142,8 @@ private[spark] class MesosClusterScheduler( private val queuedDriversState = engineFactory.createEngine("driverQueue") private val launchedDriversState = engineFactory.createEngine("launchedDrivers") private val pendingRetryDriversState = engineFactory.createEngine("retryList") + // dispactcher constraints + val driverOfferConstraints = parseConstraintString(conf.get("spark.mesos.constraints", "")) // Flag to mark if the scheduler is ready to be called, which is until the scheduler // is registered with Mesos master. @volatile protected var ready = false @@ -518,6 +520,20 @@ private[spark] class MesosClusterScheduler( val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() val currentTime = new Date() + stateLock.synchronized { + var it = offers.iterator() + while (it.hasNext) { + val offer = it.next() + val offerAttributes = toAttributeMap(offer.getAttributesList) + val meetsConstraints = matchesAttributeRequirements(driverOfferConstraints, offerAttributes) + + if (!meetsConstraints) { + driver.declineOffer(offer.getId) + it.remove() + } + } + } + stateLock.synchronized { // We first schedule all the supervised drivers that are ready to retry. // This list will be empty if none of the drivers are marked as supervise. From ef698a9d87cd5f19240a24a56ae638ac15cf1c0e Mon Sep 17 00:00:00 2001 From: astralidea Date: Sat, 16 Jan 2016 10:56:26 +0800 Subject: [PATCH 2/4] fix bug after review --- .../cluster/mesos/MesosClusterScheduler.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 980b82bf75376..750dce2468ab6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -142,8 +142,7 @@ private[spark] class MesosClusterScheduler( private val queuedDriversState = engineFactory.createEngine("driverQueue") private val launchedDriversState = engineFactory.createEngine("launchedDrivers") private val pendingRetryDriversState = engineFactory.createEngine("retryList") - // dispactcher constraints - val driverOfferConstraints = parseConstraintString(conf.get("spark.mesos.constraints", "")) + private val driverOfferConstraints = parseConstraintString(conf.get("spark.mesos.constraints", "")) // Flag to mark if the scheduler is ready to be called, which is until the scheduler // is registered with Mesos master. @volatile protected var ready = false @@ -512,16 +511,9 @@ private[spark] class MesosClusterScheduler( } override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { - val currentOffers = offers.asScala.map(o => - new ResourceOffer( - o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem")) - ).toList - logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}") - val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() - val currentTime = new Date() stateLock.synchronized { - var it = offers.iterator() + val it = offers.iterator() while (it.hasNext) { val offer = it.next() val offerAttributes = toAttributeMap(offer.getAttributesList) @@ -534,6 +526,14 @@ private[spark] class MesosClusterScheduler( } } + val currentOffers = offers.asScala.map(o => + new ResourceOffer( + o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem")) + ).toList + logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}") + val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() + val currentTime = new Date() + stateLock.synchronized { // We first schedule all the supervised drivers that are ready to retry. // This list will be empty if none of the drivers are marked as supervise. From 94b520ff520513aa89affcd499ea2406c3d0f967 Mon Sep 17 00:00:00 2001 From: astralidea Date: Sat, 16 Jan 2016 11:08:34 +0800 Subject: [PATCH 3/4] for scala style --- .../spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 750dce2468ab6..29e25fedf98ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -142,7 +142,8 @@ private[spark] class MesosClusterScheduler( private val queuedDriversState = engineFactory.createEngine("driverQueue") private val launchedDriversState = engineFactory.createEngine("launchedDrivers") private val pendingRetryDriversState = engineFactory.createEngine("retryList") - private val driverOfferConstraints = parseConstraintString(conf.get("spark.mesos.constraints", "")) + private val driverOfferConstraints = + parseConstraintString(conf.get("spark.mesos.constraints", "")) // Flag to mark if the scheduler is ready to be called, which is until the scheduler // is registered with Mesos master. @volatile protected var ready = false @@ -512,6 +513,7 @@ private[spark] class MesosClusterScheduler( override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { + // filter by mesos constraints stateLock.synchronized { val it = offers.iterator() while (it.hasNext) { From f4d586f733de44104171fa2cc050bafd848c7059 Mon Sep 17 00:00:00 2001 From: astralidea Date: Sat, 16 Jan 2016 11:11:14 +0800 Subject: [PATCH 4/4] upper cap --- .../spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 29e25fedf98ae..87760ab8f0b0d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -513,7 +513,7 @@ private[spark] class MesosClusterScheduler( override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { - // filter by mesos constraints + // Filter by mesos constraints stateLock.synchronized { val it = offers.iterator() while (it.hasNext) {