@@ -35,6 +35,7 @@ use crate::arrow::util::pretty;
3535use crate :: physical_plan:: {
3636 execute_stream, execute_stream_partitioned, ExecutionPlan , SendableRecordBatchStream ,
3737} ;
38+ use crate :: sql:: utils:: find_window_exprs;
3839use async_trait:: async_trait;
3940
4041/// Implementation of DataFrame API
@@ -75,10 +76,17 @@ impl DataFrame for DataFrameImpl {
7576
7677 /// Create a projection based on arbitrary expressions
7778 fn select ( & self , expr_list : Vec < Expr > ) -> Result < Arc < dyn DataFrame > > {
78- let plan = LogicalPlanBuilder :: from ( self . to_logical_plan ( ) )
79- . project ( expr_list) ?
80- . build ( ) ?;
81- Ok ( Arc :: new ( DataFrameImpl :: new ( self . ctx_state . clone ( ) , & plan) ) )
79+ let window_func_exprs = find_window_exprs ( & expr_list) ;
80+ let plan = if window_func_exprs. is_empty ( ) {
81+ self . to_logical_plan ( )
82+ } else {
83+ LogicalPlanBuilder :: window_plan ( self . to_logical_plan ( ) , window_func_exprs) ?
84+ } ;
85+ let project_plan = LogicalPlanBuilder :: from ( plan) . project ( expr_list) ?. build ( ) ?;
86+ Ok ( Arc :: new ( DataFrameImpl :: new (
87+ self . ctx_state . clone ( ) ,
88+ & project_plan,
89+ ) ) )
8290 }
8391
8492 /// Create a filter based on a predicate expression
@@ -233,7 +241,7 @@ mod tests {
233241 use crate :: execution:: options:: CsvReadOptions ;
234242 use crate :: logical_plan:: * ;
235243 use crate :: physical_plan:: functions:: Volatility ;
236- use crate :: physical_plan:: ColumnarValue ;
244+ use crate :: physical_plan:: { window_functions , ColumnarValue } ;
237245 use crate :: { assert_batches_sorted_eq, execution:: context:: ExecutionContext } ;
238246 use crate :: { physical_plan:: functions:: ScalarFunctionImplementation , test} ;
239247 use arrow:: datatypes:: DataType ;
@@ -270,6 +278,31 @@ mod tests {
270278 Ok ( ( ) )
271279 }
272280
281+ #[ tokio:: test]
282+ async fn select_with_window_exprs ( ) -> Result < ( ) > {
283+ // build plan using Table API
284+ let t = test_table ( ) . await ?;
285+ let first_row = Expr :: WindowFunction {
286+ fun : window_functions:: WindowFunction :: BuiltInWindowFunction (
287+ window_functions:: BuiltInWindowFunction :: FirstValue ,
288+ ) ,
289+ args : vec ! [ col( "aggregate_test_100.c1" ) ] ,
290+ partition_by : vec ! [ col( "aggregate_test_100.c2" ) ] ,
291+ order_by : vec ! [ ] ,
292+ window_frame : None ,
293+ } ;
294+ let t2 = t. select ( vec ! [ col( "c1" ) , first_row] ) ?;
295+ let plan = t2. to_logical_plan ( ) ;
296+
297+ let sql_plan = create_plan (
298+ "select c1, first_value(c1) over (partition by c2) from aggregate_test_100" ,
299+ )
300+ . await ?;
301+
302+ assert_same_plan ( & plan, & sql_plan) ;
303+ Ok ( ( ) )
304+ }
305+
273306 #[ tokio:: test]
274307 async fn aggregate ( ) -> Result < ( ) > {
275308 // build plan using DataFrame API
0 commit comments