From 038b643b04047e39bb117469b7ef43ab59676c8c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 Feb 2023 14:37:28 -0500 Subject: [PATCH 1/4] Make the page index tests clearer about what they are doing --- .../src/datasource/file_format/parquet.rs | 77 +++++++++++-------- .../src/physical_plan/file_format/parquet.rs | 26 +++++-- 2 files changed, 63 insertions(+), 40 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 809be3a7d9680..9799a5ccddb9a 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -550,50 +550,63 @@ pub(crate) mod test_util { use parquet::file::properties::WriterProperties; use tempfile::NamedTempFile; + /// How many rows per page should be written + const ROWS_PER_PAGE: usize = 2; + /// Writes `batches` to a temporary parquet file /// - /// If multi_page is set to `true`, all batches are written into - /// one temporary parquet file and the parquet file is written + /// If multi_page is set to `true`, the parquet file(s) are written /// with 2 rows per data page (used to test page filtering and /// boundaries). pub async fn store_parquet( batches: Vec, multi_page: bool, ) -> Result<(Vec, Vec)> { - if multi_page { - // All batches write in to one file, each batch must have same schema. - let mut output = NamedTempFile::new().expect("creating temp file"); - let mut builder = WriterProperties::builder(); - builder = builder.set_data_page_row_count_limit(2); - let proper = builder.build(); - let mut writer = - ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper)) - .expect("creating writer"); - for b in batches { - writer.write(&b).expect("Writing batch"); - } - writer.close().unwrap(); - Ok((vec![local_unpartitioned_file(&output)], vec![output])) - } else { - // Each batch writes to their own file - let files: Vec<_> = batches - .into_iter() - .map(|batch| { - let mut output = NamedTempFile::new().expect("creating temp file"); + // Each batch writes to their own file + let files: Vec<_> = batches + .into_iter() + .map(|batch| { + let mut output = NamedTempFile::new().expect("creating temp file"); + + let builder = WriterProperties::builder(); + let props = if multi_page { + builder.set_data_page_row_count_limit(2) + } else { + builder + } + .build(); - let props = WriterProperties::builder().build(); - let mut writer = - ArrowWriter::try_new(&mut output, batch.schema(), Some(props)) - .expect("creating writer"); + let mut writer = + ArrowWriter::try_new(&mut output, batch.schema(), Some(props)) + .expect("creating writer"); + if multi_page { + // write in smaller batches as the parquet writer + // only checks datapage size limits on the boundaries of each batch + write_in_chunks(&mut writer, &batch, ROWS_PER_PAGE); + } else { writer.write(&batch).expect("Writing batch"); - writer.close().unwrap(); - output - }) - .collect(); + }; + writer.close().unwrap(); + output + }) + .collect(); - let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); - Ok((meta, files)) + let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); + Ok((meta, files)) + } + + //// write batches chunk_size rows at a time + fn write_in_chunks( + writer: &mut ArrowWriter, + batch: &RecordBatch, + chunk_size: usize, + ) { + let mut i = 0; + while i < batch.num_rows() { + let num = chunk_size.min(batch.num_rows() - i); + writer.write(&batch.slice(i, num)).unwrap(); + i += num; } } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 5784fd9e1484f..83bcc9bc0c091 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -836,8 +836,7 @@ mod tests { /// round-trip record batches by writing each individual RecordBatch to /// a parquet file and then reading that parquet file with the specified - /// options. If page_index_predicate is set to `true`, all RecordBatches - /// are written into a parquet file instead. + /// options. #[derive(Debug, Default)] struct RoundTrip { projection: Option>, @@ -1635,27 +1634,38 @@ mod tests { #[tokio::test] async fn parquet_page_index_exec_metrics() { - let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)])); - let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)])); + let c1: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(2), + Some(3), + Some(4), + Some(5), + ])); let batch1 = create_batch(vec![("int", c1.clone())]); - let batch2 = create_batch(vec![("int", c2.clone())]); let filter = col("int").eq(lit(4_i32)); let rt = RoundTrip::new() .with_predicate(filter) .with_page_index_predicate() - .round_trip(vec![batch1, batch2]) + .round_trip(vec![batch1]) .await; let metrics = rt.parquet_exec.metrics().unwrap(); // assert the batches and some metrics + #[rustfmt::skip] let expected = vec![ - "+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+", + "+-----+", + "| int |", + "+-----+", + "| 4 |", + "| 5 |", + "+-----+", ]; assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); - assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3); + assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4); assert!( get_value(&metrics, "page_index_eval_time") > 0, "no eval time in metrics: {metrics:#?}" From 0ae899e5eefbc95bdd8320ad2955575b9fc4e9d6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 Feb 2023 15:35:06 -0500 Subject: [PATCH 2/4] Support page skipping / page_index pushdown for evolved schemas --- .../core/src/physical_optimizer/pruning.rs | 25 ++---- .../src/physical_plan/file_format/parquet.rs | 81 +++++++++++++++++ .../file_format/parquet/page_filter.rs | 89 +++++++++++++++---- 3 files changed, 163 insertions(+), 32 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 1931105cd252f..f3d36301792e7 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -29,7 +29,7 @@ //! other source (e.g. a catalog) use std::convert::TryFrom; -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; use crate::execution::context::ExecutionProps; use crate::prelude::lit; @@ -233,25 +233,18 @@ impl PruningPredicate { .unwrap_or_default() } - /// Returns all need column indexes to evaluate this pruning predicate - pub(crate) fn need_input_columns_ids(&self) -> HashSet { - let mut set = HashSet::new(); - self.required_columns.columns.iter().for_each(|x| { - match self.schema().column_with_name(x.0.name.as_str()) { - None => {} - Some(y) => { - set.insert(y.0); - } - } - }); - set + pub(crate) fn required_columns(&self) -> &RequiredStatColumns { + &self.required_columns } } +/// Records for which columns statistics are necessary to evaluate a +/// pruning predicate. +/// /// Handles creating references to the min/max statistics /// for columns as well as recording which statistics are needed #[derive(Debug, Default, Clone)] -struct RequiredStatColumns { +pub(crate) struct RequiredStatColumns { /// The statistics required to evaluate this predicate: /// * The unqualified column in the input schema /// * Statistics type (e.g. Min or Max or Null_Count) @@ -267,7 +260,7 @@ impl RequiredStatColumns { /// Returns an iterator over items in columns (see doc on /// `self.columns` for details) - fn iter(&self) -> impl Iterator { + pub(crate) fn iter(&self) -> impl Iterator { self.columns.iter() } @@ -852,7 +845,7 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result< } #[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum StatisticsType { +pub(crate) enum StatisticsType { Min, Max, NullCount, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 83bcc9bc0c091..2a5adbd12b0b4 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1330,6 +1330,55 @@ mod tests { assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5); } + #[tokio::test] + async fn evolved_schema_disjoint_schema_with_page_index_pushdown() { + let c1: ArrayRef = Arc::new(StringArray::from(vec![ + // Page 1 (can't prune as c2 is null) + Some("Foo"), + ])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![ + // Page 1: + Some(1), + Some(2), + // Page 2: (pruned) + Some(3), + Some(4), + // Page 3: (pruned) + Some(5), + None, + ])); + + // batch1: c1(string) + let batch1 = create_batch(vec![("c1", c1)]); + + // batch2: c2(int64) + let batch2 = create_batch(vec![("c2", c2)]); + + let filter = col("c2").eq(lit(1_i64)); + + // read/write them files: + let rt = RoundTrip::new() + .with_predicate(filter) + .with_page_index_predicate() + .round_trip(vec![batch1, batch2]) + .await; + + let expected = vec![ + "+-----+----+", + "| c1 | c2 |", + "+-----+----+", + "| | 1 |", + "| | 2 |", + "| Foo | |", + "+-----+----+", + ]; + assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); + let metrics = rt.parquet_exec.metrics().unwrap(); + // Note there are were 4 rows pruned in total (across two pages) + assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4); + } + #[tokio::test] async fn multi_column_predicate_pushdown() { let c1: ArrayRef = @@ -1361,6 +1410,38 @@ mod tests { assert_batches_sorted_eq!(expected, &read); } + #[tokio::test] + async fn multi_column_predicate_pushdown_page_index_pushdown() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]); + + // Columns in different order to schema + let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar"))); + + // read/write them files: + let read = RoundTrip::new() + .with_predicate(filter) + .with_page_index_predicate() + .round_trip_to_batches(vec![batch1]) + .await + .unwrap(); + + let expected = vec![ + "+-----+----+", + "| c1 | c2 |", + "+-----+----+", + "| | 2 |", + "| Foo | 1 |", + "| bar | |", + "+-----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + #[tokio::test] async fn evolved_schema_incompatible_types() { let c1: ArrayRef = diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 3aaad0078b495..ebe59db9e7133 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -139,6 +139,10 @@ impl PagePruningPredicate { let page_index_predicates = &self.predicates; let groups = file_metadata.row_groups(); + if groups.is_empty() { + return Ok(None); + } + let file_offset_indexes = file_metadata.offset_indexes(); let file_page_indexes = file_metadata.page_indexes(); let (file_offset_indexes, file_page_indexes) = @@ -155,30 +159,25 @@ impl PagePruningPredicate { let mut row_selections = Vec::with_capacity(page_index_predicates.len()); for predicate in page_index_predicates { - // `extract_page_index_push_down_predicates` only return predicate with one col. - // when building `PruningPredicate`, some single column filter like `abs(i) = 1` - // will be rewrite to `lit(true)`, so may have an empty required_columns. - let col_id = - if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() { - col_id - } else { - continue; - }; + // find column index by looking in the row group metadata. + let col_idx = find_column_index(predicate, &groups[0]); let mut selectors = Vec::with_capacity(row_groups.len()); for r in row_groups.iter() { + let row_group_metadata = &groups[*r]; + let rg_offset_indexes = file_offset_indexes.get(*r); let rg_page_indexes = file_page_indexes.get(*r); - if let (Some(rg_page_indexes), Some(rg_offset_indexes)) = - (rg_page_indexes, rg_offset_indexes) + if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = + (rg_page_indexes, rg_offset_indexes, col_idx) { selectors.extend( prune_pages_in_one_row_group( - &groups[*r], + row_group_metadata, predicate, - rg_offset_indexes.get(col_id), - rg_page_indexes.get(col_id), - groups[*r].column(col_id).column_descr(), + rg_offset_indexes.get(col_idx), + rg_page_indexes.get(col_idx), + groups[*r].column(col_idx).column_descr(), file_metrics, ) .map_err(|e| { @@ -190,7 +189,7 @@ impl PagePruningPredicate { } else { trace!( "Did not have enough metadata to prune with page indexes, \ - falling back, falling back to all rows", + falling back to all rows", ); // fallback select all rows let all_selected = @@ -223,6 +222,64 @@ impl PagePruningPredicate { } } +/// Returns the column index in the row group metadata for the single +/// column of a single column pruning predicate. +/// +/// For example, give the predicate `y > 5` +/// +/// And columns in the RowGroupMetadata like `['x', 'y', 'z']` will +/// return 1. +/// +/// Returns `None` if the column is not found, or if there are no +/// required columns, which is the case for predicate like `abs(i) = +/// 1` which are rewritten to `lit(true)` +/// +/// Panics: +/// +/// If the predicate contains more than one column reference (assumes +/// that `extract_page_index_push_down_predicates` only return +/// predicate with one col) +/// +fn find_column_index( + predicate: &PruningPredicate, + row_group_metadata: &RowGroupMetaData, +) -> Option { + let mut found_required_column: Option<&Column> = None; + + for required_column_details in predicate.required_columns().iter() { + let column = &required_column_details.0; + if let Some(found_required_column) = found_required_column.as_ref() { + // make sure it is the same name we have seen previously + assert_eq!( + column.name, found_required_column.name, + "Unexpected multi column predicate" + ); + } else { + found_required_column = Some(column); + } + } + + let column = if let Some(found_required_column) = found_required_column.as_ref() { + found_required_column + } else { + trace!("No column references in pruning predicate"); + return None; + }; + + let col_idx = row_group_metadata + .columns() + .iter() + .enumerate() + .find(|(_idx, c)| c.column_descr().name() == column.name) + .map(|(idx, _c)| idx); + + if col_idx.is_none() { + trace!("Can not find column {} in row group meta", column.name); + } + + col_idx +} + /// Intersects the [`RowSelector`]s /// /// For exampe, given: From e3abcb56310ad3ac3b77f271f8d4317dfd674a53 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 Feb 2023 16:11:59 -0500 Subject: [PATCH 3/4] upate test --- .../src/physical_plan/file_format/parquet.rs | 51 ++++++++++++++----- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 2a5adbd12b0b4..5bb03b4f42bba 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1333,8 +1333,15 @@ mod tests { #[tokio::test] async fn evolved_schema_disjoint_schema_with_page_index_pushdown() { let c1: ArrayRef = Arc::new(StringArray::from(vec![ - // Page 1 (can't prune as c2 is null) + // Page 1 Some("Foo"), + Some("Bar"), + // Page 2 + Some("Foo2"), + Some("Bar2"), + // Page 3 + Some("Foo3"), + Some("Bar3"), ])); let c2: ArrayRef = Arc::new(Int64Array::from(vec![ @@ -1350,10 +1357,16 @@ mod tests { ])); // batch1: c1(string) - let batch1 = create_batch(vec![("c1", c1)]); + let batch1 = create_batch(vec![("c1", c1.clone())]); // batch2: c2(int64) - let batch2 = create_batch(vec![("c2", c2)]); + let batch2 = create_batch(vec![("c2", c2.clone())]); + + // batch3 (has c2, c1) -- both columns, should still prune + let batch3 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]); + + // batch4 (has c2, c1) -- different column order, should still prune + let batch4 = create_batch(vec![("c2", c2), ("c1", c1)]); let filter = col("c2").eq(lit(1_i64)); @@ -1361,22 +1374,34 @@ mod tests { let rt = RoundTrip::new() .with_predicate(filter) .with_page_index_predicate() - .round_trip(vec![batch1, batch2]) + .round_trip(vec![batch1, batch2, batch3, batch4]) .await; let expected = vec![ - "+-----+----+", - "| c1 | c2 |", - "+-----+----+", - "| | 1 |", - "| | 2 |", - "| Foo | |", - "+-----+----+", + "+------+----+", + "| c1 | c2 |", + "+------+----+", + "| | 1 |", + "| | 2 |", + "| Bar | |", + "| Bar | 2 |", + "| Bar | 2 |", + "| Bar2 | |", + "| Bar3 | |", + "| Foo | |", + "| Foo | 1 |", + "| Foo | 1 |", + "| Foo2 | |", + "| Foo3 | |", + "+------+----+", ]; assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); let metrics = rt.parquet_exec.metrics().unwrap(); - // Note there are were 4 rows pruned in total (across two pages) - assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4); + + // There are 4 rows pruned in each of batch2, batch3, and + // batch4 for a total of 12. batch1 had no pruning as c2 was + // filled in as null + assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 12); } #[tokio::test] From d154b9e318f2b5d9ca7500c0708e11fd30faeb0b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 14 Feb 2023 08:49:17 -0500 Subject: [PATCH 4/4] Update datafusion/core/src/datasource/file_format/parquet.rs Co-authored-by: Yang Jiang --- datafusion/core/src/datasource/file_format/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 9799a5ccddb9a..0a7a7cadc90a4 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -570,7 +570,7 @@ pub(crate) mod test_util { let builder = WriterProperties::builder(); let props = if multi_page { - builder.set_data_page_row_count_limit(2) + builder.set_data_page_row_count_limit(ROWS_PER_PAGE) } else { builder }