From 00bcf8a893c021fa4a949c5ac077a34881870ace Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Apr 2015 11:55:11 -0700 Subject: [PATCH 01/23] Avoid IO operations on empty files in BlockObjectWriter. --- .../apache/spark/storage/BlockObjectWriter.scala | 13 ++++++------- .../spark/util/collection/ExternalSorter.scala | 8 +++++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index a33f22ef52687..3a353d2eae886 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -184,13 +184,12 @@ private[spark] class DiskBlockObjectWriter( objOut.flush() bs.flush() close() - } - - val truncateStream = new FileOutputStream(file, true) - try { - truncateStream.getChannel.truncate(initialPosition) - } finally { - truncateStream.close() + val truncateStream = new FileOutputStream(file, true) + try { + truncateStream.getChannel.truncate(initialPosition) + } finally { + truncateStream.close() + } } } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 3b9d14f9372b6..1caf0734c2558 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -359,9 +359,11 @@ private[spark] class ExternalSorter[K, V, C]( // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() - val writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, - curWriteMetrics) - writer.open() + // We purposely don't call open() on the disk writer in order to avoid writing compression + // headers into empty files, but we still need to create the file because the read code + // expects it to exist: + file.createNewFile() + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, curWriteMetrics) } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be From 8fd89b47efbef6325e0bc45bad0b74bf8ead4a6d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Apr 2015 12:10:46 -0700 Subject: [PATCH 02/23] Do not create empty files at all. --- .../util/collection/ExternalSorter.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 1caf0734c2558..ba589ed26e17e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -359,10 +359,6 @@ private[spark] class ExternalSorter[K, V, C]( // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() - // We purposely don't call open() on the disk writer in order to avoid writing compression - // headers into empty files, but we still need to create the file because the read code - // expects it to exist: - file.createNewFile() blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, curWriteMetrics) } // Creating the file to write to and creating a disk writer both involve interacting with @@ -735,11 +731,16 @@ private[spark] class ExternalSorter[K, V, C]( val writeStartTime = System.nanoTime util.Utils.tryWithSafeFinally { for (i <- 0 until numPartitions) { - val in = new FileInputStream(partitionWriters(i).fileSegment().file) - util.Utils.tryWithSafeFinally { - lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) - } { - in.close() + val file = partitionWriters(i).fileSegment().file + if (!file.exists()) { + lengths(i) = 0 + } else { + val in = new FileInputStream(file) + util.Utils.tryWithSafeFinally { + lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) + } { + in.close() + } } } } { From 0db87c341686e7b24e760583bcc9fe9054d3095a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Apr 2015 13:30:00 -0700 Subject: [PATCH 03/23] Reduce scope of FileOutputStream in ExternalSorter --- .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index ba589ed26e17e..2603adf01c4d5 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -727,7 +727,6 @@ private[spark] class ExternalSorter[K, V, C]( // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) - val out = new FileOutputStream(outputFile, true) val writeStartTime = System.nanoTime util.Utils.tryWithSafeFinally { for (i <- 0 until numPartitions) { @@ -735,16 +734,17 @@ private[spark] class ExternalSorter[K, V, C]( if (!file.exists()) { lengths(i) = 0 } else { + val out = new FileOutputStream(outputFile, true) val in = new FileInputStream(file) util.Utils.tryWithSafeFinally { lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) } { in.close() + out.close() } } } } { - out.close() context.taskMetrics.shuffleWriteMetrics.foreach( _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } From 7e2340d05721d6374e78069baa5870e87cd0cfb1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Apr 2015 17:45:45 -0700 Subject: [PATCH 04/23] Revert "Reduce scope of FileOutputStream in ExternalSorter" This reverts commit 3c9c9447d4d4e8ddeb036167390073e3b67fb621. --- .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 2603adf01c4d5..ba589ed26e17e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -727,6 +727,7 @@ private[spark] class ExternalSorter[K, V, C]( // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) + val out = new FileOutputStream(outputFile, true) val writeStartTime = System.nanoTime util.Utils.tryWithSafeFinally { for (i <- 0 until numPartitions) { @@ -734,17 +735,16 @@ private[spark] class ExternalSorter[K, V, C]( if (!file.exists()) { lengths(i) = 0 } else { - val out = new FileOutputStream(outputFile, true) val in = new FileInputStream(file) util.Utils.tryWithSafeFinally { lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) } { in.close() - out.close() } } } } { + out.close() context.taskMetrics.shuffleWriteMetrics.foreach( _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } From 8113eac2ea7206cb24e6fe52252caaf70ff642a5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 4 Jun 2015 19:06:00 -0700 Subject: [PATCH 05/23] Hacky WIP towards speculatively running w/o reset(), then retrying with it. --- .../hive/execution/HiveComparisonTest.scala | 40 ++++++++++++++++--- .../hive/execution/HiveQueryFileTest.scala | 2 +- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index c9dd4c0935a72..d06787fe127ff 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -207,7 +207,11 @@ abstract class HiveComparisonTest } val installHooksCommand = "(?i)SET.*hooks".r - def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) { + def createQueryTest( + testCaseName: String, + sql: String, + reset: Boolean = true, + tryWithoutResettingFirst: Boolean = false) { // testCaseName must not contain ':', which is not allowed to appear in a filename of Windows assert(!testCaseName.contains(":")) @@ -238,9 +242,6 @@ abstract class HiveComparisonTest test(testCaseName) { logDebug(s"=== HIVE TEST: $testCaseName ===") - // Clear old output for this testcase. - outputDirectories.map(new File(_, testCaseName)).filter(_.exists()).foreach(_.delete()) - val sqlWithoutComment = sql.split("\n").filterNot(l => l.matches("--.*(?<=[^\\\\]);")).mkString("\n") val allQueries = @@ -267,7 +268,10 @@ abstract class HiveComparisonTest }.mkString("\n== Console version of this test ==\n", "\n", "\n") } - try { + def doTest(reset: Boolean, isSpeculative: Boolean = false): Unit = { + // Clear old output for this testcase. + outputDirectories.map(new File(_, testCaseName)).filter(_.exists()).foreach(_.delete()) + if (reset) { TestHive.reset() } @@ -390,12 +394,36 @@ abstract class HiveComparisonTest """.stripMargin stringToFile(new File(wrongDirectory, testCaseName), errorMessage + consoleTestCase) - fail(errorMessage) + if (isSpeculative && !reset) { + // TODO: log this at a very low level that won't appear in the console appender + // then throw a custom exception + fail("Failed on first run; retrying") + } else { + fail(errorMessage) + } } } // Touch passed file. new FileOutputStream(new File(passedDirectory, testCaseName)).close() + } + + try { + try { + if (tryWithoutResettingFirst) { + doTest(reset = false, isSpeculative = true) + } else { + doTest(reset) + } + } catch { + case tf: org.scalatest.exceptions.TestFailedException => + if (tryWithoutResettingFirst) { + logWarning("Test failed without reset(); retrying with reset()") + doTest(reset = true) + } else { + throw tf + } + } } catch { case tf: org.scalatest.exceptions.TestFailedException => throw tf case originalException: Exception => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala index f7b37dae0a5f3..f96c989c4614f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala @@ -59,7 +59,7 @@ abstract class HiveQueryFileTest extends HiveComparisonTest { runAll) { // Build a test case and submit it to scala test framework... val queriesString = fileToString(testCaseFile) - createQueryTest(testCaseName, queriesString) + createQueryTest(testCaseName, queriesString, reset = true, tryWithoutResettingFirst = true) } else { // Only output warnings for the built in whitelist as this clutters the output when the user // trying to execute a single test from the commandline. From 417f50e1945c026666eeda3480fe6ae0154c040b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 4 Jun 2015 19:06:19 -0700 Subject: [PATCH 06/23] Hackily comment out most of dev/run-tests to speed up Jenkins iteration. --- dev/run-tests | 108 +------------------------------------------------- 1 file changed, 2 insertions(+), 106 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index d178e2a4601ea..777565ff7bc5b 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -77,69 +77,10 @@ export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl" fi } -# Only run Hive tests if there are SQL changes. -# Partial solution for SPARK-1455. -if [ -n "$AMPLAB_JENKINS" ]; then - target_branch="$ghprbTargetBranch" - git fetch origin "$target_branch":"$target_branch" - - # AMP_JENKINS_PRB indicates if the current build is a pull request build. - if [ -n "$AMP_JENKINS_PRB" ]; then - # It is a pull request build. - sql_diffs=$( - git diff --name-only "$target_branch" \ - | grep -e "^sql/" -e "^bin/spark-sql" -e "^sbin/start-thriftserver.sh" - ) - - non_sql_diffs=$( - git diff --name-only "$target_branch" \ - | grep -v -e "^sql/" -e "^bin/spark-sql" -e "^sbin/start-thriftserver.sh" - ) - - if [ -n "$sql_diffs" ]; then - echo "[info] Detected changes in SQL. Will run Hive test suite." - _RUN_SQL_TESTS=true - - if [ -z "$non_sql_diffs" ]; then - echo "[info] Detected no changes except in SQL. Will only run SQL tests." - _SQL_TESTS_ONLY=true - fi - fi - else - # It is a regular build. We should run SQL tests. - _RUN_SQL_TESTS=true - fi -fi set -o pipefail trap 'handle_error $LINENO' ERR -echo "" -echo "=========================================================================" -echo "Running Apache RAT checks" -echo "=========================================================================" - -CURRENT_BLOCK=$BLOCK_RAT - -./dev/check-license - -echo "" -echo "=========================================================================" -echo "Running Scala style checks" -echo "=========================================================================" - -CURRENT_BLOCK=$BLOCK_SCALA_STYLE - -./dev/lint-scala - -echo "" -echo "=========================================================================" -echo "Running Python style checks" -echo "=========================================================================" - -CURRENT_BLOCK=$BLOCK_PYTHON_STYLE - -./dev/lint-python echo "" echo "=========================================================================" @@ -163,15 +104,6 @@ CURRENT_BLOCK=$BLOCK_BUILD fi } -echo "" -echo "=========================================================================" -echo "Detecting binary incompatibilities with MiMa" -echo "=========================================================================" - -CURRENT_BLOCK=$BLOCK_MIMA - -./dev/mima - echo "" echo "=========================================================================" echo "Running Spark unit tests" @@ -180,19 +112,8 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS { - # If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled. - # This must be a single argument, as it is. - if [ -n "$_RUN_SQL_TESTS" ]; then - SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver" - fi - - if [ -n "$_SQL_TESTS_ONLY" ]; then - # This must be an array of individual arguments. Otherwise, having one long string - # will be interpreted as a single test, which doesn't work. - SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "hive-thriftserver/test" "mllib/test") - else - SBT_MAVEN_TEST_ARGS=("test") - fi + SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver" + SBT_MAVEN_TEST_ARGS=("hive/test") echo "[info] Running Spark tests with these arguments: $SBT_MAVEN_PROFILES_ARGS ${SBT_MAVEN_TEST_ARGS[@]}" @@ -213,28 +134,3 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS fi } -echo "" -echo "=========================================================================" -echo "Running PySpark tests" -echo "=========================================================================" - -CURRENT_BLOCK=$BLOCK_PYSPARK_UNIT_TESTS - -# add path for python 3 in jenkins -export PATH="${PATH}:/home/anaconda/envs/py3k/bin" -./python/run-tests - -echo "" -echo "=========================================================================" -echo "Running SparkR tests" -echo "=========================================================================" - -CURRENT_BLOCK=$BLOCK_SPARKR_UNIT_TESTS - -if [ $(command -v R) ]; then - ./R/install-dev.sh - ./R/run-tests.sh -else - echo "Ignoring SparkR tests as R was not found in PATH" -fi - From f90dc9456549e4a0b052f7ad291caf954ca451a7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 4 Jun 2015 23:03:26 -0700 Subject: [PATCH 07/23] Don't pass configuration to ObjectWritable in SerializableWritable --- core/src/main/scala/org/apache/spark/SerializableWritable.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index cb2cae185256a..73867a35f7355 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -19,7 +19,6 @@ package org.apache.spark import java.io._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.ObjectWritable import org.apache.hadoop.io.Writable @@ -41,7 +40,6 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() val ow = new ObjectWritable() - ow.setConf(new Configuration()) ow.readFields(in) t = ow.get().asInstanceOf[T] } From 480d20a7d3390703a8a5ead64c3029fc32f90124 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 01:07:58 -0700 Subject: [PATCH 08/23] Broadcast configuration in hiveWriterContainers (WIP hack) --- .../apache/spark/SerializableWritable.scala | 2 +- .../hive/execution/InsertIntoHiveTable.scala | 6 +- .../spark/sql/hive/hiveWriterContainers.scala | 74 +++++++++++-------- 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index 73867a35f7355..aea97ab4225f4 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -43,4 +43,4 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa ow.readFields(in) t = ow.get().asInstanceOf[T] } -} +} \ No newline at end of file diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8613332186f28..ea930f85250a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -174,12 +174,14 @@ case class InsertIntoHiveTable( val jobConf = new JobConf(sc.hiveconf) val jobConfSer = new SerializableWritable(jobConf) + val broadcastedConf = sc.sparkContext.broadcast(new SerializableWritable[JobConf](jobConf)) val writerContainer = if (numDynamicPartitions > 0) { val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) - new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames) + new SparkHiveDynamicPartitionWriterContainer( + broadcastedConf, fileSinkConf, dynamicPartColNames) } else { - new SparkHiveWriterContainer(jobConf, fileSinkConf) + new SparkHiveWriterContainer(broadcastedConf, fileSinkConf) } saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 2bb526b14be34..df0af92c999ab 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -31,9 +31,10 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ import org.apache.hadoop.hive.common.FileUtils +import org.apache.spark.broadcast.Broadcast import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.Row -import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} +import org.apache.spark.{SerializableWritable, Logging, SparkHadoopWriter} import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim._ @@ -44,7 +45,7 @@ import org.apache.spark.sql.types._ * It is based on [[SparkHadoopWriter]]. */ private[hive] class SparkHiveWriterContainer( - @transient jobConf: JobConf, + jobConf: Broadcast[SerializableWritable[JobConf]], fileSinkConf: FileSinkDesc) extends Logging with SparkHadoopMapRedUtil @@ -56,22 +57,28 @@ private[hive] class SparkHiveWriterContainer( // handler settings can be set to jobConf if (tableDesc != null) { PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc) - Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf) + Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf.value.value) } - protected val conf = new SerializableWritable(jobConf) + @transient var conf: JobConf = jobConf.value.value private var jobID = 0 private var splitID = 0 private var attemptID = 0 - private var jID: SerializableWritable[JobID] = null - private var taID: SerializableWritable[TaskAttemptID] = null + + @transient private var jID: JobID = null + @transient private var taID: TaskAttemptID = null + private var jIDString: String = null + private var taskIDString: String = null + private var taskAttemptIDString: String = null @transient private var writer: FileSinkOperator.RecordWriter = null - @transient protected lazy val committer = conf.value.getOutputCommitter - @transient protected lazy val jobContext = newJobContext(conf.value, jID.value) - @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) + @transient protected lazy val committer = conf.getOutputCommitter + /** Only used on driver side **/ + @transient protected lazy val jobContext = newJobContext(conf, jID) + /** Only used on executor side */ + @transient private lazy val taskContext = newTaskAttemptContext(conf, taID) @transient private lazy val outputFormat = - conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] + conf.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] def driverSideSetup() { setIDs(0, 0, 0) @@ -80,6 +87,7 @@ private[hive] class SparkHiveWriterContainer( } def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) { + conf = new JobConf(jobConf.value.value) setIDs(jobId, splitId, attemptId) setConfParams() committer.setupTask(taskContext) @@ -90,7 +98,7 @@ private[hive] class SparkHiveWriterContainer( val numberFormat = NumberFormat.getInstance() numberFormat.setMinimumIntegerDigits(5) numberFormat.setGroupingUsed(false) - val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat) + val extension = Utilities.getFileExtension(conf, fileSinkConf.getCompressed, outputFormat) "part-" + numberFormat.format(splitID) + extension } @@ -110,11 +118,11 @@ private[hive] class SparkHiveWriterContainer( // NOTE this method is executed at the executor side. // For Hive tables without partitions or with only static partitions, only 1 writer is needed. writer = HiveFileFormatUtils.getHiveRecordWriter( - conf.value, + conf, fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + conf.getOutputValueClass.asInstanceOf[Class[Writable]], fileSinkConf, - FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), + FileOutputFormat.getTaskOutputPath(conf, getOutputName), Reporter.NULL) } @@ -127,17 +135,23 @@ private[hive] class SparkHiveWriterContainer( splitID = splitId attemptID = attemptId - jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId)) - taID = new SerializableWritable[TaskAttemptID]( - new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) + // note: sparkHadoopwriter.createjobid may be locale-dependent because it doesn't pass a locale + // to date format; we should fix this so that its results is location-independent in case + // different cluster nodes have different locales (e.g. driver and executor may be different + // types of machines with different configurations). + jID = SparkHadoopWriter.createJobID(now, jobId) + taID = new TaskAttemptID(new TaskID(jID, true, splitID), attemptID) + jIDString = jID.toString + taskAttemptIDString = taID.toString + taskIDString = taID.getTaskID.toString } private def setConfParams() { - conf.value.set("mapred.job.id", jID.value.toString) - conf.value.set("mapred.tip.id", taID.value.getTaskID.toString) - conf.value.set("mapred.task.id", taID.value.toString) - conf.value.setBoolean("mapred.task.is.map", true) - conf.value.setInt("mapred.task.partition", splitID) + conf.set("mapred.job.id", jIDString) + conf.set("mapred.tip.id", taskIDString) + conf.set("mapred.task.id", taskAttemptIDString) + conf.setBoolean("mapred.task.is.map", true) + conf.setInt("mapred.task.partition", splitID) } } @@ -160,14 +174,14 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer { } private[spark] class SparkHiveDynamicPartitionWriterContainer( - @transient jobConf: JobConf, + jobConf: Broadcast[SerializableWritable[JobConf]], fileSinkConf: FileSinkDesc, dynamicPartColNames: Array[String]) extends SparkHiveWriterContainer(jobConf, fileSinkConf) { import SparkHiveDynamicPartitionWriterContainer._ - private val defaultPartName = jobConf.get( + private val defaultPartName = jobConf.value.value.get( ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ @@ -191,10 +205,10 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( // Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does: // calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then // load it with loadDynamicPartitions/loadPartition/loadTable. - val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true) - jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false) + val oldMarker = jobConf.value.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true) + jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false) super.commitJob() - jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) + jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) } override def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = { @@ -229,16 +243,16 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) val path = { - val outputPath = FileOutputFormat.getOutputPath(conf.value) + val outputPath = FileOutputFormat.getOutputPath(conf) assert(outputPath != null, "Undefined job output-path") val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) new Path(workPath, getOutputName) } HiveFileFormatUtils.getHiveRecordWriter( - conf.value, + conf, fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + conf.getOutputValueClass.asInstanceOf[Class[Writable]], newFileSinkDesc, path, Reporter.NULL) From 55041d2177a3174f633bee1c7be2e4e2266c9b69 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 02:09:48 -0700 Subject: [PATCH 09/23] Use local[*] instead of local[2] --- .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 7c7afc824d7a6..a25e092cbbf9a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._ object TestHive extends TestHiveContext( new SparkContext( - "local[2]", + "local[*]", "TestSQLContext", new SparkConf() .set("spark.sql.test", "") From 57c2cb444070a5dc9cd01f7f9c0004e8c24e00dc Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 02:22:57 -0700 Subject: [PATCH 10/23] try in-memory Derby --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index fbf2c7d8cbc06..0eb4cefe7d897 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -536,7 +536,7 @@ private[hive] object HiveContext { } } propMap.put("javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=$localMetastore;create=true") + s"jdbc:derby:memory:myDB;create=true") propMap.put("datanucleus.rdbms.datastoreAdapterClassName", "org.datanucleus.store.rdbms.adapter.DerbyAdapter") propMap.toMap From 9db0abc04e782067913e0d62d864b496f5b95494 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 02:42:23 -0700 Subject: [PATCH 11/23] Avoid writing empty files in BypassMergeSortShuffleWriter --- .../spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index d3d6280284beb..e55d9e10a701b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -108,7 +108,7 @@ public void insertAll(Iterator> records) throws IOException { final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = - blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be @@ -143,6 +143,9 @@ public long[] writePartitionedFile( boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { + if (partitionWriters[i].fileSegment().length() == 0) { + continue; + } final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); boolean copyThrewException = true; try { From 9e116d119ac8d332b89c3a653dc09a9e810fe439 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 14:35:53 -0700 Subject: [PATCH 12/23] Rework SPARK-7041 for BypassMergeSort split --- .../sort/BypassMergeSortShuffleWriter.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index e55d9e10a701b..ae46ba96ca295 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -107,6 +107,14 @@ public void insertAll(Iterator> records) throws IOException { blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); + // Note that we purposely do not call open() on the disk writers here; DiskBlockObjectWriter + // will automatically open() itself if necessary. This is an optimization to avoid file + // creation and truncation for empty partitions; this optimization probably doesn't make sense + // for most realistic production workloads, but it can make a large difference when playing + // around with Spark SQL queries in spark-shell on toy datasets: if you performed a query over + // an extremely small number of records then Spark SQL's default parallelism of 200 would + // result in slower out-of-the-box performance due to these constant-factor overheads. This + // optimization speeds up local microbenchmarking and SQL unit tests. partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } @@ -144,6 +152,10 @@ public long[] writePartitionedFile( try { for (int i = 0; i < numPartitions; i++) { if (partitionWriters[i].fileSegment().length() == 0) { + // In insertAll(), we didn't create empty files for empty reduce partitions; this branch + // handles that case. Since we'll be skipping deletion of these files, verify that they + // don't exist: + assert(!partitionWriters[i].fileSegment().file().exists()); continue; } final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); @@ -175,7 +187,8 @@ public void stop() throws IOException { for (BlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: writer.revertPartialWritesAndClose(); - if (!diskBlockManager.getFile(writer.blockId()).delete()) { + final File file = diskBlockManager.getFile(writer.blockId()); + if (file.exists() && !file.delete()) { logger.error("Error while deleting file for block {}", writer.blockId()); } } From 3fe16e89cb527fe63bff3795c4f08ba2f98ad127 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 14:36:13 -0700 Subject: [PATCH 13/23] Revert "Broadcast configuration in hiveWriterContainers (WIP hack)" This reverts commit 480d20a7d3390703a8a5ead64c3029fc32f90124. --- .../apache/spark/SerializableWritable.scala | 2 +- .../hive/execution/InsertIntoHiveTable.scala | 6 +- .../spark/sql/hive/hiveWriterContainers.scala | 74 ++++++++----------- 3 files changed, 33 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index aea97ab4225f4..73867a35f7355 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -43,4 +43,4 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa ow.readFields(in) t = ow.get().asInstanceOf[T] } -} \ No newline at end of file +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index ea930f85250a6..8613332186f28 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -174,14 +174,12 @@ case class InsertIntoHiveTable( val jobConf = new JobConf(sc.hiveconf) val jobConfSer = new SerializableWritable(jobConf) - val broadcastedConf = sc.sparkContext.broadcast(new SerializableWritable[JobConf](jobConf)) val writerContainer = if (numDynamicPartitions > 0) { val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) - new SparkHiveDynamicPartitionWriterContainer( - broadcastedConf, fileSinkConf, dynamicPartColNames) + new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames) } else { - new SparkHiveWriterContainer(broadcastedConf, fileSinkConf) + new SparkHiveWriterContainer(jobConf, fileSinkConf) } saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index df0af92c999ab..2bb526b14be34 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -31,10 +31,9 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ import org.apache.hadoop.hive.common.FileUtils -import org.apache.spark.broadcast.Broadcast import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.Row -import org.apache.spark.{SerializableWritable, Logging, SparkHadoopWriter} +import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim._ @@ -45,7 +44,7 @@ import org.apache.spark.sql.types._ * It is based on [[SparkHadoopWriter]]. */ private[hive] class SparkHiveWriterContainer( - jobConf: Broadcast[SerializableWritable[JobConf]], + @transient jobConf: JobConf, fileSinkConf: FileSinkDesc) extends Logging with SparkHadoopMapRedUtil @@ -57,28 +56,22 @@ private[hive] class SparkHiveWriterContainer( // handler settings can be set to jobConf if (tableDesc != null) { PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc) - Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf.value.value) + Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf) } - @transient var conf: JobConf = jobConf.value.value + protected val conf = new SerializableWritable(jobConf) private var jobID = 0 private var splitID = 0 private var attemptID = 0 - - @transient private var jID: JobID = null - @transient private var taID: TaskAttemptID = null - private var jIDString: String = null - private var taskIDString: String = null - private var taskAttemptIDString: String = null + private var jID: SerializableWritable[JobID] = null + private var taID: SerializableWritable[TaskAttemptID] = null @transient private var writer: FileSinkOperator.RecordWriter = null - @transient protected lazy val committer = conf.getOutputCommitter - /** Only used on driver side **/ - @transient protected lazy val jobContext = newJobContext(conf, jID) - /** Only used on executor side */ - @transient private lazy val taskContext = newTaskAttemptContext(conf, taID) + @transient protected lazy val committer = conf.value.getOutputCommitter + @transient protected lazy val jobContext = newJobContext(conf.value, jID.value) + @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) @transient private lazy val outputFormat = - conf.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] + conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] def driverSideSetup() { setIDs(0, 0, 0) @@ -87,7 +80,6 @@ private[hive] class SparkHiveWriterContainer( } def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) { - conf = new JobConf(jobConf.value.value) setIDs(jobId, splitId, attemptId) setConfParams() committer.setupTask(taskContext) @@ -98,7 +90,7 @@ private[hive] class SparkHiveWriterContainer( val numberFormat = NumberFormat.getInstance() numberFormat.setMinimumIntegerDigits(5) numberFormat.setGroupingUsed(false) - val extension = Utilities.getFileExtension(conf, fileSinkConf.getCompressed, outputFormat) + val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat) "part-" + numberFormat.format(splitID) + extension } @@ -118,11 +110,11 @@ private[hive] class SparkHiveWriterContainer( // NOTE this method is executed at the executor side. // For Hive tables without partitions or with only static partitions, only 1 writer is needed. writer = HiveFileFormatUtils.getHiveRecordWriter( - conf, + conf.value, fileSinkConf.getTableInfo, - conf.getOutputValueClass.asInstanceOf[Class[Writable]], + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], fileSinkConf, - FileOutputFormat.getTaskOutputPath(conf, getOutputName), + FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), Reporter.NULL) } @@ -135,23 +127,17 @@ private[hive] class SparkHiveWriterContainer( splitID = splitId attemptID = attemptId - // note: sparkHadoopwriter.createjobid may be locale-dependent because it doesn't pass a locale - // to date format; we should fix this so that its results is location-independent in case - // different cluster nodes have different locales (e.g. driver and executor may be different - // types of machines with different configurations). - jID = SparkHadoopWriter.createJobID(now, jobId) - taID = new TaskAttemptID(new TaskID(jID, true, splitID), attemptID) - jIDString = jID.toString - taskAttemptIDString = taID.toString - taskIDString = taID.getTaskID.toString + jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId)) + taID = new SerializableWritable[TaskAttemptID]( + new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) } private def setConfParams() { - conf.set("mapred.job.id", jIDString) - conf.set("mapred.tip.id", taskIDString) - conf.set("mapred.task.id", taskAttemptIDString) - conf.setBoolean("mapred.task.is.map", true) - conf.setInt("mapred.task.partition", splitID) + conf.value.set("mapred.job.id", jID.value.toString) + conf.value.set("mapred.tip.id", taID.value.getTaskID.toString) + conf.value.set("mapred.task.id", taID.value.toString) + conf.value.setBoolean("mapred.task.is.map", true) + conf.value.setInt("mapred.task.partition", splitID) } } @@ -174,14 +160,14 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer { } private[spark] class SparkHiveDynamicPartitionWriterContainer( - jobConf: Broadcast[SerializableWritable[JobConf]], + @transient jobConf: JobConf, fileSinkConf: FileSinkDesc, dynamicPartColNames: Array[String]) extends SparkHiveWriterContainer(jobConf, fileSinkConf) { import SparkHiveDynamicPartitionWriterContainer._ - private val defaultPartName = jobConf.value.value.get( + private val defaultPartName = jobConf.get( ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ @@ -205,10 +191,10 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( // Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does: // calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then // load it with loadDynamicPartitions/loadPartition/loadTable. - val oldMarker = jobConf.value.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true) - jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false) + val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true) + jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false) super.commitJob() - jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) + jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) } override def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = { @@ -243,16 +229,16 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) val path = { - val outputPath = FileOutputFormat.getOutputPath(conf) + val outputPath = FileOutputFormat.getOutputPath(conf.value) assert(outputPath != null, "Undefined job output-path") val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) new Path(workPath, getOutputName) } HiveFileFormatUtils.getHiveRecordWriter( - conf, + conf.value, fileSinkConf.getTableInfo, - conf.getOutputValueClass.asInstanceOf[Class[Writable]], + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], newFileSinkDesc, path, Reporter.NULL) From 5c777cf40ee1f70092639a8abfe8b9598d6d3636 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 14:35:53 -0700 Subject: [PATCH 14/23] Rework SPARK-7041 for BypassMergeSort split --- .../sort/BypassMergeSortShuffleWriter.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index d3d6280284beb..a921ce64ad75a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -107,6 +107,14 @@ public void insertAll(Iterator> records) throws IOException { blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); + // Note that we purposely do not call open() on the disk writers here; DiskBlockObjectWriter + // will automatically open() itself if necessary. This is an optimization to avoid file + // creation and truncation for empty partitions; this optimization probably doesn't make sense + // for most realistic production workloads, but it can make a large difference when playing + // around with Spark SQL queries in spark-shell on toy datasets: if you performed a query over + // an extremely small number of records then Spark SQL's default parallelism of 200 would + // result in slower out-of-the-box performance due to these constant-factor overheads. This + // optimization speeds up local microbenchmarking and SQL unit tests. partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); } @@ -143,6 +151,13 @@ public long[] writePartitionedFile( boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { + if (partitionWriters[i].fileSegment().length() == 0) { + // In insertAll(), we didn't create empty files for empty reduce partitions; this branch + // handles that case. Since we'll be skipping deletion of these files, verify that they + // don't exist: + assert(!partitionWriters[i].fileSegment().file().exists()); + continue; + } final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); boolean copyThrewException = true; try { @@ -172,7 +187,8 @@ public void stop() throws IOException { for (BlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: writer.revertPartialWritesAndClose(); - if (!diskBlockManager.getFile(writer.blockId()).delete()) { + final File file = diskBlockManager.getFile(writer.blockId()); + if (file.exists() && !file.delete()) { logger.error("Error while deleting file for block {}", writer.blockId()); } } From 5ac11d150da51385a988e4982a1082ff2df0f0ac Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 15:38:05 -0700 Subject: [PATCH 15/23] Revert "Use local[*] instead of local[2]" This reverts commit 55041d2177a3174f633bee1c7be2e4e2266c9b69. --- .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index a25e092cbbf9a..7c7afc824d7a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._ object TestHive extends TestHiveContext( new SparkContext( - "local[*]", + "local[2]", "TestSQLContext", new SparkConf() .set("spark.sql.test", "") From 895de59937968921595a6e56edbdaa5358fda5f5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 15:38:10 -0700 Subject: [PATCH 16/23] Revert "try in-memory Derby" This reverts commit 57c2cb444070a5dc9cd01f7f9c0004e8c24e00dc. --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 0eb4cefe7d897..fbf2c7d8cbc06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -536,7 +536,7 @@ private[hive] object HiveContext { } } propMap.put("javax.jdo.option.ConnectionURL", - s"jdbc:derby:memory:myDB;create=true") + s"jdbc:derby:;databaseName=$localMetastore;create=true") propMap.put("datanucleus.rdbms.datastoreAdapterClassName", "org.datanucleus.store.rdbms.adapter.DerbyAdapter") propMap.toMap From bf30fee5980139b68949bbe6f0a0d77a0e0fac30 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 15:40:18 -0700 Subject: [PATCH 17/23] Revert "Don't pass configuration to ObjectWritable in SerializableWritable" This reverts commit f90dc9456549e4a0b052f7ad291caf954ca451a7. --- core/src/main/scala/org/apache/spark/SerializableWritable.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index 73867a35f7355..cb2cae185256a 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.ObjectWritable import org.apache.hadoop.io.Writable @@ -40,6 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() val ow = new ObjectWritable() + ow.setConf(new Configuration()) ow.readFields(in) t = ow.get().asInstanceOf[T] } From b1e3f82f2e16afe3316ff73bd8baea1b0e750a16 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 15:40:24 -0700 Subject: [PATCH 18/23] Revert "Hacky WIP towards speculatively running w/o reset(), then retrying with it." This reverts commit 8113eac2ea7206cb24e6fe52252caaf70ff642a5. --- .../hive/execution/HiveComparisonTest.scala | 40 +++---------------- .../hive/execution/HiveQueryFileTest.scala | 2 +- 2 files changed, 7 insertions(+), 35 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index d06787fe127ff..c9dd4c0935a72 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -207,11 +207,7 @@ abstract class HiveComparisonTest } val installHooksCommand = "(?i)SET.*hooks".r - def createQueryTest( - testCaseName: String, - sql: String, - reset: Boolean = true, - tryWithoutResettingFirst: Boolean = false) { + def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) { // testCaseName must not contain ':', which is not allowed to appear in a filename of Windows assert(!testCaseName.contains(":")) @@ -242,6 +238,9 @@ abstract class HiveComparisonTest test(testCaseName) { logDebug(s"=== HIVE TEST: $testCaseName ===") + // Clear old output for this testcase. + outputDirectories.map(new File(_, testCaseName)).filter(_.exists()).foreach(_.delete()) + val sqlWithoutComment = sql.split("\n").filterNot(l => l.matches("--.*(?<=[^\\\\]);")).mkString("\n") val allQueries = @@ -268,10 +267,7 @@ abstract class HiveComparisonTest }.mkString("\n== Console version of this test ==\n", "\n", "\n") } - def doTest(reset: Boolean, isSpeculative: Boolean = false): Unit = { - // Clear old output for this testcase. - outputDirectories.map(new File(_, testCaseName)).filter(_.exists()).foreach(_.delete()) - + try { if (reset) { TestHive.reset() } @@ -394,36 +390,12 @@ abstract class HiveComparisonTest """.stripMargin stringToFile(new File(wrongDirectory, testCaseName), errorMessage + consoleTestCase) - if (isSpeculative && !reset) { - // TODO: log this at a very low level that won't appear in the console appender - // then throw a custom exception - fail("Failed on first run; retrying") - } else { - fail(errorMessage) - } + fail(errorMessage) } } // Touch passed file. new FileOutputStream(new File(passedDirectory, testCaseName)).close() - } - - try { - try { - if (tryWithoutResettingFirst) { - doTest(reset = false, isSpeculative = true) - } else { - doTest(reset) - } - } catch { - case tf: org.scalatest.exceptions.TestFailedException => - if (tryWithoutResettingFirst) { - logWarning("Test failed without reset(); retrying with reset()") - doTest(reset = true) - } else { - throw tf - } - } } catch { case tf: org.scalatest.exceptions.TestFailedException => throw tf case originalException: Exception => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala index f96c989c4614f..f7b37dae0a5f3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala @@ -59,7 +59,7 @@ abstract class HiveQueryFileTest extends HiveComparisonTest { runAll) { // Build a test case and submit it to scala test framework... val queriesString = fileToString(testCaseFile) - createQueryTest(testCaseName, queriesString, reset = true, tryWithoutResettingFirst = true) + createQueryTest(testCaseName, queriesString) } else { // Only output warnings for the built in whitelist as this clutters the output when the user // trying to execute a single test from the commandline. From 51133707318b45b920db7f2163c7dedd6132340e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 16:25:59 -0700 Subject: [PATCH 19/23] Revert "Rework SPARK-7041 for BypassMergeSort split" This reverts commit 9e116d119ac8d332b89c3a653dc09a9e810fe439. --- .../sort/BypassMergeSortShuffleWriter.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index ae46ba96ca295..e55d9e10a701b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -107,14 +107,6 @@ public void insertAll(Iterator> records) throws IOException { blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); - // Note that we purposely do not call open() on the disk writers here; DiskBlockObjectWriter - // will automatically open() itself if necessary. This is an optimization to avoid file - // creation and truncation for empty partitions; this optimization probably doesn't make sense - // for most realistic production workloads, but it can make a large difference when playing - // around with Spark SQL queries in spark-shell on toy datasets: if you performed a query over - // an extremely small number of records then Spark SQL's default parallelism of 200 would - // result in slower out-of-the-box performance due to these constant-factor overheads. This - // optimization speeds up local microbenchmarking and SQL unit tests. partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } @@ -152,10 +144,6 @@ public long[] writePartitionedFile( try { for (int i = 0; i < numPartitions; i++) { if (partitionWriters[i].fileSegment().length() == 0) { - // In insertAll(), we didn't create empty files for empty reduce partitions; this branch - // handles that case. Since we'll be skipping deletion of these files, verify that they - // don't exist: - assert(!partitionWriters[i].fileSegment().file().exists()); continue; } final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); @@ -187,8 +175,7 @@ public void stop() throws IOException { for (BlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: writer.revertPartialWritesAndClose(); - final File file = diskBlockManager.getFile(writer.blockId()); - if (file.exists() && !file.delete()) { + if (!diskBlockManager.getFile(writer.blockId()).delete()) { logger.error("Error while deleting file for block {}", writer.blockId()); } } From fac08d5287ecc684686e95a4f2e7dc88dd22e474 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 5 Jun 2015 16:28:45 -0700 Subject: [PATCH 20/23] SPARK-8135. In SerializableWritable, don't load defaults when instantiating Configuration --- core/src/main/scala/org/apache/spark/SerializableWritable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index cb2cae185256a..beb2e27254725 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() val ow = new ObjectWritable() - ow.setConf(new Configuration()) + ow.setConf(new Configuration(false)) ow.readFields(in) t = ow.get().asInstanceOf[T] } From 2b500b981a5c31394bca43be513f5622c2e90c7b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 12 Jun 2015 13:17:19 -0700 Subject: [PATCH 21/23] Only run unsafe tests (testing a jenkins job) --- dev/run-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/run-tests b/dev/run-tests index 777565ff7bc5b..a76631d71cfbd 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -113,7 +113,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS { SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver" - SBT_MAVEN_TEST_ARGS=("hive/test") + SBT_MAVEN_TEST_ARGS=("unsafe/test") echo "[info] Running Spark tests with these arguments: $SBT_MAVEN_PROFILES_ARGS ${SBT_MAVEN_TEST_ARGS[@]}" From fbd3d037ad31331e5a7cb40dd6ee3c823ca25293 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 12 Jun 2015 13:33:13 -0700 Subject: [PATCH 22/23] Add log4j test properties to unsafe project. --- unsafe/src/test/resources/log4j.properties | 28 ++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 unsafe/src/test/resources/log4j.properties diff --git a/unsafe/src/test/resources/log4j.properties b/unsafe/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..eb3b1999eb996 --- /dev/null +++ b/unsafe/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN +org.spark-project.jetty.LEVEL=WARN From 46dd005b1515f33ae79db0403744fc8fd94a4f5a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 12 Jun 2015 13:47:32 -0700 Subject: [PATCH 23/23] Try only testing bagel instead (since I don't think Maven logs to unit-tests.log?) --- dev/run-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/run-tests b/dev/run-tests index a76631d71cfbd..9da13bf34d79f 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -113,7 +113,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS { SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver" - SBT_MAVEN_TEST_ARGS=("unsafe/test") + SBT_MAVEN_TEST_ARGS=("bagel/test") echo "[info] Running Spark tests with these arguments: $SBT_MAVEN_PROFILES_ARGS ${SBT_MAVEN_TEST_ARGS[@]}"