diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1713377f8d4de..10199db1a1de3 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -621,7 +621,10 @@ config_namespace! { /// bytes of the parquet file optimistically. If not specified, two reads are required: /// One read to fetch the 8-byte parquet footer and /// another to fetch the metadata length encoded in the footer - pub metadata_size_hint: Option, default = None + /// Default setting to 512 KiB, which should be sufficient for most parquet files, + /// it can reduce one I/O operation per parquet file. If the metadata is larger than + /// the hint, two reads will still be performed. + pub metadata_size_hint: Option, default = Some(512 * 1024) /// (reading) If true, filter expressions are be applied during the parquet decoding operation to /// reduce the number of rows decoded. This optimization is sometimes called "late materialization". diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 8c1bb02ef0737..e78c5f09553cc 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -269,6 +269,8 @@ pub struct ParquetReadOptions<'a> { pub file_sort_order: Vec>, /// Properties for decryption of Parquet files that use modular encryption pub file_decryption_properties: Option, + /// Metadata size hint for Parquet files reading (in bytes) + pub metadata_size_hint: Option, } impl Default for ParquetReadOptions<'_> { @@ -281,6 +283,7 @@ impl Default for ParquetReadOptions<'_> { schema: None, file_sort_order: vec![], file_decryption_properties: None, + metadata_size_hint: None, } } } @@ -340,6 +343,12 @@ impl<'a> ParquetReadOptions<'a> { self.file_decryption_properties = Some(file_decryption_properties); self } + + /// Configure metadata size hint for Parquet files reading (in bytes) + pub fn metadata_size_hint(mut self, size_hint: Option) -> Self { + self.metadata_size_hint = size_hint; + self + } } /// Options that control the reading of ARROW files. @@ -606,6 +615,11 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { if let Some(file_decryption_properties) = &self.file_decryption_properties { options.crypto.file_decryption = Some(file_decryption_properties.clone()); } + // This can be overridden per-read in ParquetReadOptions, if setting. + if let Some(metadata_size_hint) = self.metadata_size_hint { + options.global.metadata_size_hint = Some(metadata_size_hint); + } + let mut file_format = ParquetFormat::new().with_options(options); if let Some(parquet_pruning) = self.parquet_pruning { diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 1781ea569d905..52c5393e10319 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -546,7 +546,8 @@ mod tests { let (files, _file_names) = store_parquet(vec![batch1], false).await?; let state = SessionContext::new().state(); - let format = ParquetFormat::default(); + // Make metadata size hint None to keep original behavior + let format = ParquetFormat::default().with_metadata_size_hint(None); let _schema = format.infer_schema(&state, &store.upcast(), &files).await?; assert_eq!(store.request_count(), 3); // No increase, cache being used. @@ -620,7 +621,9 @@ mod tests { let mut state = SessionContext::new().state(); state = set_view_state(state, force_views); - let format = ParquetFormat::default().with_force_view_types(force_views); + let format = ParquetFormat::default() + .with_force_view_types(force_views) + .with_metadata_size_hint(None); let schema = format.infer_schema(&state, &store.upcast(), &files).await?; assert_eq!(store.request_count(), 6); diff --git a/datafusion/core/tests/datasource/object_store_access.rs b/datafusion/core/tests/datasource/object_store_access.rs index 6b9585f408a17..d1592c21472d3 100644 --- a/datafusion/core/tests/datasource/object_store_access.rs +++ b/datafusion/core/tests/datasource/object_store_access.rs @@ -27,7 +27,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch}; use async_trait::async_trait; use bytes::Bytes; -use datafusion::prelude::{CsvReadOptions, SessionContext}; +use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; use futures::stream::BoxStream; use insta::assert_snapshot; use object_store::memory::InMemory; @@ -45,8 +45,9 @@ use url::Url; #[tokio::test] async fn create_single_csv_file() { + let test = Test::new().with_single_file_csv().await; assert_snapshot!( - single_file_csv_test().await.requests(), + test.requests(), @r" RequestCountingObjectStore() Total Requests: 2 @@ -58,8 +59,9 @@ async fn create_single_csv_file() { #[tokio::test] async fn query_single_csv_file() { + let test = Test::new().with_single_file_csv().await; assert_snapshot!( - single_file_csv_test().await.query("select * from csv_table").await, + test.query("select * from csv_table").await, @r" ------- Query Output (2 rows) ------- +---------+-------+-------+ @@ -79,8 +81,9 @@ async fn query_single_csv_file() { #[tokio::test] async fn create_multi_file_csv_file() { + let test = Test::new().with_multi_file_csv().await; assert_snapshot!( - multi_file_csv_test().await.requests(), + test.requests(), @r" RequestCountingObjectStore() Total Requests: 4 @@ -94,8 +97,9 @@ async fn create_multi_file_csv_file() { #[tokio::test] async fn query_multi_csv_file() { + let test = Test::new().with_multi_file_csv().await; assert_snapshot!( - multi_file_csv_test().await.query("select * from csv_table").await, + test.query("select * from csv_table").await, @r" ------- Query Output (6 rows) ------- +---------+-------+-------+ @@ -120,24 +124,132 @@ async fn query_multi_csv_file() { } #[tokio::test] -async fn create_single_parquet_file() { +async fn create_single_parquet_file_default() { + // The default metadata size hint is 512KB + // which is enough to fetch the entire footer metadata and PageIndex + // in a single GET request. + let test = Test::new().with_single_file_parquet().await; + // expect 1 get request which reads the footer metadata and page index assert_snapshot!( - single_file_parquet_test().await.requests(), + test.requests(), + @r" + RequestCountingObjectStore() + Total Requests: 2 + - HEAD path=parquet_table.parquet + - GET (range) range=0-2994 path=parquet_table.parquet + " + ); +} + +#[tokio::test] +async fn create_single_parquet_file_prefetch() { + // Explicitly specify a prefetch hint that is adequate for the footer and page index + let test = Test::new() + .with_parquet_metadata_size_hint(Some(1000)) + .with_single_file_parquet() + .await; + // expect 1 1000 byte request which reads the footer metadata and page index + assert_snapshot!( + test.requests(), + @r" + RequestCountingObjectStore() + Total Requests: 2 + - HEAD path=parquet_table.parquet + - GET (range) range=1994-2994 path=parquet_table.parquet + " + ); +} + +#[tokio::test] +async fn create_single_parquet_file_too_small_prefetch() { + // configure a prefetch size that is too small to fetch the footer + // metadata + // + // Using the ranges from the test below (with no_prefetch), + // pick a number less than 730: + // -------- + // 2286-2294: (8 bytes) footer + length + // 2264-2986: (722 bytes) footer metadata + let test = Test::new() + .with_parquet_metadata_size_hint(Some(500)) + .with_single_file_parquet() + .await; + // expect three get requests: + // 1. read the footer (500 bytes per hint, not enough for the footer metadata) + // 2. Read the footer metadata + // 3. reads the PageIndex + assert_snapshot!( + test.requests(), @r" RequestCountingObjectStore() Total Requests: 4 - HEAD path=parquet_table.parquet - - GET (range) range=2986-2994 path=parquet_table.parquet + - GET (range) range=2494-2994 path=parquet_table.parquet - GET (range) range=2264-2986 path=parquet_table.parquet - GET (range) range=2124-2264 path=parquet_table.parquet " ); } +#[tokio::test] +async fn create_single_parquet_file_small_prefetch() { + // configure a prefetch size that is large enough for the footer + // metadata but **not** the PageIndex + // + // Using the ranges from the test below (with no_prefetch), + // the 730 is determined as follows; + // -------- + // 2286-2294: (8 bytes) footer + length + // 2264-2986: (722 bytes) footer metadata + let test = Test::new() + // 740 is enough to get both the footer + length (8 bytes) + // but not the entire PageIndex + .with_parquet_metadata_size_hint(Some(740)) + .with_single_file_parquet() + .await; + // expect two get requests: + // 1. read the footer metadata + // 2. reads the PageIndex + assert_snapshot!( + test.requests(), + @r" + RequestCountingObjectStore() + Total Requests: 3 + - HEAD path=parquet_table.parquet + - GET (range) range=2254-2994 path=parquet_table.parquet + - GET (range) range=2124-2264 path=parquet_table.parquet + " + ); +} + +#[tokio::test] +async fn create_single_parquet_file_no_prefetch() { + let test = Test::new() + // force no prefetch by setting size hint to None + .with_parquet_metadata_size_hint(None) + .with_single_file_parquet() + .await; + // Without a metadata size hint, the parquet reader + // does *three* range requests to read the footer metadata: + // 1. The footer length (last 8 bytes) + // 2. The footer metadata + // 3. The PageIndex metadata + assert_snapshot!( + test.requests(), + @r" + RequestCountingObjectStore() + Total Requests: 2 + - HEAD path=parquet_table.parquet + - GET (range) range=0-2994 path=parquet_table.parquet + " + ); +} + #[tokio::test] async fn query_single_parquet_file() { + let test = Test::new().with_single_file_parquet().await; assert_snapshot!( - single_file_parquet_test().await.query("select count(distinct a), count(b) from parquet_table").await, + test.query("select count(distinct a), count(b) from parquet_table").await, @r" ------- Query Output (1 rows) ------- +---------------------------------+------------------------+ @@ -157,10 +269,11 @@ async fn query_single_parquet_file() { #[tokio::test] async fn query_single_parquet_file_with_single_predicate() { + let test = Test::new().with_single_file_parquet().await; // Note that evaluating predicates requires additional object store requests // (to evaluate predicates) assert_snapshot!( - single_file_parquet_test().await.query("select min(a), max(b) from parquet_table WHERE a > 150").await, + test.query("select min(a), max(b) from parquet_table WHERE a > 150").await, @r" ------- Query Output (1 rows) ------- +----------------------+----------------------+ @@ -179,10 +292,12 @@ async fn query_single_parquet_file_with_single_predicate() { #[tokio::test] async fn query_single_parquet_file_multi_row_groups_multiple_predicates() { + let test = Test::new().with_single_file_parquet().await; + // Note that evaluating predicates requires additional object store requests // (to evaluate predicates) assert_snapshot!( - single_file_parquet_test().await.query("select min(a), max(b) from parquet_table WHERE a > 50 AND b < 1150").await, + test.query("select min(a), max(b) from parquet_table WHERE a > 50 AND b < 1150").await, @r" ------- Query Output (1 rows) ------- +----------------------+----------------------+ @@ -200,75 +315,16 @@ async fn query_single_parquet_file_multi_row_groups_multiple_predicates() { ); } -/// Create a test with a single CSV file with three columns and two rows -async fn single_file_csv_test() -> Test { - // upload CSV data to object store - let csv_data = r#"c1,c2,c3 -0.00001,5e-12,true -0.00002,4e-12,false -"#; - - Test::new() - .with_bytes("/csv_table.csv", csv_data) - .await - .register_csv("csv_table", "/csv_table.csv") - .await -} - -/// Create a test with three CSV files in a directory -async fn multi_file_csv_test() -> Test { - let mut test = Test::new(); - // upload CSV data to object store - for i in 0..3 { - let csv_data1 = format!( - r#"c1,c2,c3 -0.0000{i},{i}e-12,true -0.00003,5e-12,false -"# - ); - test = test - .with_bytes(&format!("/data/file_{i}.csv"), csv_data1) - .await; - } - // register table - test.register_csv("csv_table", "/data/").await -} - -/// Create a test with a single parquet file that has two -/// columns and two row groups -/// -/// Column "a": Int32 with values 0-100] in row group 1 -/// and [101-200] in row group 2 -/// -/// Column "b": Int32 with values 1000-1100] in row group 1 -/// and [1101-1200] in row group 2 -async fn single_file_parquet_test() -> Test { - // Create parquet bytes - let a: ArrayRef = Arc::new(Int32Array::from_iter_values(0..200)); - let b: ArrayRef = Arc::new(Int32Array::from_iter_values(1000..1200)); - let batch = RecordBatch::try_from_iter([("a", a), ("b", b)]).unwrap(); - - let mut buffer = vec![]; - let props = parquet::file::properties::WriterProperties::builder() - .set_max_row_group_size(100) - .build(); - let mut writer = - parquet::arrow::ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)) - .unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - Test::new() - .with_bytes("/parquet_table.parquet", buffer) - .await - .register_parquet("parquet_table", "/parquet_table.parquet") - .await -} - /// Runs tests with a request counting object store struct Test { object_store: Arc, session_context: SessionContext, + /// metadata size hint to use when registering parquet files + /// + /// * `None`: uses the default (does not set a size_hint) + /// * `Some(None)`L: set prefetch hint to None (prefetching) + /// * `Some(Some(size))`: set prefetch hint to size + parquet_metadata_size_hint: Option>, } impl Test { @@ -281,9 +337,16 @@ impl Test { Self { object_store, session_context, + parquet_metadata_size_hint: None, } } + /// Specify the metadata size hint to use when registering parquet files + fn with_parquet_metadata_size_hint(mut self, size_hint: Option) -> Self { + self.parquet_metadata_size_hint = Some(size_hint); + self + } + /// Returns a string representation of all recorded requests thus far fn requests(&self) -> String { format!("{}", self.object_store) @@ -312,16 +375,88 @@ impl Test { self } - /// Register a CSV file at the given path relative to the [`datafusion_test_data`] directory + /// Register a Parquet file at the given path relative to the + /// [`datafusion_test_data`] directory async fn register_parquet(self, table_name: &str, path: &str) -> Self { let path = format!("mem://{path}"); + let mut options: ParquetReadOptions<'_> = ParquetReadOptions::new(); + + // If a metadata size hint was specified, apply it + if let Some(parquet_metadata_size_hint) = self.parquet_metadata_size_hint { + options = options.metadata_size_hint(parquet_metadata_size_hint); + } + self.session_context - .register_parquet(table_name, path, Default::default()) + .register_parquet(table_name, path, options) .await .unwrap(); self } + /// Register a single CSV file with three columns and two row named + /// `csv_table` + async fn with_single_file_csv(self) -> Test { + // upload CSV data to object store + let csv_data = r#"c1,c2,c3 +0.00001,5e-12,true +0.00002,4e-12,false +"#; + self.with_bytes("/csv_table.csv", csv_data) + .await + .register_csv("csv_table", "/csv_table.csv") + .await + } + + /// Register three CSV files in a directory, called `csv_table` + async fn with_multi_file_csv(mut self) -> Test { + // upload CSV data to object store + for i in 0..3 { + let csv_data1 = format!( + r#"c1,c2,c3 +0.0000{i},{i}e-12,true +0.00003,5e-12,false +"# + ); + self = self + .with_bytes(&format!("/data/file_{i}.csv"), csv_data1) + .await; + } + // register table + self.register_csv("csv_table", "/data/").await + } + + /// Add a single parquet file that has two columns and two row groups named `parquet_table` + /// + /// Column "a": Int32 with values 0-100] in row group 1 + /// and [101-200] in row group 2 + /// + /// Column "b": Int32 with values 1000-1100] in row group 1 + /// and [1101-1200] in row group 2 + async fn with_single_file_parquet(self) -> Test { + // Create parquet bytes + let a: ArrayRef = Arc::new(Int32Array::from_iter_values(0..200)); + let b: ArrayRef = Arc::new(Int32Array::from_iter_values(1000..1200)); + let batch = RecordBatch::try_from_iter([("a", a), ("b", b)]).unwrap(); + + let mut buffer = vec![]; + let props = parquet::file::properties::WriterProperties::builder() + .set_max_row_group_size(100) + .build(); + let mut writer = parquet::arrow::ArrowWriter::try_new( + &mut buffer, + batch.schema(), + Some(props), + ) + .unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + self.with_bytes("/parquet_table.parquet", buffer) + .await + .register_parquet("parquet_table", "/parquet_table.parquet") + .await + } + /// Runs the specified query and returns a string representation of the results /// suitable for comparison with insta snapshots /// diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b15ec026372d5..f1cc4c7a0cc98 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -246,7 +246,7 @@ datafusion.execution.parquet.max_predicate_cache_size NULL datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 datafusion.execution.parquet.maximum_parallel_row_group_writers 1 -datafusion.execution.parquet.metadata_size_hint NULL +datafusion.execution.parquet.metadata_size_hint 524288 datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false @@ -366,7 +366,7 @@ datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. -datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer +datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index fbf55a56057b6..7ca5eb8f7be45 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -81,7 +81,7 @@ The following configuration settings are available: | datafusion.execution.parquet.enable_page_index | true | (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | | datafusion.execution.parquet.pruning | true | (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | | datafusion.execution.parquet.skip_metadata | true | (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | -| datafusion.execution.parquet.metadata_size_hint | NULL | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | +| datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. |