diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index cdd8e72a06cc9..230aff2fd7609 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -455,6 +455,11 @@ config_namespace! { /// tables (e.g. `/table/year=2021/month=01/data.parquet`). pub listing_table_ignore_subdirectory: bool, default = true + /// Should a `ListingTable` created through the `ListingTableFactory` infer table + /// partitions from Hive compliant directories. Defaults to true (partition columns are + /// inferred and will be represented in the table schema). + pub listing_table_factory_infer_partitions: bool, default = true + /// Should DataFusion support recursive CTEs pub enable_recursive_ctes: bool, default = true diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 690ce31d0dc76..9858b109125fb 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -802,6 +802,9 @@ impl ListingOptions { .rev() .skip(1) // get parents only; skip the file itself .rev() + // Partitions are expected to follow the format "column_name=value", so we + // should ignore any path part that cannot be parsed into the expected format + .filter(|s| s.contains('=')) .map(|s| s.split('=').take(1).collect()) .collect_vec() }) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index d4d9db785639e..218a1fedbb379 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -63,14 +63,39 @@ impl TableProviderFactory for ListingTableFactory { ))? .create(session_state, &cmd.options)?; + let mut table_path = ListingTableUrl::parse(&cmd.location)?; + let file_extension = match table_path.is_collection() { + // Setting the extension to be empty instead of allowing the default extension seems + // odd, but was done to ensure existing behavior isn't modified. It seems like this + // could be refactored to either use the default extension or set the fully expected + // extension when compression is included (e.g. ".csv.gz") + true => "", + false => &get_extension(cmd.location.as_str()), + }; + let mut options = ListingOptions::new(file_format) + .with_session_config_options(session_state.config()) + .with_file_extension(file_extension); + let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() { + let infer_parts = session_state + .config_options() + .execution + .listing_table_factory_infer_partitions; + let part_cols = if cmd.table_partition_cols.is_empty() && infer_parts { + options + .infer_partitions(session_state, &table_path) + .await? + .into_iter() + } else { + cmd.table_partition_cols.clone().into_iter() + }; + ( None, - cmd.table_partition_cols - .iter() - .map(|x| { + part_cols + .map(|p| { ( - x.clone(), + p, DataType::Dictionary( Box::new(DataType::UInt16), Box::new(DataType::Utf8), @@ -106,19 +131,7 @@ impl TableProviderFactory for ListingTableFactory { (Some(schema), table_partition_cols) }; - let mut table_path = ListingTableUrl::parse(&cmd.location)?; - let file_extension = match table_path.is_collection() { - // Setting the extension to be empty instead of allowing the default extension seems - // odd, but was done to ensure existing behavior isn't modified. It seems like this - // could be refactored to either use the default extension or set the fully expected - // extension when compression is included (e.g. ".csv.gz") - true => "", - false => &get_extension(cmd.location.as_str()), - }; - let options = ListingOptions::new(file_format) - .with_file_extension(file_extension) - .with_session_config_options(session_state.config()) - .with_table_partition_cols(table_partition_cols); + options = options.with_table_partition_cols(table_partition_cols); options .validate_partitions(session_state, &table_path) @@ -192,6 +205,7 @@ fn get_extension(path: &str) -> String { #[cfg(test)] mod tests { + use datafusion_execution::config::SessionConfig; use glob::Pattern; use std::collections::HashMap; use std::fs; @@ -419,4 +433,83 @@ mod tests { let listing_options = listing_table.options(); assert_eq!("", listing_options.file_extension); } + + #[tokio::test] + async fn test_create_with_hive_partitions() { + let dir = tempfile::tempdir().unwrap(); + let mut path = PathBuf::from(dir.path()); + path.extend(["key1=value1", "key2=value2"]); + fs::create_dir_all(&path).unwrap(); + path.push("data.parquet"); + fs::File::create_new(&path).unwrap(); + + let factory = ListingTableFactory::new(); + let context = SessionContext::new(); + let state = context.state(); + let name = TableReference::bare("foo"); + + let cmd = CreateExternalTable { + name, + location: dir.path().to_str().unwrap().to_string(), + file_type: "parquet".to_string(), + schema: Arc::new(DFSchema::empty()), + table_partition_cols: vec![], + if_not_exists: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Constraints::default(), + column_defaults: HashMap::new(), + }; + let table_provider = factory.create(&state, &cmd).await.unwrap(); + let listing_table = table_provider + .as_any() + .downcast_ref::() + .unwrap(); + + let listing_options = listing_table.options(); + let dtype = + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)); + let expected_cols = vec![ + (String::from("key1"), dtype.clone()), + (String::from("key2"), dtype.clone()), + ]; + assert_eq!(expected_cols, listing_options.table_partition_cols); + + // Ensure partition detection can be disabled via config + let factory = ListingTableFactory::new(); + let mut cfg = SessionConfig::new(); + cfg.options_mut() + .execution + .listing_table_factory_infer_partitions = false; + let context = SessionContext::new_with_config(cfg); + let state = context.state(); + let name = TableReference::bare("foo"); + + let cmd = CreateExternalTable { + name, + location: dir.path().to_str().unwrap().to_string(), + file_type: "parquet".to_string(), + schema: Arc::new(DFSchema::empty()), + table_partition_cols: vec![], + if_not_exists: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Constraints::default(), + column_defaults: HashMap::new(), + }; + let table_provider = factory.create(&state, &cmd).await.unwrap(); + let listing_table = table_provider + .as_any() + .downcast_ref::() + .unwrap(); + + let listing_options = listing_table.options(); + assert!(listing_options.table_partition_cols.is_empty()); + } } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 35b2a6c03b399..1be2549ace71b 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7444,4 +7444,3 @@ NULL NULL statement ok drop table distinct_avg; - diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index fb2c89020112d..361bc97a17d9c 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -220,6 +220,7 @@ datafusion.execution.collect_statistics true datafusion.execution.enable_recursive_ctes true datafusion.execution.enforce_batch_size_in_joins false datafusion.execution.keep_partition_by_columns false +datafusion.execution.listing_table_factory_infer_partitions true datafusion.execution.listing_table_ignore_subdirectory true datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.meta_fetch_concurrency 32 @@ -334,6 +335,7 @@ datafusion.execution.collect_statistics true Should DataFusion collect statistic datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches +datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 273ba31cb47d1..075256ae4b92d 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -175,6 +175,34 @@ select * from partitioned_insert_test order by a,b,c 1 20 200 2 20 200 +statement count 0 +CREATE EXTERNAL TABLE +partitioned_insert_test_readback +STORED AS csv +LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/'; + +query TTT +describe partitioned_insert_test_readback; +---- +c Int64 YES +a Dictionary(UInt16, Utf8) NO +b Dictionary(UInt16, Utf8) NO + +query ITT +select * from partitioned_insert_test_readback order by a,b,c; +---- +1 10 100 +1 10 200 +1 20 100 +2 20 100 +1 20 200 +2 20 200 + +query I +select count(*) from partitioned_insert_test_readback where b=100; +---- +3 + statement ok CREATE EXTERNAL TABLE partitioned_insert_test_verify(c bigint) diff --git a/datafusion/sqllogictest/test_files/listing_table_partitions.slt b/datafusion/sqllogictest/test_files/listing_table_partitions.slt new file mode 100644 index 0000000000000..52433429cfe80 --- /dev/null +++ b/datafusion/sqllogictest/test_files/listing_table_partitions.slt @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +query I +copy (values('foo'), ('bar')) +to 'test_files/scratch/listing_table_partitions/single_part/a=1/file1.parquet'; +---- +2 + +query I +copy (values('baz')) +to 'test_files/scratch/listing_table_partitions/single_part/a=1/file2.parquet'; +---- +1 + +statement count 0 +create external table single_part +stored as parquet location 'test_files/scratch/listing_table_partitions/single_part/'; + +query TT +select * from single_part order by (column1); +---- +bar 1 +baz 1 +foo 1 + +query I +copy (values('foo'), ('bar')) to 'test_files/scratch/listing_table_partitions/multi_part/a=1/b=100/file1.parquet'; +---- +2 + +query I +copy (values('baz')) to 'test_files/scratch/listing_table_partitions/multi_part/a=1/b=200/file1.parquet'; +---- +1 + +statement count 0 +create external table multi_part +stored as parquet location 'test_files/scratch/listing_table_partitions/multi_part/'; + +query TTT +select * from multi_part where b=200; +---- +baz 1 200 + +statement count 0 +set datafusion.execution.listing_table_factory_infer_partitions = false; + +statement count 0 +create external table infer_disabled +stored as parquet location 'test_files/scratch/listing_table_partitions/multi_part/'; + +query T +select * from infer_disabled order by (column1); +---- +bar +baz +foo + +statement count 0 +set datafusion.execution.listing_table_factory_infer_partitions = true; diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index de96dc368d176..f260119c7a580 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -24,6 +24,17 @@ **Note:** DataFusion `50.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. You can see the current [status of the `50.0.0 `release here](https://github.com/apache/datafusion/issues/16799) +### ListingTable automatically detects Hive Partitioned tables + +DataFusion 50.0.0 automatically infers Hive partitions when using the `ListingTableFactory` and `CREATE EXTERNAL TABLE`. Previously, +when creating a `ListingTable`, datasets that use Hive partitioning (e.g. +`/table_root/column1=value1/column2=value2/data.parquet`) would not have the Hive columns reflected in +the table's schema or data. The previous behavior can be +restored by setting the `datafusion.execution.listing_table_factory_infer_partitions` configuration option to `false`. +See [issue #17049] for more details. + +[issue #17049]: https://github.com/apache/datafusion/issues/17049 + ### `MSRV` updated to 1.86.0 The Minimum Supported Rust Version (MSRV) has been updated to [`1.86.0`]. diff --git a/docs/source/user-guide/cli/datasources.md b/docs/source/user-guide/cli/datasources.md index c15b8a5e46c99..6b1a4887a8a0f 100644 --- a/docs/source/user-guide/cli/datasources.md +++ b/docs/source/user-guide/cli/datasources.md @@ -162,6 +162,30 @@ STORED AS PARQUET LOCATION 'gs://bucket/my_table/'; ``` +When specifying a directory path that has a Hive compliant partition structure, by default, DataFusion CLI will +automatically parse and incorporate the Hive columns and their values into the table's schema and data. Given the +following remote object paths: + +```console +gs://bucket/my_table/a=1/b=100/file1.parquet +gs://bucket/my_table/a=2/b=200/file2.parquet +``` + +`my_table` can be queried and filtered on the Hive columns: + +```sql +CREATE EXTERNAL TABLE my_table +STORED AS PARQUET +LOCATION 'gs://bucket/my_table/'; + +SELECT count(*) FROM my_table WHERE b=200; ++----------+ +| count(*) | ++----------+ +| 1 | ++----------+ +``` + # Formats ## Parquet diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 5060bc3805fd1..aac468b8fd567 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -118,6 +118,7 @@ The following configuration settings are available: | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | +| datafusion.execution.listing_table_factory_infer_partitions | true | Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). | | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | diff --git a/docs/source/user-guide/sql/ddl.md b/docs/source/user-guide/sql/ddl.md index b5028cca49eff..bd41f691bf90b 100644 --- a/docs/source/user-guide/sql/ddl.md +++ b/docs/source/user-guide/sql/ddl.md @@ -169,6 +169,35 @@ LOCATION '/path/to/directory/of/files' OPTIONS ('has_header' 'true'); ``` +Tables that are partitioned using a Hive compliant partitioning scheme will have their columns and values automatically +detected and incorporated into the table's schema and data. Given the following example directory structure: + +```console +hive_partitioned/ +├── a=1 +│   └── b=200 +│   └── file1.parquet +└── a=2 + └── b=100 + └── file2.parquet +``` + +Users can specify the top level `hive_partitioned` directory as an `EXTERNAL TABLE` and leverage the Hive partitions to query +and filter data. + +```sql +CREATE EXTERNAL TABLE hive_partitioned +STORED AS PARQUET +LOCATION '/path/to/hive_partitioned/'; + +SELECT count(*) FROM hive_partitioned WHERE b=100; ++------------------+ +| count(*) | ++------------------+ +| 1 | ++------------------+ +``` + ### Example: Unbounded Data Sources We can create unbounded data sources using the `CREATE UNBOUNDED EXTERNAL TABLE` SQL statement.