diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index d6a0add9b2537..3ec6befb8ea57 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -91,6 +91,10 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool { | Expr::GroupingSet(_) | Expr::Case { .. } => VisitRecursion::Continue, + Expr::Unnest { .. } => { + is_applicable = false; + VisitRecursion::Stop + } Expr::ScalarFunction(scalar_function) => { match scalar_function.fun.volatility() { Volatility::Immutable => VisitRecursion::Continue, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 35119f374fa37..b2e1b35fdac35 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -30,8 +30,8 @@ use crate::datasource::source_as_provider; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; use crate::logical_expr::{ - Aggregate, EmptyRelation, Join, Projection, Sort, SubqueryAlias, TableScan, Unnest, - Window, + Aggregate, EmptyRelation, Join, Projection, Sort, SubqueryAlias, TableScan, + Unnest as UnnestPlan, Window, }; use crate::logical_expr::{ CrossJoin, Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, @@ -83,8 +83,9 @@ use datafusion_common::{ use datafusion_expr::expr::{ self, AggregateFunction, AggregateUDF, Alias, Between, BinaryExpr, Cast, GetFieldAccess, GetIndexedField, GroupingSet, InList, Like, ScalarUDF, TryCast, - WindowFunction, + Unnest, WindowFunction, }; + use datafusion_expr::expr_rewriter::{unalias, unnormalize_cols}; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{DescribeTable, DmlStatement, StringifiedPlan, WriteOp}; @@ -216,6 +217,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { Ok(name) } + Expr::Unnest(Unnest { array_exprs, .. }) => { + create_function_physical_name("unnest", false, array_exprs) + } Expr::ScalarFunction(func) => { create_function_physical_name(&func.fun.to_string(), false, &func.args) } @@ -1226,7 +1230,7 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch))) } - LogicalPlan::Unnest(Unnest { input, column, schema, options }) => { + LogicalPlan::Unnest(UnnestPlan { input, column, schema, options }) => { let input = self.create_initial_plan(input, session_state).await?; let column_exec = schema.index_of_column(column) .map(|idx| Column::new(&column.name, idx))?; diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 3e4e3068977ca..d384637b7d301 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -28,6 +28,8 @@ use crate::window_function; use crate::Operator; use arrow::datatypes::DataType; use datafusion_common::internal_err; +use datafusion_common::UnnestOptions; +use datafusion_common::not_impl_err; use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue}; use std::collections::HashSet; use std::fmt; @@ -147,6 +149,8 @@ pub enum Expr { TryCast(TryCast), /// A sort expression, that can be used to sort values. Sort(Sort), + /// Unnest expression + Unnest(Unnest), /// Represents the call of a built-in scalar function with a set of arguments. ScalarFunction(ScalarFunction), /// Represents the call of a user-defined scalar function with arguments. @@ -328,6 +332,24 @@ impl Between { } } +/// Unnest expression +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct Unnest { + /// Arrays to unnest + pub array_exprs: Vec, + pub options: UnnestOptions, +} + +impl Unnest { + /// Create a new Unnest expression + pub fn new(array_exprs: Vec, options: UnnestOptions) -> Self { + Self { + array_exprs, + options, + } + } +} + /// ScalarFunction expression #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct ScalarFunction { @@ -728,6 +750,7 @@ impl Expr { Expr::TryCast { .. } => "TryCast", Expr::WindowFunction { .. } => "WindowFunction", Expr::Wildcard => "Wildcard", + Expr::Unnest(..) => "Unnest", } } @@ -1030,6 +1053,47 @@ impl Expr { pub fn contains_outer(&self) -> bool { !find_out_reference_exprs(self).is_empty() } + + /// Flatten the nested array expressions until the base array is reached. + /// For example: + /// [[1, 2, 3], [4, 5, 6]] -> [1, 2, 3, 4, 5, 6] + /// [[[1, 2], [3, 4]], [[5, 6], [7, 8]]] -> [1, 2, 3, 4, 5, 6, 7, 8] + /// Panics if the expression is not an unnest expression. + pub fn flatten(&self) -> Self { + self.try_flatten().unwrap() + } + + /// Flatten the nested array expressions until the base array is reached. + /// For example: + /// [[1, 2, 3], [4, 5, 6]] => [1, 2, 3, 4, 5, 6] + /// [[[1, 2], [3, 4]], [[5, 6], [7, 8]]] => [1, 2, 3, 4, 5, 6, 7, 8] + /// Returns an error if the expression cannot be flattened. + pub fn try_flatten(&self) -> Result { + match self { + Self::ScalarFunction(ScalarFunction { + fun: built_in_function::BuiltinScalarFunction::MakeArray, + args, + }) => { + let flatten_args: Vec = + args.iter().flat_map(Self::flatten_internal).collect(); + Ok(Self::ScalarFunction(ScalarFunction { + fun: built_in_function::BuiltinScalarFunction::MakeArray, + args: flatten_args, + })) + } + _ => not_impl_err!("flatten() is not implemented for {self}"), + } + } + + fn flatten_internal(&self) -> Vec { + match self { + Self::ScalarFunction(ScalarFunction { + fun: built_in_function::BuiltinScalarFunction::MakeArray, + args, + }) => args.iter().flat_map(Self::flatten_internal).collect(), + _ => vec![self.clone()], + } + } } #[macro_export] @@ -1118,6 +1182,9 @@ impl fmt::Display for Expr { write!(f, " NULLS LAST") } } + Expr::Unnest(Unnest { array_exprs, .. }) => { + fmt_function(f, "unnest", false, array_exprs, false) + } Expr::ScalarFunction(func) => { fmt_function(f, &func.fun.to_string(), false, &func.args, true) } @@ -1286,7 +1353,6 @@ fn fmt_function( false => args.iter().map(|arg| format!("{arg:?}")).collect(), }; - // let args: Vec = args.iter().map(|arg| format!("{:?}", arg)).collect(); let distinct_str = match distinct { true => "DISTINCT ", false => "", @@ -1452,6 +1518,9 @@ fn create_name(e: &Expr) -> Result { } } } + Expr::Unnest(Unnest { array_exprs, .. }) => { + create_function_name("unnest", false, array_exprs) + } Expr::ScalarFunction(func) => { create_function_name(&func.fun.to_string(), false, &func.args) } @@ -1583,6 +1652,69 @@ mod test { use datafusion_common::Column; use datafusion_common::{Result, ScalarValue}; + use super::ScalarFunction; + + fn create_make_array_expr(args: &[Expr]) -> Expr { + Expr::ScalarFunction(ScalarFunction::new( + crate::BuiltinScalarFunction::MakeArray, + args.to_vec(), + )) + } + + #[test] + fn test_flatten() { + let i64_none = ScalarValue::try_from(&DataType::Int64).unwrap(); + + let arr = create_make_array_expr(&[ + create_make_array_expr(&[lit(10i64), lit(20i64), lit(30i64)]), + create_make_array_expr(&[lit(1i64), lit(i64_none.clone()), lit(10i64)]), + create_make_array_expr(&[lit(4i64), lit(5i64), lit(6i64)]), + ]); + + let flattened = arr.flatten(); + assert_eq!( + flattened, + create_make_array_expr(&[ + lit(10i64), + lit(20i64), + lit(30i64), + lit(1i64), + lit(i64_none), + lit(10i64), + lit(4i64), + lit(5i64), + lit(6i64), + ]) + ); + + // [[[1, 2], [3, 4]], [[5, 6], [7, 8]]] -> [1, 2, 3, 4, 5, 6, 7, 8] + let arr = create_make_array_expr(&[ + create_make_array_expr(&[ + create_make_array_expr(&[lit(1i64), lit(2i64)]), + create_make_array_expr(&[lit(3i64), lit(4i64)]), + ]), + create_make_array_expr(&[ + create_make_array_expr(&[lit(5i64), lit(6i64)]), + create_make_array_expr(&[lit(7i64), lit(8i64)]), + ]), + ]); + + let flattened = arr.flatten(); + assert_eq!( + flattened, + create_make_array_expr(&[ + lit(1i64), + lit(2i64), + lit(3i64), + lit(4i64), + lit(5i64), + lit(6i64), + lit(7i64), + lit(8i64), + ]) + ); + } + #[test] fn format_case_when() -> Result<()> { let expr = case(col("a")) diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 9651b377c5bda..98c809ef08602 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -19,7 +19,7 @@ use super::{Between, Expr, Like}; use crate::expr::{ AggregateFunction, AggregateUDF, Alias, BinaryExpr, Cast, GetFieldAccess, GetIndexedField, InList, InSubquery, Placeholder, ScalarFunction, ScalarUDF, Sort, - TryCast, WindowFunction, + TryCast, Unnest, WindowFunction, }; use crate::field_util::GetFieldAccessSchema; use crate::type_coercion::binary::get_result_type; @@ -28,9 +28,9 @@ use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field}; use datafusion_common::{ internal_err, plan_err, Column, DFField, DFSchema, DataFusionError, ExprSchema, - Result, + Result, not_impl_err, }; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; /// trait to allow expr to typable with respect to a schema @@ -88,6 +88,28 @@ impl ExprSchemable for Expr { .collect::>>()?; Ok((fun.return_type)(&data_types)?.as_ref().clone()) } + Expr::Unnest(Unnest { array_exprs, .. }) => { + let data_types = array_exprs + .iter() + .map(|e| e.get_type(schema)) + .collect::>>()?; + + if data_types.is_empty() { + return internal_err!("Empty expression is not allowed") + } + + // Use a HashSet to efficiently check for unique data types + let unique_data_types: HashSet<_> = data_types.iter().collect(); + + // If there is more than one unique data type, return an error + if unique_data_types.len() > 1 { + return not_impl_err!("Unnest does not support inconsistent data types: {data_types:?}"); + } + + // Extract the common data type since there is only one unique data type + let return_type = data_types[0].to_owned(); + Ok(return_type) + } Expr::ScalarFunction(ScalarFunction { fun, args }) => { let data_types = args .iter() @@ -129,7 +151,9 @@ impl ExprSchemable for Expr { | Expr::IsUnknown(_) | Expr::IsNotTrue(_) | Expr::IsNotFalse(_) - | Expr::IsNotUnknown(_) => Ok(DataType::Boolean), + | Expr::IsNotUnknown(_) + | Expr::Like { .. } + | Expr::SimilarTo { .. } => Ok(DataType::Boolean), Expr::ScalarSubquery(subquery) => { Ok(subquery.subquery.schema().field(0).data_type().clone()) } @@ -138,7 +162,6 @@ impl ExprSchemable for Expr { ref right, ref op, }) => get_result_type(&left.get_type(schema)?, op, &right.get_type(schema)?), - Expr::Like { .. } | Expr::SimilarTo { .. } => Ok(DataType::Boolean), Expr::Placeholder(Placeholder { data_type, .. }) => { data_type.clone().ok_or_else(|| { DataFusionError::Plan( @@ -146,20 +169,17 @@ impl ExprSchemable for Expr { ) }) } - Expr::Wildcard => { - // Wildcard do not really have a type and do not appear in projections - Ok(DataType::Null) - } - Expr::QualifiedWildcard { .. } => internal_err!( - "QualifiedWildcard expressions are not valid in a logical query plan" - ), - Expr::GroupingSet(_) => { - // grouping sets do not really have a type and do not appear in projections - Ok(DataType::Null) - } Expr::GetIndexedField(GetIndexedField { expr, field }) => { field_for_index(expr, field, schema).map(|x| x.data_type().clone()) } + Expr::Wildcard | Expr::GroupingSet(_) => { + // They do not really have a type and do not appear in projections + Ok(DataType::Null) + } + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "QualifiedWildcard expressions are not valid in a logical query plan" + .to_owned(), + )), } } @@ -231,6 +251,7 @@ impl ExprSchemable for Expr { Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema), Expr::ScalarVariable(_, _) | Expr::TryCast { .. } + | Expr::Unnest(..) | Expr::ScalarFunction(..) | Expr::ScalarUDF(..) | Expr::WindowFunction { .. } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 6171d43b37f50..bb2169dad6a17 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -18,7 +18,7 @@ //! This module provides a builder for creating LogicalPlans use crate::dml::{CopyOptions, CopyTo}; -use crate::expr::Alias; +use crate::expr::{Alias, ScalarFunction}; use crate::expr_rewriter::{ coerce_plan_expr_for_schema, normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, @@ -27,8 +27,10 @@ use crate::expr_rewriter::{ use crate::type_coercion::binary::comparison_coercion; use crate::utils::{columnize_expr, compare_sort_expr}; use crate::{ - and, binary_expr, DmlStatement, Operator, TableProviderFilterPushDown, WriteOp, + and, binary_expr, expr, BuiltInWindowFunction, DmlStatement, Operator, + TableProviderFilterPushDown, WindowFrame, WriteOp, }; +use crate::{col, ident, WindowFunction}; use crate::{ logical_plan::{ Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join, @@ -1063,6 +1065,124 @@ impl LogicalPlanBuilder { options, )?)) } + + /// Join the unnested plans. + /// + /// We apply `RowNumber` to each unnested plan (column) and full join based on the row number. + /// Each plan is ensured to have the same number of rows. + /// + /// For example: + /// + /// Given the first plan with name col1 (1, 2, 3) and the second plan with name col2 (4, 5, null), + /// After unnesting, we have two plans with row numbers: + /// Plan 1: (1, 1), (2, 2), (3, 3) + /// Plan 2: (4, 1), (5, 2), (null, 3) + /// + /// Then we can join these two plans based on the row number. + /// The result is: + /// col1 col2 + /// 1 4 + /// 2 5 + /// 3 null + // TODO: Fails to convert fold to try_fold. + #[allow(clippy::manual_try_fold)] + pub fn join_unnest_plans( + unnest_plans: Vec, + column_names: Vec, + ) -> Result { + // Add row_number for each unnested array + let window_func_expr = Expr::WindowFunction(expr::WindowFunction::new( + WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::RowNumber), + vec![], + vec![], + vec![], + WindowFrame::new(false), + )); + let window_func_exprs = vec![window_func_expr.clone()]; + + let window_plans = unnest_plans + .into_iter() + .map(|plan| LogicalPlanBuilder::window_plan(plan, window_func_exprs.clone())) + .collect::>>()?; + + // Create alias for row number + let row_numbers_name: Vec = (0..column_names.len()) + .map(|idx| format!("rn{idx}")) + .collect(); + + let project_exprs: Vec> = column_names + .iter() + .zip(row_numbers_name.iter()) + .map(|(col_name, row_number_name)| { + vec![ + ident(col_name), + window_func_expr.clone().alias(row_number_name), + ] + }) + .collect(); + let project_plans = window_plans + .iter() + .zip(project_exprs.into_iter()) + .map(|(plan, expr)| { + LogicalPlanBuilder::from(plan.clone()) + .project(expr)? + .build() + }) + .collect::>>()?; + + let columns_to_join_on = project_plans + .iter() + .zip(row_numbers_name.iter()) + .map(|(_, rn)| col(rn)) + .collect::>(); + + let subqueries_alias_plan = project_plans; + + let (join_plan, _) = subqueries_alias_plan + .iter() + .zip(columns_to_join_on.iter()) + .skip(1) + .fold( + Ok(( + subqueries_alias_plan[0].clone(), + columns_to_join_on[0].clone(), + )), + |result: Result<(LogicalPlan, Expr)>, (right_plan, right_column)| { + result.and_then(|(left_plan, left_column)| { + let plan = LogicalPlanBuilder::from(left_plan) + .join( + right_plan.clone(), + JoinType::Full, + (Vec::::new(), Vec::::new()), + Some(left_column.eq(right_column.clone())), + )? + .build()?; + Ok((plan, right_column.clone())) + }) + }, + )?; + + let selected_exprs: Vec = column_names.into_iter().map(col).collect(); + + LogicalPlanBuilder::from(join_plan).project(selected_exprs) + } + + /// Unnest the given array expressions. + /// Expands an array into a set of rows. + /// First step is to create LogicalPlan::Unnest for each array expression (column). + /// Then join these plans side by side. If the row number is not equal, null is filled. + /// For example: + /// Unnest(\[1,2,3\], \[4,5\]), generating 3 rows ((1,4), (2,5), (3,null)). + pub fn unnest_arrays( + self, + array_exprs: Vec, + array_options: Vec, + ) -> Result { + let (unnest_plans, column_names) = + build_unnest_plans(self.plan.clone(), array_exprs.clone(), array_options)?; + + Self::join_unnest_plans(unnest_plans, column_names) + } } /// Creates a schema for a join operation. @@ -1382,6 +1502,49 @@ pub fn wrap_projection_for_join_if_necessary( Ok((plan, join_on, need_project)) } +// TODO: Move this to array utils module. +// Align arrays with nulls to have the same row size. +fn align_arrays_with_nulls(array_exprs: &[Expr]) -> Result> { + let array_exprs: Vec = array_exprs.iter().map(|e| e.flatten()).collect(); + + // Append array with null to the same size, so we can join them easily. + // For example: + // unnest([1, 2], [3, 4, 5]) => unnest([1, 2, null], [3, 4, 5]) + + // Calculate the maximum array size + let max_array_size = array_exprs + .iter() + .filter_map(|e| match e { + Expr::ScalarFunction(ScalarFunction { args, .. }) => Some(args.len()), + Expr::Literal(ScalarValue::List(Some(scalar_value), _)) => { + Some(scalar_value.len()) + } + _ => None, + }) + .max() + .ok_or_else(|| { + DataFusionError::NotImplemented("UNNEST only supports list type".to_string()) + })?; + + // Extend arrays with null values to match the maximum size + let array_exprs: Vec = array_exprs + .into_iter() + .map(|e| { + if let Expr::ScalarFunction(ScalarFunction { fun, mut args }) = e { + args.extend( + (args.len()..max_array_size) + .map(|_| Expr::Literal(ScalarValue::Null)), + ); + Expr::ScalarFunction(ScalarFunction { fun, args }) + } else { + e + } + }) + .collect(); + + Ok(array_exprs) +} + /// Basic TableSource implementation intended for use in tests and documentation. It is expected /// that users will provide their own TableSource implementations or use DataFusion's /// DefaultTableSource. @@ -1470,6 +1633,38 @@ pub fn unnest_with_options( })) } +/// Create unnest plan from arrays. +fn build_unnest_plans( + input: LogicalPlan, + array_exprs: Vec, + array_options: Vec, +) -> Result<(Vec, Vec)> { + // Prepare array expressions + // 1. Fill nulls to the same size + // 2. Create projection for each array expression + let array_expression = align_arrays_with_nulls(&array_exprs)?; + + let project_plan_builder = + LogicalPlanBuilder::from(input.clone()).project(array_expression.clone())?; + + let mut unnest_plans = vec![]; + let mut columns_name = vec![]; + + // Build unnest plan for each array expression + for (expr, options) in array_expression.iter().zip(array_options.into_iter()) { + let column = expr.display_name()?; + columns_name.push(column.clone()); + + let unnest_plan = project_plan_builder + .clone() + .unnest_column_with_options(column, options)? + .build()?; + unnest_plans.push(unnest_plan); + } + + Ok((unnest_plans, columns_name)) +} + #[cfg(test)] mod tests { use crate::logical_plan::StringifiedPlan; diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index f74cc164a7a5a..d9cc2a6b556ed 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -20,7 +20,7 @@ use crate::expr::{ AggregateFunction, AggregateUDF, Alias, Between, BinaryExpr, Case, Cast, GetIndexedField, GroupingSet, InList, InSubquery, Like, Placeholder, ScalarFunction, - ScalarUDF, Sort, TryCast, WindowFunction, + ScalarUDF, Sort, TryCast, Unnest, WindowFunction, }; use crate::Expr; use datafusion_common::tree_node::VisitRecursion; @@ -55,6 +55,7 @@ impl TreeNode for Expr { Expr::ScalarFunction (ScalarFunction{ args, .. } )| Expr::ScalarUDF(ScalarUDF { args, .. }) => { args.clone() } + Expr::Unnest(Unnest {array_exprs, ..}) => array_exprs.clone(), Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => { lists_of_exprs.clone().into_iter().flatten().collect() } @@ -263,6 +264,13 @@ impl TreeNode for Expr { asc, nulls_first, )), + Expr::Unnest(Unnest { + array_exprs, + options, + }) => Expr::Unnest(Unnest::new( + transform_vec(array_exprs, &mut transform)?, + options, + )), Expr::ScalarFunction(ScalarFunction { args, fun }) => Expr::ScalarFunction( ScalarFunction::new(fun, transform_vec(args, &mut transform)?), ), diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 54a1ce348bf90..bd3c637b719ba 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -282,6 +282,7 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet) -> Result<()> { | Expr::Cast { .. } | Expr::TryCast { .. } | Expr::Sort { .. } + | Expr::Unnest(..) | Expr::ScalarFunction(..) | Expr::ScalarUDF(..) | Expr::WindowFunction { .. } diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index ee7b37979dc4b..638d6c4713a6f 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -178,6 +178,7 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> Result { | Expr::Case(_) | Expr::Cast(_) | Expr::TryCast(_) + | Expr::Unnest(_) | Expr::ScalarFunction(..) | Expr::InList { .. } => Ok(VisitRecursion::Continue), Expr::Sort(_) diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index f5a6860299abb..c7127abd27db3 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -342,6 +342,7 @@ impl<'a> ConstEvaluator<'a> { | Expr::GroupingSet(_) | Expr::Wildcard | Expr::QualifiedWildcard { .. } + | Expr::Unnest(_) | Expr::Placeholder(_) => false, Expr::ScalarFunction(ScalarFunction { fun, .. }) => { Self::volatility_ok(fun.volatility()) diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 410ea97887e0c..82cd5d91354df 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -283,7 +283,7 @@ fn build_batch( .unwrap(); build_batch_fixedsize_list(batch, schema, column.index(), list_array, options) } - _ => exec_err!("Invalid unnest column {column}"), + data_type => exec_err!("Invalid unnest column {column}, got {data_type}"), } } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 8a8550d05d133..ff002f2cfd4f8 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -751,6 +751,11 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode { .to_string(), )) } + Expr::Unnest(..) => { + return Err(Error::NotImplemented( + "try_from() for Unnest is not implemented".to_string(), + )) + } Expr::ScalarFunction(ScalarFunction { fun, args }) => { let fun: protobuf::ScalarFunction = fun.try_into()?; let args: Vec = args diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 3861b4848d9ba..a2ea18cb5a3e6 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -16,8 +16,10 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result}; -use datafusion_expr::expr::{ScalarFunction, ScalarUDF}; +use datafusion_common::{ + not_impl_err, plan_err, DFSchema, DataFusionError, Result, UnnestOptions, +}; +use datafusion_expr::expr::{ScalarFunction, ScalarUDF, Unnest}; use datafusion_expr::function::suggest_valid_function; use datafusion_expr::window_frame::regularize; use datafusion_expr::{ @@ -46,6 +48,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { crate::utils::normalize_ident(function.name.0[0].clone()) }; + // Unnest Expression + // TOOD: Configure UnnestOptions, set default for now + let options: UnnestOptions = Default::default(); + if name == "unnest" { + let array_exprs = + self.function_args_to_expr(function.args, schema, planner_context)?; + return Ok(Expr::Unnest(Unnest::new(array_exprs, options))); + } + // user-defined function (UDF) should have precedence in case it has the same name as a scalar built-in function if let Some(fm) = self.schema_provider.get_function_meta(&name) { let args = diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index a01a9a2fb8db3..dd80d1804e416 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -16,11 +16,11 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_common::{not_impl_err, DataFusionError, Result, UnnestOptions}; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; use sqlparser::ast::TableFactor; - mod join; +mod unnest; impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Create a `LogicalPlan` that scans the named relation @@ -62,6 +62,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { self.plan_table_with_joins(*table_with_joins, planner_context)?, alias, ), + + // TODO: Support UnnestOptions + TableFactor::UNNEST { + alias, + array_exprs, + with_offset: _, + with_offset_alias: _, + } => { + // TODO: Support UnnestOptions + let array_options: Vec = + vec![UnnestOptions::default(); array_exprs.len()]; + + // If column aliases are not supplied, then for a function returning a base data type, + // the column name is also the same as the function name. + if let Some(mut alias) = alias { + if alias.columns.is_empty() { + alias.columns = vec![alias.name.clone()]; + } + ( + self.plan_unnest(array_exprs, planner_context, array_options)?, + Some(alias), + ) + } else { + ( + self.plan_unnest(array_exprs, planner_context, array_options)?, + None, + ) + } + } + // @todo Support TableFactory::TableFunction? _ => { return not_impl_err!( @@ -69,6 +99,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ); } }; + if let Some(alias) = alias { self.apply_table_alias(plan, alias) } else { diff --git a/datafusion/sql/src/relation/unnest.rs b/datafusion/sql/src/relation/unnest.rs new file mode 100644 index 0000000000000..2645880941b70 --- /dev/null +++ b/datafusion/sql/src/relation/unnest.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; + +use datafusion_common::{DFSchema, Result, UnnestOptions}; +use datafusion_expr::expr::Expr; +use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; +use sqlparser::ast::Expr as SQLExpr; + +impl<'a, S: ContextProvider> SqlToRel<'a, S> { + pub(crate) fn plan_unnest( + &self, + array_exprs: Vec, + planner_context: &mut PlannerContext, + array_options: Vec, + ) -> Result { + // No pre-defiend schema for Unnest + let schema = DFSchema::empty(); + + let exprs: Vec = array_exprs + .into_iter() + .map(|sql| self.sql_expr_to_logical_expr(sql, &schema, planner_context)) + .collect::>>()?; + + let plan = LogicalPlanBuilder::empty(true).build()?; + + LogicalPlanBuilder::from(plan) + .unnest_arrays(exprs, array_options)? + .build() + } +} diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 2062afabfc1a4..eae61a99f7752 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -17,6 +17,7 @@ use std::collections::HashSet; use std::sync::Arc; +use std::vec; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::utils::{ @@ -24,12 +25,11 @@ use crate::utils::{ resolve_columns, resolve_positions_to_exprs, }; -use datafusion_common::Column; use datafusion_common::{ - get_target_functional_dependencies, not_impl_err, plan_err, DFSchemaRef, + get_target_functional_dependencies, not_impl_err, plan_err, Column, DFSchemaRef, DataFusionError, Result, }; -use datafusion_expr::expr::Alias; +use datafusion_expr::expr::{Alias, Unnest}; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, }; @@ -92,6 +92,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // having and group by clause may reference aliases defined in select projection let projected_plan = self.project(plan.clone(), select_exprs.clone())?; + let mut combined_schema = (**projected_plan.schema()).clone(); combined_schema.merge(plan.schema()); @@ -226,7 +227,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; // final projection - let plan = project(plan, select_exprs_post_aggr)?; + let plan = project(plan, select_exprs_post_aggr.clone())?; + + // Process unnest expressions, convert to LogicalPlan::Unnest + let plan = process_unnest_expr(plan, &select_exprs_post_aggr)?; // process distinct clause let distinct = select @@ -700,3 +704,35 @@ fn get_updated_group_by_exprs( Ok(new_group_by_exprs) } + +// Convert Expr::Unnest to LogicalPlan::Unnest +fn process_unnest_expr(input: LogicalPlan, select_exprs: &[Expr]) -> Result { + let mut array_options = vec![]; + let mut array_exprs_to_unnest = vec![]; + for expr in select_exprs.iter() { + if let Expr::Unnest(Unnest { + array_exprs, + options, + }) = expr + { + array_exprs_to_unnest.push(array_exprs[0].clone()); + array_options.push(options.clone()); + } else if let Expr::Alias(Alias { expr, .. }) = expr { + if let Expr::Unnest(Unnest { + array_exprs, + options, + }) = expr.as_ref() + { + array_exprs_to_unnest.push(array_exprs[0].clone()); + array_options.push(options.clone()); + } + } + } + if array_exprs_to_unnest.is_empty() { + Ok(input) + } else { + LogicalPlanBuilder::from(input) + .unnest_arrays(array_exprs_to_unnest, array_options)? + .build() + } +} diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index f11bc5206eb43..7d6b9ae99f0de 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -209,6 +209,14 @@ AS VALUES (make_array([28, 29, 30], [31, 32, 33], [34, 35, 36], [28, 29, 30], [31, 32, 33], [34, 35, 36], [28, 29, 30], [31, 32, 33], [34, 35, 36], [28, 29, 30]), [28, 29, 30], [37, 38, 39], 10) ; +statement ok +CREATE TABLE arrays_unnest +AS VALUES + (make_array(1, 2), 'A', make_array(1,2), 3), + (NULL, 'B', make_array(4), 5), + (make_array(3), 'C', make_array(6,7,8), 9) +; + # arrays table query ??? select column1, column2, column3 from arrays; @@ -2415,15 +2423,160 @@ select make_array(f0) from fixed_size_list_array [[1, 2]] [[3, 4]] -query ? -select array_concat(column1, [7]) from arrays_values_v2; +## Unnest + +# Set target partitions to 1 for deterministic results +# Row sort is not used since the order is crucial. +statement ok +set datafusion.execution.target_partitions = 1; + +query II +select unnest(make_array(1,2,3)), + unnest(make_array(4,5)) +; +---- +1 4 +2 5 +3 NULL + +query III +select unnest(make_array(1,2,3)), + unnest(make_array(4,5)), + unnest(make_array(6,7,8,9)) +; +---- +1 4 6 +2 5 7 +3 NULL 8 +NULL NULL 9 + +query III +select unnest(make_array(1,2,3,4,5)), + unnest(make_array(6,7)), + unnest(make_array(8,9,10,11,22,33)) +; +---- +1 6 8 +2 7 9 +3 NULL 10 +4 NULL 11 +5 NULL 22 +NULL NULL 33 + +# Select From + +query IIIII +select * from unnest( + make_array(1), + make_array(2,3), + make_array(4,5,6), + make_array(7,8), + make_array(9) +); +---- +1 2 4 7 9 +NULL 3 5 8 NULL +NULL NULL 6 NULL NULL + +query I +select * from unnest(make_array(1,2,3)) as data +---- +1 +2 +3 + +query II +select * from unnest(make_array(1,2,3),make_array(7,6,5,4)) as data(a,b) order by b +---- +NULL 4 +3 5 +2 6 +1 7 + +query ?T?I +select * from arrays_unnest; +---- +[1, 2] A [1, 2] 3 +NULL B [4] 5 +[3] C [6, 7, 8] 9 + +# TODO: Unnest columns fails +query error DataFusion error: SQL error: TokenizerError\("Unterminated string literal at Line: 2, Column 95"\) +caused by +Internal error: UNNEST only supports list type\. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker +select unnest(column1), column2 from arrays_unnest; + +query error DataFusion error: SQL error: TokenizerError\("Unterminated string literal at Line: 2, Column 95"\) +caused by +Internal error: UNNEST only supports list type\. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker +select unnest(column3), column4 from arrays_unnest; + +query I +select unnest(make_array(1,2,3)); ---- -[, 2, 3, 7] -[7] -[9, , 10, 7] -[, 1, 7] -[11, 12, 7] -[7] +1 +2 +3 + +query I +select * from unnest(make_array(1,2,3)); +---- +1 +2 +3 + +query I +SELECT sum(a) FROM unnest(make_array(1, 2, 3)) as a; +---- +6 + +query I +SELECT sum(a) AS total FROM unnest(make_array(1, 2, 3)) as a; +---- +6 + +# TODO: alias fails +query error DataFusion error: Schema error: No field named a\. Valid fields are "make_array\(Int64\(1\),Int64\(2\),Int64\(3\)\)"\. +SELECT a FROM (select unnest(make_array(1, 2, 3)) as a); + +query II +SELECT data.a, data.b FROM unnest(make_array(1, 2, 3), make_array(4, 5, 6, 7)) as data(a, b); +---- +1 4 +2 5 +3 6 +NULL 7 + +query II +SELECT sum(data.a), sum(data.b) FROM unnest(make_array(1, 2, 3), make_array(4, 5, 6, 7)) as data(a, b); +---- +6 22 + +# Test unnest with multi-dimensional arrays +query I +select unnest(make_array(make_array(10,20,30),make_array(1,NULL,10),make_array(4,5,6))) +---- +10 +20 +30 +1 +NULL +10 +4 +5 +6 + +# TODO: sum for unnest as subquery not working yet +# query error DataFusion error: Error during planning: No function matches the given name and argument types 'SUM\(List\(Field \{ name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\)\)'\. You might need to add explicit type casts\.\n Candidate functions:\n SUM\(Int8/Int16/Int32/Int64/UInt8/UInt16/UInt32/UInt64/Float32/Float64\) +# SELECT sum(a) FROM (select unnest(make_array(1, 2, 3)) as a); + +# TODO: select from part of the columns is not allowed. +# query error DataFusion error: External error: Arrow error: Invalid argument error: must either specify a row count or at least one column +# SELECT data.a FROM unnest(make_array(1, 2, 3), make_array(4, 5, 6, 7)) as data(a, b); + +# TODO: select from part of the columns is not allowed. +# query error DataFusion error: External error: External error: Arrow error: Invalid argument error: must either specify a row count or at least one column +# SELECT sum(data.b) FROM unnest(make_array(1, 2, 3), make_array(4, 5, 6, 9)) as data(a, b); # flatten query ??? @@ -2578,9 +2731,6 @@ drop table array_has_table_2D_float; statement ok drop table array_has_table_3D; -statement ok -drop table arrays_values_without_nulls; - statement ok drop table arrays_with_repeating_elements; @@ -2589,3 +2739,9 @@ drop table nested_arrays_with_repeating_elements; statement ok drop table flatten_table; + +statement ok +drop table arrays_values_without_nulls; + +statement ok +drop table arrays_unnest;