diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 3f2860947177c..1dbfaf381a4b5 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -23,16 +23,13 @@ use async_trait::async_trait; use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::stats::Precision; use datafusion_common::{ - Constraints, DataFusionError, SchemaExt, Statistics, internal_datafusion_err, - plan_err, project_schema, + Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema, }; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::FileSinkConfig; -use datafusion_datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, -}; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics, }; @@ -331,20 +328,6 @@ impl ListingTable { self.schema_adapter_factory.as_ref() } - /// Creates a schema adapter for mapping between file and table schemas - /// - /// Uses the configured schema adapter factory if available, otherwise falls back - /// to the default implementation. - fn create_schema_adapter(&self) -> Box { - let table_schema = self.schema(); - match &self.schema_adapter_factory { - Some(factory) => { - factory.create_with_projected_schema(Arc::clone(&table_schema)) - } - None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)), - } - } - /// Creates a file source and applies schema adapter factory if available fn create_file_source_with_schema_adapter( &self, @@ -359,10 +342,8 @@ impl ListingTable { ); let mut source = self.options.format.file_source(table_schema); - // Apply schema adapter to source if available - // + // Apply schema adapter to source if available. // The source will use this SchemaAdapter to adapt data batches as they flow up the plan. - // Note: ListingTable also creates a SchemaAdapter in `scan()` but that is only used to adapt collected statistics. if let Some(factory) = &self.schema_adapter_factory { source = source.with_schema_adapter_factory(Arc::clone(factory))?; } @@ -709,25 +690,17 @@ impl ListingTable { ) }; - let (mut file_groups, mut stats) = compute_all_files_statistics( + let (file_groups, stats) = compute_all_files_statistics( file_groups, self.schema(), self.options.collect_stat, inexact_stats, )?; - let schema_adapter = self.create_schema_adapter(); - let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?; - - stats.column_statistics = - schema_mapper.map_column_statistics(&stats.column_statistics)?; - file_groups.iter_mut().try_for_each(|file_group| { - if let Some(stat) = file_group.statistics_mut() { - stat.column_statistics = - schema_mapper.map_column_statistics(&stat.column_statistics)?; - } - Ok::<_, DataFusionError>(()) - })?; + // Note: Statistics already include both file columns and partition columns. + // PartitionedFile::with_statistics automatically appends exact partition column + // statistics (min=max=partition_value, null_count=0, distinct_count=1) computed + // from partition_values. Ok(ListFilesResult { file_groups, statistics: stats, diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 020bdfd5d86e2..7db79485d1841 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -129,16 +129,13 @@ mod tests { ListingOptions, ListingTable, ListingTableConfig, SchemaSource, }; use datafusion_common::{ - assert_contains, plan_err, + assert_contains, stats::Precision, test_util::{batches_to_string, datafusion_test_data}, - ColumnStatistics, DataFusionError, Result, ScalarValue, + DataFusionError, Result, ScalarValue, }; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_format::FileFormat; - use datafusion_datasource::schema_adapter::{ - SchemaAdapter, SchemaAdapterFactory, SchemaMapper, - }; use datafusion_datasource::ListingTableUrl; use datafusion_expr::dml::InsertOp; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; @@ -147,15 +144,12 @@ mod tests { use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::{collect, ExecutionPlanProperties}; - use rstest::rstest; use std::collections::HashMap; use std::io::Write; use std::sync::Arc; use tempfile::TempDir; use url::Url; - const DUMMY_NULL_COUNT: Precision = Precision::Exact(42); - /// Creates a test schema with standard field types used in tests fn create_test_schema() -> SchemaRef { Arc::new(Schema::new(vec![ @@ -1448,31 +1442,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_statistics_mapping_with_custom_factory() -> Result<()> { - let ctx = SessionContext::new(); - let table = create_test_listing_table_with_json_and_adapter( - &ctx, - false, - // NullStatsAdapterFactory sets column_statistics null_count to DUMMY_NULL_COUNT - Arc::new(NullStatsAdapterFactory {}), - )?; - - let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; - - assert_eq!( - result.statistics.column_statistics[0].null_count, - DUMMY_NULL_COUNT - ); - for g in result.file_groups { - if let Some(s) = g.file_statistics(None) { - assert_eq!(s.column_statistics[0].null_count, DUMMY_NULL_COUNT); - } - } - - Ok(()) - } - #[tokio::test] async fn test_statistics_mapping_with_default_factory() -> Result<()> { let ctx = SessionContext::new(); @@ -1513,199 +1482,4 @@ mod tests { Ok(()) } - - #[rstest] - #[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")] - #[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")] - #[case( - MapSchemaError::InvalidProjection, - "Invalid projection in schema mapping" - )] - #[tokio::test] - async fn test_schema_adapter_map_schema_errors( - #[case] error_type: MapSchemaError, - #[case] expected_error_msg: &str, - ) -> Result<()> { - let ctx = SessionContext::new(); - let table = create_test_listing_table_with_json_and_adapter( - &ctx, - false, - Arc::new(FailingMapSchemaAdapterFactory { error_type }), - )?; - - // The error should bubble up from the scan operation when schema mapping fails - let scan_result = table.scan(&ctx.state(), None, &[], None).await; - - assert!(scan_result.is_err()); - let error_msg = scan_result.unwrap_err().to_string(); - assert!( - error_msg.contains(expected_error_msg), - "Expected error containing '{expected_error_msg}', got: {error_msg}" - ); - - Ok(()) - } - - // Test that errors during file listing also bubble up correctly - #[tokio::test] - async fn test_schema_adapter_error_during_file_listing() -> Result<()> { - let ctx = SessionContext::new(); - let table = create_test_listing_table_with_json_and_adapter( - &ctx, - true, - Arc::new(FailingMapSchemaAdapterFactory { - error_type: MapSchemaError::TypeIncompatible, - }), - )?; - - // The error should bubble up from list_files_for_scan when collecting statistics - let list_result = table.list_files_for_scan(&ctx.state(), &[], None).await; - - assert!(list_result.is_err()); - let error_msg = list_result.unwrap_err().to_string(); - assert!( - error_msg.contains("Cannot map incompatible types"), - "Expected type incompatibility error during file listing, got: {error_msg}" - ); - - Ok(()) - } - - #[derive(Debug, Copy, Clone)] - enum MapSchemaError { - TypeIncompatible, - GeneralFailure, - InvalidProjection, - } - - #[derive(Debug)] - struct FailingMapSchemaAdapterFactory { - error_type: MapSchemaError, - } - - impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(FailingMapSchemaAdapter { - schema: projected_table_schema, - error_type: self.error_type, - }) - } - } - - #[derive(Debug)] - struct FailingMapSchemaAdapter { - schema: SchemaRef, - error_type: MapSchemaError, - } - - impl SchemaAdapter for FailingMapSchemaAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.schema.field(index); - file_schema.fields.find(field.name()).map(|(i, _)| i) - } - - fn map_schema( - &self, - _file_schema: &Schema, - ) -> Result<(Arc, Vec)> { - // Always fail with different error types based on the configured error_type - match self.error_type { - MapSchemaError::TypeIncompatible => { - plan_err!( - "Cannot map incompatible types: Boolean cannot be cast to Utf8" - ) - } - MapSchemaError::GeneralFailure => { - plan_err!("Schema adapter mapping failed due to internal error") - } - MapSchemaError::InvalidProjection => { - plan_err!("Invalid projection in schema mapping: column index out of bounds") - } - } - } - } - - #[derive(Debug)] - struct NullStatsAdapterFactory; - - impl SchemaAdapterFactory for NullStatsAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(NullStatsAdapter { - schema: projected_table_schema, - }) - } - } - - #[derive(Debug)] - struct NullStatsAdapter { - schema: SchemaRef, - } - - impl SchemaAdapter for NullStatsAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.schema.field(index); - file_schema.fields.find(field.name()).map(|(i, _)| i) - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> Result<(Arc, Vec)> { - let projection = (0..file_schema.fields().len()).collect(); - Ok((Arc::new(NullStatsMapper {}), projection)) - } - } - - #[derive(Debug)] - struct NullStatsMapper; - - impl SchemaMapper for NullStatsMapper { - fn map_batch(&self, batch: RecordBatch) -> Result { - Ok(batch) - } - - fn map_column_statistics( - &self, - stats: &[ColumnStatistics], - ) -> Result> { - Ok(stats - .iter() - .map(|s| { - let mut s = s.clone(); - s.null_count = DUMMY_NULL_COUNT; - s - }) - .collect()) - } - } - - /// Helper function to create a test ListingTable with JSON format and custom schema adapter factory - fn create_test_listing_table_with_json_and_adapter( - ctx: &SessionContext, - collect_stat: bool, - schema_adapter_factory: Arc, - ) -> Result { - let path = "table/file.json"; - register_test_store(ctx, &[(path, 10)]); - - let format = JsonFormat::default(); - let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat); - let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); - let table_path = ListingTableUrl::parse("test:///table/")?; - - let config = ListingTableConfig::new(table_path) - .with_listing_options(opt) - .with_schema(Arc::new(schema)) - .with_schema_adapter_factory(schema_adapter_factory); - - ListingTable::try_new(config) - } } diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index cbfcb718836b0..1846473e109a3 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -112,13 +112,26 @@ mod test { .unwrap() } + // Date32 values for test data (days since 1970-01-01): + // 2025-03-01 = 20148 + // 2025-03-02 = 20149 + // 2025-03-03 = 20150 + // 2025-03-04 = 20151 + const DATE_2025_03_01: i32 = 20148; + const DATE_2025_03_02: i32 = 20149; + const DATE_2025_03_03: i32 = 20150; + const DATE_2025_03_04: i32 = 20151; + /// Helper function to create expected statistics for a partition with Int32 column + /// + /// If `date_range` is provided, includes exact statistics for the partition date column. + /// Partition column statistics are exact because all rows in a partition share the same value. fn create_partition_statistics( num_rows: usize, total_byte_size: usize, min_value: i32, max_value: i32, - include_date_column: bool, + date_range: Option<(i32, i32)>, ) -> Statistics { // Int32 is 4 bytes per row let int32_byte_size = num_rows * 4; @@ -131,16 +144,19 @@ mod test { byte_size: Precision::Exact(int32_byte_size), }]; - if include_date_column { - // The date column is a partition column (from the directory path), - // not stored in the parquet file, so byte_size is Absent + if let Some((min_date, max_date)) = date_range { + // Partition column stats are computed from partition values: + // - null_count = 0 (partition values from paths are never null) + // - min/max are the merged partition values across files in the group + // - byte_size = num_rows * 4 (Date32 is 4 bytes per row) + let date32_byte_size = num_rows * 4; column_stats.push(ColumnStatistics { - null_count: Precision::Absent, - max_value: Precision::Absent, - min_value: Precision::Absent, + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Date32(Some(max_date))), + min_value: Precision::Exact(ScalarValue::Date32(Some(min_date))), sum_value: Precision::Absent, distinct_count: Precision::Absent, - byte_size: Precision::Absent, + byte_size: Precision::Exact(date32_byte_size), }); } @@ -220,10 +236,22 @@ mod test { let statistics = (0..scan.output_partitioning().partition_count()) .map(|idx| scan.partition_statistics(Some(idx))) .collect::>>()?; - let expected_statistic_partition_1 = - create_partition_statistics(2, 16, 3, 4, true); - let expected_statistic_partition_2 = - create_partition_statistics(2, 16, 1, 2, true); + // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] + let expected_statistic_partition_1 = create_partition_statistics( + 2, + 16, + 3, + 4, + Some((DATE_2025_03_01, DATE_2025_03_02)), + ); + // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04] + let expected_statistic_partition_2 = create_partition_statistics( + 2, + 16, + 1, + 2, + Some((DATE_2025_03_03, DATE_2025_03_04)), + ); // Check the statistics of each partition assert_eq!(statistics.len(), 2); assert_eq!(statistics[0], expected_statistic_partition_1); @@ -252,10 +280,11 @@ mod test { let statistics = (0..projection.output_partitioning().partition_count()) .map(|idx| projection.partition_statistics(Some(idx))) .collect::>>()?; + // Projection only includes id column, not the date partition column let expected_statistic_partition_1 = - create_partition_statistics(2, 8, 3, 4, false); + create_partition_statistics(2, 8, 3, 4, None); let expected_statistic_partition_2 = - create_partition_statistics(2, 8, 1, 2, false); + create_partition_statistics(2, 8, 1, 2, None); // Check the statistics of each partition assert_eq!(statistics.len(), 2); assert_eq!(statistics[0], expected_statistic_partition_1); @@ -283,7 +312,14 @@ mod test { let statistics = (0..sort_exec.output_partitioning().partition_count()) .map(|idx| sort_exec.partition_statistics(Some(idx))) .collect::>>()?; - let expected_statistic_partition = create_partition_statistics(4, 32, 1, 4, true); + // All 4 files merged: ids [1-4], dates [2025-03-01, 2025-03-04] + let expected_statistic_partition = create_partition_statistics( + 4, + 32, + 1, + 4, + Some((DATE_2025_03_01, DATE_2025_03_04)), + ); assert_eq!(statistics.len(), 1); assert_eq!(statistics[0], expected_statistic_partition); // Check the statistics_by_partition with real results @@ -296,10 +332,22 @@ mod test { let sort_exec: Arc = Arc::new( SortExec::new(ordering.into(), scan_2).with_preserve_partitioning(true), ); - let expected_statistic_partition_1 = - create_partition_statistics(2, 16, 3, 4, true); - let expected_statistic_partition_2 = - create_partition_statistics(2, 16, 1, 2, true); + // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] + let expected_statistic_partition_1 = create_partition_statistics( + 2, + 16, + 3, + 4, + Some((DATE_2025_03_01, DATE_2025_03_02)), + ); + // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04] + let expected_statistic_partition_2 = create_partition_statistics( + 2, + 16, + 1, + 2, + Some((DATE_2025_03_03, DATE_2025_03_04)), + ); let statistics = (0..sort_exec.output_partitioning().partition_count()) .map(|idx| sort_exec.partition_statistics(Some(idx))) .collect::>>()?; @@ -349,7 +397,7 @@ mod test { min_value: Precision::Exact(ScalarValue::Null), sum_value: Precision::Exact(ScalarValue::Null), distinct_count: Precision::Exact(0), - byte_size: Precision::Absent, + byte_size: Precision::Exact(16), // 4 rows * 4 bytes (Date32) }, ], }; @@ -378,7 +426,7 @@ mod test { min_value: Precision::Exact(ScalarValue::Null), sum_value: Precision::Exact(ScalarValue::Null), distinct_count: Precision::Exact(0), - byte_size: Precision::Absent, + byte_size: Precision::Exact(8), // 2 rows * 4 bytes (Date32) }, ], }; @@ -397,10 +445,22 @@ mod test { .collect::>>()?; // Check that we have 4 partitions (2 from each scan) assert_eq!(statistics.len(), 4); - let expected_statistic_partition_1 = - create_partition_statistics(2, 16, 3, 4, true); - let expected_statistic_partition_2 = - create_partition_statistics(2, 16, 1, 2, true); + // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] + let expected_statistic_partition_1 = create_partition_statistics( + 2, + 16, + 3, + 4, + Some((DATE_2025_03_01, DATE_2025_03_02)), + ); + // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04] + let expected_statistic_partition_2 = create_partition_statistics( + 2, + 16, + 1, + 2, + Some((DATE_2025_03_03, DATE_2025_03_04)), + ); // Verify first partition (from first scan) assert_eq!(statistics[0], expected_statistic_partition_1); // Verify second partition (from first scan) @@ -494,11 +554,13 @@ mod test { .collect::>>()?; // Check that we have 2 partitions assert_eq!(statistics.len(), 2); + // Cross join output schema: [left.id, left.date, right.id] // Cross join doesn't propagate Column's byte_size let expected_statistic_partition_1 = Statistics { num_rows: Precision::Exact(8), total_byte_size: Precision::Exact(512), column_statistics: vec![ + // column 0: left.id (Int32, file column from t1) ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Int32(Some(4))), @@ -507,14 +569,17 @@ mod test { distinct_count: Precision::Absent, byte_size: Precision::Absent, }, + // column 1: left.date (Date32, partition column from t1) + // Partition column statistics are exact because all rows in a partition share the same value. ColumnStatistics { - null_count: Precision::Absent, - max_value: Precision::Absent, - min_value: Precision::Absent, + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Date32(Some(20151))), + min_value: Precision::Exact(ScalarValue::Date32(Some(20148))), sum_value: Precision::Absent, distinct_count: Precision::Absent, byte_size: Precision::Absent, }, + // column 2: right.id (Int32, file column from t2) - right partition 0: ids [3,4] ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Int32(Some(4))), @@ -529,6 +594,7 @@ mod test { num_rows: Precision::Exact(8), total_byte_size: Precision::Exact(512), column_statistics: vec![ + // column 0: left.id (Int32, file column from t1) ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Int32(Some(4))), @@ -537,14 +603,17 @@ mod test { distinct_count: Precision::Absent, byte_size: Precision::Absent, }, + // column 1: left.date (Date32, partition column from t1) + // Partition column statistics are exact because all rows in a partition share the same value. ColumnStatistics { - null_count: Precision::Absent, - max_value: Precision::Absent, - min_value: Precision::Absent, + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Date32(Some(20151))), + min_value: Precision::Exact(ScalarValue::Date32(Some(20148))), sum_value: Precision::Absent, distinct_count: Precision::Absent, byte_size: Precision::Absent, }, + // column 2: right.id (Int32, file column from t2) - right partition 1: ids [1,2] ColumnStatistics { null_count: Precision::Exact(0), max_value: Precision::Exact(ScalarValue::Int32(Some(2))), @@ -572,10 +641,22 @@ mod test { let scan = create_scan_exec_with_statistics(None, Some(2)).await; let coalesce_batches: Arc = Arc::new(CoalesceBatchesExec::new(scan, 2)); - let expected_statistic_partition_1 = - create_partition_statistics(2, 16, 3, 4, true); - let expected_statistic_partition_2 = - create_partition_statistics(2, 16, 1, 2, true); + // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] + let expected_statistic_partition_1 = create_partition_statistics( + 2, + 16, + 3, + 4, + Some((DATE_2025_03_01, DATE_2025_03_02)), + ); + // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04] + let expected_statistic_partition_2 = create_partition_statistics( + 2, + 16, + 1, + 2, + Some((DATE_2025_03_03, DATE_2025_03_04)), + ); let statistics = (0..coalesce_batches.output_partitioning().partition_count()) .map(|idx| coalesce_batches.partition_statistics(Some(idx))) .collect::>>()?; @@ -597,7 +678,14 @@ mod test { let scan = create_scan_exec_with_statistics(None, Some(2)).await; let coalesce_partitions: Arc = Arc::new(CoalescePartitionsExec::new(scan)); - let expected_statistic_partition = create_partition_statistics(4, 32, 1, 4, true); + // All files merged: ids [1-4], dates [2025-03-01, 2025-03-04] + let expected_statistic_partition = create_partition_statistics( + 4, + 32, + 1, + 4, + Some((DATE_2025_03_01, DATE_2025_03_04)), + ); let statistics = (0..coalesce_partitions.output_partitioning().partition_count()) .map(|idx| coalesce_partitions.partition_statistics(Some(idx))) .collect::>>()?; @@ -646,7 +734,14 @@ mod test { .map(|idx| global_limit.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 1); - let expected_statistic_partition = create_partition_statistics(2, 16, 3, 4, true); + // GlobalLimit takes from first partition: ids [3,4], dates [2025-03-01, 2025-03-02] + let expected_statistic_partition = create_partition_statistics( + 2, + 16, + 3, + 4, + Some((DATE_2025_03_01, DATE_2025_03_02)), + ); assert_eq!(statistics[0], expected_statistic_partition); Ok(()) } diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 05cc723ef05fb..3ee4e37589c15 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -31,7 +31,6 @@ use datafusion::{ listing::{ListingOptions, ListingTable, ListingTableConfig}, }, error::Result, - physical_plan::ColumnStatistics, prelude::SessionContext, test_util::{self, arrow_test_data, parquet_test_data}, }; @@ -464,10 +463,19 @@ async fn parquet_statistics() -> Result<()> { assert_eq!(stat_cols.len(), 4); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(3)); - // TODO assert partition column (1,2,3) stats once implemented (#1186) - assert_eq!(stat_cols[1], ColumnStatistics::new_unknown(),); - assert_eq!(stat_cols[2], ColumnStatistics::new_unknown(),); - assert_eq!(stat_cols[3], ColumnStatistics::new_unknown(),); + // Partition column statistics (year=2021 for all 3 rows) + assert_eq!(stat_cols[1].null_count, Precision::Exact(0)); + assert_eq!( + stat_cols[1].min_value, + Precision::Exact(ScalarValue::Int32(Some(2021))) + ); + assert_eq!( + stat_cols[1].max_value, + Precision::Exact(ScalarValue::Int32(Some(2021))) + ); + // month and day are Utf8 partition columns with statistics + assert_eq!(stat_cols[2].null_count, Precision::Exact(0)); + assert_eq!(stat_cols[3].null_count, Precision::Exact(0)); //// WITH PROJECTION //// let dataframe = ctx.sql("SELECT mycol, day FROM t WHERE day='28'").await?; @@ -479,8 +487,16 @@ async fn parquet_statistics() -> Result<()> { assert_eq!(stat_cols.len(), 2); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(1)); - // TODO assert partition column stats once implemented (#1186) - assert_eq!(stat_cols[1], ColumnStatistics::new_unknown()); + // Partition column statistics for day='28' (1 row) + assert_eq!(stat_cols[1].null_count, Precision::Exact(0)); + assert_eq!( + stat_cols[1].min_value, + Precision::Exact(ScalarValue::Utf8(Some("28".to_string()))) + ); + assert_eq!( + stat_cols[1].max_value, + Precision::Exact(ScalarValue::Utf8(Some("28".to_string()))) + ); Ok(()) } diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index d5fde15ff9e5a..28a403ab92ad8 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -364,11 +364,27 @@ impl FileGroupPartitioner { /// Represents a group of partitioned files that'll be processed by a single thread. /// Maintains optional statistics across all files in the group. +/// +/// # Statistics +/// +/// The group-level [`FileGroup::file_statistics`] field contains merged statistics from all files +/// in the group for the **full table schema** (file columns + partition columns). +/// +/// Partition column statistics are derived from the individual file partition values: +/// - `min` = minimum partition value across all files in the group +/// - `max` = maximum partition value across all files in the group +/// - `null_count` = 0 (partition values are never null) +/// +/// This allows query optimizers to prune entire file groups based on partition bounds. #[derive(Debug, Clone)] pub struct FileGroup { /// The files in this group files: Vec, - /// Optional statistics for the data across all files in the group + /// Optional statistics for the data across all files in the group. + /// + /// These statistics cover the full table schema: file columns plus partition columns. + /// Partition column statistics are merged from individual [`PartitionedFile::statistics`], + /// which compute exact values from [`PartitionedFile::partition_values`]. statistics: Option>, } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c2415b3d597c9..ad89406014a0b 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -730,6 +730,7 @@ impl DataSource for FileScanConfig { fn partition_statistics(&self, partition: Option) -> Result { if let Some(partition) = partition { // Get statistics for a specific partition + // Note: FileGroup statistics include partition columns (computed from partition_values) if let Some(file_group) = self.file_groups.get(partition) && let Some(stat) = file_group.file_statistics(None) { diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 1e71825b99e87..347e783c278d0 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -95,6 +95,19 @@ impl FileRange { #[derive(Debug, Clone)] /// A single file or part of a file that should be read, along with its schema, statistics /// and partition column values that need to be appended to each row. +/// +/// # Statistics +/// +/// The [`Self::statistics`] field contains statistics for the **full table schema**, +/// which includes both file columns and partition columns. When statistics are set via +/// [`Self::with_statistics`], exact statistics for partition columns are automatically +/// computed from [`Self::partition_values`]: +/// +/// - `min = max = partition_value` (all rows in a file share the same partition value) +/// - `null_count = 0` (partition values extracted from paths are never null) +/// - `distinct_count = 1` (single distinct value per file for each partition column) +/// +/// This enables query optimizers to use partition column bounds for pruning and planning. pub struct PartitionedFile { /// Path for the file (e.g. URL, filesystem path, etc) pub object_meta: ObjectMeta, @@ -115,6 +128,10 @@ pub struct PartitionedFile { /// /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. + /// + /// These statistics cover the full table schema: file columns plus partition columns. + /// When set via [`Self::with_statistics`], partition column statistics are automatically + /// computed from [`Self::partition_values`] with exact min/max/null_count/distinct_count. pub statistics: Option>, /// An optional field for user defined per object metadata pub extensions: Option>, @@ -214,9 +231,38 @@ impl PartitionedFile { self } - // Update the statistics for this file. - pub fn with_statistics(mut self, statistics: Arc) -> Self { - self.statistics = Some(statistics); + /// Update the statistics for this file. + /// + /// The provided `statistics` should cover only the file schema columns. + /// This method will automatically append exact statistics for partition columns + /// based on `partition_values`: + /// - `min = max = partition_value` (all rows have the same value) + /// - `null_count = 0` (partition values from paths are never null) + /// - `distinct_count = 1` (all rows have the same partition value) + pub fn with_statistics(mut self, file_statistics: Arc) -> Self { + if self.partition_values.is_empty() { + // No partition columns, use stats as-is + self.statistics = Some(file_statistics); + } else { + // Extend stats with exact partition column statistics + let mut stats = Arc::unwrap_or_clone(file_statistics); + for partition_value in &self.partition_values { + let col_stats = ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(partition_value.clone()), + min_value: Precision::Exact(partition_value.clone()), + distinct_count: Precision::Exact(1), + sum_value: Precision::Absent, + byte_size: partition_value + .data_type() + .primitive_width() + .map(|w| stats.num_rows.multiply(&Precision::Exact(w))) + .unwrap_or_else(|| Precision::Absent), + }; + stats.column_statistics.push(col_stats); + } + self.statistics = Some(Arc::new(stats)); + } self } @@ -561,6 +607,70 @@ mod tests { sut.get_store(url.as_ref()).unwrap(); } + #[test] + fn test_with_statistics_appends_partition_column_stats() { + use crate::PartitionedFile; + use datafusion_common::stats::Precision; + use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; + + // Create a PartitionedFile with partition values + let mut pf = PartitionedFile::new( + "test.parquet", + 100, // file size + ); + pf.partition_values = vec![ + ScalarValue::Date32(Some(20148)), // 2025-03-01 + ]; + + // Create file-only statistics (1 column for 'id') + let file_stats = Arc::new(Statistics { + num_rows: Precision::Exact(2), + total_byte_size: Precision::Exact(16), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(3))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Absent, + }], + }); + + // Call with_statistics - should append partition column stats + let pf = pf.with_statistics(file_stats); + + // Verify the statistics now have 2 columns + let stats = pf.statistics.unwrap(); + assert_eq!( + stats.column_statistics.len(), + 2, + "Expected 2 columns (id + date partition)" + ); + + // Verify partition column statistics + let partition_col_stats = &stats.column_statistics[1]; + assert_eq!( + partition_col_stats.null_count, + Precision::Exact(0), + "Partition column null_count should be Exact(0)" + ); + assert_eq!( + partition_col_stats.min_value, + Precision::Exact(ScalarValue::Date32(Some(20148))), + "Partition column min should match partition value" + ); + assert_eq!( + partition_col_stats.max_value, + Precision::Exact(ScalarValue::Date32(Some(20148))), + "Partition column max should match partition value" + ); + assert_eq!( + partition_col_stats.distinct_count, + Precision::Exact(1), + "Partition column distinct_count should be Exact(1)" + ); + } + #[test] fn test_url_contains() { let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();