From ba322045de94df8ae41d2a25e8f6fa34f5b5c089 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Mar 2016 13:02:54 -0700 Subject: [PATCH 1/9] Add ClassTags to BlockInfo. --- .../spark/storage/BlockInfoManager.scala | 7 +- .../apache/spark/storage/BlockManager.scala | 72 ++++++++++--------- .../spark/storage/memory/MemoryStore.scala | 26 +++---- .../spark/storage/BlockInfoManagerSuite.scala | 3 +- .../spark/storage/BlockManagerSuite.scala | 38 +++++----- 5 files changed, 83 insertions(+), 63 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 46fab7a899633..1508ddb24d58a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -21,6 +21,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.reflect.ClassTag import com.google.common.collect.ConcurrentHashMultiset @@ -37,10 +38,14 @@ import org.apache.spark.internal.Logging * @param level the block's storage level. This is the requested persistence level, not the * effective storage level of the block (i.e. if this is MEMORY_AND_DISK, then this * does not imply that the block is actually resident in memory). + * @param classTag the block's [[ClassTag]], used to select the serializer * @param tellMaster whether state changes for this block should be reported to the master. This * is true for most blocks, but is false for broadcast blocks. */ -private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { +private[storage] class BlockInfo( + val level: StorageLevel, + val classTag: ClassTag[_ <: Any], + val tellMaster: Boolean) { /** * The size of the block (in bytes) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3bbdf48104c91..3c4037c6a4b93 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -23,6 +23,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal @@ -633,12 +634,13 @@ private[spark] class BlockManager( * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate( + def getOrElseUpdate[T: ClassTag]( blockId: BlockId, level: StorageLevel, - makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = { + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { // Initially we hold no locks on this block. - doPutIterator(blockId, makeIterator, level, keepReadLock = true) match { + val classTag = implicitly[ClassTag[T]] + doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. @@ -664,13 +666,13 @@ private[spark] class BlockManager( /** * @return true if the block was stored or false if an error occurred. */ - def putIterator( + def putIterator[T: ClassTag]( // TODO(josh): is this one of the places where implicit CT is ok? blockId: BlockId, - values: Iterator[Any], + values: Iterator[T], level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(values != null, "Values is null") - doPutIterator(blockId, () => values, level, tellMaster) match { + doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match { case None => true case Some(iter) => @@ -703,13 +705,13 @@ private[spark] class BlockManager( * * @return true if the block was stored or false if an error occurred. */ - def putBytes( + def putBytes[T: ClassTag]( // TODO(josh) blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(bytes != null, "Bytes is null") - doPutBytes(blockId, bytes, level, tellMaster) + doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster) } /** @@ -723,13 +725,14 @@ private[spark] class BlockManager( * returns. * @return true if the block was already present or if the put succeeded, false otherwise. */ - private def doPutBytes( + private def doPutBytes[T]( blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, + classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Boolean = { - doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo => + doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis // Since we're storing bytes, initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. @@ -753,7 +756,7 @@ private[spark] class BlockManager( // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { val values = dataDeserialize(blockId, bytes.duplicate()) - memoryStore.putIterator(blockId, values, level) match { + memoryStore.putIterator(blockId, values, level, classTag) match { case Right(_) => true case Left(iter) => // If putting deserialized values in memory failed, we will put the bytes directly to @@ -772,14 +775,14 @@ private[spark] class BlockManager( diskStore.putBytes(blockId, bytes) } - val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) + val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory, externalBlockStore, or disk store, // tell the master about it. - putBlockInfo.size = size + info.size = size if (tellMaster) { - reportBlockStatus(blockId, putBlockInfo, putBlockStatus) + reportBlockStatus(blockId, info, putBlockStatus) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) @@ -807,6 +810,7 @@ private[spark] class BlockManager( private def doPut[T]( blockId: BlockId, level: StorageLevel, + classTag: ClassTag[_], tellMaster: Boolean, keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = { @@ -814,7 +818,7 @@ private[spark] class BlockManager( require(level != null && level.isValid, "StorageLevel is null or invalid") val putBlockInfo = { - val newInfo = new BlockInfo(level, tellMaster) + val newInfo = new BlockInfo(level, classTag, tellMaster) if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { newInfo } else { @@ -867,21 +871,22 @@ private[spark] class BlockManager( * @return None if the block was already present or if the put succeeded, or Some(iterator) * if the put failed. */ - private def doPutIterator( + private def doPutIterator[T]( blockId: BlockId, - iterator: () => Iterator[Any], + iterator: () => Iterator[T], level: StorageLevel, + classTag: ClassTag[T], tellMaster: Boolean = true, - keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = { - doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo => + keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = { + doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis - var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] = None + var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None // Size of the block in bytes var size = 0L if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. - memoryStore.putIterator(blockId, iterator(), level) match { + memoryStore.putIterator(blockId, iterator(), level, classTag) match { case Right(s) => size = s case Left(iter) => @@ -903,14 +908,14 @@ private[spark] class BlockManager( size = diskStore.getSize(blockId) } - val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) + val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory, externalBlockStore, or disk store, // tell the master about it. - putBlockInfo.size = size + info.size = size if (tellMaster) { - reportBlockStatus(blockId, putBlockInfo, putBlockStatus) + reportBlockStatus(blockId, info, putBlockStatus) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) @@ -918,7 +923,7 @@ private[spark] class BlockManager( logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis - val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo) + val bytesToReplicate = doGetLocalBytes(blockId, info) try { replicate(blockId, bytesToReplicate, level) } finally { @@ -983,12 +988,13 @@ private[spark] class BlockManager( * @return a copy of the iterator. The original iterator passed this method should no longer * be used after this method returns. */ - private def maybeCacheDiskValuesInMemory( + private def maybeCacheDiskValuesInMemory[T]( blockInfo: BlockInfo, blockId: BlockId, level: StorageLevel, - diskIterator: Iterator[Any]): Iterator[Any] = { + diskIterator: Iterator[T]): Iterator[T] = { require(level.deserialized) + val classTag = blockInfo.classTag.asInstanceOf[ClassTag[T]] if (level.useMemory) { // Synchronize on blockInfo to guard against a race condition where two readers both try to // put values read from disk into the MemoryStore. @@ -997,7 +1003,7 @@ private[spark] class BlockManager( // Note: if we had a means to discard the disk iterator, we would do that here. memoryStore.getValues(blockId).get } else { - memoryStore.putIterator(blockId, diskIterator, level) match { + memoryStore.putIterator(blockId, diskIterator, level, classTag) match { case Left(iter) => // The memory store put() failed, so it returned the iterator back to us: iter @@ -1006,7 +1012,7 @@ private[spark] class BlockManager( memoryStore.getValues(blockId).get } } - } + }.asInstanceOf[Iterator[T]] } else { diskIterator } @@ -1292,7 +1298,7 @@ private[spark] class BlockManager( * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = { + def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: ByteBuffer): Iterator[T] = { bytes.rewind() dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true)) } @@ -1301,12 +1307,14 @@ private[spark] class BlockManager( * Deserializes a InputStream into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = { + def dataDeserializeStream[T: ClassTag]( + blockId: BlockId, + inputStream: InputStream): Iterator[T] = { val stream = new BufferedInputStream(inputStream) defaultSerializer .newInstance() .deserializeStream(wrapForCompression(blockId, stream)) - .asIterator + .asIterator.asInstanceOf[Iterator[T]] } def stop(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index a7c1854a41ff7..e43542f3f2ab9 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -22,6 +22,7 @@ import java.util.LinkedHashMap import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging @@ -126,10 +127,11 @@ private[spark] class MemoryStore( * iterator or call `close()` on it in order to free the storage memory consumed by the * partially-unrolled block. */ - private[storage] def putIterator( + private[storage] def putIterator[T]( blockId: BlockId, - values: Iterator[Any], - level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = { + values: Iterator[T], + level: StorageLevel, + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") @@ -148,7 +150,7 @@ private[spark] class MemoryStore( // Keep track of unroll memory used by this particular block / putIterator() operation var unrollMemoryUsedByThisBlock = 0L // Underlying vector for unrolling the block - var vector = new SizeTrackingVector[Any] + var vector = new SizeTrackingVector[T]()(classTag) // Request enough memory to begin unrolling keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) @@ -261,7 +263,7 @@ private[spark] class MemoryStore( None } else { require(entry.deserialized, "should only call getValues on deserialized blocks") - Some(entry.value.asInstanceOf[Array[Any]].iterator) + Some(entry.value.asInstanceOf[Array[_]].iterator) } } @@ -472,16 +474,16 @@ private[spark] class MemoryStore( * @param unrolled an iterator for the partially-unrolled values. * @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]]. */ -private[storage] class PartiallyUnrolledIterator( +private[storage] class PartiallyUnrolledIterator[T]( memoryStore: MemoryStore, unrollMemory: Long, - unrolled: Iterator[Any], - rest: Iterator[Any]) - extends Iterator[Any] { + unrolled: Iterator[T], + rest: Iterator[T]) + extends Iterator[T] { private[this] var unrolledIteratorIsConsumed: Boolean = false - private[this] var iter: Iterator[Any] = { - val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, { + private[this] var iter: Iterator[T] = { + val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, { unrolledIteratorIsConsumed = true memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) }) @@ -489,7 +491,7 @@ private[storage] class PartiallyUnrolledIterator( } override def hasNext: Boolean = iter.hasNext - override def next(): Any = iter.next() + override def next(): T = iter.next() /** * Called to dispose of this iterator and free its memory. diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index fe83fc722a8e8..7ee76aa4c6f9d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.implicitConversions +import scala.reflect.ClassTag import org.scalatest.BeforeAndAfterEach import org.scalatest.time.SpanSugar._ @@ -52,7 +53,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { } private def newBlockInfo(): BlockInfo = { - new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false) + new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false) } private def withTaskId[T](taskAttemptId: Long)(block: => T): T = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 2e0c0596a75bb..a32714d8849b1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -24,6 +24,7 @@ import scala.concurrent.duration._ import scala.concurrent.Future import scala.language.implicitConversions import scala.language.postfixOps +import scala.reflect.ClassTag import org.mockito.{Matchers => mc} import org.mockito.Mockito.{mock, times, verify, when} @@ -1073,7 +1074,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with all the space in the world. This should succeed. - var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY) + var putResult = + memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any) assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => @@ -1084,7 +1086,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed after kicking out someBlock1. assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)) assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)) - putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY) + putResult = + memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any) assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) assert(memoryStore.contains("someBlock2")) @@ -1098,7 +1101,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the mean time, however, we kicked out someBlock2 before giving up. assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)) - putResult = memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY) + putResult = + memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator assert(!memoryStore.contains("someBlock2")) assert(putResult.isLeft) @@ -1120,8 +1124,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with plenty of space. This should succeed and cache both blocks. - val result1 = memoryStore.putIterator("b1", smallIterator, memOnly) - val result2 = memoryStore.putIterator("b2", smallIterator, memOnly) + val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any) + val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any) assert(memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(result1.isRight) // unroll was successful @@ -1136,7 +1140,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.putIterator("b2", smallIterator, memOnly) // Unroll with not enough space. This should succeed but kick out b1 in the process. - val result3 = memoryStore.putIterator("b3", smallIterator, memOnly) + val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any) assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) @@ -1146,7 +1150,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.putIterator("b3", smallIterator, memOnly) // Unroll huge block with not enough space. This should fail and kick out b2 in the process. - val result4 = memoryStore.putIterator("b4", bigIterator, memOnly) + val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, ClassTag.Any) assert(result4.isLeft) // unroll was unsuccessful assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) @@ -1174,7 +1178,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed but kick out b1 in the process. // Memory store should contain b2 and b3, while disk store should contain only b1 - val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk) + val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any) assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) @@ -1190,7 +1194,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // the block may be stored to disk. During the unrolling process, block "b2" should be kicked // out, so the memory store should contain only b3, while the disk store should contain // b1, b2 and b4. - val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk) + val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any) assert(result4.isLeft) assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) @@ -1210,28 +1214,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // All unroll memory used is released because putIterator did not return an iterator - assert(memoryStore.putIterator("b1", smallIterator, memOnly).isRight) + assert(memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - assert(memoryStore.putIterator("b2", smallIterator, memOnly).isRight) + assert(memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll memory is not released because putIterator returned an iterator // that still depends on the underlying vector used in the process - assert(memoryStore.putIterator("b3", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB3 > 0) // The unroll memory owned by this thread builds on top of its value after the previous unrolls - assert(memoryStore.putIterator("b4", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b4", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) // ... but only to a certain extent (until we run out of free space to grant new unroll memory) - assert(memoryStore.putIterator("b5", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b5", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask - assert(memoryStore.putIterator("b6", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b6", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask - assert(memoryStore.putIterator("b7", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b7", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) @@ -1243,7 +1247,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val memoryStore = store.memoryStore val blockId = BlockId("rdd_3_10") store.blockInfoManager.lockNewBlockForWriting( - blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)) + blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)) memoryStore.putBytes(blockId, 13000, () => { fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") }) From f22f8ee16f7212178c83ad2c7a22c767dee4fa63 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Mar 2016 14:09:11 -0700 Subject: [PATCH 2/9] Construct BlockManager with a SerializerManager --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../scala/org/apache/spark/storage/BlockManager.scala | 8 ++++---- .../spark/storage/BlockManagerReplicationSuite.scala | 8 +++++--- .../org/apache/spark/storage/BlockManagerSuite.scala | 8 +++++--- .../spark/streaming/ReceivedBlockHandlerSuite.scala | 5 +++-- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 459fab88ce1de..e2c47ceda2e6f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -331,7 +331,7 @@ object SparkEnv extends Logging { // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, - serializer, conf, memoryManager, mapOutputTracker, shuffleManager, + serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3c4037c6a4b93..2fc102c1f355a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -40,7 +40,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExternalShuffleClient import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.rpc.RpcEnv -import org.apache.spark.serializer.{Serializer, SerializerInstance} +import org.apache.spark.serializer.{Serializer, SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ import org.apache.spark.util._ @@ -61,7 +61,7 @@ private[spark] class BlockManager( executorId: String, rpcEnv: RpcEnv, val master: BlockManagerMaster, - defaultSerializer: Serializer, + serializerManager: SerializerManager, val conf: SparkConf, memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, @@ -1283,7 +1283,7 @@ private[spark] class BlockManager( outputStream: OutputStream, values: Iterator[Any]): Unit = { val byteStream = new BufferedOutputStream(outputStream) - val ser = defaultSerializer.newInstance() + val ser = serializerManager.getSerializer(ClassTag.Any).newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } @@ -1311,7 +1311,7 @@ private[spark] class BlockManager( blockId: BlockId, inputStream: InputStream): Iterator[T] = { val stream = new BufferedInputStream(inputStream) - defaultSerializer + serializerManager.getSerializer(ClassTag.Any) .newInstance() .deserializeStream(wrapForCompression(blockId, stream)) .asIterator.asInstanceOf[Iterator[T]] diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index b78a3648cd8bc..98e8450fa1453 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.StorageLevel._ @@ -62,7 +62,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) - val store = new BlockManager(name, rpcEnv, master, serializer, conf, + val serializerManager = new SerializerManager(serializer, conf) + val store = new BlockManager(name, rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memManager.setMemoryStore(store.memoryStore) store.initialize("app-id") @@ -262,7 +263,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo when(failableTransfer.hostName).thenReturn("some-hostname") when(failableTransfer.port).thenReturn(1000) val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000, numCores = 1) - val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer, conf, + val serializerManager = new SerializerManager(serializer, conf) + val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0) memManager.setMemoryStore(failableStore.memoryStore) failableStore.initialize("app-id") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index a32714d8849b1..915afc87ecb26 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -41,7 +41,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.network.shuffle.BlockFetchingListener import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util._ @@ -77,7 +77,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val transfer = transferService .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1)) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) - val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, + val serializerManager = new SerializerManager(serializer, conf) + val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memManager.setMemoryStore(blockManager.memoryStore) blockManager.initialize("app-id") @@ -821,8 +822,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE maxOnHeapExecutionMemory = Long.MaxValue, maxStorageMemory = 1200, numCores = 1) + val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, - new JavaSerializer(conf), conf, memoryManager, mapOutputTracker, + serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memoryManager.setMemoryStore(store.memoryStore) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 2d509af85ae33..dec65533293e4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ @@ -264,7 +264,8 @@ class ReceivedBlockHandlerSuite name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf, + val serializerManager = new SerializerManager(serializer, conf) + val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memManager.setMemoryStore(blockManager.memoryStore) blockManager.initialize("app-id") From c30c6ee4905e3b836ce231e6a506ba11faad1f12 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Mar 2016 14:59:28 -0700 Subject: [PATCH 3/9] Propagate ClassTags in a bunch more places. --- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../spark/storage/BlockInfoManager.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 43 +++++++++++-------- .../spark/storage/memory/MemoryStore.scala | 2 +- 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 8a577c83e10db..f96551c793a14 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -326,7 +326,7 @@ abstract class RDD[T: ClassTag]( val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. - SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => { + SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { readCachedBlock = false computeOrReadCheckpoint(partition, context) }) match { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 1508ddb24d58a..94d11c5be5a49 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -44,7 +44,7 @@ import org.apache.spark.internal.Logging */ private[storage] class BlockInfo( val level: StorageLevel, - val classTag: ClassTag[_ <: Any], + val classTag: ClassTag[_], val tellMaster: Boolean) { /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2fc102c1f355a..959caef3b0724 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -419,7 +419,7 @@ private[spark] class BlockManager( val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get } else { - dataDeserialize(blockId, memoryStore.getBytes(blockId).get) + dataDeserialize(blockId, memoryStore.getBytes(blockId).get)(info.classTag) } val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) @@ -427,10 +427,11 @@ private[spark] class BlockManager( val iterToReturn: Iterator[Any] = { val diskBytes = diskStore.getBytes(blockId) if (level.deserialized) { - val diskValues = dataDeserialize(blockId, diskBytes) + val diskValues = dataDeserialize(blockId, diskBytes)(info.classTag) maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) } else { - dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)) + val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes) + dataDeserialize(blockId, bytes)(info.classTag) } } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) @@ -505,6 +506,7 @@ private[spark] class BlockManager( */ def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => + // TODO(josh): read class tag? new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit()) } } @@ -634,12 +636,12 @@ private[spark] class BlockManager( * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate[T: ClassTag]( + def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, + classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { // Initially we hold no locks on this block. - val classTag = implicitly[ClassTag[T]] doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully @@ -666,7 +668,7 @@ private[spark] class BlockManager( /** * @return true if the block was stored or false if an error occurred. */ - def putIterator[T: ClassTag]( // TODO(josh): is this one of the places where implicit CT is ok? + def putIterator[T: ClassTag]( blockId: BlockId, values: Iterator[T], level: StorageLevel, @@ -755,7 +757,7 @@ private[spark] class BlockManager( // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { - val values = dataDeserialize(blockId, bytes.duplicate()) + val values = dataDeserialize(blockId, bytes.duplicate())(classTag) memoryStore.putIterator(blockId, values, level, classTag) match { case Right(_) => true case Left(iter) => @@ -894,7 +896,7 @@ private[spark] class BlockManager( if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { fileOutputStream => - dataSerializeStream(blockId, fileOutputStream, iter) + dataSerializeStream(blockId, fileOutputStream, iter)(classTag) } size = diskStore.getSize(blockId) } else { @@ -903,7 +905,7 @@ private[spark] class BlockManager( } } else if (level.useDisk) { diskStore.put(blockId) { fileOutputStream => - dataSerializeStream(blockId, fileOutputStream, iterator()) + dataSerializeStream(blockId, fileOutputStream, iterator())(classTag) } size = diskStore.getSize(blockId) } @@ -1139,9 +1141,9 @@ private[spark] class BlockManager( * @return true if the block was stored or false if the block was already stored or an * error occurred. */ - def putSingle( + def putSingle[T: ClassTag]( blockId: BlockId, - value: Any, + value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean = { putIterator(blockId, Iterator(value), level, tellMaster) @@ -1158,9 +1160,9 @@ private[spark] class BlockManager( * * @return the block's new effective StorageLevel. */ - def dropFromMemory( + private[storage] def dropFromMemory[T]( blockId: BlockId, - data: () => Either[Array[Any], ByteBuffer]): StorageLevel = { + data: () => Either[Array[T], ByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false @@ -1172,7 +1174,10 @@ private[spark] class BlockManager( data() match { case Left(elements) => diskStore.put(blockId) { fileOutputStream => - dataSerializeStream(blockId, fileOutputStream, elements.toIterator) + dataSerializeStream( + blockId, + fileOutputStream, + elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) } case Right(bytes) => diskStore.putBytes(blockId, bytes) @@ -1278,17 +1283,17 @@ private[spark] class BlockManager( } /** Serializes into a stream. */ - def dataSerializeStream( + def dataSerializeStream[T: ClassTag]( blockId: BlockId, outputStream: OutputStream, - values: Iterator[Any]): Unit = { + values: Iterator[T]): Unit = { val byteStream = new BufferedOutputStream(outputStream) - val ser = serializerManager.getSerializer(ClassTag.Any).newInstance() + val ser = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } /** Serializes into a byte buffer. */ - def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = { + def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ByteBuffer = { val byteStream = new ByteBufferOutputStream(4096) dataSerializeStream(blockId, byteStream, values) byteStream.toByteBuffer @@ -1311,7 +1316,7 @@ private[spark] class BlockManager( blockId: BlockId, inputStream: InputStream): Iterator[T] = { val stream = new BufferedInputStream(inputStream) - serializerManager.getSerializer(ClassTag.Any) + serializerManager.getSerializer(implicitly[ClassTag[T]]) .newInstance() .deserializeStream(wrapForCompression(blockId, stream)) .asIterator.asInstanceOf[Iterator[T]] diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index e43542f3f2ab9..9491b2b547218 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -188,7 +188,7 @@ private[spark] class MemoryStore( val entry = if (level.deserialized) { new MemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues), deserialized = true) } else { - val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) + val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)(classTag) new MemoryEntry(bytes, bytes.limit, deserialized = false) } val size = entry.size From 359fb7efea5ce06c3a43e67013f585067dc9cf4b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Mar 2016 15:39:30 -0700 Subject: [PATCH 4/9] Propagate class tags during block replication. --- .../spark/network/BlockDataManager.scala | 8 ++++- .../spark/network/BlockTransferService.scala | 10 +++++-- .../network/netty/NettyBlockRpcServer.scala | 14 ++++++--- .../netty/NettyBlockTransferService.scala | 12 ++++---- .../apache/spark/storage/BlockManager.scala | 29 ++++++++++++++----- .../spark/storage/BlockManagerSuite.scala | 3 +- 6 files changed, 54 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index cc5e851c29b32..5321285bdd22a 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -17,6 +17,8 @@ package org.apache.spark.network +import scala.reflect.ClassTag + import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.storage.{BlockId, StorageLevel} @@ -35,7 +37,11 @@ trait BlockDataManager { * Returns true if the block was stored and false if the put operation failed or the block * already existed. */ - def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean + def putBlockData( + blockId: BlockId, + data: ManagedBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Boolean /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index 2de0f2033f2ed..e43e3a2de2566 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.concurrent.{Await, Future, Promise} import scala.concurrent.duration.Duration +import scala.reflect.ClassTag import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} @@ -76,7 +77,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo execId: String, blockId: BlockId, blockData: ManagedBuffer, - level: StorageLevel): Future[Unit] + level: StorageLevel, + classTag: ClassTag[_]): Future[Unit] /** * A special case of [[fetchBlocks]], as it fetches only one block and is blocking. @@ -114,7 +116,9 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo execId: String, blockId: BlockId, blockData: ManagedBuffer, - level: StorageLevel): Unit = { - Await.result(uploadBlock(hostname, port, execId, blockId, blockData, level), Duration.Inf) + level: StorageLevel, + classTag: ClassTag[_]): Unit = { + val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag) + Await.result(future, Duration.Inf) } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index c1dbca5db2007..2ed8a00df7023 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -20,6 +20,8 @@ package org.apache.spark.network.netty import java.nio.ByteBuffer import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag import org.apache.spark.internal.Logging import org.apache.spark.network.BlockDataManager @@ -61,12 +63,16 @@ class NettyBlockRpcServer( responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer) case uploadBlock: UploadBlock => - // StorageLevel is serialized as bytes using our JavaSerializer. - val level: StorageLevel = - serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata)) + // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer. + val (level: StorageLevel, classTag: ClassTag[_]) = { + serializer + .newInstance() + .deserialize(ByteBuffer.wrap(uploadBlock.metadata)) + .asInstanceOf[(StorageLevel, ClassTag[_])] + } val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData)) val blockId = BlockId(uploadBlock.blockId) - blockManager.putBlockData(blockId, data, level) + blockManager.putBlockData(blockId, data, level, classTag) responseContext.onSuccess(ByteBuffer.allocate(0)) } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index f588a28eed28d..5f3d4532dd866 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} +import scala.reflect.ClassTag import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.network._ @@ -118,18 +119,19 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage execId: String, blockId: BlockId, blockData: ManagedBuffer, - level: StorageLevel): Future[Unit] = { + level: StorageLevel, + classTag: ClassTag[_]): Future[Unit] = { val result = Promise[Unit]() val client = clientFactory.createClient(hostname, port) - // StorageLevel is serialized as bytes using our JavaSerializer. Everything else is encoded - // using our binary protocol. - val levelBytes = JavaUtils.bufferToArray(serializer.newInstance().serialize(level)) + // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer. + // Everything else is encoded using our binary protocol. + val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) // Convert or copy nio buffer into array in order to serialize it. val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) - client.sendRpc(new UploadBlock(appId, execId, blockId.toString, levelBytes, array).toByteBuffer, + client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer, new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { logTrace(s"Successfully uploaded block $blockId") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 959caef3b0724..84c66ae146a80 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -296,8 +296,12 @@ private[spark] class BlockManager( /** * Put the block locally, using the given storage level. */ - override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = { - putBytes(blockId, data.nioByteBuffer(), level) + override def putBlockData( + blockId: BlockId, + data: ManagedBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Boolean = { + putBytes(blockId, data.nioByteBuffer(), level)(classTag) } /** @@ -504,9 +508,8 @@ private[spark] class BlockManager( * * This does not acquire a lock on this block in this JVM. */ - def getRemoteValues(blockId: BlockId): Option[BlockResult] = { + private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => - // TODO(josh): read class tag? new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit()) } } @@ -744,7 +747,7 @@ private[spark] class BlockManager( Future { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool - replicate(blockId, bufferView, level) + replicate(blockId, bufferView, level, classTag) }(futureExecutionContext) } else { null @@ -927,7 +930,7 @@ private[spark] class BlockManager( val remoteStartTime = System.currentTimeMillis val bytesToReplicate = doGetLocalBytes(blockId, info) try { - replicate(blockId, bytesToReplicate, level) + replicate(blockId, bytesToReplicate, level, classTag) } finally { BlockManager.dispose(bytesToReplicate) } @@ -1040,7 +1043,11 @@ private[spark] class BlockManager( * Replicate block to another node. Not that this is a blocking call that returns after * the block has been replicated. */ - private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { + private def replicate( + blockId: BlockId, + data: ByteBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 val peersForReplication = new ArrayBuffer[BlockManagerId] @@ -1096,7 +1103,13 @@ private[spark] class BlockManager( data.rewind() logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") blockTransferService.uploadBlockSync( - peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel) + peer.host, + peer.port, + peer.executorId, + blockId, + new NioManagedBuffer(data), + tLevel, + classTag) logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms" .format(System.currentTimeMillis - onePeerStartTime)) peersReplicatedTo += peer diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 915afc87ecb26..30f4f89707047 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1345,7 +1345,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, - level: StorageLevel): Future[Unit] = { + level: StorageLevel, + classTag: ClassTag[_]): Future[Unit] = { import scala.concurrent.ExecutionContext.Implicits.global Future {} } From 43d81610a407dcc04fd2192a8c80ce47a850969a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Mar 2016 17:06:15 -0700 Subject: [PATCH 5/9] Add MiMa exclude. --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index b38eec34a08b5..03344bc86c2ef 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -567,6 +567,9 @@ object MimaExcludes { if missing.map(_.fullName).sameElements(Seq("org.apache.spark.Logging")) => false case _ => true } + ) ++ Seq( + // [SPARK-13990] Automatically pick serializer when caching RDDs + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock") ) case v if v.startsWith("1.6") => Seq( From ba3ca52ff8828fcced7d89cbf265e385c83f8044 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Mar 2016 19:38:28 -0700 Subject: [PATCH 6/9] Fix DistributedSuite tests. --- core/src/test/scala/org/apache/spark/DistributedSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 1c3f2bc315ddc..91425cb3ae432 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -196,8 +196,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex blockManager.master.getLocations(blockId).foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, blockId.toString) - val deserialized = blockManager.dataDeserialize(blockId, bytes.nioByteBuffer()) - .asInstanceOf[Iterator[Int]].toList + val deserialized = blockManager.dataDeserialize[Int](blockId, bytes.nioByteBuffer()).toList assert(deserialized === (1 to 100).toList) } } @@ -222,7 +221,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val numPartitions = 10 val conf = new SparkConf() .set("spark.storage.unrollMemoryThreshold", "1024") - .set("spark.testing.memory", (size * numPartitions).toString) + .set("spark.testing.memory", size.toString) sc = new SparkContext(clusterUrl, "test", conf) val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY) assert(data.count() === size) From a6c3d534d7bb1c758dc7b24c640aabbc4509728d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 21 Mar 2016 00:22:24 -0700 Subject: [PATCH 7/9] Fix FlatMapIterator test. --- .../apache/spark/storage/BlockManager.scala | 2 +- .../spark/storage/memory/MemoryStore.scala | 71 ++++++++++++------- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 7c8eeac6f9eb8..22d6aa2cf653b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1167,7 +1167,7 @@ private[spark] class BlockManager( * * @return the block's new effective StorageLevel. */ - private[storage] def dropFromMemory[T]( + private[storage] def dropFromMemory[T: ClassTag]( blockId: BlockId, data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 4421c11d2e996..d370ee912ab31 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -31,11 +31,18 @@ import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector import org.apache.spark.util.io.ChunkedByteBuffer -private sealed trait MemoryEntry { - val size: Long +private sealed trait MemoryEntry[T] { + def size: Long + def classTag: ClassTag[T] } -private case class DeserializedMemoryEntry[T](value: Array[T], size: Long) extends MemoryEntry -private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: Long) extends MemoryEntry +private case class DeserializedMemoryEntry[T]( + value: Array[T], + size: Long, + classTag: ClassTag[T]) extends MemoryEntry[T] +private case class SerializedMemoryEntry[T]( + buffer: ChunkedByteBuffer, + size: Long, + classTag: ClassTag[T]) extends MemoryEntry[T] /** * Stores blocks in memory, either as Arrays of deserialized Java objects or as @@ -50,7 +57,7 @@ private[spark] class MemoryStore( // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and // acquiring or releasing unroll memory, must be synchronized on `memoryManager`! - private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) + private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true) // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `memoryManager` @@ -96,13 +103,16 @@ private[spark] class MemoryStore( * * @return true if the put() succeeded, false otherwise. */ - def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): Boolean = { + def putBytes[T: ClassTag]( + blockId: BlockId, + size: Long, + _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") if (memoryManager.acquireStorageMemory(blockId, size)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) - val entry = new SerializedMemoryEntry(bytes, size) + val entry = new SerializedMemoryEntry[T](bytes, size, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) } @@ -189,10 +199,10 @@ private[spark] class MemoryStore( val arrayValues = vector.toArray vector = null val entry = if (level.deserialized) { - new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues)) + new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)(classTag) - new SerializedMemoryEntry(bytes, bytes.size) + new SerializedMemoryEntry[T](bytes, bytes.size, classTag) } val size = entry.size def transferUnrollToStorage(amount: Long): Unit = { @@ -252,17 +262,19 @@ private[spark] class MemoryStore( case null => None case e: DeserializedMemoryEntry[_] => throw new IllegalArgumentException("should only call getBytes on serialized blocks") - case SerializedMemoryEntry(bytes, _) => Some(bytes) + case SerializedMemoryEntry(bytes, _, _) => Some(bytes) } } - def getValues(blockId: BlockId): Option[Iterator[Any]] = { + def getValues(blockId: BlockId): Option[Iterator[_]] = { val entry = entries.synchronized { entries.get(blockId) } entry match { case null => None - case e: SerializedMemoryEntry => + case e: SerializedMemoryEntry[_] => throw new IllegalArgumentException("should only call getValues on deserialized blocks") - case DeserializedMemoryEntry(values, _) => Some(values.iterator) + case DeserializedMemoryEntry(values, _, _) => + val x = Some(values) + x.map(_.iterator) } } @@ -335,6 +347,24 @@ private[spark] class MemoryStore( } } + def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { + val data = entry match { + case DeserializedMemoryEntry(values, _, _) => Left(values) + case SerializedMemoryEntry(buffer, _, _) => Right(buffer) + } + val newEffectiveStorageLevel = + blockManager.dropFromMemory(blockId, () => data)(entry.classTag) + if (newEffectiveStorageLevel.isValid) { + // The block is still present in at least one store, so release the lock + // but don't delete the block info + blockManager.releaseLock(blockId) + } else { + // The block isn't present in any store, so delete the block info so that the + // block can be stored again + blockManager.blockInfoManager.removeBlock(blockId) + } + } + if (freedMemory >= space) { logInfo(s"${selectedBlocks.size} blocks selected for dropping") for (blockId <- selectedBlocks) { @@ -343,20 +373,7 @@ private[spark] class MemoryStore( // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { - val data = entry match { - case DeserializedMemoryEntry(values, _) => Left(values) - case SerializedMemoryEntry(buffer, _) => Right(buffer) - } - val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data) - if (newEffectiveStorageLevel.isValid) { - // The block is still present in at least one store, so release the lock - // but don't delete the block info - blockManager.releaseLock(blockId) - } else { - // The block isn't present in any store, so delete the block info so that the - // block can be stored again - blockManager.blockInfoManager.removeBlock(blockId) - } + dropBlock(blockId, entry) } } freedMemory From bd98021e1f4c0b65c90a50b1c54fbd2abe4b303d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 21 Mar 2016 11:16:35 -0700 Subject: [PATCH 8/9] Fix test compilation. --- .../org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 531c4454485c0..122ca0627f720 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util._ import org.apache.spark.util.{ManualClock, Utils} +import org.apache.spark.util.io.ChunkedByteBuffer class ReceivedBlockHandlerSuite extends SparkFunSuite @@ -155,7 +156,7 @@ class ReceivedBlockHandlerSuite val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf) val bytes = reader.read(fileSegment) reader.close() - blockManager.dataDeserialize(generateBlockId(), bytes).toList + blockManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList } loggedData shouldEqual data } From 8793a1dbf546d1db8ceeda2e91edc1a084046f33 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 21 Mar 2016 17:16:59 -0700 Subject: [PATCH 9/9] Two typo fixes. --- .../main/scala/org/apache/spark/network/BlockDataManager.scala | 2 +- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index 5321285bdd22a..8f83668d79029 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -41,7 +41,7 @@ trait BlockDataManager { blockId: BlockId, data: ManagedBuffer, level: StorageLevel, - classTag: ClassTag[_]): Boolean + classTag: ClassTag[_]): Boolean /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 22d6aa2cf653b..83f8c5c37d136 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -710,7 +710,7 @@ private[spark] class BlockManager( * * @return true if the block was stored or false if an error occurred. */ - def putBytes[T: ClassTag]( // TODO(josh) + def putBytes[T: ClassTag]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel,