Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion/core/benches/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema};
use bytes::{BufMut, BytesMut};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::config::ConfigOptions;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn create_plan() -> Arc<dyn ExecutionPlan> {
#[derive(Clone)]
struct BenchmarkPlan {
plan: Arc<dyn ExecutionPlan>,
config: ConfigOptions,
config: SessionConfig,
}

impl std::fmt::Display for BenchmarkPlan {
Expand All @@ -102,8 +102,8 @@ fn bench_push_down_filter(c: &mut Criterion) {
let plan = tokio::runtime::Runtime::new()
.unwrap()
.block_on(create_plan());
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
let mut config = SessionConfig::default();
config.options_mut().execution.parquet.pushdown_filters = true;
let plan = BenchmarkPlan { plan, config };
let optimizer = FilterPushdown::new();

Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2118,7 +2118,7 @@ impl DefaultPhysicalPlanner {
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer
.optimize(new_plan, session_state.config_options())
.optimize(new_plan, session_state.config())
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;
Expand Down Expand Up @@ -2441,7 +2441,6 @@ mod tests {
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type};
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
assert_contains, DFSchemaRef, TableReference, ToDFSchema as _,
};
Expand Down Expand Up @@ -3545,7 +3544,7 @@ digraph {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
_config: &SessionConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(plan)
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/tests/execution/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,7 @@ async fn query_yields(
task_ctx: Arc<TaskContext>,
) -> Result<(), Box<dyn Error>> {
// Run plan through EnsureCooperative
let optimized =
EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?;
let optimized = EnsureCooperative::new().optimize(plan, task_ctx.session_config())?;

// Get the stream
let stream = physical_plan::execute_stream(optimized, task_ctx)?;
Expand Down
12 changes: 8 additions & 4 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{col, lit, Expr};

use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::filter::FilterExec;
Expand All @@ -56,8 +55,13 @@ async fn check_stats_precision_with_filter_pushdown() {
let table = get_listing_table(&table_path, None, &opt).await;

let (_, _, state) = get_cache_runtime_state();
let mut options: ConfigOptions = state.config().options().as_ref().clone();
options.execution.parquet.pushdown_filters = true;
let mut session_config =
SessionConfig::from(state.config().options().as_ref().clone());
session_config
.options_mut()
.execution
.parquet
.pushdown_filters = true;

// Scan without filter, stats are exact
let exec = table.scan(&state, None, &[], None).await.unwrap();
Expand Down Expand Up @@ -85,7 +89,7 @@ async fn check_stats_precision_with_filter_pushdown() {
as Arc<dyn ExecutionPlan>;

let optimized_exec = FilterPushdown::new()
.optimize(filtered_exec, &options)
.optimize(filtered_exec, &session_config)
.unwrap();

assert!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use arrow::record_batch::RecordBatch;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion_common::cast::as_int64_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{self, cast};
Expand Down Expand Up @@ -67,7 +67,7 @@ async fn assert_count_optim_success(
let task_ctx = Arc::new(TaskContext::default());
let plan: Arc<dyn ExecutionPlan> = Arc::new(plan);

let config = ConfigOptions::new();
let config = SessionConfig::new();
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?;

// A ProjectionExec is a sign that the count optimization was applied
Expand Down Expand Up @@ -264,7 +264,7 @@ async fn test_count_inexact_stat() -> Result<()> {
Arc::clone(&schema),
)?;

let conf = ConfigOptions::new();
let conf = SessionConfig::new();
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;

// check that the original ExecutionPlan was not replaced
Expand Down Expand Up @@ -308,7 +308,7 @@ async fn test_count_with_nulls_inexact_stat() -> Result<()> {
Arc::clone(&schema),
)?;

let conf = ConfigOptions::new();
let conf = SessionConfig::new();
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;

// check that the original ExecutionPlan was not replaced
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;
use crate::physical_optimizer::test_utils::parquet_exec;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_execution::config::SessionConfig;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
Expand All @@ -47,7 +47,7 @@ macro_rules! assert_optimized {
($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => {
// run optimizer
let optimizer = CombinePartialFinalAggregate {};
let config = ConfigOptions::new();
let config = SessionConfig::new();
let optimized = optimizer.optimize($PLAN, &config)?;
// Now format correctly
let plan = displayable(optimized.as_ref()).indent(true).to_string();
Expand Down
87 changes: 62 additions & 25 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,41 +404,56 @@ const SORT_DISTRIB_DISTRIB: [Run; 3] =

#[derive(Clone)]
struct TestConfig {
config: ConfigOptions,
session_config: SessionConfig,
}

impl Default for TestConfig {
fn default() -> Self {
Self {
config: test_suite_default_config_options(),
}
let config = test_suite_default_config_options();
let session_config = SessionConfig::from(config.clone());
Self { session_config }
}
}

impl TestConfig {
/// If preferred, will not repartition / resort data if it is already sorted.
fn with_prefer_existing_sort(mut self) -> Self {
self.config.optimizer.prefer_existing_sort = true;
self.session_config
.options_mut()
.optimizer
.prefer_existing_sort = true;
self
}

/// If preferred, will not attempt to convert Union to Interleave.
fn with_prefer_existing_union(mut self) -> Self {
self.config.optimizer.prefer_existing_union = true;
self.session_config
.options_mut()
.optimizer
.prefer_existing_union = true;
self
}

/// If preferred, will repartition file scans.
/// Accepts a minimum file size to repartition.
fn with_prefer_repartition_file_scans(mut self, file_min_size: usize) -> Self {
self.config.optimizer.repartition_file_scans = true;
self.config.optimizer.repartition_file_min_size = file_min_size;
self.session_config
.options_mut()
.optimizer
.repartition_file_scans = true;
self.session_config
.options_mut()
.optimizer
.repartition_file_min_size = file_min_size;
self
}

/// Set the preferred target partitions for query execution concurrency.
fn with_query_execution_partitions(mut self, target_partitions: usize) -> Self {
self.config.execution.target_partitions = target_partitions;
self.session_config
.options_mut()
.execution
.target_partitions = target_partitions;
self
}

Expand All @@ -455,13 +470,18 @@ impl TestConfig {

// Add the ancillary output requirements operator at the start:
let optimizer = OutputRequirements::new_add_mode();
let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
let mut optimized = optimizer.optimize(plan.clone(), &self.session_config)?;

// This file has 2 rules that use tree node, apply these rules to original plan consecutively
// After these operations tree nodes should be in a consistent state.
// This code block makes sure that these rules doesn't violate tree node integrity.
{
let adjusted = if self.config.optimizer.top_down_join_key_reordering {
let adjusted = if self
.session_config
.options()
.optimizer
.top_down_join_key_reordering
{
// Run adjust_input_keys_ordering rule
let plan_requirements =
PlanWithKeyRequirements::new_default(plan.clone());
Expand All @@ -483,7 +503,10 @@ impl TestConfig {
// Then run ensure_distribution rule
DistributionContext::new_default(adjusted)
.transform_up(|distribution_context| {
ensure_distribution(distribution_context, &self.config)
ensure_distribution(
distribution_context,
self.session_config.options(),
)
})
.data()
.and_then(check_integrity)?;
Expand All @@ -494,18 +517,18 @@ impl TestConfig {
optimized = match run {
Run::Distribution => {
let optimizer = EnforceDistribution::new();
optimizer.optimize(optimized, &self.config)?
optimizer.optimize(optimized, &self.session_config)?
}
Run::Sorting => {
let optimizer = EnforceSorting::new();
optimizer.optimize(optimized, &self.config)?
optimizer.optimize(optimized, &self.session_config)?
}
};
}

// Remove the ancillary output requirements operator when done:
let optimizer = OutputRequirements::new_remove_mode();
let optimized = optimizer.optimize(optimized, &self.config)?;
let optimized = optimizer.optimize(optimized, &self.session_config)?;

// Now format correctly
let actual_lines = get_plan_string(&optimized);
Expand Down Expand Up @@ -3340,10 +3363,13 @@ fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
];

let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let mut config = SessionConfig::new();
config.options_mut().execution.target_partitions = 10;
config
.options_mut()
.optimizer
.enable_round_robin_repartition = true;
config.options_mut().optimizer.prefer_existing_sort = false;
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
assert_plan_txt!(expected, dist_plan);

Expand Down Expand Up @@ -3378,10 +3404,13 @@ fn put_sort_when_input_is_valid() -> Result<()> {
" DataSourceExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];

let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let mut config = SessionConfig::new();
config.options_mut().execution.target_partitions = 10;
config
.options_mut()
.optimizer
.enable_round_robin_repartition = true;
config.options_mut().optimizer.prefer_existing_sort = false;
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
assert_plan_txt!(expected, dist_plan);

Expand Down Expand Up @@ -3503,7 +3532,11 @@ async fn test_distribute_sort_parquet() -> Result<()> {
let test_config: TestConfig =
TestConfig::default().with_prefer_repartition_file_scans(1000);
assert!(
test_config.config.optimizer.repartition_file_scans,
test_config
.session_config
.options()
.optimizer
.repartition_file_scans,
"should enable scans to be repartitioned"
);

Expand Down Expand Up @@ -3542,7 +3575,11 @@ async fn test_distribute_sort_memtable() -> Result<()> {
let test_config: TestConfig =
TestConfig::default().with_prefer_repartition_file_scans(1000);
assert!(
test_config.config.optimizer.repartition_file_scans,
test_config
.session_config
.options()
.optimizer
.repartition_file_scans,
"should enable scans to be repartitioned"
);

Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use crate::physical_optimizer::test_utils::{

use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TreeNode, TransformedResult};
use datafusion_common::{Result, TableReference};
use datafusion_execution::config::SessionConfig;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use datafusion_expr_common::operator::Operator;
Expand Down Expand Up @@ -110,8 +110,9 @@ impl EnforceSortingTest {
/// Runs the enforce sorting test and returns a string with the input and
/// optimized plan as strings for snapshot comparison using insta
pub(crate) fn run(&self) -> String {
let mut config = ConfigOptions::new();
config.optimizer.repartition_sorts = self.repartition_sorts;
let mut session_config = SessionConfig::new();
session_config.options_mut().optimizer.repartition_sorts = self.repartition_sorts;
let config = session_config.options();

// This file has 4 rules that use tree node, apply these rules as in the
// EnforceSorting::optimize implementation
Expand Down Expand Up @@ -149,7 +150,7 @@ impl EnforceSortingTest {
plan_with_pipeline_fixer,
false,
true,
&config,
config,
)
})
.data()
Expand All @@ -171,7 +172,7 @@ impl EnforceSortingTest {

// Run the actual optimizer
let optimized_physical_plan = EnforceSorting::new()
.optimize(Arc::clone(&self.plan), &config)
.optimize(Arc::clone(&self.plan), &session_config)
.expect("enforce_sorting failed");

// Get string representation of the plan
Expand Down Expand Up @@ -2277,7 +2278,7 @@ async fn test_commutativity() -> Result<()> {
DataSourceExec: partitions=1, partition_sizes=[0]
"#);

let config = ConfigOptions::new();
let config = SessionConfig::new();
let rules = vec![
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
Expand Down
Loading