@@ -87,68 +87,75 @@ impl OptimizerRule for SubqueryFilterToJoin {
8787
8888 // Add subquery joins to new_input
8989 // optimized_input value should retain for possible optimization rollback
90- let mut new_input = optimized_input. clone ( ) ;
91- let opt_result = subquery_filters. iter ( ) . try_for_each ( |& e| match e {
92- Expr :: InSubquery {
93- expr,
94- subquery,
95- negated,
96- } => {
97- let right_input =
98- self . optimize ( & * subquery. subquery , execution_props) ?;
99- let right_schema = right_input. schema ( ) ;
100- if right_schema. fields ( ) . len ( ) != 1 {
101- return Err ( DataFusionError :: Plan (
102- "Only single column allowed in InSubquery" . to_string ( ) ,
103- ) ) ;
104- } ;
105-
106- let right_key = right_schema. field ( 0 ) . qualified_column ( ) ;
107- let left_key = match * expr. clone ( ) {
108- Expr :: Column ( col) => col,
109- _ => return Err ( DataFusionError :: NotImplemented (
110- "Filtering by expression not implemented for InSubquery"
111- . to_string ( ) ,
112- ) ) ,
113- } ;
114-
115- let join_type = match negated {
116- true => JoinType :: Anti ,
117- false => JoinType :: Semi ,
118- } ;
119-
120- let schema = build_join_schema (
121- new_input. schema ( ) ,
122- right_schema,
123- & join_type,
124- ) ?;
125-
126- new_input = LogicalPlan :: Join ( Join {
127- left : Arc :: new ( new_input. clone ( ) ) ,
128- right : Arc :: new ( right_input) ,
129- on : vec ! [ ( left_key, right_key) ] ,
130- join_type,
131- join_constraint : JoinConstraint :: On ,
132- schema : Arc :: new ( schema) ,
133- null_equals_null : false ,
134- } ) ;
135-
136- Ok ( ( ) )
90+ let opt_result = subquery_filters. iter ( ) . try_fold (
91+ optimized_input. clone ( ) ,
92+ |input, & e| match e {
93+ Expr :: InSubquery {
94+ expr,
95+ subquery,
96+ negated,
97+ } => {
98+ let right_input = self . optimize (
99+ & * subquery. subquery ,
100+ execution_props
101+ ) ?;
102+ let right_schema = right_input. schema ( ) ;
103+ if right_schema. fields ( ) . len ( ) != 1 {
104+ return Err ( DataFusionError :: Plan (
105+ "Only single column allowed in InSubquery"
106+ . to_string ( ) ,
107+ ) ) ;
108+ } ;
109+
110+ let right_key = right_schema. field ( 0 ) . qualified_column ( ) ;
111+ let left_key = match * expr. clone ( ) {
112+ Expr :: Column ( col) => col,
113+ _ => return Err ( DataFusionError :: NotImplemented (
114+ "Filtering by expression not implemented for InSubquery"
115+ . to_string ( ) ,
116+ ) ) ,
117+ } ;
118+
119+ let join_type = if * negated {
120+ JoinType :: Anti
121+ } else {
122+ JoinType :: Semi
123+ } ;
124+
125+ let schema = build_join_schema (
126+ optimized_input. schema ( ) ,
127+ right_schema,
128+ & join_type,
129+ ) ?;
130+
131+ Ok ( LogicalPlan :: Join ( Join {
132+ left : Arc :: new ( input) ,
133+ right : Arc :: new ( right_input) ,
134+ on : vec ! [ ( left_key, right_key) ] ,
135+ join_type,
136+ join_constraint : JoinConstraint :: On ,
137+ schema : Arc :: new ( schema) ,
138+ null_equals_null : false ,
139+ } ) )
140+ }
141+ _ => Err ( DataFusionError :: Plan (
142+ "Unknown expression while rewriting subquery to joins"
143+ . to_string ( ) ,
144+ ) ) ,
137145 }
138- _ => Err ( DataFusionError :: Plan (
139- "Unknown expression while rewriting subquery to joins"
140- . to_string ( ) ,
141- ) ) ,
142- } ) ;
146+ ) ;
143147
144148 // In case of expressions which could not be rewritten
145149 // return original filter with optimized input
146- if opt_result. is_err ( ) {
147- return Ok ( LogicalPlan :: Filter ( Filter {
148- predicate : predicate. clone ( ) ,
149- input : Arc :: new ( optimized_input) ,
150- } ) ) ;
151- }
150+ let new_input = match opt_result {
151+ Ok ( plan) => plan,
152+ Err ( _) => {
153+ return Ok ( LogicalPlan :: Filter ( Filter {
154+ predicate : predicate. clone ( ) ,
155+ input : Arc :: new ( optimized_input) ,
156+ } ) )
157+ }
158+ } ;
152159
153160 // Apply regular filters to join output if some or just return join
154161 if regular_filters. is_empty ( ) {
0 commit comments