diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index d9641cd7f72dc..74b6118a9eb92 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -311,7 +311,13 @@ abstract class InMemoryBaseTable( private var _pushedFilters: Array[Filter] = Array.empty override def build: Scan = { - val scan = InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema) + val scan = if (InMemoryBaseTable.this.ordering.nonEmpty) { + new InMemoryBatchScanWithOrdering( + data.map(_.asInstanceOf[InputPartition]), schema, tableSchema) + } else { + InMemoryBatchScan( + data.map(_.asInstanceOf[InputPartition]), schema, tableSchema) + } if (evaluableFilters.nonEmpty) { scan.filter(evaluableFilters) } @@ -471,6 +477,14 @@ abstract class InMemoryBaseTable( } } + private class InMemoryBatchScanWithOrdering( + data: Seq[InputPartition], + readSchema: StructType, + tableSchema: StructType) + extends InMemoryBatchScan(data, readSchema, tableSchema) with SupportsReportOrdering { + override def outputOrdering(): Array[SortOrder] = InMemoryBaseTable.this.ordering + } + abstract class InMemoryWriterBuilder() extends SupportsTruncate with SupportsDynamicOverwrite with SupportsStreamingUpdateAsAppend { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala index b7470ab5059cc..00aacaa93bb60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala @@ -66,7 +66,8 @@ object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with SQLConfHelpe private def ordering(plan: LogicalPlan) = plan.transformDown { case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportOrdering, _, _, _) => - val ordering = V2ExpressionUtils.toCatalystOrdering(scan.outputOrdering(), relation) + val ordering = + V2ExpressionUtils.toCatalystOrdering(scan.outputOrdering(), relation, relation.funCatalog) d.copy(ordering = Some(ordering)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index 12007cd94cd54..3aa896386b308 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -22,7 +22,7 @@ import java.sql.Date import java.util.Collections import org.apache.spark.sql.{catalyst, AnalysisException, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, Cast, Literal} +import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, Cast, Literal, TransformExpression} import org.apache.spark.sql.catalyst.expressions.objects.Invoke import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary, CoalescedHashPartitioning, HashPartitioning, RangePartitioning, UnknownPartitioning} @@ -30,9 +30,10 @@ import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions._ -import org.apache.spark.sql.connector.expressions.LogicalExpressions._ +import org.apache.spark.sql.connector.expressions.Expressions._ import org.apache.spark.sql.execution.{QueryExecution, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.streaming.MemoryStream @@ -1489,4 +1490,22 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase Seq(None) } } + + test("SPARK-56321: Scan with SupportsReportOrdering and function-based sort order") { + val bucketById = bucket(4, "id") + val tableOrdering = Array(sort(bucketById, SortDirection.ASCENDING, NullOrdering.NULLS_FIRST)) + catalog.createTable(ident, schema, Array(bucketById), emptyProps, + Distributions.unspecified(), tableOrdering, None, None) + + sql(s"INSERT INTO testcat.ns1.test_table VALUES (1, 'a', date '2021-01-01')") + + val df = sql("SELECT id, data FROM testcat.ns1.test_table") + val scans = collect(df.queryExecution.executedPlan) { case s: BatchScanExec => s } + assert(scans.size === 1) + val ordering = scans.head.outputOrdering + assert(ordering.nonEmpty, + "scan should report non-empty outputOrdering via SupportsReportOrdering") + assert(ordering.head.child.isInstanceOf[TransformExpression], + "bucket-based sort order should resolve to a TransformExpression") + } }