diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 36c34251c317..8440ba685c3a 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -267,6 +267,14 @@ public boolean preserveDataGrouping() {
.parse();
}
+ public boolean preserveDataOrdering() {
+ return confParser
+ .booleanConf()
+ .sessionConf(SparkSQLProperties.PRESERVE_DATA_ORDERING)
+ .defaultValue(SparkSQLProperties.PRESERVE_DATA_ORDERING_DEFAULT)
+ .parse();
+ }
+
public boolean aggregatePushDownEnabled() {
return confParser
.booleanConf()
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 161f09d53e2c..ea17f2c8db21 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -40,6 +40,13 @@ private SparkSQLProperties() {}
"spark.sql.iceberg.planning.preserve-data-grouping";
public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false;
+ // Controls whether to report sort order to Spark Requires
+ // spark.sql.iceberg.planning.preserve-data-grouping=true to have any effect. Ordering
+ // is only reported when KeyGroupedPartitioning is active (i.e. grouping is enabled).
+ public static final String PRESERVE_DATA_ORDERING =
+ "spark.sql.iceberg.planning.preserve-data-ordering";
+ public static final boolean PRESERVE_DATA_ORDERING_DEFAULT = false;
+
// Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg
public static final String AGGREGATE_PUSH_DOWN_ENABLED =
"spark.sql.iceberg.aggregate-push-down.enabled";
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java
new file mode 100644
index 000000000000..005c62bf20b6
--- /dev/null
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
+import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.ProjectingInternalRow;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * A {@link PartitionReader} that reads multiple sorted files and merges them into a single sorted
+ * stream using a k-way heap merge ({@link SortedMerge}).
+ *
+ *
This reader is used when {@code preserve-data-ordering} is enabled and the task group contains
+ * multiple files that all have the same sort order.
+ *
+ *
Sort key columns absent from the requested projection are temporarily added to the read schema
+ * so that {@link SortOrderComparators} can access them during the merge. The extra columns are
+ * stripped from each row before it is returned to Spark.
+ */
+class MergingSortedRowDataReader implements PartitionReader {
+ private static final Logger LOG = LoggerFactory.getLogger(MergingSortedRowDataReader.class);
+
+ private final CloseableGroup resources;
+ private final CloseableIterator mergedIterator;
+ private final List fileReaders;
+ // non-null only when sort key columns were added to the read schema beyond what Spark projected
+ private final ProjectingInternalRow projectingRow;
+ private InternalRow current;
+
+ MergingSortedRowDataReader(SparkInputPartition partition) {
+ Table table = partition.table();
+ ScanTaskGroup taskGroup = partition.taskGroup();
+ Schema projection = partition.projection();
+ SortOrder sortOrder = table.sortOrder();
+
+ int numFiles = taskGroup.tasks().size();
+
+ Preconditions.checkState(
+ sortOrder.isSorted(), "Cannot create merging reader for unsorted table %s", table.name());
+ Preconditions.checkState(
+ numFiles > 1, "Merging reader requires multiple files, got %s", numFiles);
+
+ LOG.info(
+ "Creating merging reader for {} files with sort order {} in table {}",
+ numFiles,
+ sortOrder.orderId(),
+ table.name());
+
+ // Augment the projected schema with any sort key columns Spark did not request so that
+ // SortOrderComparators can access every sort key field during the merge.
+ Schema mergeReadSchema = mergeReadSchema(projection, sortOrder, table);
+ this.projectingRow = buildProjectingRow(projection, mergeReadSchema);
+
+ this.resources = new CloseableGroup();
+ this.fileReaders =
+ taskGroup.tasks().stream()
+ .map(
+ task ->
+ new RowDataReader(
+ table,
+ partition.io(),
+ new BaseScanTaskGroup<>(Collections.singletonList(task)),
+ mergeReadSchema,
+ partition.isCaseSensitive(),
+ partition.cacheDeleteFilesOnExecutors()))
+ .toList();
+ // Wrap each reader as a CloseableIterable and feed into SortedMerge.
+ List> fileIterables =
+ fileReaders.stream().map(this::readerToIterable).toList();
+ SortedMerge sortedMerge =
+ new SortedMerge<>(buildComparator(mergeReadSchema, sortOrder), fileIterables);
+ resources.addCloseable(sortedMerge);
+ this.mergedIterator = sortedMerge.iterator();
+ }
+
+ /**
+ * Adapts a {@link RowDataReader} to a {@link CloseableIterable} for use with {@link SortedMerge}.
+ * Each row is copied before it enters the priority queue because Spark's Parquet/ORC readers
+ * reuse {@link InternalRow} instances for performance.
+ */
+ private CloseableIterable readerToIterable(RowDataReader reader) {
+ return CloseableIterable.withNoopClose(
+ () ->
+ new CloseableIterator<>() {
+ private boolean advanced = false;
+ private boolean hasNext = false;
+
+ @Override
+ public boolean hasNext() {
+ if (!advanced) {
+ try {
+ hasNext = reader.next();
+ advanced = true;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to advance reader", e);
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public InternalRow next() {
+ if (!advanced) {
+ hasNext();
+ }
+ advanced = false;
+ return reader.get().copy();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ });
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ if (!mergedIterator.hasNext()) {
+ return false;
+ }
+
+ InternalRow merged = mergedIterator.next();
+ if (projectingRow == null) {
+ this.current = merged;
+ } else {
+ projectingRow.project(merged);
+ this.current = projectingRow;
+ }
+
+ return true;
+ }
+
+ @Override
+ public InternalRow get() {
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException {
+ resources.close();
+ }
+
+ @Override
+ public CustomTaskMetric[] currentMetricsValues() {
+ long totalDeletes =
+ fileReaders.stream()
+ .flatMap(reader -> Arrays.stream(reader.currentMetricsValues()))
+ .filter(metric -> metric instanceof TaskNumDeletes)
+ .mapToLong(CustomTaskMetric::value)
+ .sum();
+ return new CustomTaskMetric[] {
+ new TaskNumSplits(fileReaders.size()), new TaskNumDeletes(totalDeletes)
+ };
+ }
+
+ /**
+ * Builds a comparator for merging {@link InternalRow}s by the given sort order. Uses {@link
+ * SortOrderComparators} which handles all transform types (identity, bucket, truncate), ASC/DESC
+ * directions, and null ordering. The two {@link InternalRowWrapper} instances are allocated once
+ * and reused — {@code wrap()} just updates an internal reference.
+ */
+ private static Comparator buildComparator(
+ Schema mergeReadSchema, SortOrder sortOrder) {
+ StructType sparkSchema = SparkSchemaUtil.convert(mergeReadSchema);
+ Comparator keyComparator =
+ SortOrderComparators.forSchema(mergeReadSchema, sortOrder);
+ InternalRowWrapper left = new InternalRowWrapper(sparkSchema, mergeReadSchema.asStruct());
+ InternalRowWrapper right = new InternalRowWrapper(sparkSchema, mergeReadSchema.asStruct());
+ return (r1, r2) -> keyComparator.compare(left.wrap(r1), right.wrap(r2));
+ }
+
+ /**
+ * Returns a {@link ProjectingInternalRow} that remaps columns from the wider merge schema back to
+ * the requested projection, or {@code null} if no extra columns were added.
+ */
+ private static ProjectingInternalRow buildProjectingRow(Schema projection, Schema mergeSchema) {
+ if (projection.columns().size() == mergeSchema.columns().size()) {
+ return null;
+ }
+
+ List mergeColumns = mergeSchema.columns();
+ List positions = Lists.newArrayListWithCapacity(projection.columns().size());
+
+ for (int i = 0; i < projection.columns().size(); i++) {
+ int fieldId = projection.columns().get(i).fieldId();
+ boolean found = false;
+ for (int j = 0; j < mergeColumns.size(); j++) {
+ if (mergeColumns.get(j).fieldId() == fieldId) {
+ positions.add(j);
+ found = true;
+ break;
+ }
+ }
+ Preconditions.checkState(
+ found, "Projection field id=%s not found in merge read schema — this is a bug", fieldId);
+ }
+
+ StructType sparkSchema = SparkSchemaUtil.convert(projection);
+ return new ProjectingInternalRow(sparkSchema, JavaConverters.asScala(positions).toIndexedSeq());
+ }
+
+ /**
+ * Returns the schema to use when reading each file. This is the requested {@code projection}
+ * augmented with any sort key columns that are not already present, so the merge comparator can
+ * access every sort key field regardless of what Spark projected.
+ */
+ private static Schema mergeReadSchema(Schema projection, SortOrder sortOrder, Table table) {
+ Schema tableSchema = table.schema();
+ List missingFields = Lists.newArrayList();
+
+ for (SortField sortField : sortOrder.fields()) {
+ int fieldId = sortField.sourceId();
+ if (projection.findField(fieldId) == null) {
+ Types.NestedField tableField = tableSchema.findField(fieldId);
+ if (tableField != null) {
+ missingFields.add(tableField);
+ }
+ }
+ }
+
+ if (missingFields.isEmpty()) {
+ return projection;
+ }
+
+ return TypeUtil.join(projection, new Schema(missingFields));
+ }
+}
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java
new file mode 100644
index 000000000000..e3cca0020701
--- /dev/null
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Analyzes whether sort ordering can be reported for a table's task groups.
+ *
+ * For sort ordering to be reported, ALL of these conditions must hold:
+ *
+ *
+ * The table has a defined sort order (non-null and {@code sortOrder.isSorted() == true})
+ * The sort order only references top-level columns.
+ * Each partition key maps to exactly ONE task group (Spark drops the ordering guarantee when
+ * multiple {@code InputPartition}s share the same partition key)
+ * Every {@link FileScanTask} in every task group carries the current sort order ID
+ *
+ */
+class SortOrderAnalyzer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SortOrderAnalyzer.class);
+
+ private SortOrderAnalyzer() {}
+
+ /**
+ * Returns {@code true} only when sort ordering can be safely reported to Spark for the given
+ * table and task groups.
+ */
+ static boolean canReportOrdering(
+ Table table, List extends ScanTaskGroup>> taskGroups, Types.StructType groupingKeyType) {
+
+ SortOrder sortOrder = table.sortOrder();
+
+ if (sortOrder == null || sortOrder.isUnsorted()) {
+ LOG.debug("Cannot report ordering: table {} has no sort order defined", table.name());
+ return false;
+ }
+
+ if (taskGroups == null || taskGroups.isEmpty()) {
+ LOG.debug("Cannot report ordering: no task groups for table {}", table.name());
+ return false;
+ }
+
+ if (hasNestedSortFields(sortOrder, table.schema())) {
+ LOG.debug("Cannot report ordering: table {} has sort order with nested fields", table.name());
+ return false;
+ }
+
+ if (!hasUniquePartitionKeys(taskGroups, groupingKeyType)) {
+ LOG.debug(
+ "Cannot report ordering: table {} has multiple task groups sharing the same partition"
+ + " key.",
+ table.name());
+ return false;
+ }
+
+ for (ScanTaskGroup> taskGroup : taskGroups) {
+ if (!allFilesHaveSortOrder(taskGroup, sortOrder.orderId())) {
+ LOG.debug(
+ "Cannot report ordering: table {} has files whose sort order ID does not match the"
+ + " current table sort order {}",
+ table.name(),
+ sortOrder.orderId());
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks that each partition key appears in at most one task group.
+ *
+ * When multiple {@code InputPartition}s share the same partition key, Spark's {@code
+ * EnsureRequirements} coalesces them into a single task at join/aggregate time. Because this
+ * coalescing simply concatenates the partitions rather than merge-sorting them, it destroys the
+ * within-partition ordering guarantee. Reporting ordering in this situation would cause incorrect
+ * query results.
+ */
+ private static boolean hasUniquePartitionKeys(
+ List extends ScanTaskGroup>> taskGroups, Types.StructType groupingKeyType) {
+
+ if (groupingKeyType == null || groupingKeyType.fields().isEmpty()) {
+ return true;
+ }
+
+ StructLikeSet seenKeys = StructLikeSet.create(groupingKeyType);
+ for (ScanTaskGroup> taskGroup : taskGroups) {
+ StructLike key = taskGroup.groupingKey();
+ if (key != null && !seenKeys.add(key)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks that every {@link FileScanTask} in the task group carries a sort order ID that matches
+ * the table's current sort order.
+ *
+ *
Non-{@code FileScanTask} entries (e.g. changelog tasks) are skipped.
+ */
+ private static boolean allFilesHaveSortOrder(
+ ScanTaskGroup> taskGroup, int expectedSortOrderId) {
+ for (ScanTask task : taskGroup.tasks()) {
+ if (!(task instanceof FileScanTask fileTask)) {
+ continue;
+ }
+
+ Integer fileSortOrderId = fileTask.file().sortOrderId();
+
+ if (fileSortOrderId == null || fileSortOrderId != expectedSortOrderId) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private static boolean hasNestedSortFields(SortOrder sortOrder, Schema schema) {
+ for (SortField field : sortOrder.fields()) {
+ if (!TypeUtil.ancestorFields(schema, field.sourceId()).isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index 22a4b171b331..6fa99c5db7a3 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -57,6 +57,8 @@ class SparkBatch implements Batch {
private final boolean executorCacheLocalityEnabled;
private final int scanHashCode;
private final boolean cacheDeleteFilesOnExecutors;
+ private final boolean orderingEnabled;
+ private Boolean lazyAnyGroupNeedsMerging;
SparkBatch(
JavaSparkContext sparkContext,
@@ -66,7 +68,8 @@ class SparkBatch implements Batch {
Types.StructType groupingKeyType,
List extends ScanTaskGroup>> taskGroups,
Schema projection,
- int scanHashCode) {
+ int scanHashCode,
+ boolean orderingEnabled) {
this.sparkContext = sparkContext;
this.table = table;
this.fileIO = fileIO;
@@ -79,6 +82,7 @@ class SparkBatch implements Batch {
this.executorCacheLocalityEnabled = readConf.executorCacheLocalityEnabled();
this.scanHashCode = scanHashCode;
this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors();
+ this.orderingEnabled = orderingEnabled;
}
@Override
@@ -94,21 +98,35 @@ public InputPartition[] planInputPartitions() {
InputPartition[] partitions = new InputPartition[taskGroups.size()];
for (int index = 0; index < taskGroups.size(); index++) {
+ ScanTaskGroup> taskGroup = taskGroups.get(index);
+
partitions[index] =
new SparkInputPartition(
groupingKeyType,
- taskGroups.get(index),
+ taskGroup,
tableBroadcast,
fileIOBroadcast,
projectionString,
caseSensitive,
locations != null ? locations[index] : SparkPlanningUtil.NO_LOCATION_PREFERENCE,
- cacheDeleteFilesOnExecutors);
+ cacheDeleteFilesOnExecutors,
+ shouldUseMergingSortedReader(taskGroup));
}
return partitions;
}
+ /**
+ * Returns true if this task group should use a k-way merging reader. This requires ordering to be
+ * enabled at the table level, multiple files in the group, and all tasks being {@link
+ * FileScanTask}s.
+ */
+ private boolean shouldUseMergingSortedReader(ScanTaskGroup> taskGroup) {
+ return orderingEnabled
+ && taskGroup.tasks().size() > 1
+ && taskGroup.tasks().stream().allMatch(task -> task instanceof FileScanTask);
+ }
+
private String[][] computePreferredLocations() {
if (localityEnabled) {
return SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups);
@@ -144,12 +162,28 @@ private OrcBatchReadConf orcBatchReadConf() {
return ImmutableOrcBatchReadConf.builder().batchSize(readConf.orcBatchSize()).build();
}
+ private boolean anyGroupNeedsMergingReader() {
+ if (lazyAnyGroupNeedsMerging == null) {
+ lazyAnyGroupNeedsMerging =
+ orderingEnabled
+ && taskGroups.stream()
+ .anyMatch(
+ group ->
+ group.tasks().size() > 1
+ && group.tasks().stream()
+ .allMatch(task -> task instanceof FileScanTask));
+ }
+ return lazyAnyGroupNeedsMerging;
+ }
+
// conditions for using Parquet batch reads:
// - Parquet vectorization is enabled
+ // - no task group requires a merging sorted reader (incompatible with vectorized reads)
// - only primitives or metadata columns are projected
// - all tasks are of FileScanTask type and read only Parquet files
private boolean useParquetBatchReads() {
return readConf.parquetVectorizationEnabled()
+ && !anyGroupNeedsMergingReader()
&& projection.columns().stream().allMatch(this::supportsParquetBatchReads)
&& taskGroups.stream().allMatch(this::supportsParquetBatchReads);
}
@@ -174,9 +208,11 @@ private boolean supportsParquetBatchReads(Types.NestedField field) {
// conditions for using ORC batch reads:
// - ORC vectorization is enabled
+ // - no task group requires a merging sorted reader (incompatible with vectorized reads)
// - all tasks are of type FileScanTask and read only ORC files with no delete files
private boolean useOrcBatchReads() {
return readConf.orcVectorizationEnabled()
+ && !anyGroupNeedsMergingReader()
&& taskGroups.stream().allMatch(this::supportsOrcBatchReads);
}
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 57ccf92b9651..47d8c9db89a7 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -110,7 +110,8 @@ public Batch toBatch() {
EMPTY_GROUPING_KEY_TYPE,
taskGroups(),
projection,
- hashCode());
+ hashCode(),
+ false);
}
private List> taskGroups() {
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
index a3d78b43a919..997ca1bd7526 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
@@ -40,6 +40,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab
private final boolean caseSensitive;
private final transient String[] preferredLocations;
private final boolean cacheDeleteFilesOnExecutors;
+ private final boolean useMergingSortedReader;
private transient Schema projection = null;
@@ -51,7 +52,8 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab
String projectionString,
boolean caseSensitive,
String[] preferredLocations,
- boolean cacheDeleteFilesOnExecutors) {
+ boolean cacheDeleteFilesOnExecutors,
+ boolean useMergingSortedReader) {
this.groupingKeyType = groupingKeyType;
this.taskGroup = taskGroup;
this.tableBroadcast = tableBroadcast;
@@ -60,6 +62,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab
this.caseSensitive = caseSensitive;
this.preferredLocations = preferredLocations;
this.cacheDeleteFilesOnExecutors = cacheDeleteFilesOnExecutors;
+ this.useMergingSortedReader = useMergingSortedReader;
}
@Override
@@ -104,4 +107,8 @@ public Schema projection() {
return projection;
}
+
+ public boolean useMergingSortedReader() {
+ return useMergingSortedReader;
+ }
}
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 7adf3c633cd0..5f12154bfece 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -168,7 +168,8 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) {
projection,
caseSensitive,
locations != null ? locations[index] : SparkPlanningUtil.NO_LOCATION_PREFERENCE,
- cacheDeleteFilesOnExecutors);
+ cacheDeleteFilesOnExecutors,
+ false);
}
return partitions;
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index fe5eeee8fb10..78d33da991c4 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -43,12 +43,15 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.SupportsReportOrdering;
import org.apache.spark.sql.connector.read.SupportsReportPartitioning;
import org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning;
import org.apache.spark.sql.connector.read.partitioning.Partitioning;
@@ -57,18 +60,20 @@
import org.slf4j.LoggerFactory;
abstract class SparkPartitioningAwareScan extends SparkScan
- implements SupportsReportPartitioning {
+ implements SupportsReportPartitioning, SupportsReportOrdering {
private static final Logger LOG = LoggerFactory.getLogger(SparkPartitioningAwareScan.class);
private final Scan, ? extends ScanTask, ? extends ScanTaskGroup>> scan;
private final boolean preserveDataGrouping;
+ private final boolean preserveDataOrdering;
private Set specs = null; // lazy cache of scanned specs
private List tasks = null; // lazy cache of uncombined tasks
private List> taskGroups = null; // lazy cache of task groups
private StructType groupingKeyType = null; // lazy cache of the grouping key type
private Transform[] groupingKeyTransforms = null; // lazy cache of grouping key transforms
+ private Boolean orderingEnabled = null; // lazy cache of ordering decision
SparkPartitioningAwareScan(
SparkSession spark,
@@ -91,6 +96,13 @@ abstract class SparkPartitioningAwareScan extends S
this.scan = scan;
this.preserveDataGrouping = readConf.preserveDataGrouping();
+ this.preserveDataOrdering = readConf.preserveDataOrdering();
+
+ if (preserveDataOrdering && !preserveDataGrouping) {
+ throw new ValidationException(
+ "Cannot preserve data ordering without data grouping. Set %s to true or disable %s.",
+ SparkSQLProperties.PRESERVE_DATA_GROUPING, SparkSQLProperties.PRESERVE_DATA_ORDERING);
+ }
if (scan == null) {
this.specs = Collections.emptySet();
@@ -123,6 +135,34 @@ public Partitioning outputPartitioning() {
}
}
+ @Override
+ public SortOrder[] outputOrdering() {
+ if (!isOrderingEnabled()) {
+ return new SortOrder[0];
+ }
+
+ org.apache.iceberg.SortOrder sortOrder = table().sortOrder();
+ SortOrder[] ordering = Spark3Util.toOrdering(sortOrder);
+ LOG.info(
+ "Reporting sort order {} for table {}: {}", sortOrder.orderId(), table().name(), ordering);
+
+ return ordering;
+ }
+
+ @Override
+ protected boolean isOrderingEnabled() {
+ if (orderingEnabled == null) {
+ orderingEnabled =
+ !groupingKeyType().fields().isEmpty()
+ && preserveDataOrdering
+ && SortOrderAnalyzer.canReportOrdering(table(), taskGroups(), groupingKeyType());
+ if (!orderingEnabled) {
+ LOG.info("Not reporting ordering for table {}", table().name());
+ }
+ }
+ return orderingEnabled;
+ }
+
@Override
protected StructType groupingKeyType() {
if (groupingKeyType == null) {
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java
index 23699aeb167c..b5db9a847d11 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java
@@ -42,7 +42,11 @@ public PartitionReader createReader(InputPartition inputPartition)
SparkInputPartition partition = (SparkInputPartition) inputPartition;
if (partition.allTasksOfType(FileScanTask.class)) {
- return new RowDataReader(partition);
+ if (partition.useMergingSortedReader()) {
+ return new MergingSortedRowDataReader(partition);
+ } else {
+ return new RowDataReader(partition);
+ }
} else if (partition.allTasksOfType(ChangelogScanTask.class)) {
return new ChangelogRowReader(partition);
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 6b80199a255c..90d7011305c6 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -180,7 +180,8 @@ public Batch toBatch() {
groupingKeyType(),
taskGroups(),
projection,
- hashCode());
+ hashCode(),
+ isOrderingEnabled());
}
@Override
@@ -366,7 +367,20 @@ public CustomMetric[] supportedCustomMetrics() {
};
}
+ protected boolean isOrderingEnabled() {
+ return false;
+ }
+
protected long adjustSplitSize(List extends ScanTask> tasks, long splitSize) {
+ // Disable splitting tasks into multiple groups when we need to preserve ordering.
+ // This prevents multiple InputPartitions with the same partitionKey, which would
+ // cause Spark to suppress outputOrdering.
+ if (readConf.preserveDataOrdering()
+ && readConf.preserveDataGrouping()
+ && table.sortOrder().isSorted()) {
+ return Long.MAX_VALUE;
+ }
+
if (readConf.splitSizeOption() == null && readConf.adaptiveSplitSizeEnabled()) {
long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum();
int parallelism = readConf.parallelism();
diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
index 507d7b313b42..bbf2a36a10a0 100644
--- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
+++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
@@ -53,13 +53,18 @@
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec;
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.QueryExecutionListener;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import scala.PartialFunction;
+import scala.collection.JavaConverters;
public abstract class TestBase extends SparkTestHelperBase {
+ private static final AdaptiveSparkPlanHelper SPARK_HELPER = new AdaptiveSparkPlanHelper() {};
+
protected static TestHiveMetastore metastore = null;
protected static HiveConf hiveConf = null;
protected static SparkSession spark = null;
@@ -291,6 +296,25 @@ public void onFailure(String funcName, QueryExecution qe, Exception exception) {
}
}
+ /** Collect all nodes of a specific plan type from the plan tree. */
+ protected List collectPlans(SparkPlan plan, Class planClass) {
+ scala.collection.Seq seq =
+ SPARK_HELPER.collect(
+ plan,
+ new PartialFunction() {
+ @Override
+ public T apply(SparkPlan p) {
+ return planClass.cast(p);
+ }
+
+ @Override
+ public boolean isDefinedAt(SparkPlan p) {
+ return planClass.isInstance(p);
+ }
+ });
+ return JavaConverters.seqAsJavaListConverter(seq).asJava();
+ }
+
@FunctionalInterface
protected interface Action {
void invoke();
diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSortOrderAnalyzer.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSortOrderAnalyzer.java
new file mode 100644
index 000000000000..5d0d97de3c84
--- /dev/null
+++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSortOrderAnalyzer.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+class TestSortOrderAnalyzer {
+
+ private static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()));
+
+ private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build();
+
+ // Represents the grouping key type for a single-column string partition
+ private static final Types.StructType KEY_TYPE =
+ Types.StructType.of(Types.NestedField.required(3, "partition", Types.StringType.get()));
+
+ @Test
+ void testUnsortedTableReturnsFalse() {
+ Table table = mockTable(SortOrder.unsorted());
+ List> groups = ImmutableList.of(taskGroupWithKey(SORT_ORDER.orderId(), "P1"));
+
+ assertThat(SortOrderAnalyzer.canReportOrdering(table, groups, KEY_TYPE)).isFalse();
+ }
+
+ @Test
+ void testNullTaskGroupsReturnsFalse() {
+ Table table = mockTable(SORT_ORDER);
+
+ assertThat(SortOrderAnalyzer.canReportOrdering(table, null, KEY_TYPE)).isFalse();
+ }
+
+ @Test
+ void testEmptyTaskGroupsReturnsFalse() {
+ Table table = mockTable(SORT_ORDER);
+
+ assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(), KEY_TYPE)).isFalse();
+ }
+
+ @Test
+ void testFileWithMismatchedSortOrderIdReturnsFalse() {
+ Table table = mockTable(SORT_ORDER);
+ // File was written with sort order ID 999, which doesn't match the table's current order
+ ScanTaskGroup> group = taskGroupWithKey(999, "P1");
+
+ assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE))
+ .isFalse();
+ }
+
+ @Test
+ void testFileWithNullSortOrderIdReturnsFalse() {
+ Table table = mockTable(SORT_ORDER);
+ // File has no sort order recorded (null) — written before sort order was set
+ ScanTaskGroup> group = taskGroupWithNullSortOrderId("P1");
+
+ assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE))
+ .isFalse();
+ }
+
+ @Test
+ void testOnlyOneFileWithWrongSortOrderInMultiFileGroupReturnsFalse() {
+ Table table = mockTable(SORT_ORDER);
+ // Two files: one correct, one from old sort order — the group must fail
+ FileScanTask goodTask = fileTask(SORT_ORDER.orderId());
+ FileScanTask badTask = fileTask(999);
+ ScanTaskGroup> group = taskGroupWithTasks("P1", ImmutableList.of(goodTask, badTask));
+
+ assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE))
+ .isFalse();
+ }
+
+ @Test
+ void testDuplicatePartitionKeyReturnsFalse() {
+ Table table = mockTable(SORT_ORDER);
+ // Two task groups that share the same partition key value — Spark coalesces them
+ // without merge-sorting, destroying the ordering guarantee.
+ ScanTaskGroup> group1 = taskGroupWithKey(SORT_ORDER.orderId(), "same-partition");
+ ScanTaskGroup> group2 = taskGroupWithKey(SORT_ORDER.orderId(), "same-partition");
+
+ assertThat(
+ SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group1, group2), KEY_TYPE))
+ .isFalse();
+ }
+
+ @Test
+ void testUniquePartitionKeysReturnsTrue() {
+ Table table = mockTable(SORT_ORDER);
+ ScanTaskGroup> group1 = taskGroupWithKey(SORT_ORDER.orderId(), "partition-A");
+ ScanTaskGroup> group2 = taskGroupWithKey(SORT_ORDER.orderId(), "partition-B");
+
+ assertThat(
+ SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group1, group2), KEY_TYPE))
+ .isTrue();
+ }
+
+ @Test
+ void testUnpartitionedTableSkipsUniquenessCheck() {
+ Table table = mockTable(SORT_ORDER);
+ // Both groups have null grouping key — acceptable for unpartitioned tables
+ ScanTaskGroup> group1 = taskGroupWithNullKey(SORT_ORDER.orderId());
+ ScanTaskGroup> group2 = taskGroupWithNullKey(SORT_ORDER.orderId());
+
+ // Empty struct type signals unpartitioned; uniqueness check is skipped
+ assertThat(
+ SortOrderAnalyzer.canReportOrdering(
+ table, ImmutableList.of(group1, group2), Types.StructType.of()))
+ .isTrue();
+ }
+
+ @Test
+ void testSingleGroupSingleFileReturnsTrue() {
+ Table table = mockTable(SORT_ORDER);
+ ScanTaskGroup> group = taskGroupWithKey(SORT_ORDER.orderId(), "P1");
+
+ assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE))
+ .isTrue();
+ }
+
+ @Test
+ void testSingleGroupMultipleFilesAllMatchingReturnsTrue() {
+ Table table = mockTable(SORT_ORDER);
+ FileScanTask task1 = fileTask(SORT_ORDER.orderId());
+ FileScanTask task2 = fileTask(SORT_ORDER.orderId());
+ ScanTaskGroup> group = taskGroupWithTasks("P1", ImmutableList.of(task1, task2));
+
+ assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE))
+ .isTrue();
+ }
+
+ @Test
+ void testMultipleGroupsAllValidReturnsTrue() {
+ Table table = mockTable(SORT_ORDER);
+ ScanTaskGroup> group1 = taskGroupWithKey(SORT_ORDER.orderId(), "P1");
+ ScanTaskGroup> group2 = taskGroupWithKey(SORT_ORDER.orderId(), "P2");
+ ScanTaskGroup> group3 = taskGroupWithKey(SORT_ORDER.orderId(), "P3");
+
+ assertThat(
+ SortOrderAnalyzer.canReportOrdering(
+ table, ImmutableList.of(group1, group2, group3), KEY_TYPE))
+ .isTrue();
+ }
+
+ private static Table mockTable(SortOrder sortOrder) {
+ Table table = mock(Table.class);
+ when(table.sortOrder()).thenReturn(sortOrder);
+ when(table.schema()).thenReturn(SCHEMA);
+ when(table.name()).thenReturn("test_table");
+ return table;
+ }
+
+ private static ScanTaskGroup> taskGroupWithKey(int sortOrderId, String partitionValue) {
+ return taskGroupWithTasks(partitionValue, ImmutableList.of(fileTask(sortOrderId)));
+ }
+
+ private static ScanTaskGroup> taskGroupWithNullSortOrderId(String partitionValue) {
+ FileScanTask task = mock(FileScanTask.class);
+ DataFile file = mock(DataFile.class);
+ when(file.sortOrderId()).thenReturn(null);
+ when(task.file()).thenReturn(file);
+
+ ScanTaskGroup> group = mock(ScanTaskGroup.class);
+ doReturn(ImmutableList.of(task)).when(group).tasks();
+ when(group.groupingKey()).thenReturn(makeKey(partitionValue));
+ return group;
+ }
+
+ private static ScanTaskGroup> taskGroupWithNullKey(int sortOrderId) {
+ ScanTaskGroup> group = mock(ScanTaskGroup.class);
+ doReturn(ImmutableList.of(fileTask(sortOrderId))).when(group).tasks();
+ when(group.groupingKey()).thenReturn(null);
+ return group;
+ }
+
+ private static ScanTaskGroup> taskGroupWithTasks(
+ String partitionValue, List tasks) {
+ ScanTaskGroup> group = mock(ScanTaskGroup.class);
+ doReturn(tasks).when(group).tasks();
+ when(group.groupingKey()).thenReturn(makeKey(partitionValue));
+ return group;
+ }
+
+ private static FileScanTask fileTask(int sortOrderId) {
+ FileScanTask task = mock(FileScanTask.class);
+ DataFile file = mock(DataFile.class);
+ when(file.sortOrderId()).thenReturn(sortOrderId);
+ when(task.file()).thenReturn(file);
+ return task;
+ }
+
+ private static StructLike makeKey(String value) {
+ GenericRecord record = GenericRecord.create(KEY_TYPE);
+ record.setField("partition", value);
+ return record;
+ }
+
+ @Test
+ void testNestedSortFieldReturnsFalse() {
+ Schema nestedSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(
+ 2,
+ "data",
+ Types.StructType.of(
+ Types.NestedField.required(3, "sort_key", Types.IntegerType.get()),
+ Types.NestedField.optional(4, "value", Types.StringType.get()))),
+ Types.NestedField.required(5, "part", Types.StringType.get()));
+ SortOrder nestedSortOrder = SortOrder.builderFor(nestedSchema).asc("data.sort_key").build();
+ Table table = mock(Table.class);
+ when(table.sortOrder()).thenReturn(nestedSortOrder);
+ when(table.schema()).thenReturn(nestedSchema);
+ when(table.name()).thenReturn("test_table");
+
+ ScanTaskGroup> group = taskGroupWithKey(nestedSortOrder.orderId(), "P1");
+
+ assertThat(SortOrderAnalyzer.canReportOrdering(table, ImmutableList.of(group), KEY_TYPE))
+ .isFalse();
+ }
+}
diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSupportsReportOrdering.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSupportsReportOrdering.java
new file mode 100644
index 000000000000..ce6ac66b56e9
--- /dev/null
+++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSupportsReportOrdering.java
@@ -0,0 +1,1003 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.ReplaceSortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkSQLProperties;
+import org.apache.iceberg.spark.TestBaseWithCatalog;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.SupportsReportOrdering;
+import org.apache.spark.sql.execution.SortExec;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+class TestSupportsReportOrdering extends TestBaseWithCatalog {
+
+ private static final Map ENABLED_ORDERING_SQL_CONF = orderingConfig(true);
+ private static final Map DISABLED_ORDERING_SQL_CONF = orderingConfig(false);
+
+ private static Map orderingConfig(boolean preserveOrdering) {
+ return ImmutableMap.builder()
+ .put(SparkSQLProperties.PRESERVE_DATA_ORDERING, String.valueOf(preserveOrdering))
+ .put(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true")
+ .put("spark.sql.autoBroadcastJoinThreshold", "-1")
+ .put("spark.sql.adaptive.enabled", "false")
+ .put("spark.sql.sources.v2.bucketing.enabled", "true")
+ .put("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
+ .put("spark.sql.requireAllClusterKeysForCoPartition", "false")
+ .buildOrThrow();
+ }
+
+ @BeforeEach
+ void useCatalog() {
+ sql("USE %s", catalogName);
+ }
+
+ @AfterEach
+ void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s", tableName("table_source"));
+ spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_ORDERING);
+ spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_GROUPING);
+ }
+
+ @TestTemplate
+ void testMergingMultipleSortedFiles() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")),
+ ImmutableList.of(new SimpleRecord(3, "c"), new SimpleRecord(4, "d")),
+ ImmutableList.of(new SimpleRecord(5, "e"), new SimpleRecord(6, "f")),
+ ImmutableList.of(new SimpleRecord(7, "g"), new SimpleRecord(8, "h")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ Dataset result =
+ spark.sql(String.format("SELECT id, data FROM %s ORDER BY id", tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(8)
+ .containsExactly(
+ row(1, "a"),
+ row(2, "b"),
+ row(3, "c"),
+ row(4, "d"),
+ row(5, "e"),
+ row(6, "f"),
+ row(7, "g"),
+ row(8, "h"));
+ }
+
+ @TestTemplate
+ void testMergingWithDuplicateSortKeyValues() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ // The same id values appear across multiple files — the k-way merge must correctly
+ // interleave rows with equal keys rather than dropping or mis-ordering them.
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")),
+ ImmutableList.of(new SimpleRecord(1, "c"), new SimpleRecord(2, "d")),
+ ImmutableList.of(new SimpleRecord(1, "e"), new SimpleRecord(3, "f")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ Dataset result =
+ spark.sql(String.format("SELECT id, data FROM %s ORDER BY id, data", tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(6)
+ .containsExactly(
+ row(1, "a"), row(1, "c"), row(1, "e"), row(2, "b"), row(2, "d"), row(3, "f"));
+ }
+
+ @TestTemplate
+ void testMergingWithNullsInSortKeyColumn() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table, "c1"); // ASC NULLS FIRST (Iceberg default)
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ sql("INSERT INTO %s VALUES (null, 'x', 'P1'), (3, 'c', 'P1')", tableName);
+ sql("INSERT INTO %s VALUES (null, 'y', 'P1'), (1, 'a', 'P1'), (2, 'b', 'P1')", tableName);
+
+ Dataset result =
+ spark.sql(
+ String.format(
+ "SELECT c1, c2 FROM %s WHERE c3 = 'P1' ORDER BY c1 ASC NULLS FIRST, c2",
+ tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(5)
+ .containsExactly(row(null, "x"), row(null, "y"), row(1, "a"), row(2, "b"), row(3, "c"));
+ }
+
+ @TestTemplate
+ void testMergingWithNullsInDescendingSortKeyColumn() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ table.replaceSortOrder().desc("c1").commit(); // DESC NULLS LAST (Iceberg default for DESC)
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ sql("INSERT INTO %s VALUES (null, 'x', 'P1'), (1, 'a', 'P1')", tableName);
+ sql("INSERT INTO %s VALUES (null, 'y', 'P1'), (3, 'c', 'P1'), (2, 'b', 'P1')", tableName);
+
+ Dataset result =
+ spark.sql(
+ String.format(
+ "SELECT c1, c2 FROM %s WHERE c3 = 'P1' ORDER BY c1 DESC NULLS LAST, c2",
+ tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(5)
+ .containsExactly(row(3, "c"), row(2, "b"), row(1, "a"), row(null, "x"), row(null, "y"));
+ }
+
+ @TestTemplate
+ void testDescendingSortOrder() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ table.replaceSortOrder().desc("id").commit();
+
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(10, "j"), new SimpleRecord(9, "i")),
+ ImmutableList.of(new SimpleRecord(8, "h"), new SimpleRecord(7, "g")),
+ ImmutableList.of(new SimpleRecord(6, "f"), new SimpleRecord(4, "d")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ Dataset result = spark.sql(String.format("SELECT id FROM %s ORDER BY id DESC", tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(6).containsExactly(row(10), row(9), row(8), row(7), row(6), row(4));
+ }
+
+ @TestTemplate
+ void testMultiColumnSortOrder() throws NoSuchTableException {
+ Table table = createThreeColumnTable(tableName);
+ setSortOrder(table, "c3", "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "A"), new ThreeColumnRecord(3, "c", "A")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "A"), new ThreeColumnRecord(1, "a", "B")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "B"), new ThreeColumnRecord(3, "c", "B")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ Dataset result =
+ spark.sql(String.format("SELECT c3, c1, c2 FROM %s ORDER BY c3, c1", tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(6)
+ .containsExactly(
+ row("A", 1, "a"),
+ row("A", 2, "b"),
+ row("A", 3, "c"),
+ row("B", 1, "a"),
+ row("B", 2, "b"),
+ row("B", 3, "c"));
+ }
+
+ @TestTemplate
+ void testSingleFileDoesNotRequireMerging() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ List batch = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
+ spark.createDataFrame(batch, SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ Dataset result = spark.sql(String.format("SELECT * FROM %s", tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(2);
+ }
+
+ @TestTemplate
+ void testPartitionedTableWithMultipleFilesPerPartition() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "P1"), new ThreeColumnRecord(3, "c", "P1")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "P1"), new ThreeColumnRecord(4, "d", "P1")),
+ ImmutableList.of(new ThreeColumnRecord(5, "e", "P2"), new ThreeColumnRecord(7, "g", "P2")),
+ ImmutableList.of(new ThreeColumnRecord(6, "f", "P2"), new ThreeColumnRecord(8, "h", "P2")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ Dataset p1Result =
+ spark.sql(String.format("SELECT c1, c2 FROM %s WHERE c3 = 'P1' ORDER BY c1", tableName));
+ List p1Rows = rowsToJava(p1Result.collectAsList());
+
+ assertThat(p1Rows)
+ .hasSize(4)
+ .containsExactly(row(1, "a"), row(2, "b"), row(3, "c"), row(4, "d"));
+
+ Dataset p2Result =
+ spark.sql(String.format("SELECT c1, c2 FROM %s WHERE c3 = 'P2' ORDER BY c1", tableName));
+ List p2Rows = rowsToJava(p2Result.collectAsList());
+
+ assertThat(p2Rows)
+ .hasSize(4)
+ .containsExactly(row(5, "e"), row(6, "f"), row(7, "g"), row(8, "h"));
+ }
+
+ @TestTemplate
+ void testOrderingNotReportedWhenDisabled() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ List batch = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
+ spark.createDataFrame(batch, SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+ spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_ORDERING);
+
+ Dataset result = spark.sql(String.format("SELECT * FROM %s", tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(2);
+ }
+
+ @TestTemplate
+ void testOrderingFailsWhenGroupingDisabled() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "P1"), new ThreeColumnRecord(3, "c", "P1")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "P1"), new ThreeColumnRecord(4, "d", "P1")));
+
+ withSQLConf(
+ ImmutableMap.of(
+ SparkSQLProperties.PRESERVE_DATA_ORDERING,
+ "true",
+ SparkSQLProperties.PRESERVE_DATA_GROUPING,
+ "false",
+ "spark.sql.autoBroadcastJoinThreshold",
+ "-1",
+ "spark.sql.adaptive.enabled",
+ "false"),
+ () ->
+ assertThatThrownBy(
+ () ->
+ spark
+ .sql(String.format("SELECT c1, c2 FROM %s ORDER BY c1", tableName))
+ .collectAsList())
+ .hasMessageContaining("Cannot preserve data ordering without data grouping"));
+ }
+
+ @TestTemplate
+ void testOrderingNotReportedForUnsortedTable() throws NoSuchTableException {
+ createSimpleTable(tableName);
+
+ List batch = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
+ spark.createDataFrame(batch, SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ Dataset result = spark.sql(String.format("SELECT * FROM %s", tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(2);
+ }
+
+ @TestTemplate
+ void testNoMergeReaderForUnpartitionedSortedTable() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName); // unpartitioned
+ setSortOrder(table, "id");
+
+ // Multiple files so that a merge reader *would* be created if the bug were present
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(3, "c")),
+ ImmutableList.of(new SimpleRecord(2, "b"), new SimpleRecord(4, "d")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ SparkPlan plan =
+ executeAndKeepPlan(String.format("SELECT id, data FROM %s ORDER BY id", tableName));
+ List sorts = collectPlans(plan, SortExec.class);
+
+ assertThat(sorts).isNotEmpty();
+
+ Dataset result = spark.sql(String.format("SELECT id FROM %s ORDER BY id", tableName));
+ List rows = rowsToJava(result.collectAsList());
+ assertThat(rows).containsExactly(row(1), row(2), row(3), row(4));
+ }
+
+ @TestTemplate
+ void testSortRequiredWhenOrderingNotReported() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")),
+ ImmutableList.of(new SimpleRecord(3, "c"), new SimpleRecord(4, "d")));
+
+ spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_ORDERING);
+
+ SparkPlan plan =
+ executeAndKeepPlan(String.format("SELECT id, data FROM %s ORDER BY id", tableName));
+
+ List sorts = collectPlans(plan, SortExec.class);
+
+ assertThat(sorts).isNotEmpty();
+ }
+
+ @TestTemplate
+ void testSortMergeJoinWithSortedTables() throws NoSuchTableException {
+ createBucketedTable(tableName, "c1");
+ createBucketedTable(tableName("table_source"), "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "X"), new ThreeColumnRecord(2, "b", "X")),
+ ImmutableList.of(new ThreeColumnRecord(3, "c", "X"), new ThreeColumnRecord(4, "d", "X")));
+
+ writeBatches(
+ tableName("table_source"),
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "A", "Y"), new ThreeColumnRecord(2, "B", "Y")),
+ ImmutableList.of(new ThreeColumnRecord(3, "C", "Y"), new ThreeColumnRecord(4, "D", "Y")));
+
+ assertPlanWithoutSort(
+ 0,
+ 2,
+ null,
+ "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c1 = t2.c1",
+ tableName,
+ tableName("table_source"));
+ }
+
+ @TestTemplate
+ void testMergeWithSortedBucketedTables() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('write.merge.mode' = 'merge-on-read', '%s' = '%d')",
+ tableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table targetTable = validationCatalog.loadTable(tableIdent);
+ setSortOrder(targetTable, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "old1", "data1"), new ThreeColumnRecord(2, "old2", "data2")),
+ ImmutableList.of(
+ new ThreeColumnRecord(3, "old3", "data3"), new ThreeColumnRecord(4, "old4", "data4")));
+
+ String sourceTableName = tableName("table_source");
+ TableIdentifier sourceTableIdent = TableIdentifier.of(Namespace.of("default"), "table_source");
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ sourceTableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table sourceTable = validationCatalog.loadTable(sourceTableIdent);
+ setSortOrder(sourceTable, "c1");
+
+ writeBatches(
+ sourceTableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(2, "new2", "data2"), new ThreeColumnRecord(3, "new3", "data3")),
+ ImmutableList.of(
+ new ThreeColumnRecord(5, "new5", "data5"), new ThreeColumnRecord(6, "new6", "data6")));
+
+ refreshTables(tableName, sourceTableName);
+
+ validationCatalog.loadTable(tableIdent).refresh();
+ validationCatalog.loadTable(sourceTableIdent).refresh();
+
+ assertPlanWithoutSort(
+ 1,
+ 3,
+ this::verifyMergeResults,
+ "MERGE INTO %s t USING %s s ON t.c1 = s.c1 "
+ + "WHEN MATCHED THEN UPDATE SET t.c2 = s.c2, t.c3 = s.c3 "
+ + "WHEN NOT MATCHED THEN INSERT *",
+ tableName,
+ sourceTableName);
+ }
+
+ @TestTemplate
+ void testHistoricalSortOrderInJoin() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ tableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table table1 = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table1, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "X"), new ThreeColumnRecord(2, "b", "X")),
+ ImmutableList.of(new ThreeColumnRecord(3, "c", "X"), new ThreeColumnRecord(4, "d", "X")));
+
+ table1.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ String table2Name = tableName("table_source");
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ table2Name, TableProperties.SPLIT_SIZE, 1024);
+
+ TableIdentifier table2Ident = TableIdentifier.of(Namespace.of("default"), "table_source");
+ Table table2 = validationCatalog.loadTable(table2Ident);
+ setSortOrder(table2, "c1");
+
+ writeBatches(
+ table2Name,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "A", "Y"), new ThreeColumnRecord(2, "B", "Y")),
+ ImmutableList.of(new ThreeColumnRecord(3, "C", "Y"), new ThreeColumnRecord(4, "D", "Y")));
+
+ table2.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ // Both tables have files with historical sort order [c1 ASC]
+ // but current table sort order is [c2 ASC, c1 ASC].
+ // Verify neither scan reports ordering — SortOrderAnalyzer must decline due to mismatched IDs.
+ TableIdentifier table2Ident2 = TableIdentifier.of(Namespace.of("default"), "table_source");
+ withSQLConf(
+ ENABLED_ORDERING_SQL_CONF,
+ () -> {
+ assertScanReportsNoOrdering(tableIdent);
+ assertScanReportsNoOrdering(table2Ident2);
+ });
+ assertPlanWithoutSort(
+ 2,
+ 2,
+ null,
+ "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c1 = t2.c1",
+ tableName,
+ table2Name);
+ }
+
+ @TestTemplate
+ void testMixedSortOrdersNoReporting() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ tableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table table1 = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table1, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "X"), new ThreeColumnRecord(2, "b", "X")));
+
+ table1.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(3, "c", "X"), new ThreeColumnRecord(4, "d", "X")));
+
+ String table2Name = tableName("table_source");
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ table2Name, TableProperties.SPLIT_SIZE, 1024);
+
+ TableIdentifier table2Ident = TableIdentifier.of(Namespace.of("default"), "table_source");
+ Table table2 = validationCatalog.loadTable(table2Ident);
+ setSortOrder(table2, "c1");
+
+ writeBatches(
+ table2Name,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "A", "Y"), new ThreeColumnRecord(2, "B", "Y")));
+
+ table2.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ writeBatches(
+ table2Name,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(3, "C", "Y"), new ThreeColumnRecord(4, "D", "Y")));
+
+ assertPlanWithoutSort(
+ 2,
+ 2,
+ null,
+ "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c1 = t2.c1",
+ tableName,
+ table2Name);
+ }
+
+ @TestTemplate
+ void testSPJWithDifferentPartitionAndSortKeys() throws NoSuchTableException {
+ createBucketedTable(tableName, "c3", "c1");
+ createBucketedTable(tableName("table_source"), "c3", "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "a", "2024-01-01"),
+ new ThreeColumnRecord(2, "b", "2024-01-02")),
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "c", "2024-01-03"),
+ new ThreeColumnRecord(2, "d", "2024-01-04")));
+
+ writeBatches(
+ tableName("table_source"),
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "A", "2024-01-01"),
+ new ThreeColumnRecord(2, "B", "2024-01-02")),
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "C", "2024-01-03"),
+ new ThreeColumnRecord(2, "D", "2024-01-04")));
+
+ refreshTables(tableName, tableName("table_source"));
+
+ assertPlanWithoutSort(
+ 0,
+ 2,
+ null,
+ "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c3 = t2.c3 AND t1.c1 = t2.c1",
+ tableName,
+ tableName("table_source"));
+ }
+
+ @TestTemplate
+ void testHistoricalSortOrderInMerge() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('write.merge.mode' = 'merge-on-read', '%s' = '%d')",
+ tableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table targetTable = validationCatalog.loadTable(tableIdent);
+ setSortOrder(targetTable, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "old1", "data1"), new ThreeColumnRecord(2, "old2", "data2")),
+ ImmutableList.of(
+ new ThreeColumnRecord(3, "old3", "data3"), new ThreeColumnRecord(4, "old4", "data4")));
+
+ targetTable.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ String sourceTableName = tableName("table_source");
+ TableIdentifier sourceTableIdent = TableIdentifier.of(Namespace.of("default"), "table_source");
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ sourceTableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table sourceTable = validationCatalog.loadTable(sourceTableIdent);
+ setSortOrder(sourceTable, "c1");
+
+ writeBatches(
+ sourceTableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(2, "new2", "data2"), new ThreeColumnRecord(3, "new3", "data3")),
+ ImmutableList.of(
+ new ThreeColumnRecord(5, "new5", "data5"), new ThreeColumnRecord(6, "new6", "data6")));
+
+ sourceTable.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ refreshTables(tableName, sourceTableName);
+
+ validationCatalog.loadTable(tableIdent).refresh();
+ validationCatalog.loadTable(sourceTableIdent).refresh();
+
+ // Files have historical sort order [c1 ASC] but tables have [c2 ASC, c1 ASC].
+ // Verify neither scan reports ordering — SortOrderAnalyzer must decline due to mismatched IDs.
+ withSQLConf(
+ ENABLED_ORDERING_SQL_CONF,
+ () -> {
+ assertScanReportsNoOrdering(tableIdent);
+ assertScanReportsNoOrdering(sourceTableIdent);
+ });
+ assertPlanWithoutSort(
+ 3,
+ 3,
+ this::verifyMergeResults,
+ "MERGE INTO %s t USING %s s ON t.c1 = s.c1 "
+ + "WHEN MATCHED THEN UPDATE SET t.c2 = s.c2, t.c3 = s.c3 "
+ + "WHEN NOT MATCHED THEN INSERT *",
+ tableName,
+ sourceTableName);
+ }
+
+ @TestTemplate
+ void testProjectionPushdownSortKeyNotProjected() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "P1"), new ThreeColumnRecord(3, "c", "P1")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "P1"), new ThreeColumnRecord(4, "d", "P1")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ // c1 is the sort key but is NOT in the SELECT list.
+ Dataset result = spark.sql(String.format("SELECT c2 FROM %s WHERE c3 = 'P1'", tableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(4).containsExactly(row("a"), row("b"), row("c"), row("d"));
+ }
+
+ @TestTemplate
+ void testNestedStructSortOrderNoReporting() {
+ sql(
+ "CREATE TABLE %s (id INT, data STRUCT, part STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (part)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ table.replaceSortOrder().asc("data.sort_key").commit();
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, named_struct('sort_key', 1, 'value', 'a'), 'P1'), "
+ + "(2, named_struct('sort_key', 3, 'value', 'c'), 'P1')",
+ tableName);
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(3, named_struct('sort_key', 2, 'value', 'b'), 'P1'), "
+ + "(4, named_struct('sort_key', 4, 'value', 'd'), 'P1')",
+ tableName);
+
+ // Nested sort fields are not supported by the merging reader, so ordering must not be reported
+ withSQLConf(ENABLED_ORDERING_SQL_CONF, () -> assertScanReportsNoOrdering(tableIdent));
+
+ // Verify all data is still readable
+ Dataset result =
+ spark.sql(String.format("SELECT id, data.value FROM %s WHERE part = 'P1'", tableName));
+ List rows = rowsToJava(result.collectAsList());
+ assertThat(rows)
+ .hasSize(4)
+ .containsExactlyInAnyOrder(row(1, "a"), row(2, "c"), row(3, "b"), row(4, "d"));
+ }
+
+ @TestTemplate
+ void testMergeOnReadWithDeleteFiles() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES("
+ + " 'write.delete.mode' = 'merge-on-read', "
+ + " 'write.merge.mode' = 'merge-on-read', "
+ + " 'format-version' = '2', "
+ + " '%s' = '%d'"
+ + ")",
+ tableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table targetTable = validationCatalog.loadTable(tableIdent);
+ setSortOrder(targetTable, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "old1", "data1"), new ThreeColumnRecord(2, "old2", "data2")),
+ ImmutableList.of(
+ new ThreeColumnRecord(3, "old3", "data3"), new ThreeColumnRecord(4, "old4", "data4")));
+
+ sql("DELETE FROM %s WHERE c1 = 2", tableName);
+
+ targetTable.refresh();
+ long deleteFileCount =
+ Iterables.size(targetTable.currentSnapshot().addedDeleteFiles(targetTable.io()));
+ assertThat(deleteFileCount).isGreaterThan(0);
+
+ String sourceTableName = tableName("table_source");
+ TableIdentifier sourceTableIdent = TableIdentifier.of(Namespace.of("default"), "table_source");
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ sourceTableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table sourceTable = validationCatalog.loadTable(sourceTableIdent);
+ setSortOrder(sourceTable, "c1");
+
+ writeBatches(
+ sourceTableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(2, "new2", "data2"), new ThreeColumnRecord(3, "new3", "data3")),
+ ImmutableList.of(
+ new ThreeColumnRecord(5, "new5", "data5"), new ThreeColumnRecord(6, "new6", "data6")));
+
+ refreshTables(tableName, sourceTableName);
+
+ validationCatalog.loadTable(tableIdent).refresh();
+ validationCatalog.loadTable(sourceTableIdent).refresh();
+
+ assertPlanWithoutSort(
+ 1,
+ 3,
+ this::verifyMergeResults,
+ "MERGE INTO %s t USING %s s ON t.c1 = s.c1 "
+ + "WHEN MATCHED THEN UPDATE SET t.c2 = s.c2, t.c3 = s.c3 "
+ + "WHEN NOT MATCHED THEN INSERT *",
+ tableName,
+ sourceTableName);
+ }
+
+ private void assertScanReportsNoOrdering(TableIdentifier ident) {
+ Table table = validationCatalog.loadTable(ident);
+ SparkTable sparkTable = SparkTable.create(table, (String) null);
+ Scan scan = sparkTable.newScanBuilder(CaseInsensitiveStringMap.empty()).build();
+ assertThat(((SupportsReportOrdering) scan).outputOrdering())
+ .as("Scan must not report ordering when files have a historical sort order")
+ .isEmpty();
+ }
+
+ private Table createSimpleTable(String name) {
+ sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", name);
+ return validationCatalog.loadTable(tableIdent);
+ }
+
+ private Table createThreeColumnTable(String name) {
+ sql("CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) USING iceberg", name);
+ return validationCatalog.loadTable(tableIdent);
+ }
+
+ private void createBucketedTable(String name, String... sortCols) {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ name, TableProperties.SPLIT_SIZE, 1024);
+
+ TableIdentifier ident =
+ name.equals(tableName)
+ ? tableIdent
+ : TableIdentifier.of(Namespace.of("default"), "table_source");
+ Table table = validationCatalog.loadTable(ident);
+
+ if (sortCols.length > 0) {
+ ReplaceSortOrder sortOrder = table.replaceSortOrder();
+ for (String col : sortCols) {
+ sortOrder = sortOrder.asc(col);
+ }
+ sortOrder.commit();
+ }
+ }
+
+ @SafeVarargs
+ private void writeBatches(String tableName, Class recordClass, List... batches)
+ throws NoSuchTableException {
+ for (List batch : batches) {
+ spark.createDataFrame(batch, recordClass).coalesce(1).writeTo(tableName).append();
+ }
+ }
+
+ private void setSortOrder(Table table, String... columns) {
+ ReplaceSortOrder sortOrder = table.replaceSortOrder();
+ for (String col : columns) {
+ sortOrder = sortOrder.asc(col);
+ }
+ sortOrder.commit();
+ }
+
+ private void refreshTable(String table) {
+ sql("REFRESH TABLE %s", table);
+ }
+
+ private void refreshTables(String... tables) {
+ for (String table : tables) {
+ refreshTable(table);
+ }
+ }
+
+ private void verifyMergeResults(String targetTableName) {
+ Dataset result =
+ spark.sql(String.format("SELECT c1, c2, c3 FROM %s ORDER BY c1", targetTableName));
+ List rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(6)
+ .containsExactly(
+ row(1, "old1", "data1"), // unchanged
+ row(2, "new2", "data2"), // updated from source
+ row(3, "new3", "data3"), // updated from source
+ row(4, "old4", "data4"), // unchanged
+ row(5, "new5", "data5"), // inserted from source
+ row(6, "new6", "data6")); // inserted from source
+ }
+
+ private void assertPlanWithoutSort(
+ int expectedNumSortsWithOrdering,
+ int expectedNumSortsWithoutOrdering,
+ Consumer dataVerification,
+ String query,
+ Object... args) {
+
+ AtomicReference> rowsWithOrdering = new AtomicReference<>();
+ AtomicReference> rowsWithoutOrdering = new AtomicReference<>();
+
+ Table targetTable = validationCatalog.loadTable(tableIdent);
+ long snapshotBeforeExecution = targetTable.currentSnapshot().snapshotId();
+
+ withSQLConf(
+ ENABLED_ORDERING_SQL_CONF,
+ () -> {
+ String plan = executeAndKeepPlan(query, args).toString();
+ int actualNumSorts = StringUtils.countMatches(plan, "Sort [");
+ assertThat(actualNumSorts)
+ .as("Number of sorts with enabled ordering must match")
+ .isEqualTo(expectedNumSortsWithOrdering);
+
+ sql("REFRESH TABLE %s", tableName);
+ validationCatalog.loadTable(tableIdent).refresh();
+
+ if (dataVerification != null) {
+ dataVerification.accept(tableName);
+ } else {
+ rowsWithOrdering.set(sql(query, args));
+ }
+ });
+
+ sql(
+ "CALL %s.system.rollback_to_snapshot('%s', %dL)",
+ catalogName, tableName, snapshotBeforeExecution);
+
+ sql("REFRESH TABLE %s", tableName);
+ validationCatalog.loadTable(tableIdent).refresh();
+
+ withSQLConf(
+ DISABLED_ORDERING_SQL_CONF,
+ () -> {
+ String plan = executeAndKeepPlan(query, args).toString();
+ int actualNumSorts = StringUtils.countMatches(plan, "Sort [");
+ assertThat(actualNumSorts)
+ .as("Number of sorts with disabled ordering must match")
+ .isEqualTo(expectedNumSortsWithoutOrdering);
+
+ sql("REFRESH TABLE %s", tableName);
+ validationCatalog.loadTable(tableIdent).refresh();
+ if (dataVerification != null) {
+ dataVerification.accept(tableName);
+ } else {
+ rowsWithoutOrdering.set(sql(query, args));
+ }
+ });
+
+ if (dataVerification == null) {
+ assertEquals(
+ "Sort elimination should not change query output",
+ rowsWithoutOrdering.get(),
+ rowsWithOrdering.get());
+ }
+ }
+}