Skip to content

Commit cb24fc0

Browse files
committed
move config to parquet-only
1 parent 5e434bc commit cb24fc0

17 files changed

Lines changed: 104 additions & 42 deletions

File tree

benchmarks/src/clickbench.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ impl RunOpt {
117117
};
118118

119119
let mut config = self.common.config();
120-
config.options_mut().execution.schema_force_string_view = self.common.string_view;
120+
config
121+
.options_mut()
122+
.execution
123+
.parquet
124+
.schema_force_string_view = self.common.string_view;
121125

122126
let ctx = SessionContext::new_with_config(config);
123127
self.register_hits(&ctx).await?;

benchmarks/src/tpch/run.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ impl RunOpt {
120120
.config()
121121
.with_collect_statistics(!self.disable_statistics);
122122
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
123-
config.options_mut().execution.schema_force_string_view = self.common.string_view;
123+
config
124+
.options_mut()
125+
.execution
126+
.parquet
127+
.schema_force_string_view = self.common.string_view;
124128
let ctx = SessionContext::new_with_config(config);
125129

126130
// register tables

datafusion/common/src/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,10 +311,6 @@ config_namespace! {
311311

312312
/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
313313
pub keep_partition_by_columns: bool, default = false
314-
315-
/// If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`,
316-
/// and `Binary/BinaryLarge` with `BinaryView`.
317-
pub schema_force_string_view: bool, default = false
318314
}
319315
}
320316

@@ -455,6 +451,10 @@ config_namespace! {
455451
/// data frame.
456452
pub maximum_buffered_record_batches_per_stream: usize, default = 2
457453

454+
455+
/// If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
456+
/// and `Binary/BinaryLarge` with `BinaryView`.
457+
pub schema_force_string_view: bool, default = false
458458
}
459459
}
460460

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
8181
maximum_parallel_row_group_writers: _,
8282
maximum_buffered_record_batches_per_stream: _,
8383
bloom_filter_on_read: _,
84+
schema_force_string_view: _,
8485
} = &parquet_options.global;
8586

8687
let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() {

datafusion/core/example.parquet

976 Bytes
Binary file not shown.

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::error::Result;
4242
use crate::execution::context::SessionState;
4343
use crate::physical_plan::{ExecutionPlan, Statistics};
4444

45+
use arrow_schema::{DataType, Field, Schema};
4546
use datafusion_common::file_options::file_type::FileType;
4647
use datafusion_common::{internal_err, not_impl_err, GetExt};
4748
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
@@ -194,6 +195,28 @@ pub fn file_type_to_format(
194195
}
195196
}
196197

198+
/// Transform a schema to use view types for Utf8 and Binary
199+
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
200+
let transformed_fields: Vec<Arc<Field>> = schema
201+
.fields
202+
.iter()
203+
.map(|field| match field.data_type() {
204+
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
205+
field.name(),
206+
DataType::Utf8View,
207+
field.is_nullable(),
208+
)),
209+
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
210+
field.name(),
211+
DataType::BinaryView,
212+
field.is_nullable(),
213+
)),
214+
_ => field.clone(),
215+
})
216+
.collect();
217+
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
218+
}
219+
197220
#[cfg(test)]
198221
pub(crate) mod test_util {
199222
use std::ops::Range;

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424

2525
use super::write::demux::start_demuxer_task;
2626
use super::write::{create_writer, SharedBuffer};
27-
use super::{FileFormat, FileFormatFactory, FileScanConfig};
27+
use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig};
2828
use crate::arrow::array::RecordBatch;
2929
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
3030
use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -305,6 +305,17 @@ impl FileFormat for ParquetFormat {
305305
Schema::try_merge(schemas)
306306
}?;
307307

308+
let schema = if state
309+
.config_options()
310+
.execution
311+
.parquet
312+
.schema_force_string_view
313+
{
314+
transform_schema_to_view(&schema)
315+
} else {
316+
schema
317+
};
318+
308319
Ok(Arc::new(schema))
309320
}
310321

datafusion/core/src/datasource/listing/table.rs

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -410,31 +410,8 @@ impl ListingOptions {
410410
.try_collect()
411411
.await?;
412412

413-
let mut schema = self.format.infer_schema(state, &store, &files).await?;
414-
415-
if state.config_options().execution.schema_force_string_view {
416-
let transformed_fields: Vec<Arc<Field>> = schema
417-
.fields
418-
.iter()
419-
.map(|field| match field.data_type() {
420-
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
421-
field.name(),
422-
DataType::Utf8View,
423-
field.is_nullable(),
424-
)),
425-
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
426-
field.name(),
427-
DataType::BinaryView,
428-
field.is_nullable(),
429-
)),
430-
_ => field.clone(),
431-
})
432-
.collect();
433-
schema = Arc::new(Schema::new_with_metadata(
434-
transformed_fields,
435-
schema.metadata.clone(),
436-
));
437-
}
413+
let schema = self.format.infer_schema(state, &store, &files).await?;
414+
438415
Ok(schema)
439416
}
440417

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,10 @@ impl ExecutionPlan for ParquetExec {
720720
enable_page_index: self.enable_page_index(),
721721
enable_bloom_filter: self.bloom_filter_on_read(),
722722
schema_adapter_factory,
723+
schema_force_string_view: self
724+
.table_parquet_options
725+
.global
726+
.schema_force_string_view,
723727
};
724728

725729
let stream =

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! [`ParquetOpener`] for opening Parquet files
1919
20+
use crate::datasource::file_format::transform_schema_to_view;
2021
use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
2122
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
2223
use crate::datasource::physical_plan::parquet::{
@@ -33,7 +34,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3334
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3435
use futures::{StreamExt, TryStreamExt};
3536
use log::debug;
36-
use parquet::arrow::arrow_reader::ArrowReaderOptions;
37+
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
3738
use parquet::arrow::async_reader::AsyncFileReader;
3839
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
3940
use std::sync::Arc;
@@ -56,6 +57,7 @@ pub(super) struct ParquetOpener {
5657
pub enable_page_index: bool,
5758
pub enable_bloom_filter: bool,
5859
pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
60+
pub schema_force_string_view: bool,
5961
}
6062

6163
impl FileOpener for ParquetOpener {
@@ -66,7 +68,7 @@ impl FileOpener for ParquetOpener {
6668
let file_metrics =
6769
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
6870

69-
let reader: Box<dyn AsyncFileReader> =
71+
let mut reader: Box<dyn AsyncFileReader> =
7072
self.parquet_file_reader_factory.create_reader(
7173
self.partition_index,
7274
file_meta,
@@ -90,14 +92,27 @@ impl FileOpener for ParquetOpener {
9092
);
9193
let enable_bloom_filter = self.enable_bloom_filter;
9294
let limit = self.limit;
95+
let schema_force_string_view = self.schema_force_string_view;
9396

9497
Ok(Box::pin(async move {
98+
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);
99+
100+
let metadata =
101+
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
102+
let mut schema = metadata.schema().clone();
103+
104+
if schema_force_string_view {
105+
schema = Arc::new(transform_schema_to_view(&schema));
106+
}
107+
95108
let options = ArrowReaderOptions::new()
96109
.with_page_index(enable_page_index)
97-
.with_schema(table_schema.clone());
110+
.with_schema(schema.clone());
111+
let metadata =
112+
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;
113+
98114
let mut builder =
99-
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
100-
.await?;
115+
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);
101116

102117
let file_schema = builder.schema().clone();
103118

0 commit comments

Comments
 (0)