Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ config_namespace! {
/// tables (e.g. `/table/year=2021/month=01/data.parquet`).
pub listing_table_ignore_subdirectory: bool, default = true

/// Should a `ListingTable` created through the `ListingTableFactory` infer table
/// partitions from Hive compliant directories. Defaults to true (partition columns are
/// inferred and will be represented in the table schema).
pub listing_table_factory_infer_partitions: bool, default = true

/// Should DataFusion support recursive CTEs
pub enable_recursive_ctes: bool, default = true

Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,9 @@ impl ListingOptions {
.rev()
.skip(1) // get parents only; skip the file itself
.rev()
// Partitions are expected to follow the format "column_name=value", so we
// should ignore any path part that cannot be parsed into the expected format
.filter(|s| s.contains('='))
Comment on lines +805 to +807
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is necessary to prevent regressions exposed by existing sqllogic tests. Here's the failures from existing tests for this PR if this fix is not included

Completed 345 test files in 17 seconds                                                                                                                                      External error: 6 errors in file /home/blake/open_source_src/datafusion-BlakeOrth/datafusion/sqllogictest/test_files/parquet.slt

1. statement failed: DataFusion error: Error during planning: Found mixed partition values on disk [[], ["subdir"]]
[SQL] CREATE EXTERNAL TABLE listing_table_folder_0
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_table/';
at /home/blake/open_source_src/datafusion-BlakeOrth/datafusion/sqllogictest/test_files/parquet.slt:306


2. query failed: DataFusion error: Error during planning: table 'datafusion.public.listing_table_folder_0' not found
[SQL] select count(*) from listing_table_folder_0;
at /home/blake/open_source_src/datafusion-BlakeOrth/datafusion/sqllogictest/test_files/parquet.slt:315


3. query failed: DataFusion error: Error during planning: table 'datafusion.public.listing_table_folder_0' not found
[SQL] select count(*) from listing_table_folder_0;
at /home/blake/open_source_src/datafusion-BlakeOrth/datafusion/sqllogictest/test_files/parquet.slt:324


4. statement failed: DataFusion error: Error during planning: Found mixed partition values on disk [[], ["subdir"]]
[SQL] CREATE EXTERNAL TABLE listing_table_folder_1
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_table';
at /home/blake/open_source_src/datafusion-BlakeOrth/datafusion/sqllogictest/test_files/parquet.slt:330


5. query failed: DataFusion error: Error during planning: table 'datafusion.public.listing_table_folder_1' not found
[SQL] select count(*) from listing_table_folder_1;
at /home/blake/open_source_src/datafusion-BlakeOrth/datafusion/sqllogictest/test_files/parquet.slt:339


6. query failed: DataFusion error: Error during planning: table 'datafusion.public.listing_table_folder_1' not found
[SQL] select count(*) from listing_table_folder_1;
at /home/blake/open_source_src/datafusion-BlakeOrth/datafusion/sqllogictest/test_files/parquet.slt:348

.map(|s| s.split('=').take(1).collect())
.collect_vec()
})
Expand Down
127 changes: 110 additions & 17 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,39 @@ impl TableProviderFactory for ListingTableFactory {
))?
.create(session_state, &cmd.options)?;

let mut table_path = ListingTableUrl::parse(&cmd.location)?;
let file_extension = match table_path.is_collection() {
// Setting the extension to be empty instead of allowing the default extension seems
// odd, but was done to ensure existing behavior isn't modified. It seems like this
// could be refactored to either use the default extension or set the fully expected
// extension when compression is included (e.g. ".csv.gz")
true => "",
false => &get_extension(cmd.location.as_str()),
};
let mut options = ListingOptions::new(file_format)
.with_session_config_options(session_state.config())
.with_file_extension(file_extension);

let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
let infer_parts = session_state
.config_options()
.execution
.listing_table_factory_infer_partitions;
let part_cols = if cmd.table_partition_cols.is_empty() && infer_parts {
options
.infer_partitions(session_state, &table_path)
.await?
.into_iter()
} else {
cmd.table_partition_cols.clone().into_iter()
};

(
None,
cmd.table_partition_cols
.iter()
.map(|x| {
part_cols
.map(|p| {
(
x.clone(),
p,
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
Expand Down Expand Up @@ -106,19 +131,7 @@ impl TableProviderFactory for ListingTableFactory {
(Some(schema), table_partition_cols)
};

let mut table_path = ListingTableUrl::parse(&cmd.location)?;
let file_extension = match table_path.is_collection() {
// Setting the extension to be empty instead of allowing the default extension seems
// odd, but was done to ensure existing behavior isn't modified. It seems like this
// could be refactored to either use the default extension or set the fully expected
// extension when compression is included (e.g. ".csv.gz")
true => "",
false => &get_extension(cmd.location.as_str()),
};
let options = ListingOptions::new(file_format)
.with_file_extension(file_extension)
.with_session_config_options(session_state.config())
.with_table_partition_cols(table_partition_cols);
options = options.with_table_partition_cols(table_partition_cols);

options
.validate_partitions(session_state, &table_path)
Expand Down Expand Up @@ -192,6 +205,7 @@ fn get_extension(path: &str) -> String {

#[cfg(test)]
mod tests {
use datafusion_execution::config::SessionConfig;
use glob::Pattern;
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -419,4 +433,83 @@ mod tests {
let listing_options = listing_table.options();
assert_eq!("", listing_options.file_extension);
}

#[tokio::test]
async fn test_create_with_hive_partitions() {
let dir = tempfile::tempdir().unwrap();
let mut path = PathBuf::from(dir.path());
path.extend(["key1=value1", "key2=value2"]);
fs::create_dir_all(&path).unwrap();
path.push("data.parquet");
fs::File::create_new(&path).unwrap();

let factory = ListingTableFactory::new();
let context = SessionContext::new();
let state = context.state();
let name = TableReference::bare("foo");

let cmd = CreateExternalTable {
name,
location: dir.path().to_str().unwrap().to_string(),
file_type: "parquet".to_string(),
schema: Arc::new(DFSchema::empty()),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options: HashMap::new(),
constraints: Constraints::default(),
column_defaults: HashMap::new(),
};
let table_provider = factory.create(&state, &cmd).await.unwrap();
let listing_table = table_provider
.as_any()
.downcast_ref::<ListingTable>()
.unwrap();

let listing_options = listing_table.options();
let dtype =
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8));
let expected_cols = vec![
(String::from("key1"), dtype.clone()),
(String::from("key2"), dtype.clone()),
];
assert_eq!(expected_cols, listing_options.table_partition_cols);

// Ensure partition detection can be disabled via config
let factory = ListingTableFactory::new();
let mut cfg = SessionConfig::new();
cfg.options_mut()
.execution
.listing_table_factory_infer_partitions = false;
let context = SessionContext::new_with_config(cfg);
let state = context.state();
let name = TableReference::bare("foo");

let cmd = CreateExternalTable {
name,
location: dir.path().to_str().unwrap().to_string(),
file_type: "parquet".to_string(),
schema: Arc::new(DFSchema::empty()),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options: HashMap::new(),
constraints: Constraints::default(),
column_defaults: HashMap::new(),
};
let table_provider = factory.create(&state, &cmd).await.unwrap();
let listing_table = table_provider
.as_any()
.downcast_ref::<ListingTable>()
.unwrap();

let listing_options = listing_table.options();
assert!(listing_options.table_partition_cols.is_empty());
}
}
1 change: 0 additions & 1 deletion datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7444,4 +7444,3 @@ NULL NULL

statement ok
drop table distinct_avg;

2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ datafusion.execution.collect_statistics true
datafusion.execution.enable_recursive_ctes true
datafusion.execution.enforce_batch_size_in_joins false
datafusion.execution.keep_partition_by_columns false
datafusion.execution.listing_table_factory_infer_partitions true
datafusion.execution.listing_table_ignore_subdirectory true
datafusion.execution.max_buffered_batches_per_output_file 2
datafusion.execution.meta_fetch_concurrency 32
Expand Down Expand Up @@ -334,6 +335,7 @@ datafusion.execution.collect_statistics true Should DataFusion collect statistic
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower.
datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches
datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema).
datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
Expand Down
28 changes: 28 additions & 0 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,34 @@ select * from partitioned_insert_test order by a,b,c
1 20 200
2 20 200

statement count 0
CREATE EXTERNAL TABLE
partitioned_insert_test_readback
STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/';

query TTT
describe partitioned_insert_test_readback;
----
c Int64 YES
a Dictionary(UInt16, Utf8) NO
b Dictionary(UInt16, Utf8) NO

query ITT
select * from partitioned_insert_test_readback order by a,b,c;
----
1 10 100
1 10 200
1 20 100
2 20 100
1 20 200
2 20 200

query I
select count(*) from partitioned_insert_test_readback where b=100;
----
3

statement ok
CREATE EXTERNAL TABLE
partitioned_insert_test_verify(c bigint)
Expand Down
75 changes: 75 additions & 0 deletions datafusion/sqllogictest/test_files/listing_table_partitions.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

query I
copy (values('foo'), ('bar'))
to 'test_files/scratch/listing_table_partitions/single_part/a=1/file1.parquet';
----
2

query I
copy (values('baz'))
to 'test_files/scratch/listing_table_partitions/single_part/a=1/file2.parquet';
----
1

statement count 0
create external table single_part
stored as parquet location 'test_files/scratch/listing_table_partitions/single_part/';

query TT
select * from single_part order by (column1);
----
bar 1
baz 1
foo 1

query I
copy (values('foo'), ('bar')) to 'test_files/scratch/listing_table_partitions/multi_part/a=1/b=100/file1.parquet';
----
2

query I
copy (values('baz')) to 'test_files/scratch/listing_table_partitions/multi_part/a=1/b=200/file1.parquet';
----
1

statement count 0
create external table multi_part
stored as parquet location 'test_files/scratch/listing_table_partitions/multi_part/';

query TTT
select * from multi_part where b=200;
----
baz 1 200

statement count 0
set datafusion.execution.listing_table_factory_infer_partitions = false;

statement count 0
create external table infer_disabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

stored as parquet location 'test_files/scratch/listing_table_partitions/multi_part/';

query T
select * from infer_disabled order by (column1);
----
bar
baz
foo

statement count 0
set datafusion.execution.listing_table_factory_infer_partitions = true;
11 changes: 11 additions & 0 deletions docs/source/library-user-guide/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@
**Note:** DataFusion `50.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version.
You can see the current [status of the `50.0.0 `release here](https://github.com/apache/datafusion/issues/16799)

### ListingTable automatically detects Hive Partitioned tables

DataFusion 50.0.0 automatically infers Hive partitions when using the `ListingTableFactory` and `CREATE EXTERNAL TABLE`. Previously,
when creating a `ListingTable`, datasets that use Hive partitioning (e.g.
`/table_root/column1=value1/column2=value2/data.parquet`) would not have the Hive columns reflected in
the table's schema or data. The previous behavior can be
restored by setting the `datafusion.execution.listing_table_factory_infer_partitions` configuration option to `false`.
See [issue #17049] for more details.

[issue #17049]: https://github.com/apache/datafusion/issues/17049

### `MSRV` updated to 1.86.0

The Minimum Supported Rust Version (MSRV) has been updated to [`1.86.0`].
Expand Down
24 changes: 24 additions & 0 deletions docs/source/user-guide/cli/datasources.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,30 @@ STORED AS PARQUET
LOCATION 'gs://bucket/my_table/';
```

When specifying a directory path that has a Hive compliant partition structure, by default, DataFusion CLI will
automatically parse and incorporate the Hive columns and their values into the table's schema and data. Given the
following remote object paths:

```console
gs://bucket/my_table/a=1/b=100/file1.parquet
gs://bucket/my_table/a=2/b=200/file2.parquet
```

`my_table` can be queried and filtered on the Hive columns:

```sql
CREATE EXTERNAL TABLE my_table
STORED AS PARQUET
LOCATION 'gs://bucket/my_table/';

SELECT count(*) FROM my_table WHERE b=200;
+----------+
| count(*) |
+----------+
| 1 |
+----------+
```

# Formats

## Parquet
Expand Down
Loading