From a4288a25e7323f56726adc1e38ed1f1bd1edd93d Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Wed, 29 Oct 2025 10:19:41 +0100 Subject: [PATCH 01/12] Add reproducer tests --- .../physical_optimizer/enforce_sorting.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 6202598218710..7cb39b2b0aab0 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -359,6 +359,79 @@ async fn test_union_inputs_different_sorted2() -> Result<()> { Ok(()) } +#[tokio::test] +async fn reproducer_with_repartition_sorts_true() -> Result<()> { + reproducer_impl(true).await?; // ✅ Passes + Ok(()) +} + +#[tokio::test] +async fn reproducer_with_repartition_sorts_false() -> Result<()> { + reproducer_impl(false).await?; + + // 💥 Doesn't pass, and generates this plan: + // + // OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + // SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + // CoalescePartitionsExec + // UnionExec + // DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + // DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + + Ok(()) +} + +async fn reproducer_impl(repartition_sorts: bool) -> Result<()> { + let schema = create_test_schema()?; + + // Source 1, will be sorted explicitly (on `nullable_col`) + let source1 = parquet_exec(schema.clone()); + let ordering1 = [sort_expr("nullable_col", &schema)].into(); + let sort1 = sort_exec(ordering1, source1.clone()); + + // Source 2, pre-sorted (on `nullable_col`) + let parquet_ordering: LexOrdering = [sort_expr("nullable_col", &schema)].into(); + let source2 = parquet_exec_with_sort(schema.clone(), vec![parquet_ordering.clone()]); + + let union = union_exec(vec![sort1, source2]); + + let coalesced = coalesce_partitions_exec(union); + + // Required sorted / single partitioned output + let requirement = [PhysicalSortRequirement::new( + col("nullable_col", &schema)?, + Some(SortOptions::new(false, true)), + )] + .into(); + let physical_plan = Arc::new(OutputRequirementExec::new( + coalesced, + Some(OrderingRequirements::new(requirement)), + Distribution::SinglePartition, + None, + )); + + let test = EnforceSortingTest::new(physical_plan).with_repartition_sorts(repartition_sorts); + + assert_snapshot!(test.run(), @r" + Input Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + CoalescePartitionsExec + UnionExec + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + + Optimized Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + SortPreservingMergeExec: [nullable_col@0 ASC] + UnionExec + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + "); + Ok(()) +} + #[tokio::test] async fn test_union_inputs_different_sorted3() -> Result<()> { let schema = create_test_schema()?; From fcd4d9f519af4e3c78fdc97fe0e24631bc7a019b Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Wed, 29 Oct 2025 11:25:52 +0100 Subject: [PATCH 02/12] Add e2e reproducer test --- datafusion/core/tests/dataframe/mod.rs | 114 +++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 043f42b18c9f4..3797474e300b7 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -45,6 +45,7 @@ use insta::assert_snapshot; use object_store::local::LocalFileSystem; use std::collections::HashMap; use std::fs; +use std::path::Path; use std::sync::Arc; use tempfile::TempDir; use url::Url; @@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> { Ok(()) } +#[tokio::test] +async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> { + reproducer_e2e_impl(false).await?; + + // 💥 Doesn't pass, and generates this plan: + // + // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] + // CoalescePartitionsExec + // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] + // UnionExec + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet + + Ok(()) +} + +#[tokio::test] +async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> { + reproducer_e2e_impl(true).await?; + + // 💥 Doesn't pass, and generates this plan: + // + // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + // SortPreservingMergeExec: [id@0 ASC NULLS LAST] + // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] + // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] + // UnionExec + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet + + Ok(()) +} + +async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> { + let config = SessionConfig::default() + .with_target_partitions(1) + .with_repartition_sorts(repartition_sorts); + let ctx = SessionContext::new_with_config(config); + + let testdata = parquet_test_data(); + + // Register "sorted" table, that is sorted + ctx.register_parquet( + "sorted", + &format!("{testdata}/alltypes_tiny_pages.parquet"), + ParquetReadOptions::default() + .file_sort_order(vec![vec![col("id").sort(true, false)]]), + ) + .await?; + + // Register "unsorted" table + ctx.register_parquet( + "unsorted", + &format!("{testdata}/alltypes_tiny_pages.parquet"), + ParquetReadOptions::default() + ) + .await?; + + let source_sorted = ctx + .table("sorted") + .await + .unwrap() + .select(vec![col("id")]) + .unwrap(); + + let source_unsorted = ctx + .table("unsorted") + .await + .unwrap() + .select(vec![col("id")]) + .unwrap(); + + let source_unsorted_resorted = source_unsorted + .sort(vec![col("id").sort(true, false)])?; + + let union = source_sorted.union(source_unsorted_resorted)?; + + let agg = union.aggregate(vec![col("id")], vec![])?; + + let df = agg; + + // To be able to remove user specific paths from the plan, for stable assertions + let testdata_clean = Path::new(&testdata).canonicalize()?.display().to_string(); + let testdata_clean = testdata_clean.strip_prefix("/").unwrap_or(&testdata_clean); + + let plan = df.explain(false, false)?.collect().await?; + assert_snapshot!( + pretty_format_batches(&plan)?.to_string().replace(&testdata_clean, "{testdata}"), + @r#" + +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | + | | Union | + | | TableScan: sorted projection=[id] | + | | Sort: unsorted.id ASC NULLS LAST | + | | TableScan: unsorted projection=[id] | + | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | + | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + | | UnionExec | + | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + | | | + +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + "# + ); + + Ok(()) +} + #[tokio::test] async fn test_count_wildcard_on_aggregate() -> Result<()> { let ctx = create_join_context()?; From b44cf083dac0029068024953f8d3ce11135f34dd Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Thu, 30 Oct 2025 10:16:34 +0100 Subject: [PATCH 03/12] review: make assertions cleaner / fix expectations --- datafusion/core/tests/dataframe/mod.rs | 82 ++++++++++--------- .../physical_optimizer/enforce_sorting.rs | 70 +++++++++------- 2 files changed, 83 insertions(+), 69 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 3797474e300b7..bb298f2ff50ab 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2999,24 +2999,53 @@ async fn test_count_wildcard_on_window() -> Result<()> { #[tokio::test] async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> { - reproducer_e2e_impl(false).await?; - - // 💥 Doesn't pass, and generates this plan: - // - // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted - // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] - // CoalescePartitionsExec - // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] - // UnionExec - // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet - // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet - + assert_snapshot!( + reproducer_e2e_impl(false).await?, + @r#" + +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | + | | Union | + | | TableScan: sorted projection=[id] | + | | Sort: unsorted.id ASC NULLS LAST | + | | TableScan: unsorted projection=[id] | + | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + | | CoalescePartitionsExec | + | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | + | | UnionExec | + | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + | | | + +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + "#); Ok(()) } #[tokio::test] async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> { - reproducer_e2e_impl(true).await?; + assert_snapshot!( + reproducer_e2e_impl(true).await?, + @r#" + +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | + | | Union | + | | TableScan: sorted projection=[id] | + | | Sort: unsorted.id ASC NULLS LAST | + | | TableScan: unsorted projection=[id] | + | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | + | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + | | UnionExec | + | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + | | | + +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + "#); // 💥 Doesn't pass, and generates this plan: // @@ -3031,7 +3060,7 @@ async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> { Ok(()) } -async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> { +async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result { let config = SessionConfig::default() .with_target_partitions(1) .with_repartition_sorts(repartition_sorts); @@ -3084,30 +3113,7 @@ async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> { let testdata_clean = testdata_clean.strip_prefix("/").unwrap_or(&testdata_clean); let plan = df.explain(false, false)?.collect().await?; - assert_snapshot!( - pretty_format_batches(&plan)?.to_string().replace(&testdata_clean, "{testdata}"), - @r#" - +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | - | | Union | - | | TableScan: sorted projection=[id] | - | | Sort: unsorted.id ASC NULLS LAST | - | | TableScan: unsorted projection=[id] | - | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | - | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - | | UnionExec | - | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | - | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | - | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - | | | - +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - "# - ); - - Ok(()) + Ok(pretty_format_batches(&plan)?.to_string().replace(&testdata_clean, "{testdata}")) } #[tokio::test] diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 7cb39b2b0aab0..1d089c0dd657b 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -361,27 +361,53 @@ async fn test_union_inputs_different_sorted2() -> Result<()> { #[tokio::test] async fn reproducer_with_repartition_sorts_true() -> Result<()> { - reproducer_impl(true).await?; // ✅ Passes + assert_snapshot!( + reproducer_impl(true).await?, + @r" + Input Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + CoalescePartitionsExec + UnionExec + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + + Optimized Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + SortPreservingMergeExec: [nullable_col@0 ASC] + UnionExec + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + "); Ok(()) } #[tokio::test] async fn reproducer_with_repartition_sorts_false() -> Result<()> { - reproducer_impl(false).await?; - - // 💥 Doesn't pass, and generates this plan: - // - // OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition - // SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] - // CoalescePartitionsExec - // UnionExec - // DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet - // DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + assert_snapshot!( + reproducer_impl(false).await?, + @r" + Input Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + CoalescePartitionsExec + UnionExec + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + Optimized Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + UnionExec + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + "); Ok(()) } -async fn reproducer_impl(repartition_sorts: bool) -> Result<()> { +async fn reproducer_impl(repartition_sorts: bool) -> Result { let schema = create_test_schema()?; // Source 1, will be sorted explicitly (on `nullable_col`) @@ -411,25 +437,7 @@ async fn reproducer_impl(repartition_sorts: bool) -> Result<()> { )); let test = EnforceSortingTest::new(physical_plan).with_repartition_sorts(repartition_sorts); - - assert_snapshot!(test.run(), @r" - Input Plan: - OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition - CoalescePartitionsExec - UnionExec - SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet - DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet - - Optimized Plan: - OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition - SortPreservingMergeExec: [nullable_col@0 ASC] - UnionExec - SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet - DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet - "); - Ok(()) + Ok(test.run()) } #[tokio::test] From 6c3194632fda2861c7d07d00da054476aff546a8 Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Thu, 30 Oct 2025 12:49:05 +0100 Subject: [PATCH 04/12] review: add verbose explain to comment --- datafusion/core/tests/dataframe/mod.rs | 30 ++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index bb298f2ff50ab..b671ae5e37522 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -3056,8 +3056,34 @@ async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> { // UnionExec // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet - - Ok(()) + // + // + // === Excerpt from the verbose explain === + // + // Physical_plan after EnforceDistribution: + // + // OutputRequirementExec: order_by=[], dist_by=Unspecified + // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] + // CoalescePartitionsExec + // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + // UnionExec + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet + // + // Physical_plan after EnforceSorting: + // + // OutputRequirementExec: order_by=[], dist_by=Unspecified + // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + // SortPreservingMergeExec: [id@0 ASC NULLS LAST] + // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] + // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] + // UnionExec + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet + + Ok(()) } async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result { From 566da90679fde78ab7094c3365f467ae2478406d Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Thu, 30 Oct 2025 12:49:57 +0100 Subject: [PATCH 05/12] review: ignore test, add reference to issue --- datafusion/core/tests/dataframe/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index b671ae5e37522..2a92b4de2bc4f 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -3023,6 +3023,7 @@ async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> { Ok(()) } +#[ignore] // See https://github.com/apache/datafusion/issues/18380 #[tokio::test] async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> { assert_snapshot!( From b09a26e198c734b3712a4d26c17797d620ec37e1 Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Thu, 30 Oct 2025 12:51:32 +0100 Subject: [PATCH 06/12] extra: rename tests in prevision of merging --- datafusion/core/tests/dataframe/mod.rs | 10 +++++----- .../core/tests/physical_optimizer/enforce_sorting.rs | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 2a92b4de2bc4f..c426270d7a1e9 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2998,9 +2998,9 @@ async fn test_count_wildcard_on_window() -> Result<()> { } #[tokio::test] -async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false() -> Result<()> { assert_snapshot!( - reproducer_e2e_impl(false).await?, + union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, @r#" +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | @@ -3025,9 +3025,9 @@ async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> { #[ignore] // See https://github.com/apache/datafusion/issues/18380 #[tokio::test] -async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true() -> Result<()> { assert_snapshot!( - reproducer_e2e_impl(true).await?, + union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?, @r#" +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | @@ -3087,7 +3087,7 @@ async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> { Ok(()) } -async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(repartition_sorts: bool) -> Result { let config = SessionConfig::default() .with_target_partitions(1) .with_repartition_sorts(repartition_sorts); diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 1d089c0dd657b..293eff9e8cb9f 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -360,9 +360,9 @@ async fn test_union_inputs_different_sorted2() -> Result<()> { } #[tokio::test] -async fn reproducer_with_repartition_sorts_true() -> Result<()> { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true() -> Result<()> { assert_snapshot!( - reproducer_impl(true).await?, + union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?, @r" Input Plan: OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition @@ -384,9 +384,9 @@ async fn reproducer_with_repartition_sorts_true() -> Result<()> { } #[tokio::test] -async fn reproducer_with_repartition_sorts_false() -> Result<()> { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false() -> Result<()> { assert_snapshot!( - reproducer_impl(false).await?, + union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, @r" Input Plan: OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition @@ -407,7 +407,7 @@ async fn reproducer_with_repartition_sorts_false() -> Result<()> { Ok(()) } -async fn reproducer_impl(repartition_sorts: bool) -> Result { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(repartition_sorts: bool) -> Result { let schema = create_test_schema()?; // Source 1, will be sorted explicitly (on `nullable_col`) From 602ac89e17435f833a47f7940438b636896e985e Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Thu, 30 Oct 2025 16:17:17 +0100 Subject: [PATCH 07/12] review: add suggested comments Co-authored-by: Nga Tran --- datafusion/core/tests/dataframe/mod.rs | 2 ++ datafusion/core/tests/physical_optimizer/enforce_sorting.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index c426270d7a1e9..2b13d35975118 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2998,6 +2998,7 @@ async fn test_count_wildcard_on_window() -> Result<()> { } #[tokio::test] +// Test with `repartition_sorts` disabled, causing a full resort of the data async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false() -> Result<()> { assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, @@ -3025,6 +3026,7 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti #[ignore] // See https://github.com/apache/datafusion/issues/18380 #[tokio::test] +// Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true() -> Result<()> { assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?, diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 293eff9e8cb9f..88eb22a843275 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -360,6 +360,7 @@ async fn test_union_inputs_different_sorted2() -> Result<()> { } #[tokio::test] +// Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true() -> Result<()> { assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?, @@ -384,6 +385,7 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti } #[tokio::test] +// Test with `repartition_sorts` disabled, causing a full resort of the data async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false() -> Result<()> { assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, From 7f153b9909a6db9da732dda0bb923c3cb5b639ee Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Thu, 30 Oct 2025 16:19:16 +0100 Subject: [PATCH 08/12] review: full explain in comment --- datafusion/core/tests/dataframe/mod.rs | 187 ++++++++++++++++++++++--- 1 file changed, 164 insertions(+), 23 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 2b13d35975118..64310bf227e42 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -3061,30 +3061,171 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet // // - // === Excerpt from the verbose explain === + // === Full verbose explain === // - // Physical_plan after EnforceDistribution: - // - // OutputRequirementExec: order_by=[], dist_by=Unspecified - // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted - // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] - // CoalescePartitionsExec - // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted - // UnionExec - // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet - // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] - // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet - // - // Physical_plan after EnforceSorting: - // - // OutputRequirementExec: order_by=[], dist_by=Unspecified - // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted - // SortPreservingMergeExec: [id@0 ASC NULLS LAST] - // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] - // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] - // UnionExec - // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet - // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet + // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + // | plan_type | plan | + // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + // | initial_logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | + // | | Union | + // | | Projection: sorted.id | + // | | TableScan: sorted | + // | | Sort: unsorted.id ASC NULLS LAST | + // | | Projection: unsorted.id | + // | | TableScan: unsorted | + // | logical_plan after resolve_grouping_function | SAME TEXT AS ABOVE | + // | logical_plan after type_coercion | SAME TEXT AS ABOVE | + // | analyzed_logical_plan | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_nested_union | SAME TEXT AS ABOVE | + // | logical_plan after simplify_expressions | SAME TEXT AS ABOVE | + // | logical_plan after replace_distinct_aggregate | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_join | SAME TEXT AS ABOVE | + // | logical_plan after decorrelate_predicate_subquery | SAME TEXT AS ABOVE | + // | logical_plan after scalar_subquery_to_join | SAME TEXT AS ABOVE | + // | logical_plan after decorrelate_lateral_join | SAME TEXT AS ABOVE | + // | logical_plan after extract_equijoin_predicate | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_duplicated_expr | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_filter | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_cross_join | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_limit | SAME TEXT AS ABOVE | + // | logical_plan after propagate_empty_relation | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_one_union | SAME TEXT AS ABOVE | + // | logical_plan after filter_null_join_keys | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_outer_join | SAME TEXT AS ABOVE | + // | logical_plan after push_down_limit | SAME TEXT AS ABOVE | + // | logical_plan after push_down_filter | SAME TEXT AS ABOVE | + // | logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_group_by_constant | SAME TEXT AS ABOVE | + // | logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE | + // | logical_plan after optimize_projections | Aggregate: groupBy=[[id]], aggr=[[]] | + // | | Union | + // | | TableScan: sorted projection=[id] | + // | | Sort: unsorted.id ASC NULLS LAST | + // | | TableScan: unsorted projection=[id] | + // | logical_plan after eliminate_nested_union | SAME TEXT AS ABOVE | + // | logical_plan after simplify_expressions | SAME TEXT AS ABOVE | + // | logical_plan after replace_distinct_aggregate | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_join | SAME TEXT AS ABOVE | + // | logical_plan after decorrelate_predicate_subquery | SAME TEXT AS ABOVE | + // | logical_plan after scalar_subquery_to_join | SAME TEXT AS ABOVE | + // | logical_plan after decorrelate_lateral_join | SAME TEXT AS ABOVE | + // | logical_plan after extract_equijoin_predicate | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_duplicated_expr | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_filter | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_cross_join | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_limit | SAME TEXT AS ABOVE | + // | logical_plan after propagate_empty_relation | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_one_union | SAME TEXT AS ABOVE | + // | logical_plan after filter_null_join_keys | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_outer_join | SAME TEXT AS ABOVE | + // | logical_plan after push_down_limit | SAME TEXT AS ABOVE | + // | logical_plan after push_down_filter | SAME TEXT AS ABOVE | + // | logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE | + // | logical_plan after eliminate_group_by_constant | SAME TEXT AS ABOVE | + // | logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE | + // | logical_plan after optimize_projections | SAME TEXT AS ABOVE | + // | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | + // | | Union | + // | | TableScan: sorted projection=[id] | + // | | Sort: unsorted.id ASC NULLS LAST | + // | | TableScan: unsorted projection=[id] | + // | initial_physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | UnionExec | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + // | | | + // | initial_physical_plan_with_stats | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | + // | | UnionExec, statistics=[Rows=Exact(14600), Bytes=Exact(647158), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false], statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | + // | | | + // | initial_physical_plan_with_schema | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, schema=[id:Int32;N] | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, schema=[id:Int32;N] | + // | | UnionExec, schema=[id:Int32;N] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, schema=[id:Int32;N] | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false], schema=[id:Int32;N] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, schema=[id:Int32;N] | + // | | | + // | physical_plan after OutputRequirements | OutputRequirementExec: order_by=[], dist_by=Unspecified | + // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | UnionExec | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + // | | | + // | physical_plan after aggregate_statistics | SAME TEXT AS ABOVE | + // | physical_plan after join_selection | SAME TEXT AS ABOVE | + // | physical_plan after LimitedDistinctAggregation | SAME TEXT AS ABOVE | + // | physical_plan after FilterPushdown | SAME TEXT AS ABOVE | + // | physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified | + // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + // | | CoalescePartitionsExec | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | UnionExec | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + // | | | + // | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE | + // | physical_plan after EnforceSorting | OutputRequirementExec: order_by=[], dist_by=Unspecified | + // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | + // | | UnionExec | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + // | | | + // | physical_plan after OptimizeAggregateOrder | SAME TEXT AS ABOVE | + // | physical_plan after ProjectionPushdown | SAME TEXT AS ABOVE | + // | physical_plan after coalesce_batches | SAME TEXT AS ABOVE | + // | physical_plan after coalesce_async_exec_input | SAME TEXT AS ABOVE | + // | physical_plan after OutputRequirements | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | + // | | UnionExec | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + // | | | + // | physical_plan after LimitAggregation | SAME TEXT AS ABOVE | + // | physical_plan after LimitPushPastWindows | SAME TEXT AS ABOVE | + // | physical_plan after LimitPushdown | SAME TEXT AS ABOVE | + // | physical_plan after ProjectionPushdown | SAME TEXT AS ABOVE | + // | physical_plan after EnsureCooperative | SAME TEXT AS ABOVE | + // | physical_plan after FilterPushdown(Post) | SAME TEXT AS ABOVE | + // | physical_plan after SanityCheckPlan | SAME TEXT AS ABOVE | + // | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | + // | | UnionExec | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + // | | | + // | physical_plan_with_stats | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | + // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST], statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true], statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | + // | | UnionExec, statistics=[Rows=Exact(14600), Bytes=Exact(647158), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | + // | | | + // | physical_plan_with_schema | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, schema=[id:Int32;N] | + // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST], schema=[id:Int32;N] | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true], schema=[id:Int32;N] | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], schema=[id:Int32;N] | + // | | UnionExec, schema=[id:Int32;N] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, schema=[id:Int32;N] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, schema=[id:Int32;N] | + // | | | + // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Ok(()) } From 83e8d15ae652dbae47f6b0cb4103d5a099779471 Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Thu, 30 Oct 2025 17:22:39 +0100 Subject: [PATCH 09/12] review: back to the excerpt --- datafusion/core/tests/dataframe/mod.rs | 141 +------------------------ 1 file changed, 5 insertions(+), 136 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 64310bf227e42..05e7f7667c5b9 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -3061,107 +3061,18 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet // // - // === Full verbose explain === + // === Excerpt from the verbose explain === // // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ // | plan_type | plan | // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - // | initial_logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | - // | | Union | - // | | Projection: sorted.id | - // | | TableScan: sorted | - // | | Sort: unsorted.id ASC NULLS LAST | - // | | Projection: unsorted.id | - // | | TableScan: unsorted | - // | logical_plan after resolve_grouping_function | SAME TEXT AS ABOVE | - // | logical_plan after type_coercion | SAME TEXT AS ABOVE | - // | analyzed_logical_plan | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_nested_union | SAME TEXT AS ABOVE | - // | logical_plan after simplify_expressions | SAME TEXT AS ABOVE | - // | logical_plan after replace_distinct_aggregate | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_join | SAME TEXT AS ABOVE | - // | logical_plan after decorrelate_predicate_subquery | SAME TEXT AS ABOVE | - // | logical_plan after scalar_subquery_to_join | SAME TEXT AS ABOVE | - // | logical_plan after decorrelate_lateral_join | SAME TEXT AS ABOVE | - // | logical_plan after extract_equijoin_predicate | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_duplicated_expr | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_filter | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_cross_join | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_limit | SAME TEXT AS ABOVE | - // | logical_plan after propagate_empty_relation | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_one_union | SAME TEXT AS ABOVE | - // | logical_plan after filter_null_join_keys | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_outer_join | SAME TEXT AS ABOVE | - // | logical_plan after push_down_limit | SAME TEXT AS ABOVE | - // | logical_plan after push_down_filter | SAME TEXT AS ABOVE | - // | logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_group_by_constant | SAME TEXT AS ABOVE | - // | logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE | - // | logical_plan after optimize_projections | Aggregate: groupBy=[[id]], aggr=[[]] | - // | | Union | - // | | TableScan: sorted projection=[id] | - // | | Sort: unsorted.id ASC NULLS LAST | - // | | TableScan: unsorted projection=[id] | - // | logical_plan after eliminate_nested_union | SAME TEXT AS ABOVE | - // | logical_plan after simplify_expressions | SAME TEXT AS ABOVE | - // | logical_plan after replace_distinct_aggregate | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_join | SAME TEXT AS ABOVE | - // | logical_plan after decorrelate_predicate_subquery | SAME TEXT AS ABOVE | - // | logical_plan after scalar_subquery_to_join | SAME TEXT AS ABOVE | - // | logical_plan after decorrelate_lateral_join | SAME TEXT AS ABOVE | - // | logical_plan after extract_equijoin_predicate | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_duplicated_expr | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_filter | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_cross_join | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_limit | SAME TEXT AS ABOVE | - // | logical_plan after propagate_empty_relation | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_one_union | SAME TEXT AS ABOVE | - // | logical_plan after filter_null_join_keys | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_outer_join | SAME TEXT AS ABOVE | - // | logical_plan after push_down_limit | SAME TEXT AS ABOVE | - // | logical_plan after push_down_filter | SAME TEXT AS ABOVE | - // | logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE | - // | logical_plan after eliminate_group_by_constant | SAME TEXT AS ABOVE | - // | logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE | - // | logical_plan after optimize_projections | SAME TEXT AS ABOVE | - // | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | - // | | Union | - // | | TableScan: sorted projection=[id] | - // | | Sort: unsorted.id ASC NULLS LAST | - // | | TableScan: unsorted projection=[id] | // | initial_physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | // | | UnionExec | // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - // | | | - // | initial_physical_plan_with_stats | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | - // | | UnionExec, statistics=[Rows=Exact(14600), Bytes=Exact(647158), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false], statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | - // | | | - // | initial_physical_plan_with_schema | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, schema=[id:Int32;N] | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, schema=[id:Int32;N] | - // | | UnionExec, schema=[id:Int32;N] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, schema=[id:Int32;N] | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false], schema=[id:Int32;N] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, schema=[id:Int32;N] | - // | | | - // | physical_plan after OutputRequirements | OutputRequirementExec: order_by=[], dist_by=Unspecified | - // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - // | | UnionExec | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - // | | | - // | physical_plan after aggregate_statistics | SAME TEXT AS ABOVE | - // | physical_plan after join_selection | SAME TEXT AS ABOVE | - // | physical_plan after LimitedDistinctAggregation | SAME TEXT AS ABOVE | - // | physical_plan after FilterPushdown | SAME TEXT AS ABOVE | + // ... // | physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified | // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | @@ -3172,7 +3083,8 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | // | | | - // | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE | + // | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE + // | | | // | physical_plan after EnforceSorting | OutputRequirementExec: order_by=[], dist_by=Unspecified | // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | @@ -3181,50 +3093,7 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti // | | UnionExec | // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - // | | | - // | physical_plan after OptimizeAggregateOrder | SAME TEXT AS ABOVE | - // | physical_plan after ProjectionPushdown | SAME TEXT AS ABOVE | - // | physical_plan after coalesce_batches | SAME TEXT AS ABOVE | - // | physical_plan after coalesce_async_exec_input | SAME TEXT AS ABOVE | - // | physical_plan after OutputRequirements | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | - // | | UnionExec | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - // | | | - // | physical_plan after LimitAggregation | SAME TEXT AS ABOVE | - // | physical_plan after LimitPushPastWindows | SAME TEXT AS ABOVE | - // | physical_plan after LimitPushdown | SAME TEXT AS ABOVE | - // | physical_plan after ProjectionPushdown | SAME TEXT AS ABOVE | - // | physical_plan after EnsureCooperative | SAME TEXT AS ABOVE | - // | physical_plan after FilterPushdown(Post) | SAME TEXT AS ABOVE | - // | physical_plan after SanityCheckPlan | SAME TEXT AS ABOVE | - // | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | - // | | UnionExec | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - // | | | - // | physical_plan_with_stats | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | - // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST], statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true], statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], statistics=[Rows=Inexact(14600), Bytes=Absent, [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)))]] | - // | | UnionExec, statistics=[Rows=Exact(14600), Bytes=Exact(647158), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, statistics=[Rows=Exact(7300), Bytes=Exact(323579), [(Col[0]: Min=Exact(Int32(0)) Max=Exact(Int32(7299)) Null=Exact(0))]] | - // | | | - // | physical_plan_with_schema | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted, schema=[id:Int32;N] | - // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST], schema=[id:Int32;N] | - // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true], schema=[id:Int32;N] | - // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], schema=[id:Int32;N] | - // | | UnionExec, schema=[id:Int32;N] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, schema=[id:Int32;N] | - // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, schema=[id:Int32;N] | - // | | | + // ... // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Ok(()) From f98d6aea2cf507d09dabae1560dc22c92a83c493 Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Sat, 1 Nov 2025 12:15:30 +0100 Subject: [PATCH 10/12] cargo fmt --- datafusion/core/tests/dataframe/mod.rs | 26 ++++++++++++------- .../physical_optimizer/enforce_sorting.rs | 15 +++++++---- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 05e7f7667c5b9..101cdd7bbc651 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2999,7 +2999,8 @@ async fn test_count_wildcard_on_window() -> Result<()> { #[tokio::test] // Test with `repartition_sorts` disabled, causing a full resort of the data -async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false() -> Result<()> { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false( +) -> Result<()> { assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, @r#" @@ -3027,7 +3028,8 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti #[ignore] // See https://github.com/apache/datafusion/issues/18380 #[tokio::test] // Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting -async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true() -> Result<()> { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true( +) -> Result<()> { assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?, @r#" @@ -3096,10 +3098,12 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti // ... // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - Ok(()) + Ok(()) } -async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(repartition_sorts: bool) -> Result { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl( + repartition_sorts: bool, +) -> Result { let config = SessionConfig::default() .with_target_partitions(1) .with_repartition_sorts(repartition_sorts); @@ -3114,15 +3118,15 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(reparti ParquetReadOptions::default() .file_sort_order(vec![vec![col("id").sort(true, false)]]), ) - .await?; + .await?; // Register "unsorted" table ctx.register_parquet( "unsorted", &format!("{testdata}/alltypes_tiny_pages.parquet"), - ParquetReadOptions::default() + ParquetReadOptions::default(), ) - .await?; + .await?; let source_sorted = ctx .table("sorted") @@ -3138,8 +3142,8 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(reparti .select(vec![col("id")]) .unwrap(); - let source_unsorted_resorted = source_unsorted - .sort(vec![col("id").sort(true, false)])?; + let source_unsorted_resorted = + source_unsorted.sort(vec![col("id").sort(true, false)])?; let union = source_sorted.union(source_unsorted_resorted)?; @@ -3152,7 +3156,9 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(reparti let testdata_clean = testdata_clean.strip_prefix("/").unwrap_or(&testdata_clean); let plan = df.explain(false, false)?.collect().await?; - Ok(pretty_format_batches(&plan)?.to_string().replace(&testdata_clean, "{testdata}")) + Ok(pretty_format_batches(&plan)? + .to_string() + .replace(&testdata_clean, "{testdata}")) } #[tokio::test] diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 88eb22a843275..e3a0eb7e1aa6f 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -361,7 +361,8 @@ async fn test_union_inputs_different_sorted2() -> Result<()> { #[tokio::test] // Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting -async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true() -> Result<()> { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true( +) -> Result<()> { assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?, @r" @@ -386,7 +387,8 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti #[tokio::test] // Test with `repartition_sorts` disabled, causing a full resort of the data -async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false() -> Result<()> { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false( +) -> Result<()> { assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, @r" @@ -409,7 +411,9 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti Ok(()) } -async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(repartition_sorts: bool) -> Result { +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl( + repartition_sorts: bool, +) -> Result { let schema = create_test_schema()?; // Source 1, will be sorted explicitly (on `nullable_col`) @@ -430,7 +434,7 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(reparti col("nullable_col", &schema)?, Some(SortOptions::new(false, true)), )] - .into(); + .into(); let physical_plan = Arc::new(OutputRequirementExec::new( coalesced, Some(OrderingRequirements::new(requirement)), @@ -438,7 +442,8 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(reparti None, )); - let test = EnforceSortingTest::new(physical_plan).with_repartition_sorts(repartition_sorts); + let test = + EnforceSortingTest::new(physical_plan).with_repartition_sorts(repartition_sorts); Ok(test.run()) } From fa079379b50187f3f0c44cd6b30b3dc2dfe49f50 Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Sat, 1 Nov 2025 12:19:08 +0100 Subject: [PATCH 11/12] fix clippy issue --- datafusion/core/tests/dataframe/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 101cdd7bbc651..7af42c83318ab 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -3158,7 +3158,7 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl( let plan = df.explain(false, false)?.collect().await?; Ok(pretty_format_batches(&plan)? .to_string() - .replace(&testdata_clean, "{testdata}")) + .replace(testdata_clean, "{testdata}")) } #[tokio::test] From 1fb22197ae300d12e6842a46f3b5d67f65e9cdf4 Mon Sep 17 00:00:00 2001 From: Renan GEHAN Date: Sat, 1 Nov 2025 12:49:00 +0100 Subject: [PATCH 12/12] simplify snapshot formatting to avoid mismatches --- datafusion/core/tests/dataframe/mod.rs | 60 ++++++++++---------------- 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 7af42c83318ab..a7393c72c9b3b 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -3004,23 +3004,13 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, @r#" - +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | - | | Union | - | | TableScan: sorted projection=[id] | - | | Sort: unsorted.id ASC NULLS LAST | - | | TableScan: unsorted projection=[id] | - | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | - | | CoalescePartitionsExec | - | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | - | | UnionExec | - | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | - | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - | | | - +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] + CoalescePartitionsExec + AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] + UnionExec + DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet "#); Ok(()) } @@ -3033,23 +3023,13 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti assert_snapshot!( union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?, @r#" - +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]] | - | | Union | - | | TableScan: sorted projection=[id] | - | | Sort: unsorted.id ASC NULLS LAST | - | | TableScan: unsorted projection=[id] | - | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | - | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | - | | UnionExec | - | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | - | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | - | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | - | | | - +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + SortPreservingMergeExec: [id@0 ASC NULLS LAST] + AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + UnionExec + DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet "#); // 💥 Doesn't pass, and generates this plan: @@ -3155,10 +3135,16 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl( let testdata_clean = Path::new(&testdata).canonicalize()?.display().to_string(); let testdata_clean = testdata_clean.strip_prefix("/").unwrap_or(&testdata_clean); - let plan = df.explain(false, false)?.collect().await?; - Ok(pretty_format_batches(&plan)? + // Use displayable() rather than explain().collect() to avoid table formatting issues. We need + // to replace machine-specific paths with variable lengths, which breaks table alignment and + // causes snapshot mismatches. + let physical_plan = df.create_physical_plan().await?; + let displayable_plan = displayable(physical_plan.as_ref()) + .indent(true) .to_string() - .replace(testdata_clean, "{testdata}")) + .replace(testdata_clean, "{testdata}"); + + Ok(displayable_plan) } #[tokio::test]