From 3799d18a948e853fdd0570fd125ac6ffe676aee9 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Sat, 15 Apr 2017 16:42:46 -0700 Subject: [PATCH 1/8] [SPARK-17729] [SQL] Enable creating hive bucketed tables --- .../spark/sql/execution/SparkSqlParser.scala | 6 +- .../spark/sql/execution/command/tables.scala | 11 +++- .../spark/sql/hive/HiveMetastoreCatalog.scala | 7 +-- .../sql/hive/client/HiveClientImpl.scala | 61 ++++++++++++++++--- .../hive/execution/InsertIntoHiveTable.scala | 21 +++++++ .../spark/sql/hive/HiveDDLCommandSuite.scala | 33 +++++++--- .../sql/hive/InsertIntoHiveTableSuite.scala | 47 ++++++++++++++ .../spark/sql/hive/ShowCreateTableSuite.scala | 11 +--- .../sql/hive/execution/HiveDDLSuite.scala | 19 ++++++ 9 files changed, 181 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c2c52894860b5..3c58c6e1b6780 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1072,13 +1072,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { if (ctx.skewSpec != null) { operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx) } - if (ctx.bucketSpec != null) { - operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) - } + val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil) val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil) val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) + val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) // Note: Hive requires partition columns to be distinct from the schema, so we need // to include the partition columns here explicitly @@ -1119,6 +1118,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { tableType = tableType, storage = storage, schema = schema, + bucketSpec = bucketSpec, provider = Some(DDLUtils.HIVE_PROVIDER), partitionColumnNames = partitionCols.map(_.name), properties = properties, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ebf03e1bf8869..46fb17706f3d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -902,9 +902,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n") } - if (metadata.bucketSpec.isDefined) { - throw new UnsupportedOperationException( - "Creating Hive table with bucket spec is not supported yet.") + if (metadata.bucketSpec.nonEmpty) { + val bucketSpec = metadata.bucketSpec.get + builder ++= s"CLUSTERED BY (${bucketSpec.bucketColumnNames.mkString(",")})\n" + + if (bucketSpec.sortColumnNames.nonEmpty) { + builder ++= s"SORTED BY (${bucketSpec.sortColumnNames.map(_ + " ASC").mkString(", ")})\n" + } + builder ++= s"INTO ${bucketSpec.numBuckets} BUCKETS\n" } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6b98066cb76c8..a193cd9807fd9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -119,6 +119,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val metastoreSchema = relation.tableMeta.schema val tableIdentifier = QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table) + val bucketSpec = relation.tableMeta.bucketSpec val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions val tablePath = new Path(relation.tableMeta.location) @@ -171,8 +172,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log location = fileIndex, partitionSchema = partitionSchema, dataSchema = dataSchema, - // We don't support hive bucketed tables, only ones we write out. - bucketSpec = None, + bucketSpec = bucketSpec, fileFormat = fileFormat, options = options)(sparkSession = sparkSession) val created = LogicalRelation(fsRelation, updatedTable) @@ -199,8 +199,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log sparkSession = sparkSession, paths = rootPath.toString :: Nil, userSpecifiedSchema = Option(dataSchema), - // We don't support hive bucketed tables, only ones we write out. - bucketSpec = None, + bucketSpec = bucketSpec, options = options, className = fileType).resolveRelation(), table = updatedTable) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 74e15a5777916..b91983a671609 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -27,10 +27,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} +import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order} import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.security.UserGroupInformation @@ -373,10 +374,39 @@ private[hive] class HiveClientImpl( Option(client.getTable(dbName, tableName, false)).map { h => // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema + val cols = h.getCols.asScala.map(fromHiveColumn) val partCols = h.getPartCols.asScala.map(fromHiveColumn) - val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols) + val schema = StructType(cols ++ partCols) + + val bucketSpec = if (h.getNumBuckets > 0) { + val sortColumnOrders = h.getSortCols.asScala + // Currently Spark only supports columns to be sorted in ascending order + // but Hive can support both ascending and descending order. If all the columns + // are sorted in ascending order, only then propagate the sortedness information + // to downstream processing / optimizations in Spark + // TODO: In future we can have Spark support columns sorted in descending order + val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC) + + val sortColumnNames = if (allAscendingSorted) { + sortColumnOrders.map { sortOrder => + val columnName = sortOrder.getCol + + if (!cols.exists(_.name.equalsIgnoreCase(columnName))) { + throw new AnalysisException(s"No match found for sort column name = $columnName " + + s"in table $dbName.$tableName. " + + s"Known table columns are ${cols.mkString("[", ", ", "]")}") + } + columnName + } + } else { + Seq() + } + Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames)) + } else { + None + } - // Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet) + // Skew spec and storage handler can't be mapped to CatalogTable (yet) val unsupportedFeatures = ArrayBuffer.empty[String] if (!h.getSkewedColNames.isEmpty) { @@ -387,10 +417,6 @@ private[hive] class HiveClientImpl( unsupportedFeatures += "storage handler" } - if (!h.getBucketCols.isEmpty) { - unsupportedFeatures += "bucketing" - } - if (h.getTableType == HiveTableType.VIRTUAL_VIEW && partCols.nonEmpty) { unsupportedFeatures += "partitioned view" } @@ -408,9 +434,7 @@ private[hive] class HiveClientImpl( }, schema = schema, partitionColumnNames = partCols.map(_.name), - // We can not populate bucketing information for Hive tables as Spark SQL has a different - // implementation of hash function from Hive. - bucketSpec = None, + bucketSpec = bucketSpec, owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, @@ -870,6 +894,23 @@ private[hive] object HiveClientImpl { hiveTable.setViewOriginalText(t) hiveTable.setViewExpandedText(t) } + + table.bucketSpec match { + case Some(bucketSpec) => + hiveTable.setNumBuckets(bucketSpec.numBuckets) + hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava) + + if (bucketSpec.sortColumnNames.nonEmpty) { + hiveTable.setSortCols( + bucketSpec.sortColumnNames + .map(col => new Order(col, HIVE_COLUMN_ORDER_ASC)) + .toList + .asJava + ) + } + case _ => + } + hiveTable } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3facf9f67be9f..10e17c5f73433 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -307,6 +307,27 @@ case class InsertIntoHiveTable( } } + table.bucketSpec match { + case Some(bucketSpec) => + // Writes to bucketed hive tables are allowed only if user does not care about maintaining + // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are + // set to false + val enforceBucketingConfig = "hive.enforce.bucketing" + val enforceSortingConfig = "hive.enforce.sorting" + + val message = s"Output Hive table ${table.identifier} is bucketed but Spark" + + "currently does NOT populate bucketed output which is compatible with Hive." + + if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean || + hadoopConf.get(enforceSortingConfig, "true").toBoolean) { + throw new AnalysisException(message) + } else { + logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " + + s"$enforceSortingConfig are set to false.") + } + case _ => // do nothing since table has no bucketing + } + val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 59cc6605a1243..7584f1741c62b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -367,13 +367,32 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle } test("create table - clustered by") { - val baseQuery = "CREATE TABLE my_table (id int, name string) CLUSTERED BY(id)" - val query1 = s"$baseQuery INTO 10 BUCKETS" - val query2 = s"$baseQuery SORTED BY(id) INTO 10 BUCKETS" - val e1 = intercept[ParseException] { parser.parsePlan(query1) } - val e2 = intercept[ParseException] { parser.parsePlan(query2) } - assert(e1.getMessage.contains("Operation not allowed")) - assert(e2.getMessage.contains("Operation not allowed")) + val numBuckets = 10 + val bucketedColumn = "id" + val sortColumn = "id" + val baseQuery = + s""" + CREATE TABLE my_table ( + $bucketedColumn int, + name string) + CLUSTERED BY($bucketedColumn) + """ + + val query1 = s"$baseQuery INTO $numBuckets BUCKETS" + val (desc1, _) = extractTableDesc(query1) + assert(desc1.bucketSpec.isDefined) + val bucketSpec1 = desc1.bucketSpec.get + assert(bucketSpec1.numBuckets == numBuckets) + assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn)) + assert(bucketSpec1.sortColumnNames.isEmpty) + + val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS" + val (desc2, _) = extractTableDesc(query2) + assert(desc2.bucketSpec.isDefined) + val bucketSpec2 = desc2.bucketSpec.get + assert(bucketSpec2.numBuckets == numBuckets) + assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn)) + assert(bucketSpec2.sortColumnNames.head.equals(sortColumn)) } test("create table - skewed by") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 2c724f8388693..7bd3973550043 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -495,6 +495,53 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } + private def testBucketedTable(testName: String)(f: String => Unit): Unit = { + test(s"Hive SerDe table - $testName") { + val hiveTable = "hive_table" + + withTable(hiveTable) { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql( + s""" + |CREATE TABLE $hiveTable (a INT, d INT) + |PARTITIONED BY (b INT, c INT) + |CLUSTERED BY(a) + |SORTED BY(a, d) INTO 256 BUCKETS + |STORED AS TEXTFILE + """.stripMargin) + f(hiveTable) + } + } + } + } + + testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") { + tableName => + withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") { + sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b") + checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4)) + } + } + + testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") { + tableName => + withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") { + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") + } + } + withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") { + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") + } + } + withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") { + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") + } + } + } + test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") { // Set hive.exec.stagingdir under the table directory without start with ".". withSQLConf("hive.exec.stagingdir" -> "./test") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index 4bfab0f9cfbfb..081153df8e732 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -247,21 +247,16 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing } } - test("hive bucketing is not supported") { + test("hive bucketing is supported") { withTable("t1") { - createRawHiveTable( + sql( s"""CREATE TABLE t1 (a INT, b STRING) |CLUSTERED BY (a) |SORTED BY (b) |INTO 2 BUCKETS """.stripMargin ) - - val cause = intercept[AnalysisException] { - sql("SHOW CREATE TABLE t1") - } - - assert(cause.getMessage.contains(" - bucketing")) + checkCreateTable("t1") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index c3d734e5a0366..7678996627c6a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -777,6 +777,25 @@ class HiveDDLSuite } } + test("desc table for Hive table - bucketed + sorted table") { + withTable("tbl") { + sql(s""" + CREATE TABLE tbl (id int, name string) + PARTITIONED BY (ds string) + CLUSTERED BY(id) + SORTED BY(id, name) INTO 1024 BUCKETS + """) + + assert(sql("DESC FORMATTED tbl").collect().containsSlice( + Seq( + Row("Num Buckets:", "1024", ""), + Row("Bucket Columns:", "[id]", ""), + Row("Sort Columns:", "[id, name]", "") + ) + )) + } + } + test("desc table for data source table using Hive Metastore") { assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive") val tabName = "tab1" From 6315ddaed5d7586195f3136fea44d9959fc515fe Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Sun, 16 Apr 2017 09:48:00 -0700 Subject: [PATCH 2/8] fixing failed test cases : HiveDDLSuite and HiveExternalCatalogSuite --- .../spark/sql/hive/HiveExternalCatalog.scala | 44 ++++++++++++++++++- .../sql/hive/execution/HiveDDLSuite.scala | 9 ++-- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ba48facff2933..1a98bac73f3b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -632,9 +632,51 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val rawTable = getRawTable(db, table) val withNewSchema = rawTable.copy(schema = schema) verifyColumnNames(withNewSchema) + + val newProperties = collection.mutable.Map[String, String]() + for ((property, value) <- withNewSchema.properties) { + newProperties.put(property, value) + } + for ((property, value) <- tableMetaToTableProps(withNewSchema)) { + newProperties.put(property, value) + } + + // Since the table schema will be changed, check if the bucketing and sort columns are still + // present in the new schema. If not, then discard the bucketing properties + val bucketSpec: Option[BucketSpec] = if (rawTable.bucketSpec.isDefined) { + val bucketSpec = rawTable.bucketSpec.get + val bucketColumnNames = bucketSpec.bucketColumnNames + val sortColumnNames = bucketSpec.sortColumnNames + + val bucketColumnsRetained = + bucketColumnNames.forall(col => schema.getFieldIndex(col).isDefined) + val sortColumnsRetained = sortColumnNames.forall(col => schema.getFieldIndex(col).isDefined) + + if (!bucketColumnsRetained || !sortColumnsRetained) { + newProperties.remove(DATASOURCE_SCHEMA_NUMBUCKETS) + newProperties.remove(DATASOURCE_SCHEMA_NUMBUCKETCOLS) + for (i <- bucketColumnNames.indices) { + newProperties.remove(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$i") + } + + newProperties.remove(DATASOURCE_SCHEMA_NUMSORTCOLS) + for (i <- sortColumnNames.indices) { + newProperties.remove(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$i") + } + None + } else { + rawTable.bucketSpec + } + } else { + None + } + // Add table metadata such as table schema, partition columns, etc. to table properties. val updatedTable = withNewSchema.copy( - properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) + properties = newProperties.toMap, + bucketSpec = bucketSpec + ) + try { client.alterTable(updatedTable) } catch { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 7678996627c6a..13f5c5dd8e80d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -786,11 +786,12 @@ class HiveDDLSuite SORTED BY(id, name) INTO 1024 BUCKETS """) - assert(sql("DESC FORMATTED tbl").collect().containsSlice( + val x = sql("DESC FORMATTED tbl").collect() + assert(x.containsSlice( Seq( - Row("Num Buckets:", "1024", ""), - Row("Bucket Columns:", "[id]", ""), - Row("Sort Columns:", "[id, name]", "") + Row("Num Buckets", "1024", ""), + Row("Bucket Columns", "[`id`]", ""), + Row("Sort Columns", "[`id`, `name`]", "") ) )) } From 303f442cc78c4980a5d14bc8527266aa9a1eda85 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 4 May 2017 11:10:42 -0700 Subject: [PATCH 3/8] review comment #1 --- .../catalyst/catalog/ExternalCatalog.scala | 27 ++++++++++++ .../catalyst/catalog/InMemoryCatalog.scala | 5 +++ .../sql/catalyst/catalog/SessionCatalog.scala | 4 ++ .../catalog/ExternalCatalogSuite.scala | 16 ++++++- .../spark/sql/hive/HiveExternalCatalog.scala | 42 +------------------ .../sql/hive/client/HiveClientImpl.scala | 11 +---- 6 files changed, 53 insertions(+), 52 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 974ef900e2eed..586cf5d8a2af1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.catalog +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.types.StructType @@ -335,6 +336,32 @@ abstract class ExternalCatalog postToAll(RenameFunctionEvent(db, oldName, newName)) } + final def checkSchemaContainsBucketingColumns( + bucketSpec: BucketSpec, + schema: StructType): Unit = { + val nonExistentBucketColumns = + bucketSpec.bucketColumnNames.filterNot(col => schema.map(_.name).contains(col)) + + if (nonExistentBucketColumns.nonEmpty) { + throw new AnalysisException( + s""" + |Some existing bucketing columns are not present in the new schema : + |(${nonExistentBucketColumns.mkString("[", ",", "]")}) + """.stripMargin) + } + + val nonExistentSortColumns = + bucketSpec.sortColumnNames.filterNot(col => schema.map(_.name).contains(col)) + + if (nonExistentSortColumns.nonEmpty) { + throw new AnalysisException( + s""" + |Some existing sort columns are not present in the new schema : + |(${nonExistentSortColumns.mkString("[", ",", "]")}) + """.stripMargin) + } + } + protected def doRenameFunction(db: String, oldName: String, newName: String): Unit def getFunction(db: String, funcName: String): CatalogFunction diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 8a5319bebe54e..aae1b0dd3c8ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -309,6 +309,11 @@ class InMemoryCatalog( schema: StructType): Unit = synchronized { requireTableExists(db, table) val origTable = catalog(db).tables(table).table + + if (origTable.bucketSpec.isDefined) { + checkSchemaContainsBucketingColumns(origTable.bucketSpec.get, schema) + } + catalog(db).tables(table).table = origTable.copy(schema = schema) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index f6653d384fe1d..f2da5cabfa211 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -345,6 +345,10 @@ class SessionCatalog( """.stripMargin) } + if (catalogTable.bucketSpec.isDefined) { + externalCatalog.checkSchemaContainsBucketingColumns(catalogTable.bucketSpec.get, newSchema) + } + // assuming the newSchema has all partition columns at the end as required externalCatalog.alterTableSchema(db, table, newSchema) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 42db4398e5072..dbd8c1627d335 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -247,7 +247,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() val tbl1 = catalog.getTable("db2", "tbl1") val newSchema = StructType(Seq( - StructField("new_field_1", IntegerType), + StructField("col1", IntegerType), StructField("new_field_2", StringType), StructField("a", IntegerType), StructField("b", StringType))) @@ -256,6 +256,20 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(newTbl1.schema == newSchema) } + test("alter table schema : removing bucketing column is not allowed") { + val catalog = newBasicCatalog() + val tbl1 = catalog.getTable("db2", "tbl1") + val newSchema = StructType(Seq( + StructField("new_field_1", IntegerType), + StructField("new_field_2", StringType), + StructField("a", IntegerType), + StructField("b", StringType))) + + intercept[AnalysisException] { + catalog.alterTableSchema("db2", "tbl1", newSchema) + } + } + test("get table") { assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 1a98bac73f3b5..82c52758401c6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -633,49 +633,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val withNewSchema = rawTable.copy(schema = schema) verifyColumnNames(withNewSchema) - val newProperties = collection.mutable.Map[String, String]() - for ((property, value) <- withNewSchema.properties) { - newProperties.put(property, value) - } - for ((property, value) <- tableMetaToTableProps(withNewSchema)) { - newProperties.put(property, value) - } - - // Since the table schema will be changed, check if the bucketing and sort columns are still - // present in the new schema. If not, then discard the bucketing properties - val bucketSpec: Option[BucketSpec] = if (rawTable.bucketSpec.isDefined) { - val bucketSpec = rawTable.bucketSpec.get - val bucketColumnNames = bucketSpec.bucketColumnNames - val sortColumnNames = bucketSpec.sortColumnNames - - val bucketColumnsRetained = - bucketColumnNames.forall(col => schema.getFieldIndex(col).isDefined) - val sortColumnsRetained = sortColumnNames.forall(col => schema.getFieldIndex(col).isDefined) - - if (!bucketColumnsRetained || !sortColumnsRetained) { - newProperties.remove(DATASOURCE_SCHEMA_NUMBUCKETS) - newProperties.remove(DATASOURCE_SCHEMA_NUMBUCKETCOLS) - for (i <- bucketColumnNames.indices) { - newProperties.remove(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$i") - } - - newProperties.remove(DATASOURCE_SCHEMA_NUMSORTCOLS) - for (i <- sortColumnNames.indices) { - newProperties.remove(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$i") - } - None - } else { - rawTable.bucketSpec - } - } else { - None - } - // Add table metadata such as table schema, partition columns, etc. to table properties. val updatedTable = withNewSchema.copy( - properties = newProperties.toMap, - bucketSpec = bucketSpec - ) + properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) try { client.alterTable(updatedTable) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b91983a671609..9528185c820a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -388,16 +388,7 @@ private[hive] class HiveClientImpl( val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC) val sortColumnNames = if (allAscendingSorted) { - sortColumnOrders.map { sortOrder => - val columnName = sortOrder.getCol - - if (!cols.exists(_.name.equalsIgnoreCase(columnName))) { - throw new AnalysisException(s"No match found for sort column name = $columnName " + - s"in table $dbName.$tableName. " + - s"Known table columns are ${cols.mkString("[", ", ", "]")}") - } - columnName - } + sortColumnOrders.map(_.getCol) } else { Seq() } From df3fd7ada3ed9aa79a5937e59df6d195690db91e Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 11 May 2017 07:55:26 -0700 Subject: [PATCH 4/8] nonEmpty => isDefined --- .../scala/org/apache/spark/sql/execution/command/tables.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 46fb17706f3d1..9ccd6792e5da4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -902,7 +902,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n") } - if (metadata.bucketSpec.nonEmpty) { + if (metadata.bucketSpec.isDefined) { val bucketSpec = metadata.bucketSpec.get builder ++= s"CLUSTERED BY (${bucketSpec.bucketColumnNames.mkString(",")})\n" From b2784ba0f4822704ef82fa27bf9e5408d8b98422 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 11 May 2017 23:07:06 -0700 Subject: [PATCH 5/8] check explicitly for hive tables --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 9528185c820a1..c6e1e75c3f850 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -887,7 +887,7 @@ private[hive] object HiveClientImpl { } table.bucketSpec match { - case Some(bucketSpec) => + case Some(bucketSpec) if DDLUtils.isHiveTable(table) => hiveTable.setNumBuckets(bucketSpec.numBuckets) hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava) From 0aa853940b9a79db059ae128c3f9f7d989312d60 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 12 May 2017 13:25:15 -0700 Subject: [PATCH 6/8] remove check as `alterTableSchema` ensures that dropping existing columns is not allowed --- .../catalyst/catalog/ExternalCatalog.scala | 26 ------------------- .../catalyst/catalog/InMemoryCatalog.scala | 4 --- .../sql/catalyst/catalog/SessionCatalog.scala | 5 ---- .../catalog/ExternalCatalogSuite.scala | 14 ---------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 5 ++-- 5 files changed, 2 insertions(+), 52 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 586cf5d8a2af1..718a5c505ac02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -336,32 +336,6 @@ abstract class ExternalCatalog postToAll(RenameFunctionEvent(db, oldName, newName)) } - final def checkSchemaContainsBucketingColumns( - bucketSpec: BucketSpec, - schema: StructType): Unit = { - val nonExistentBucketColumns = - bucketSpec.bucketColumnNames.filterNot(col => schema.map(_.name).contains(col)) - - if (nonExistentBucketColumns.nonEmpty) { - throw new AnalysisException( - s""" - |Some existing bucketing columns are not present in the new schema : - |(${nonExistentBucketColumns.mkString("[", ",", "]")}) - """.stripMargin) - } - - val nonExistentSortColumns = - bucketSpec.sortColumnNames.filterNot(col => schema.map(_.name).contains(col)) - - if (nonExistentSortColumns.nonEmpty) { - throw new AnalysisException( - s""" - |Some existing sort columns are not present in the new schema : - |(${nonExistentSortColumns.mkString("[", ",", "]")}) - """.stripMargin) - } - } - protected def doRenameFunction(db: String, oldName: String, newName: String): Unit def getFunction(db: String, funcName: String): CatalogFunction diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index aae1b0dd3c8ce..3b724a05de500 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -310,10 +310,6 @@ class InMemoryCatalog( requireTableExists(db, table) val origTable = catalog(db).tables(table).table - if (origTable.bucketSpec.isDefined) { - checkSchemaContainsBucketingColumns(origTable.bucketSpec.get, schema) - } - catalog(db).tables(table).table = origTable.copy(schema = schema) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index f2da5cabfa211..b314563e35580 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -344,11 +344,6 @@ class SessionCatalog( |not present in the new schema. We don't support dropping columns yet. """.stripMargin) } - - if (catalogTable.bucketSpec.isDefined) { - externalCatalog.checkSchemaContainsBucketingColumns(catalogTable.bucketSpec.get, newSchema) - } - // assuming the newSchema has all partition columns at the end as required externalCatalog.alterTableSchema(db, table, newSchema) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index dbd8c1627d335..1759ac04c0033 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -256,20 +256,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(newTbl1.schema == newSchema) } - test("alter table schema : removing bucketing column is not allowed") { - val catalog = newBasicCatalog() - val tbl1 = catalog.getTable("db2", "tbl1") - val newSchema = StructType(Seq( - StructField("new_field_1", IntegerType), - StructField("new_field_2", StringType), - StructField("a", IntegerType), - StructField("b", StringType))) - - intercept[AnalysisException] { - catalog.alterTableSchema("db2", "tbl1", newSchema) - } - } - test("get table") { assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index a193cd9807fd9..9dd8279efc1f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -119,7 +119,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val metastoreSchema = relation.tableMeta.schema val tableIdentifier = QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table) - val bucketSpec = relation.tableMeta.bucketSpec val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions val tablePath = new Path(relation.tableMeta.location) @@ -172,7 +171,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log location = fileIndex, partitionSchema = partitionSchema, dataSchema = dataSchema, - bucketSpec = bucketSpec, + bucketSpec = None, fileFormat = fileFormat, options = options)(sparkSession = sparkSession) val created = LogicalRelation(fsRelation, updatedTable) @@ -199,7 +198,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log sparkSession = sparkSession, paths = rootPath.toString :: Nil, userSpecifiedSchema = Option(dataSchema), - bucketSpec = bucketSpec, + bucketSpec = None, options = options, className = fileType).resolveRelation(), table = updatedTable) From bf306da2ea7d87887205bdc875ce849e897bb233 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Sun, 14 May 2017 08:55:03 -0700 Subject: [PATCH 7/8] review comment --- .../org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala | 1 - .../org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala | 1 - .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 1 + .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 2 -- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 ++ 5 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 718a5c505ac02..974ef900e2eed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.catalog -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.types.StructType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 3b724a05de500..8a5319bebe54e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -309,7 +309,6 @@ class InMemoryCatalog( schema: StructType): Unit = synchronized { requireTableExists(db, table) val origTable = catalog(db).tables(table).table - catalog(db).tables(table).table = origTable.copy(schema = schema) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b314563e35580..f6653d384fe1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -344,6 +344,7 @@ class SessionCatalog( |not present in the new schema. We don't support dropping columns yet. """.stripMargin) } + // assuming the newSchema has all partition columns at the end as required externalCatalog.alterTableSchema(db, table, newSchema) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 82c52758401c6..ba48facff2933 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -632,11 +632,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val rawTable = getRawTable(db, table) val withNewSchema = rawTable.copy(schema = schema) verifyColumnNames(withNewSchema) - // Add table metadata such as table schema, partition columns, etc. to table properties. val updatedTable = withNewSchema.copy( properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) - try { client.alterTable(updatedTable) } catch { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c6e1e75c3f850..7a03860aebf68 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -425,6 +425,8 @@ private[hive] class HiveClientImpl( }, schema = schema, partitionColumnNames = partCols.map(_.name), + // For data source tables, we will always overwrite the bucket spec in + // HiveExternalCatalog with the bucketing information in table properties. bucketSpec = bucketSpec, owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000, From 865711f87259245292c78098b3ce5381a99ffc55 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Mon, 15 May 2017 07:30:19 -0700 Subject: [PATCH 8/8] fix documentation --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7a03860aebf68..04f2751e79a51 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -425,8 +425,10 @@ private[hive] class HiveClientImpl( }, schema = schema, partitionColumnNames = partCols.map(_.name), - // For data source tables, we will always overwrite the bucket spec in - // HiveExternalCatalog with the bucketing information in table properties. + // If the table is written by Spark, we will put bucketing information in table properties, + // and will always overwrite the bucket spec in hive metastore by the bucketing information + // in table properties. This means, if we have bucket spec in both hive metastore and + // table properties, we will trust the one in table properties. bucketSpec = bucketSpec, owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000,