From 69411a3eca312f039f36e76ed3a0ef36ce54b4f7 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 18 Jul 2024 09:30:02 +0800 Subject: [PATCH] map-with-vec-exprs Signed-off-by: jayzhan211 --- datafusion/functions/src/core/map.rs | 46 ++++++++++++---------------- datafusion/sql/src/expr/function.rs | 19 ++++++++++-- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/datafusion/functions/src/core/map.rs b/datafusion/functions/src/core/map.rs index 1834c7ac6060f..77e3c9965c17a 100644 --- a/datafusion/functions/src/core/map.rs +++ b/datafusion/functions/src/core/map.rs @@ -75,33 +75,31 @@ fn make_map(args: &[ColumnarValue]) -> Result { make_map_batch_internal(key, value, can_evaluate_to_const) } +fn get_scalar_from_col(c: &ColumnarValue) -> ScalarValue { + match c { + ColumnarValue::Scalar(s) => s.clone(), + _ => todo!(""), + } +} + fn make_map_batch(args: &[ColumnarValue]) -> Result { - if args.len() != 2 { + if args.len() % 2 != 0 { return exec_err!( "make_map requires exactly 2 arguments, got {} instead", args.len() ); } - let can_evaluate_to_const = can_evaluate_to_const(args); + let len = args.len() / 2; + let key_iter = args[0..len].into_iter().map(get_scalar_from_col); + let key = ScalarValue::iter_to_array(key_iter)?; + let val_iter = args[len..].into_iter().map(get_scalar_from_col); + let value = ScalarValue::iter_to_array(val_iter)?; - let key = get_first_array_ref(&args[0])?; - let value = get_first_array_ref(&args[1])?; + let can_evaluate_to_const = can_evaluate_to_const(args); make_map_batch_internal(key, value, can_evaluate_to_const) } -fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result { - match columnar_value { - ColumnarValue::Scalar(value) => match value { - ScalarValue::List(array) => Ok(array.value(0)), - ScalarValue::LargeList(array) => Ok(array.value(0)), - ScalarValue::FixedSizeList(array) => Ok(array.value(0)), - _ => exec_err!("Expected array, got {:?}", value), - }, - ColumnarValue::Array(array) => exec_err!("Expected scalar, got {:?}", array), - } -} - fn make_map_batch_internal( keys: ArrayRef, values: ArrayRef, @@ -302,17 +300,13 @@ impl ScalarUDFImpl for MapFunc { arg_types.len() ); } + + let key_type = &arg_types[0]; + let val_type = arg_types.last().unwrap(); + let mut builder = SchemaBuilder::new(); - builder.push(Field::new( - "key", - get_element_type(&arg_types[0])?.clone(), - false, - )); - builder.push(Field::new( - "value", - get_element_type(&arg_types[1])?.clone(), - true, - )); + builder.push(Field::new("key", key_type.clone(), false)); + builder.push(Field::new("value", val_type.clone(), true)); let fields = builder.finish().fields; Ok(DataType::Map( Arc::new(Field::new("entries", DataType::Struct(fields), false)), diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index dab328cc49080..293ec1f8e717c 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -229,8 +229,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // user-defined function (UDF) should have precedence if let Some(fm) = self.context_provider.get_function_meta(&name) { - let args = self.function_args_to_expr(args, schema, planner_context)?; - return Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fm, args))); + let argss = + self.function_args_to_expr(args.clone(), schema, planner_context)?; + + // TODO: ExprPlanner + if name == "map" { + let mut final_args = vec![]; + for args in argss { + if let Expr::ScalarFunction(fun) = args { + final_args.extend_from_slice(&fun.args) + } + } + return Ok(Expr::ScalarFunction(ScalarFunction::new_udf( + fm, final_args, + ))); + } + + return Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fm, argss))); } // Build Unnest expression