Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,13 @@ abstract class InMemoryBaseTable(
private var _pushedFilters: Array[Filter] = Array.empty

override def build: Scan = {
val scan = InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
val scan = if (InMemoryBaseTable.this.ordering.nonEmpty) {
new InMemoryBatchScanWithOrdering(
data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
} else {
InMemoryBatchScan(
data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
}
if (evaluableFilters.nonEmpty) {
scan.filter(evaluableFilters)
}
Expand Down Expand Up @@ -471,6 +477,14 @@ abstract class InMemoryBaseTable(
}
}

private class InMemoryBatchScanWithOrdering(
data: Seq[InputPartition],
readSchema: StructType,
tableSchema: StructType)
extends InMemoryBatchScan(data, readSchema, tableSchema) with SupportsReportOrdering {
override def outputOrdering(): Array[SortOrder] = InMemoryBaseTable.this.ordering
}

abstract class InMemoryWriterBuilder() extends SupportsTruncate with SupportsDynamicOverwrite
with SupportsStreamingUpdateAsAppend {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with SQLConfHelpe

private def ordering(plan: LogicalPlan) = plan.transformDown {
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportOrdering, _, _, _) =>
val ordering = V2ExpressionUtils.toCatalystOrdering(scan.outputOrdering(), relation)
val ordering =
V2ExpressionUtils.toCatalystOrdering(scan.outputOrdering(), relation, relation.funCatalog)
d.copy(ordering = Some(ordering))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import java.sql.Date
import java.util.Collections

import org.apache.spark.sql.{catalyst, AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, Cast, Literal}
import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, Cast, Literal, TransformExpression}
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary, CoalescedHashPartitioning, HashPartitioning, RangePartitioning, UnknownPartitioning}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.expressions.LogicalExpressions._
import org.apache.spark.sql.connector.expressions.Expressions._
import org.apache.spark.sql.execution.{QueryExecution, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.streaming.MemoryStream
Expand Down Expand Up @@ -1489,4 +1490,22 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase
Seq(None)
}
}

test("SPARK-56321: Scan with SupportsReportOrdering and function-based sort order") {
val bucketById = bucket(4, "id")
val tableOrdering = Array(sort(bucketById, SortDirection.ASCENDING, NullOrdering.NULLS_FIRST))
catalog.createTable(ident, schema, Array(bucketById), emptyProps,
Distributions.unspecified(), tableOrdering, None, None)

sql(s"INSERT INTO testcat.ns1.test_table VALUES (1, 'a', date '2021-01-01')")

val df = sql("SELECT id, data FROM testcat.ns1.test_table")
val scans = collect(df.queryExecution.executedPlan) { case s: BatchScanExec => s }
assert(scans.size === 1)
val ordering = scans.head.outputOrdering
assert(ordering.nonEmpty,
"scan should report non-empty outputOrdering via SupportsReportOrdering")
assert(ordering.head.child.isInstanceOf[TransformExpression],
"bucket-based sort order should resolve to a TransformExpression")
}
}