From 125e79c107db599b15f700c3b4023d9516b28fd1 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 30 Dec 2016 04:38:07 -0800 Subject: [PATCH 1/4] [SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server. Since Scala `Iterator.duplicate` uses a queue to buffer all items between both iterators, this causes GC and hangs. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300 --- .../SparkExecuteStatementOperation.scala | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index aeabd6a15881d..59e7cafc816db 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -50,8 +50,8 @@ private[hive] class SparkExecuteStatementOperation( with Logging { private var result: DataFrame = _ + private var resultList: Option[Array[org.apache.spark.sql.Row]] = _ private var iter: Iterator[SparkRow] = _ - private var iterHeader: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ private var statementId: String = _ @@ -103,6 +103,10 @@ private[hive] class SparkExecuteStatementOperation( } } + private def useIncrementalCollect: Boolean = { + sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + } + def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) @@ -111,9 +115,15 @@ private[hive] class SparkExecuteStatementOperation( // Reset iter to header when fetching start from first row if (order.equals(FetchOrientation.FETCH_FIRST)) { - val (ita, itb) = iterHeader.duplicate - iter = ita - iterHeader = itb + iter = if (useIncrementalCollect) { + resultList = None + result.toLocalIterator.asScala + } else { + if (resultList.isEmpty) { + resultList = Some(result.collect()) + } + resultList.get.iterator + } } if (!iter.hasNext) { @@ -227,17 +237,14 @@ private[hive] class SparkExecuteStatementOperation( } HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) iter = { - val useIncrementalCollect = - sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean if (useIncrementalCollect) { + resultList = None result.toLocalIterator.asScala } else { - result.collect().iterator + resultList = Some(result.collect()) + resultList.get.iterator } } - val (itra, itrb) = iter.duplicate - iterHeader = itra - iter = itrb dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray } catch { case e: HiveSQLException => From 00bb52dadc3ba7d93eb5d49cb866829b624366dd Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 31 Dec 2016 23:07:02 -0800 Subject: [PATCH 2/4] Use SparkRow instead of Row. --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 59e7cafc816db..15e6c92ec6cde 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -50,7 +50,7 @@ private[hive] class SparkExecuteStatementOperation( with Logging { private var result: DataFrame = _ - private var resultList: Option[Array[org.apache.spark.sql.Row]] = _ + private var resultList: Option[Array[SparkRow]] = _ private var iter: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ private var statementId: String = _ From fb4ee89ceb99bb3cdd9852028b97bc6d407890ac Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 5 Jan 2017 12:18:49 -0800 Subject: [PATCH 3/4] Create SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT and add docs. --- .../org/apache/spark/sql/internal/SQLConf.scala | 6 ++++++ .../SparkExecuteStatementOperation.scala | 13 +++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 304dcb691b323..246a704383022 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -309,6 +309,12 @@ object SQLConf { .stringConf .createOptional + val THRIFTSERVER_INCREMENTAL_COLLECT = + SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect") + .doc("When true, enable incremental collection for execution in Thrift Server.") + .booleanConf + .createWithDefault(false) + val THRIFTSERVER_UI_STATEMENT_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 15e6c92ec6cde..517b01f183926 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -50,7 +50,12 @@ private[hive] class SparkExecuteStatementOperation( with Logging { private var result: DataFrame = _ + + // We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST. + // This is only used when `spark.sql.thriftServer.incrementalCollect` is set to `false`. + // In case of `true`, this will be `None` and FETCH_FIRST will trigger re-execution. private var resultList: Option[Array[SparkRow]] = _ + private var iter: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ private var statementId: String = _ @@ -103,10 +108,6 @@ private[hive] class SparkExecuteStatementOperation( } } - private def useIncrementalCollect: Boolean = { - sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean - } - def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) @@ -115,7 +116,7 @@ private[hive] class SparkExecuteStatementOperation( // Reset iter to header when fetching start from first row if (order.equals(FetchOrientation.FETCH_FIRST)) { - iter = if (useIncrementalCollect) { + iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { resultList = None result.toLocalIterator.asScala } else { @@ -237,7 +238,7 @@ private[hive] class SparkExecuteStatementOperation( } HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) iter = { - if (useIncrementalCollect) { + if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { resultList = None result.toLocalIterator.asScala } else { From e66b165da40aae4690a1d38c0733c55aaeb578c2 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 6 Jan 2017 10:04:52 -0800 Subject: [PATCH 4/4] Make the config internal. --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 246a704383022..c9210f327eb0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -311,6 +311,7 @@ object SQLConf { val THRIFTSERVER_INCREMENTAL_COLLECT = SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect") + .internal() .doc("When true, enable incremental collection for execution in Thrift Server.") .booleanConf .createWithDefault(false)