From 9c0cb445856cfd4384287c1ac00c4f6a87cdb52a Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 19 Jul 2016 13:12:30 -0500 Subject: [PATCH 1/6] [SPARK-15703] Make ListenerBus event queue configurable --- .../scala/org/apache/spark/SparkContext.scala | 4 ++-- .../spark/scheduler/LiveListenerBus.scala | 13 ++++------ .../scheduler/EventLoggingListenerSuite.scala | 4 ++-- .../spark/scheduler/SparkListenerSuite.scala | 24 +++++++++---------- .../BlockManagerReplicationSuite.scala | 3 ++- .../spark/storage/BlockManagerSuite.scala | 3 ++- .../spark/ui/storage/StorageTabSuite.scala | 7 +++--- .../streaming/ReceivedBlockHandlerSuite.scala | 3 ++- 8 files changed, 31 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 57d1f09f6b15b..59bb7e9a5e9ce 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def isStopped: Boolean = stopped.get() // An asynchronous listener bus for Spark events - private[spark] val listenerBus = new LiveListenerBus + private[spark] val listenerBus = new LiveListenerBus(this) // This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( @@ -2147,7 +2147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - listenerBus.start(this) + listenerBus.start() _listenerBusStarted = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 1c21313d1cb17..9606abb67302c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -32,18 +32,17 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus extends SparkListenerBus { +private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus { self => import LiveListenerBus._ - private var sparkContext: SparkContext = null - // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private val EVENT_QUEUE_CAPACITY = 10000 - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf. + getInt("spark.scheduler.listenerbus.eventqueue.size", 10000) + private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) // Indicate if `start()` is called private val started = new AtomicBoolean(false) @@ -96,11 +95,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus { * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. * - * @param sc Used to stop the SparkContext in case the listener thread dies. */ - def start(sc: SparkContext): Unit = { + def start(): Unit = { if (started.compareAndSet(false, true)) { - sparkContext = sc listenerThread.start() } else { throw new IllegalStateException(s"$name already started!") diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index c4c80b5b57daa..7f4859206e257 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit extraConf.foreach { case (k, v) => conf.set(k, v) } val logName = compressionCodec.map("test-" + _).getOrElse("test") val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) - val listenerBus = new LiveListenerBus + val listenerBus = new LiveListenerBus(sc) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) val applicationEnd = SparkListenerApplicationEnd(1000L) // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() - listenerBus.start(sc) + listenerBus.start() listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 5ba67afc0cd62..ba6b123708cc1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -39,11 +39,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match test("don't call sc.stop in listener") { sc = new SparkContext("local", "SparkListenerSuite") val listener = new SparkContextStoppingListener(sc) - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) bus.addListener(listener) // Starting listener bus should flush all buffered events - bus.start(sc) + bus.start() bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) @@ -53,7 +53,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match test("basic creation and shutdown of LiveListenerBus") { val counter = new BasicJobCounter - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) bus.addListener(counter) // Listener bus hasn't started yet, so posting events should not increment counter @@ -61,7 +61,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(counter.count === 0) // Starting listener bus should flush all buffered events - bus.start(sc) + bus.start() bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) @@ -72,14 +72,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Listener bus must not be started twice intercept[IllegalStateException] { - val bus = new LiveListenerBus - bus.start(sc) - bus.start(sc) + val bus = new LiveListenerBus(sc) + bus.start() + bus.start() } // ... or stopped before starting intercept[IllegalStateException] { - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) bus.stop() } } @@ -107,11 +107,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } } - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) val blockingListener = new BlockingListener bus.addListener(blockingListener) - bus.start(sc) + bus.start() bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() @@ -353,13 +353,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val badListener = new BadListener val jobCounter1 = new BasicJobCounter val jobCounter2 = new BasicJobCounter - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) // Propagate events to bad listener first bus.addListener(badListener) bus.addListener(jobCounter1) bus.addListener(jobCounter2) - bus.start(sc) + bus.start() // Post events to all listeners, and wait until the queue is drained (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 31687e6147314..03fba291cef98 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -92,7 +92,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo conf.set("spark.storage.cachedPeersTtl", "10") master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", - new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) + new BlockManagerMasterEndpoint(rpcEnv, true, conf, + new LiveListenerBus(new SparkContext(conf)))), conf, true) allStores.clear() } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6821582254f5b..36bf79273117a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -108,7 +108,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.driver.port", rpcEnv.address.port.toString) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", - new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) + new BlockManagerMasterEndpoint(rpcEnv, true, conf, + new LiveListenerBus(new SparkContext(conf)))), conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 411a0ddebeb77..0bd036b829441 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.storage import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkConf, SparkFunSuite, Success} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, Success} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.storage._ @@ -43,8 +43,9 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { private val bm1 = BlockManagerId("big", "dog", 1) before { - bus = new LiveListenerBus - storageStatusListener = new StorageStatusListener(new SparkConf()) + val conf = new SparkConf() + bus = new LiveListenerBus(new SparkContext(conf)) + storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) bus.addListener(storageListener) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index e97427991bf92..09cc9bfaf8802 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -78,7 +78,8 @@ class ReceivedBlockHandlerSuite conf.set("spark.driver.port", rpcEnv.address.port.toString) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", - new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) + new BlockManagerMasterEndpoint(rpcEnv, true, conf, + new LiveListenerBus(new SparkContext(conf)))), conf, true) storageLevel = StorageLevel.MEMORY_ONLY_SER blockManager = createBlockManager(blockManagerSize, conf) From 76d9af8a4e26a55b591dacab0785bfc25aa473fc Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Wed, 20 Jul 2016 10:46:37 -0500 Subject: [PATCH 2/6] [SPARK-15703] Make ListenerBus event queue size ConfigEntry and update tests --- .../org/apache/spark/internal/config/package.scala | 13 +++++++++++++ .../apache/spark/scheduler/LiveListenerBus.scala | 4 ++-- .../apache/spark/scheduler/SparkListenerSuite.scala | 2 +- .../storage/BlockManagerReplicationSuite.scala | 2 +- .../apache/spark/storage/BlockManagerSuite.scala | 2 +- .../apache/spark/ui/storage/StorageTabSuite.scala | 2 +- .../spark/streaming/ReceivedBlockHandlerSuite.scala | 2 +- 7 files changed, 20 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 05dd68300f891..1ae83b712672c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -19,6 +19,7 @@ package org.apache.spark.internal import java.util.concurrent.TimeUnit +import org.apache.spark.SparkException import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.ByteUnit @@ -103,4 +104,16 @@ package object config { .stringConf .checkValues(Set("hive", "in-memory")) .createWithDefault("in-memory") + + private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = + ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") + .intConf + .transform((x: Int) => { + if (x <= 0) { + throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") + } else { + x + } + }) + .createWithDefault(10000) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 9606abb67302c..656f07caa4d26 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.util.DynamicVariable +import org.apache.spark.internal.config._ import org.apache.spark.SparkContext import org.apache.spark.util.Utils @@ -40,8 +41,7 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf. - getInt("spark.scheduler.listenerbus.eventqueue.size", 10000) + private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) // Indicate if `start()` is called diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index ba6b123708cc1..5b3e714197e27 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -37,7 +37,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val jobCompletionTime = 1421191296660L test("don't call sc.stop in listener") { - sc = new SparkContext("local", "SparkListenerSuite") + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val listener = new SparkContextStoppingListener(sc) val bus = new LiveListenerBus(sc) bus.addListener(listener) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 03fba291cef98..a0209130d574c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -93,7 +93,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(new SparkContext(conf)))), conf, true) + new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true) allStores.clear() } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 36bf79273117a..415f39c35700f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -109,7 +109,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(new SparkContext(conf)))), conf, true) + new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 0bd036b829441..5e31e9edd3453 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -44,7 +44,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { before { val conf = new SparkConf() - bus = new LiveListenerBus(new SparkContext(conf)) + bus = new LiveListenerBus(new SparkContext("local", "test", conf)) storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 09cc9bfaf8802..30ba8bf95d600 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -79,7 +79,7 @@ class ReceivedBlockHandlerSuite blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(new SparkContext(conf)))), conf, true) + new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true) storageLevel = StorageLevel.MEMORY_ONLY_SER blockManager = createBlockManager(blockManagerSize, conf) From 09e855ede8d4c7b01d48467b126368a048ea398d Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Wed, 20 Jul 2016 16:49:40 -0500 Subject: [PATCH 3/6] [SPARK-15703] Fixes test failures to ensure single sc --- .../org/apache/spark/internal/config/package.scala | 7 ------- .../apache/spark/scheduler/LiveListenerBus.scala | 14 +++++++++++--- .../spark/scheduler/SparkListenerSuite.scala | 4 +++- .../storage/BlockManagerReplicationSuite.scala | 8 ++++++-- .../apache/spark/storage/BlockManagerSuite.scala | 5 +++-- .../apache/spark/ui/storage/StorageTabSuite.scala | 8 ++++---- .../streaming/ReceivedBlockHandlerSuite.scala | 4 +++- 7 files changed, 30 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1ae83b712672c..c92b1ba4709ee 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -108,12 +108,5 @@ package object config { private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") .intConf - .transform((x: Int) => { - if (x <= 0) { - throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") - } else { - x - } - }) .createWithDefault(10000) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 656f07caa4d26..0fbf52f5dc6de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -21,9 +21,8 @@ import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean import scala.util.DynamicVariable - import org.apache.spark.internal.config._ -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.util.Utils /** @@ -41,9 +40,18 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) + private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + + private def validateAndGetQueueSize(): Int = { + val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) + if (queueSize <= 0) { + throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") + } + queueSize + } + // Indicate if `start()` is called private val started = new AtomicBoolean(false) // Indicate if `stop()` is called diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 5b3e714197e27..e8a88d4909a83 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -52,6 +52,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("basic creation and shutdown of LiveListenerBus") { + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val counter = new BasicJobCounter val bus = new LiveListenerBus(sc) bus.addListener(counter) @@ -106,7 +107,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match drained = true } } - + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val bus = new LiveListenerBus(sc) val blockingListener = new BlockingListener @@ -353,6 +354,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val badListener = new BadListener val jobCounter1 = new BasicJobCounter val jobCounter2 = new BasicJobCounter + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val bus = new LiveListenerBus(sc) // Propagate events to bad listener first diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index a0209130d574c..b9e3a364ee221 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.StorageLevel._ /** Testsuite that tests block replication in BlockManager */ -class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter { +class BlockManagerReplicationSuite extends SparkFunSuite + with Matchers + with BeforeAndAfter + with LocalSparkContext { private val conf = new SparkConf(false).set("spark.app.id", "test") private var rpcEnv: RpcEnv = null @@ -91,9 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo // to make cached peers refresh frequently conf.set("spark.storage.cachedPeersTtl", "10") + sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true) + new LiveListenerBus(sc))), conf, true) allStores.clear() } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 415f39c35700f..7ab23930d8f6b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -49,7 +49,7 @@ import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach - with PrivateMethodTester with ResetSystemProperties { + with PrivateMethodTester with LocalSparkContext with ResetSystemProperties { import BlockManagerSuite._ @@ -107,9 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) + sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true) + new LiveListenerBus(sc))), conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 5e31e9edd3453..4cb3ea1942b01 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.ui.storage import org.scalatest.BeforeAndAfter - -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, Success} +import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.storage._ @@ -27,7 +26,7 @@ import org.apache.spark.storage._ /** * Test various functionality in the StorageListener that supports the StorageTab. */ -class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { +class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter { private var bus: LiveListenerBus = _ private var storageStatusListener: StorageStatusListener = _ private var storageListener: StorageListener = _ @@ -44,7 +43,8 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { before { val conf = new SparkConf() - bus = new LiveListenerBus(new SparkContext("local", "test", conf)) + sc = new SparkContext("local", "test", conf) + bus = new LiveListenerBus(sc) storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 30ba8bf95d600..feb5c30c6aa14 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -47,6 +47,7 @@ class ReceivedBlockHandlerSuite extends SparkFunSuite with BeforeAndAfter with Matchers + with LocalSparkContext with Logging { import WriteAheadLogBasedBlockHandler._ @@ -77,9 +78,10 @@ class ReceivedBlockHandlerSuite rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) + sc = new SparkContext("local", "test", conf) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true) + new LiveListenerBus(sc))), conf, true) storageLevel = StorageLevel.MEMORY_ONLY_SER blockManager = createBlockManager(blockManagerSize, conf) From 41dc57c264aeb0e5c9f25aa4889a6169331b79c6 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Wed, 20 Jul 2016 16:55:43 -0500 Subject: [PATCH 4/6] [SPARK-15703] Fixes import ordering --- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 3 ++- .../scala/org/apache/spark/ui/storage/StorageTabSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 0fbf52f5dc6de..41280032d8278 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -21,8 +21,9 @@ import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean import scala.util.DynamicVariable -import org.apache.spark.internal.config._ + import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils /** diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 4cb3ea1942b01..f6c8418ba3ac4 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.ui.storage import org.scalatest.BeforeAndAfter + import org.apache.spark._ -import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.storage._ From 889fe66b849ab789878fa3dbfe17c8b0fb4681eb Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Thu, 21 Jul 2016 10:59:35 -0500 Subject: [PATCH 5/6] Remove extra line --- .../main/scala/org/apache/spark/scheduler/LiveListenerBus.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 41280032d8278..bfa3c408f2284 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -44,7 +44,6 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) - private def validateAndGetQueueSize(): Int = { val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) if (queueSize <= 0) { From 82feec483e7e846b455d14bb80b5a679bea24a8c Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 25 Jul 2016 16:15:47 -0500 Subject: [PATCH 6/6] [SPARK-15703] Remove unused import --- .../main/scala/org/apache/spark/internal/config/package.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c92b1ba4709ee..ebb21e9efd384 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -19,7 +19,6 @@ package org.apache.spark.internal import java.util.concurrent.TimeUnit -import org.apache.spark.SparkException import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.ByteUnit