From ab128a66db341143a38586c74aa5ba2a5105f1f7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 10 Apr 2024 18:11:31 -0400 Subject: [PATCH 1/2] Avoid copies in `InlineTableScan` via TreeNode API --- .../src/analyzer/inline_table_scan.rs | 92 ++++++------------- 1 file changed, 30 insertions(+), 62 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index 88202ffd21f1c..993dc24f39e52 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -17,17 +17,13 @@ //! Analyzed rule to replace TableScan references //! such as DataFrames and Views and inlines the LogicalPlan. -use std::sync::Arc; use crate::analyzer::AnalyzerRule; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Column, Result}; -use datafusion_expr::expr::{Exists, InSubquery}; -use datafusion_expr::{ - logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan, -}; +use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan}; /// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`] /// (DataFrame / ViewTable) @@ -51,65 +47,37 @@ impl AnalyzerRule for InlineTableScan { } fn analyze_internal(plan: LogicalPlan) -> Result> { - match plan { - // Match only on scans without filter / projection / fetch - // Views and DataFrames won't have those added - // during the early stage of planning - LogicalPlan::TableScan(TableScan { - table_name, - source, - projection, - filters, - .. - }) if filters.is_empty() && source.get_logical_plan().is_some() => { - let sub_plan = source.get_logical_plan().unwrap(); - let projection_exprs = generate_projection_expr(&projection, sub_plan)?; - LogicalPlanBuilder::from(sub_plan.clone()) - .project(projection_exprs)? - // Ensures that the reference to the inlined table remains the - // same, meaning we don't have to change any of the parent nodes - // that reference this table. - .alias(table_name)? - .build() - .map(Transformed::yes) - } - LogicalPlan::Filter(filter) => { - let new_expr = filter.predicate.transform(&rewrite_subquery).data()?; - Filter::try_new(new_expr, filter.input) - .map(|e| Transformed::yes(LogicalPlan::Filter(e))) + // rewrite any subqueries in the plan first + let result = plan.map_subqueries(|plan| plan.transform_up(&analyze_internal))?; + + let result = result.transform_data(|plan| { + match plan { + // Match only on scans without filter / projection / fetch + // Views and DataFrames won't have those added + // during the early stage of planning + LogicalPlan::TableScan(TableScan { + table_name, + source, + projection, + filters, + .. + }) if filters.is_empty() && source.get_logical_plan().is_some() => { + let sub_plan = source.get_logical_plan().unwrap(); + let projection_exprs = generate_projection_expr(&projection, sub_plan)?; + LogicalPlanBuilder::from(sub_plan.clone()) + .project(projection_exprs)? + // Ensures that the reference to the inlined table remains the + // same, meaning we don't have to change any of the parent nodes + // that reference this table. + .alias(table_name)? + .build() + .map(Transformed::yes) + } + _ => Ok(Transformed::no(plan)), } - _ => Ok(Transformed::no(plan)), - } -} + })?; -fn rewrite_subquery(expr: Expr) -> Result> { - match expr { - Expr::Exists(Exists { subquery, negated }) => { - let plan = subquery.subquery.as_ref().clone(); - let new_plan = plan.transform_up(&analyze_internal).data()?; - let subquery = subquery.with_plan(Arc::new(new_plan)); - Ok(Transformed::yes(Expr::Exists(Exists { subquery, negated }))) - } - Expr::InSubquery(InSubquery { - expr, - subquery, - negated, - }) => { - let plan = subquery.subquery.as_ref().clone(); - let new_plan = plan.transform_up(&analyze_internal).data()?; - let subquery = subquery.with_plan(Arc::new(new_plan)); - Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( - expr, subquery, negated, - )))) - } - Expr::ScalarSubquery(subquery) => { - let plan = subquery.subquery.as_ref().clone(); - let new_plan = plan.transform_up(&analyze_internal).data()?; - let subquery = subquery.with_plan(Arc::new(new_plan)); - Ok(Transformed::yes(Expr::ScalarSubquery(subquery))) - } - _ => Ok(Transformed::no(expr)), - } + Ok(result) } fn generate_projection_expr( From 0e0ac8ee522c12c3656e5101285dda2d0d6a0dde Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 11 Apr 2024 14:57:55 -0400 Subject: [PATCH 2/2] Improve variable name --- datafusion/optimizer/src/analyzer/inline_table_scan.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index 993dc24f39e52..cc5f870a9c732 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -48,9 +48,10 @@ impl AnalyzerRule for InlineTableScan { fn analyze_internal(plan: LogicalPlan) -> Result> { // rewrite any subqueries in the plan first - let result = plan.map_subqueries(|plan| plan.transform_up(&analyze_internal))?; + let transformed_plan = + plan.map_subqueries(|plan| plan.transform_up(&analyze_internal))?; - let result = result.transform_data(|plan| { + let transformed_plan = transformed_plan.transform_data(|plan| { match plan { // Match only on scans without filter / projection / fetch // Views and DataFrames won't have those added @@ -77,7 +78,7 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { } })?; - Ok(result) + Ok(transformed_plan) } fn generate_projection_expr(