From 9d5bd99e317d5d0f73b645f8e29276c5d99ed39a Mon Sep 17 00:00:00 2001 From: siknezev Date: Sat, 1 Aug 2020 10:31:39 -0700 Subject: [PATCH] Sort-merge join operator spilling performance improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit What changes were proposed in this pull request? The following list of changes will improve SQL execution performance when data is spilled on the disk: 1) Implement lazy initialization of UnsafeSorterSpillReader - iterator on top of spilled rows: ... During SortMergeJoin (Left Semi Join) execution, the iterator on the spill data is created but no iteration over the data is done. ... Having lazy initialization of UnsafeSorterSpillReader will enable efficient processing of SortMergeJoin even if data is spilled onto disk. Unnecessary I/O will be avoided. 2) Decrease initial memory read buffer size in UnsafeSorterSpillReader from 1MB to 1KB: ... UnsafeSorterSpillReader constructor takes lot of time due to size of default 1MB memory read buffer. ... The code already has logic to increase the memory read buffer if it cannot fit the data, so decreasing the size to 1K is safe and has positive performance impact. 3) Improve memory utilization when spilling is enabled in ExternalAppendOnlyUnsafeRowArrey ... In the current implementation, when spilling is enabled, UnsafeExternalSorter object is created and then data moved from ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter and then ExternalAppendOnlyUnsafeRowArrey object is emptied. Just before ExternalAppendOnlyUnsafeRowArrey object is emptied there are both objects in the memory with the same data. That require double memory and there is duplication of data. This can be avoided. ... In the proposed solution, when spark.sql.sortMergeJoinExec.buffer.in.memory.threshold is reached adding new rows into ExternalAppendOnlyUnsafeRowArray object stops. UnsafeExternalSorter object is created and new rows are added into this object. ExternalAppendOnlyUnsafeRowArray object retains all rows already added into this object. This approach will enable better memory utilization and avoid unnecessary movement of data from one object into another. Why are the changes needed? Testing with TPC-DS 100 TB benchmark data set showed that some of SQLs (example query 14) are not able to run even with extremely large Spark executor memory. Spark spilling feature has to be enabled, in order to be able to process these SQLs. Processing of SQLs becomes extremely slow when spilling is enabled. The test of this solution with query 14 and enabled spilling on the disk, showed 500X performance improvements and it didn’t degrade performance of the other SQLs from TPC-DS benchmark. Does this PR introduce any user-facing change? No How was this patch tested? By running TPC-DS SQLs with different data sets 10 TB and 100 TB By running all Spark tests. --- .../spark/unsafe/map/BytesToBytesMap.java | 6 +- .../unsafe/sort/UnsafeExternalSorter.java | 35 +++++--- .../unsafe/sort/UnsafeSorterIterator.java | 4 +- .../unsafe/sort/UnsafeSorterSpillMerger.java | 2 +- .../unsafe/sort/UnsafeSorterSpillReader.java | 82 ++++++++++++------- .../sort/UnsafeInMemorySorterSuite.java | 3 +- ...yUnsafeRowArrayBenchmark-jdk11-results.txt | 6 +- ...endOnlyUnsafeRowArrayBenchmark-results.txt | 6 +- .../ExternalAppendOnlyUnsafeRowArray.scala | 56 ++++++++----- .../sql/DataFrameWindowFunctionsSuite.scala | 4 +- ...nalAppendOnlyUnsafeRowArrayBenchmark.scala | 42 ++++++++++ ...xternalAppendOnlyUnsafeRowArraySuite.scala | 6 +- 12 files changed, 175 insertions(+), 77 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 6e028886f2318..ee17895528712 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -322,10 +322,10 @@ public Location next() { return loc; } else { assert(reader != null); - if (!reader.hasNext()) { - advanceToNextPage(); - } try { + if (!reader.hasNext()) { + advanceToNextPage(); + } reader.loadNext(); } catch (IOException e) { try { diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 55e4e609c3c7b..d79a02c76d86c 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -506,7 +506,7 @@ class SpillableIterator extends UnsafeSorterIterator { private boolean loaded = false; private int numRecords = 0; - SpillableIterator(UnsafeSorterIterator inMemIterator) { + SpillableIterator(UnsafeSorterIterator inMemIterator) throws IOException { this.upstream = inMemIterator; this.numRecords = inMemIterator.getNumRecords(); } @@ -681,31 +681,24 @@ static class ChainedIterator extends UnsafeSorterIterator { ChainedIterator(Queue iterators) { assert iterators.size() > 0; this.numRecords = 0; - for (UnsafeSorterIterator iter: iterators) { - this.numRecords += iter.getNumRecords(); - } this.iterators = iterators; - this.current = iterators.remove(); } @Override - public int getNumRecords() { + public int getNumRecords() throws IOException { + initializeNumRecords(); return numRecords; } @Override - public boolean hasNext() { - while (!current.hasNext() && !iterators.isEmpty()) { - current = iterators.remove(); - } + public boolean hasNext() throws IOException { + nextIterator(); return current.hasNext(); } @Override public void loadNext() throws IOException { - while (!current.hasNext() && !iterators.isEmpty()) { - current = iterators.remove(); - } + nextIterator(); current.loadNext(); } @@ -720,5 +713,21 @@ public void loadNext() throws IOException { @Override public long getKeyPrefix() { return current.getKeyPrefix(); } + + private void initializeNumRecords() throws IOException { + if (numRecords == 0) { + for (UnsafeSorterIterator iter: iterators) { + numRecords += iter.getNumRecords(); + } + this.current = iterators.remove(); + } + } + + private void nextIterator() throws IOException { + initializeNumRecords(); + while (!current.hasNext() && !iterators.isEmpty()) { + current = iterators.remove(); + } + } } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java index 1b3167fcc250c..c9b15ba3270e4 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java @@ -21,7 +21,7 @@ public abstract class UnsafeSorterIterator { - public abstract boolean hasNext(); + public abstract boolean hasNext() throws IOException; public abstract void loadNext() throws IOException; @@ -33,5 +33,5 @@ public abstract class UnsafeSorterIterator { public abstract long getKeyPrefix(); - public abstract int getNumRecords(); + public abstract int getNumRecords() throws IOException; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java index ab800288dcb43..20bb963c60846 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java @@ -71,7 +71,7 @@ public int getNumRecords() { } @Override - public boolean hasNext() { + public boolean hasNext() throws IOException { return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext()); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index a524c4790407d..03db1a74e111e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -47,55 +47,49 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen private int numRecords; private int numRecordsRemaining; - private byte[] arr = new byte[1024 * 1024]; + private byte[] arr = new byte[1024]; private Object baseObject = arr; private final TaskContext taskContext = TaskContext.get(); + private final SerializerManager serManager; + private final File dataFile; + private final BlockId blkId; + private boolean initialized; public UnsafeSorterSpillReader( SerializerManager serializerManager, File file, BlockId blockId) throws IOException { assert (file.length() > 0); - final ConfigEntry bufferSizeConfigEntry = - package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE(); - // This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe. - final int DEFAULT_BUFFER_SIZE_BYTES = - ((Long) bufferSizeConfigEntry.defaultValue().get()).intValue(); - int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES : - ((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue(); - - final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get( - package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED()); - - final InputStream bs = - new NioBufferedFileInputStream(file, bufferSizeBytes); - try { - if (readAheadEnabled) { - this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs), - bufferSizeBytes); - } else { - this.in = serializerManager.wrapStream(blockId, bs); - } - this.din = new DataInputStream(this.in); - numRecords = numRecordsRemaining = din.readInt(); - } catch (IOException e) { - Closeables.close(bs, /* swallowIOException = */ true); - throw e; - } + serManager = serializerManager; + dataFile = file; + blkId = blockId; + initialized = false; } @Override - public int getNumRecords() { + public int getNumRecords() throws IOException { + if (!initialized) { + readSpilledFile(); + initialized = true; + } return numRecords; } @Override - public boolean hasNext() { + public boolean hasNext() throws IOException { + if (!initialized) { + readSpilledFile(); + initialized = true; + } return (numRecordsRemaining > 0); } @Override public void loadNext() throws IOException { + if (!initialized) { + readSpilledFile(); + initialized = true; + } // Kill the task in case it has been marked as killed. This logic is from // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order // to avoid performance overhead. This check is added here in `loadNext()` instead of in @@ -148,4 +142,34 @@ public void close() throws IOException { } } } + + private void readSpilledFile() throws IOException { + assert (dataFile.length() > 0); + final ConfigEntry bufferSizeConfigEntry = + package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE(); + // This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe. + final int DEFAULT_BUFFER_SIZE_BYTES = + ((Long) bufferSizeConfigEntry.defaultValue().get()).intValue(); + int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES : + ((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue(); + + final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get( + package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED()); + + final InputStream bs = + new NioBufferedFileInputStream(dataFile, bufferSizeBytes); + try { + if (readAheadEnabled) { + this.in = new ReadAheadInputStream(serManager.wrapStream(blkId, bs), + bufferSizeBytes); + } else { + this.in = serManager.wrapStream(blkId, bs); + } + this.din = new DataInputStream(this.in); + numRecords = numRecordsRemaining = din.readInt(); + } catch (IOException e) { + Closeables.close(bs, /* swallowIOException = */ true); + throw e; + } + } } diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index 2b8a0602730e1..e7945f99a9267 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.util.collection.unsafe.sort; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -51,7 +52,7 @@ private static String getStringFromDataPage(Object baseObject, long baseOffset, } @Test - public void testSortingEmptyInput() { + public void testSortingEmptyInput() throws IOException { final TaskMemoryManager memoryManager = new TaskMemoryManager( new TestMemoryManager( new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0); diff --git a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk11-results.txt b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk11-results.txt index 4f50a894e5c07..881b3e4e73f2d 100644 --- a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk11-results.txt @@ -42,4 +42,8 @@ Spilling with 10000 rows: Best Time(ms) Avg Time(ms) Stdev(m UnsafeExternalSorter 12 12 0 13.8 72.7 1.0X ExternalAppendOnlyUnsafeRowArray 8 8 0 19.8 50.6 1.4X - +Java HotSpot(TM) 64-Bit Server VM 11.0.7+8-LTS on Linux 4.4.0-178-generic +Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz +Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +UnsafeSorterSpillReader_bufferSize1024 231 342 82 1.1 901.6 1.0X \ No newline at end of file diff --git a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt index c4be80af1334b..989380214e490 100644 --- a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt +++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt @@ -42,4 +42,8 @@ Spilling with 10000 rows: Best Time(ms) Avg Time(ms) Stdev(m UnsafeExternalSorter 11 11 1 14.7 68.0 1.0X ExternalAppendOnlyUnsafeRowArray 9 10 1 17.1 58.5 1.2X - +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~16.04-b09 on Linux 4.4.0-178-generic +Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz +Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +UnsafeSorterSpillReader_bufferSize1024 411 426 13 0.6 1607.2 1.0X diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index ac282ea2e94f5..eb63e5f444407 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -95,7 +95,8 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( // inside `UnsafeExternalSorter` spillableArray.cleanupResources() spillableArray = null - } else if (inMemoryBuffer != null) { + } + if (inMemoryBuffer != null) { inMemoryBuffer.clear() } numFieldsPerRow = 0 @@ -124,18 +125,6 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( numRowsSpillThreshold, false) - // populate with existing in-memory buffered rows - if (inMemoryBuffer != null) { - inMemoryBuffer.foreach(existingUnsafeRow => - spillableArray.insertRecord( - existingUnsafeRow.getBaseObject, - existingUnsafeRow.getBaseOffset, - existingUnsafeRow.getSizeInBytes, - 0, - false) - ) - inMemoryBuffer.clear() - } numFieldsPerRow = unsafeRow.numFields() } @@ -168,7 +157,15 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( if (spillableArray == null) { new InMemoryBufferIterator(startIndex) } else { - new SpillableArrayIterator(spillableArray.getIterator(startIndex), numFieldsPerRow) + val offsetIndex = if (inMemoryBuffer != null && startIndex > inMemoryBuffer.length) { + startIndex - inMemoryBuffer.length + } else { + 0 + } + new SpilledArrayMergeIterator( + spillableArray.getIterator(offsetIndex), + numFieldsPerRow, + startIndex) } } @@ -204,20 +201,37 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( } } - private[this] class SpillableArrayIterator( + private[this] class SpilledArrayMergeIterator( iterator: UnsafeSorterIterator, - numFieldPerRow: Int) + numFieldPerRow: Int, + startIndex: Int) extends ExternalAppendOnlyUnsafeRowArrayIterator { - private val currentRow = new UnsafeRow(numFieldPerRow) + private var currentIndex = startIndex - override def hasNext(): Boolean = !isModified() && iterator.hasNext + private val currentSorterRow = new UnsafeRow(numFieldPerRow) + + override def hasNext(): Boolean = { + if (inMemoryBuffer != null && currentIndex < inMemoryBuffer.length) { + !isModified() + } else { + !isModified() && iterator.hasNext + } + } override def next(): UnsafeRow = { throwExceptionIfModified() - iterator.loadNext() - currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength) - currentRow + if (inMemoryBuffer != null && currentIndex < inMemoryBuffer.length) { + val result = inMemoryBuffer(currentIndex) + currentIndex += 1 + result + } else { + iterator.loadNext() + currentSorterRow.pointTo(iterator.getBaseObject, + iterator.getBaseOffset, + iterator.getRecordLength) + currentSorterRow + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index bc6adfb857b02..2d27c4340c926 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -614,7 +614,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest } test("Window spill with more than the inMemoryThreshold and spillThreshold") { - val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value") + val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4"), (1, "5"), (2, "6")).toDF("key", "value") val window = Window.partitionBy($"key").orderBy($"value") withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", @@ -628,7 +628,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest test("SPARK-21258: complex object in combination with spilling") { // Make sure we trigger the spilling path. withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", - SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") { + SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "16") { val sampleSchema = new StructType(). add("f0", StringType). add("f1", LongType). diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 0869e25674e69..be24dd3ca32c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -182,6 +182,47 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { } } + def testAgainstUnsafeSorterSpillReader( + numSpillThreshold: Int, + numRows: Int, + numIterators: Int, + iterations: Int): Unit = { + val rows = testRows(numRows) + val benchmark = new Benchmark(s"Spilling SpillReader with $numRows rows", iterations * numRows, + output = output) + + benchmark.addCase("UnsafeSorterSpillReader_bufferSize1024") { _: Int => + val array = UnsafeExternalSorter.create( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + null, + null, + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + numSpillThreshold, + false) + + rows.foreach(x => + array.insertRecord( + x.getBaseObject, + x.getBaseOffset, + x.getSizeInBytes, + 0, + false)) + + for (_ <- 0L until numIterators) { + array.getIterator(0) + } + array.cleanupResources() + } + + withFakeTaskContext { + benchmark.run() + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("WITHOUT SPILL") { val spillThreshold = 100 * 1000 @@ -194,6 +235,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18) testAgainstRawUnsafeExternalSorter( config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4) + testAgainstUnsafeSorterSpillReader(5 * 1000, 16 * 1000, 100 * 1000, 1 << 4) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index 98aba3ba25f17..735368533f1ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -169,8 +169,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar // Add more rows to trigger switch to [[UnsafeExternalSorter]] and cause a spill to happen. // Verify that spill has happened - populateRows(array, 2, expectedValues) - assert(array.length == inMemoryThreshold + 1) + populateRows(array, 12, expectedValues) + assert(array.length == inMemoryThreshold + 11) assertSpill() val iterator2 = validateData(array, expectedValues) @@ -202,7 +202,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("generate iterator with start index exceeding array's size (without spill)") { - val (inMemoryThreshold, spillThreshold) = (20, 100) + val (inMemoryThreshold, spillThreshold) = (60, 100) withExternalArray(inMemoryThreshold, spillThreshold) { array => populateRows(array, spillThreshold / 2)