Skip to content
Closed
5 changes: 0 additions & 5 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -8979,11 +8979,6 @@
"Cannot delete from table <table> where <filters>."
]
},
"_LEGACY_ERROR_TEMP_1111" : {
"message" : [
"DESCRIBE does not support partition for v2 tables."
]
},
"_LEGACY_ERROR_TEMP_1114" : {
"message" : [
"The streaming sources in a query do not have a common supported execution mode.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,6 @@ case class SetNamespaceLocation(
*/
case class DescribeRelation(
relation: LogicalPlan,
partitionSpec: TablePartitionSpec,
isExtended: Boolean,
override val output: Seq[Attribute] = DescribeRelation.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = relation
Expand All @@ -922,6 +921,19 @@ object DescribeRelation {
def getOutputAttrs: Seq[Attribute] = DescribeCommandSchema.describeTableAttributes()
}

/**
* The logical plan of the DESCRIBE relation_name PARTITION command.
*/
case class DescribeTablePartition(
table: LogicalPlan,
partitionSpec: PartitionSpec,
isExtended: Boolean,
override val output: Seq[Attribute] = DescribeRelation.getOutputAttrs)
extends V2PartitionCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): DescribeTablePartition =
copy(table = newChild)
}

/**
* The logical plan of the DESCRIBE relation_name col_name command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,12 +1568,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
"filters" -> filters.mkString("[", ", ", "]")))
}

def describeDoesNotSupportPartitionForV2TablesError(): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1111",
messageParameters = Map.empty)
}

def cannotReplaceMissingTableError(
tableIdentifier: Identifier): Throwable = {
new CannotReplaceMissingTableException(tableIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.connect.proto.{ExecutePlanResponse, PipelineCommandResul
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{Command, CreateNamespace, CreateTable, CreateTableAsSelect, CreateView, DescribeRelation, DropView, InsertIntoStatement, LogicalPlan, RenameTable, ShowColumns, ShowCreateTable, ShowFunctions, ShowTableProperties, ShowTables, ShowViews}
import org.apache.spark.sql.catalyst.plans.logical.{Command, CreateNamespace, CreateTable, CreateTableAsSelect, CreateView, DescribeRelation, DescribeTablePartition, DropView, InsertIntoStatement, LogicalPlan, RenameTable, ShowColumns, ShowCreateTable, ShowFunctions, ShowTableProperties, ShowTables, ShowViews}
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.execution.command.{ShowCatalogsCommand, ShowNamespacesCommand}
Expand Down Expand Up @@ -140,6 +140,7 @@ private[connect] object PipelinesHandler extends Logging {
def blockUnsupportedSqlCommand(queryPlan: LogicalPlan): Unit = {
val allowlistedCommands = Set(
classOf[DescribeRelation],
classOf[DescribeTablePartition],
classOf[ShowTables],
classOf[ShowTableProperties],
classOf[ShowNamespacesCommand],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet.
case DescribeRelation(
resolvedChild @ ResolvedV1TableOrViewIdentifier(ident),
partitionSpec,
isExtended,
output) =>
DescribeTableCommand(resolvedChild, ident, partitionSpec, isExtended, output)
DescribeTableCommand(resolvedChild, ident, Map.empty, isExtended, output)

case DescribeTablePartition(
resolvedChild @ ResolvedV1TableOrViewIdentifier(ident),
UnresolvedPartitionSpec(spec, _),
isExtended,
output) =>
DescribeTableCommand(resolvedChild, ident, spec, isExtended, output)

case DescribeColumn(
ResolvedViewIdentifier(ident), column: UnresolvedAttribute, isExtended, output) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace,
GlobalTempView, LocalTempView, PersistedView,
PlanWithUnresolvedIdentifier, SchemaEvolution, SchemaTypeEvolution, UnresolvedAttribute,
UnresolvedIdentifier, UnresolvedNamespace, UnresolvedProcedure}
UnresolvedIdentifier, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedProcedure}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.parser._
Expand Down Expand Up @@ -1440,8 +1440,8 @@ class SparkSqlAstBuilder extends AstBuilder {
}

/**
* Create a [[DescribeColumn]] or [[DescribeRelation]] or [[DescribeRelationAsJsonCommand]]
* command.
* Create a [[DescribeColumn]], [[DescribeRelation]], [[DescribeTablePartition]], or
* [[DescribeRelationJsonCommand]] command.
*/
override def visitDescribeRelation(ctx: DescribeRelationContext): LogicalPlan = withOrigin(ctx) {
val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null
Expand All @@ -1463,7 +1463,7 @@ class SparkSqlAstBuilder extends AstBuilder {
isExtended)
}
} else {
val partitionSpec = if (ctx.partitionSpec != null) {
val rawSpec = if (ctx.partitionSpec != null) {
// According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`.
visitPartitionSpec(ctx.partitionSpec).map {
case (key, Some(value)) => key -> value
Expand All @@ -1474,9 +1474,12 @@ class SparkSqlAstBuilder extends AstBuilder {
Map.empty[String, String]
}
if (asJson) {
DescribeRelationJsonCommand(relation, partitionSpec, isExtended)
// DescribeRelationJsonCommand uses the raw Map directly (V1 path only).
DescribeRelationJsonCommand(relation, rawSpec, isExtended)
} else if (rawSpec.nonEmpty) {
DescribeTablePartition(relation, UnresolvedPartitionSpec(rawSpec), isExtended)
} else {
DescribeRelation(relation, partitionSpec, isExtended)
DescribeRelation(relation, isExtended)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,13 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case DescribeNamespace(ResolvedNamespace(catalog, ns, _), extended, output) =>
DescribeNamespaceExec(output, catalog.asNamespaceCatalog, ns, extended) :: Nil

case DescribeRelation(r: ResolvedTable, partitionSpec, isExtended, output) =>
if (partitionSpec.nonEmpty) {
throw QueryCompilationErrors.describeDoesNotSupportPartitionForV2TablesError()
}
case DescribeRelation(r: ResolvedTable, isExtended, output) =>
DescribeTableExec(output, r.table, isExtended) :: Nil

case DescribeTablePartition(r: ResolvedTable, part, isExtended, output) =>
DescribeTablePartitionExec(output, r.table.asPartitionable, r.identifier,
Seq(part).asResolvedPartitionSpecs.head, isExtended) :: Nil

case DescribeColumn(r: ResolvedTable, column, isExtended, output) =>
column match {
case c: Attribute =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ case class DescribeTableExec(
isExtended: Boolean) extends LeafV2CommandExec {
override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()
addSchema(rows)
addPartitioning(rows)
addClustering(rows)
addBaseDescription(rows)

if (isExtended) {
addMetadataColumns(rows)
Expand All @@ -50,6 +48,13 @@ case class DescribeTableExec(
rows.toSeq
}

/** Schema + partitioning + clustering rows, shared with DescribeTablePartitionExec. */
private[v2] def addBaseDescription(rows: ArrayBuffer[InternalRow]): Unit = {
addSchema(rows)
addPartitioning(rows)
addClustering(rows)
}

private def addTableDetails(rows: ArrayBuffer[InternalRow]): Unit = {
rows += emptyRow()
rows += toCatalystRow("# Detailed Table Information", "", "")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal, ToPrettyString}
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement}
import org.apache.spark.sql.errors.QueryCompilationErrors

case class DescribeTablePartitionExec(
output: Seq[Attribute],
table: SupportsPartitionManagement,
tableIdent: Identifier,
partSpec: ResolvedPartitionSpec,
isExtended: Boolean) extends LeafV2CommandExec {

override protected def run(): Seq[InternalRow] = {
val partitionRow = validateAndGetPartition()

// Delegate schema + partitioning + clustering to DescribeTableExec.
val rows = new ArrayBuffer[InternalRow]()
DescribeTableExec(output, table, isExtended = false).addBaseDescription(rows)

if (isExtended) {
addPartitionDetails(rows, partitionRow)
}
rows.toSeq
}

private def validateAndGetPartition(): InternalRow = {
val partitionSchema = table.partitionSchema()
val (names, ident) = (partSpec.names, partSpec.ident)
val partitionIdentifiers = table.listPartitionIdentifiers(names.toArray, ident)
if (partitionIdentifiers.isEmpty) {
throw QueryCompilationErrors.notExistPartitionError(tableIdent, ident, partitionSchema)
}
assert(partitionIdentifiers.length == 1)
partitionIdentifiers.head
}

private def addPartitionDetails(
rows: ArrayBuffer[InternalRow],
partitionRow: InternalRow): Unit = {
val partitionSchema = table.partitionSchema()

// Render partition values using ToPrettyString + escapePathName,
// consistent with ShowTablePartitionExec.
val len = partitionSchema.length
val partitions = new Array[String](len)
val timeZoneId = conf.sessionLocalTimeZone
for (i <- 0 until len) {
val dataType = partitionSchema(i).dataType
val partValueUTF8String = ToPrettyString(Literal(partitionRow.get(i, dataType), dataType),
Some(timeZoneId)).eval(null)
val partValueStr = if (partValueUTF8String == null) "null" else partValueUTF8String.toString
partitions(i) = escapePathName(partitionSchema(i).name) + "=" + escapePathName(partValueStr)
}
val partitionValues = partitions.mkString("[", ", ", "]")

rows += toCatalystRow("", "", "")
rows += toCatalystRow("# Detailed Partition Information", "", "")
rows += toCatalystRow("Partition Values", partitionValues, "")

val metadata = table.loadPartitionMetadata(partSpec.ident)
metadata.asScala.toSeq.sortBy(_._1).foreach { case (k, v) =>
rows += toCatalystRow(k, v, "")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ ExplainCommand 'DescribeColumn 'b, false, [info_name#x, info_value#x], SimpleMod
-- !query
EXPLAIN DESCRIBE t PARTITION (c='Us', d=2)
-- !query analysis
ExplainCommand 'DescribeRelation [c=Us, d=2], false, [col_name#x, data_type#x, comment#x], SimpleMode
ExplainCommand 'DescribeTablePartition unresolvedpartitionspec((c,Us), (d,2), None), false, [col_name#x, data_type#x, comment#x], SimpleMode


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ trait SQLQueryTestHelper extends SQLConfHelper with Logging {
case _: DescribeCommandBase
| _: DescribeColumnCommand
| _: DescribeRelation
| _: DescribeTablePartition
| _: DescribeColumn => true
case PhysicalOperation(_, _, Sort(_, true, _, _)) => true
case _ => plan.children.iterator.exists(isSemanticallySorted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.sql.execution.command

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.plans.logical.{DescribeColumn, DescribeRelation}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute,
UnresolvedPartitionSpec, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.plans.logical.{DescribeColumn, DescribeRelation,
DescribeTablePartition}
import org.apache.spark.sql.test.SharedSparkSession

class DescribeTableParserSuite extends SharedSparkSession with AnalysisTest {
Expand All @@ -28,16 +30,24 @@ class DescribeTableParserSuite extends SharedSparkSession with AnalysisTest {
test("SPARK-17328: Fix NPE with EXPLAIN DESCRIBE TABLE") {
comparePlans(parsePlan("describe t"),
DescribeRelation(
UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = false))
UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), isExtended = false))
comparePlans(parsePlan("describe table t"),
DescribeRelation(
UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = false))
UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), isExtended = false))
comparePlans(parsePlan("describe table extended t"),
DescribeRelation(
UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = true))
UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), isExtended = true))
comparePlans(parsePlan("describe table formatted t"),
DescribeRelation(
UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = true))
UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), isExtended = true))
}

test("describe table with partition spec") {
comparePlans(parsePlan("DESCRIBE TABLE t PARTITION (ds='2024-01-01')"),
DescribeTablePartition(
UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true),
UnresolvedPartitionSpec(Map("ds" -> "2024-01-01")),
isExtended = false))
}

test("describe table column") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import org.mockito.invocation.InvocationOnMock
import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, AsOfVersion, EmptyFunctionRegistry, NoSuchTableException, RelationResolution, ResolvedFieldName, ResolvedFieldPosition, ResolvedIdentifier, ResolvedTable, ResolveSessionCatalog, TimeTravelSpec, UnresolvedAttribute, UnresolvedFieldPosition, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, AsOfVersion, EmptyFunctionRegistry, NoSuchTableException, RelationResolution, ResolvedFieldName, ResolvedFieldPosition, ResolvedIdentifier, ResolvedTable, ResolveSessionCatalog, TimeTravelSpec, UnresolvedAttribute, UnresolvedFieldPosition, UnresolvedInlineTable, UnresolvedPartitionSpec, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog, TempVariableManager}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{AlterColumns, AlterColumnSpec, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DefaultValueExpression, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical.{AlterColumns, AlterColumnSpec, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DefaultValueExpression, DeleteAction, DeleteFromTable, DescribeRelation, DescribeTablePartition, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
import org.apache.spark.sql.connector.FakeV2Provider
Expand Down Expand Up @@ -943,13 +943,13 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest {
comparePlans(parsed2, expected2)
} else {
parsed1 match {
case DescribeRelation(_: ResolvedTable, _, isExtended, _) =>
case DescribeRelation(_: ResolvedTable, isExtended, _) =>
assert(!isExtended)
case _ => fail("Expect DescribeTable, but got:\n" + parsed1.treeString)
}

parsed2 match {
case DescribeRelation(_: ResolvedTable, _, isExtended, _) =>
case DescribeRelation(_: ResolvedTable, isExtended, _) =>
assert(isExtended)
case _ => fail("Expect DescribeTable, but got:\n" + parsed2.treeString)
}
Expand All @@ -965,10 +965,10 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest {
comparePlans(parsed3, expected3)
} else {
parsed3 match {
case DescribeRelation(_: ResolvedTable, partitionSpec, isExtended, _) =>
case DescribeTablePartition(_: ResolvedTable, partitionSpec, isExtended, _) =>
assert(!isExtended)
assert(partitionSpec == Map("a" -> "1"))
case _ => fail("Expect DescribeTable, but got:\n" + parsed2.treeString)
assert(partitionSpec == UnresolvedPartitionSpec(Map("a" -> "1")))
case _ => fail("Expect DescribeTablePartition, but got:\n" + parsed3.treeString)
}
}
}
Expand Down Expand Up @@ -1662,7 +1662,7 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest {
case AppendData(r: DataSourceV2Relation, _, _, _, _, _, _) =>
assert(r.catalog.contains(catalog))
assert(r.identifier.exists(_.name() == tableIdent))
case DescribeRelation(r: ResolvedTable, _, _, _) =>
case DescribeRelation(r: ResolvedTable, _, _) =>
assert(r.catalog == catalog)
assert(r.identifier.name() == tableIdent)
case ShowTableProperties(r: ResolvedTable, _, _) =>
Expand Down
Loading