From c5da77767347600e550a8a0ee9b5f02be708b90c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 3 May 2021 16:39:38 -0400 Subject: [PATCH 1/4] Implement count distinct for dictionary arrays --- datafusion/src/execution/context.rs | 23 ++++++++ .../src/physical_plan/distinct_expressions.rs | 26 ++++++-- .../src/physical_plan/hash_aggregate.rs | 3 + datafusion/src/physical_plan/mod.rs | 4 ++ datafusion/src/scalar.rs | 59 +++++++++++++++++-- 5 files changed, 107 insertions(+), 8 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index dee253f44ac33..a61da1c32a912 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1766,6 +1766,24 @@ mod tests { "+-----+-------------+", ]; assert_batches_sorted_eq!(expected, &results); + + // Now, use dict as an aggregate + let results = + plan_and_collect(&mut ctx, "SELECT val, count(distinct dict) FROM t GROUP BY val") + .await + .expect("ran plan correctly"); + + let expected = vec![ + "+-----+----------------------+", + "| val | COUNT(DISTINCT dict) |", + "+-----+----------------------+", + "| 1 | 2 |", + "| 2 | 2 |", + "| 4 | 1 |", + "+-----+----------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + } run_test_case::().await; @@ -1778,6 +1796,9 @@ mod tests { run_test_case::().await; } + + + async fn run_count_distinct_integers_aggregated_scenario( partitions: Vec>, ) -> Result> { @@ -1904,6 +1925,8 @@ mod tests { Ok(()) } + + #[test] fn aggregate_with_alias() -> Result<()> { let tmp_dir = TempDir::new()?; diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index 1c93b5a104d09..59bb1b549a815 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -61,6 +61,9 @@ impl DistinctCount { name: String, data_type: DataType, ) -> Self { + println!("creating distinct counts for {:#?}", input_data_types); + println!(" name {}, data_type: {:?}", name, data_type); + Self { input_data_types, exprs, @@ -70,6 +73,16 @@ impl DistinctCount { } } +// return the type to use to accumulate state for the specified input type +fn state_type(data_type: DataType) -> DataType { + match data_type { + // when aggregating dictionary values, use the underlying value type + DataType::Dictionary(_key_type, value_type) => *value_type, + t @ _ => t + } +} + + impl AggregateExpr for DistinctCount { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -84,10 +97,12 @@ impl AggregateExpr for DistinctCount { Ok(self .input_data_types .iter() - .map(|data_type| { + .map(|input_data_type| { + let state_type = state_type(input_data_type.clone()); + println!("AAL using state type of {:?}", state_type); Field::new( &format_state_name(&self.name, "count distinct"), - DataType::List(Box::new(Field::new("item", data_type.clone(), true))), + DataType::List(Box::new(Field::new("item", state_type, true))), false, ) }) @@ -121,7 +136,10 @@ impl Accumulator for DistinctCountAccumulator { self.values.insert(DistinctScalarValues( values .iter() - .map(GroupByScalar::try_from) + .map(|v| { + println!("trying to update from {:?}", v); + GroupByScalar::try_from(v) + }) .collect::>>()?, )); } @@ -158,7 +176,7 @@ impl Accumulator for DistinctCountAccumulator { let mut cols_out = self .data_types .iter() - .map(|data_type| ScalarValue::List(Some(Vec::new()), data_type.clone())) + .map(|data_type| ScalarValue::List(Some(Vec::new()), state_type(data_type.clone()))) .collect::>(); let mut cols_vec = cols_out diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index fad4fa585034b..5ca5778682e17 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -1070,6 +1070,8 @@ fn dictionary_create_group_by_value( /// Extract the value in `col[row]` as a GroupByScalar fn create_group_by_value(col: &ArrayRef, row: usize) -> Result { + println!("AAL creating group by value for row {} of type {:?}", row, col); + match col.data_type() { DataType::Float32 => { let array = col.as_any().downcast_ref::().unwrap(); @@ -1176,6 +1178,7 @@ pub(crate) fn create_group_by_values( row: usize, vec: &mut Box<[GroupByScalar]>, ) -> Result<()> { + println!("AAL creating group by values for row {}", row); for (i, col) in group_by_keys.iter().enumerate() { vec[i] = create_group_by_value(col, row)? } diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 11f0946c91ff6..e4602ff3120ab 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -274,6 +274,7 @@ pub trait AggregateExpr: Send + Sync + Debug { /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; + /// the field of the final result of this aggregation. fn field(&self) -> Result; @@ -311,6 +312,9 @@ pub trait Accumulator: Send + Sync + Debug { if values.is_empty() { return Ok(()); }; + + println!("Accumulator::update_batch for values: {:#?}", values); + (0..values[0].len()).try_for_each(|index| { let v = values .iter() diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 6f03194f45423..1d86f3ba29214 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -19,10 +19,12 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; -use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; +use arrow::datatypes::{ArrowDictionaryKeyType, DataType, Field, IntervalUnit, TimeUnit}; use arrow::{ array::*, - datatypes::{ArrowNativeType, Float32Type, TimestampNanosecondType}, + datatypes::{ArrowNativeType, Float32Type, TimestampNanosecondType, + Int16Type, Int32Type, Int64Type, + Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type}, }; use arrow::{ array::{ @@ -358,7 +360,10 @@ impl ScalarValue { DataType::LargeUtf8 => { build_list!(LargeStringBuilder, LargeUtf8, values, size) } - _ => panic!("Unexpected DataType for list"), + _ => { + println!("list data type was: {}", data_type); + panic!("Unexpected DataType for list") + } }), ScalarValue::Date32(e) => match e { Some(value) => Arc::new(Date32Array::from_value(*value, size)), @@ -443,15 +448,61 @@ impl ScalarValue { } DataType::Timestamp(TimeUnit::Nanosecond, _) => { typed_cast!(array, index, TimestampNanosecondArray, TimestampNanosecond) + }, + DataType::Dictionary(index_type, _) => match **index_type { + DataType::Int8 => { + Self::try_from_dict_array::(array, index)? + } + DataType::Int16 => { + Self::try_from_dict_array::(array, index)? + } + DataType::Int32 => { + Self::try_from_dict_array::(array, index)? + } + DataType::Int64 => { + Self::try_from_dict_array::(array, index)? + } + DataType::UInt8 => { + Self::try_from_dict_array::(array, index)? + } + DataType::UInt16 => { + Self::try_from_dict_array::(array, index)? + } + DataType::UInt32 => { + Self::try_from_dict_array::(array, index)? + } + DataType::UInt64 => { + Self::try_from_dict_array::(array, index)? + } + _ => return Err(DataFusionError::Internal(format!( + "Index type not supported while creating scalar from dictionary: {}", + array.data_type(), + ))), } other => { return Err(DataFusionError::NotImplemented(format!( - "Can't create a scalar of array of type \"{:?}\"", + "Can't create a scalar from array of type \"{:?}\"", other ))) } }) } + + fn try_from_dict_array(array: &ArrayRef, index: usize) -> Result { + let dict_array = array.as_any().downcast_ref::>().unwrap(); + + // look up the index in the values dictionary + let keys_col = dict_array.keys_array(); + let values_index = keys_col.value(index).to_usize().ok_or_else(|| { + DataFusionError::Internal(format!( + "Can not convert index to usize in dictionary of type creating group by value {:?}", + keys_col.data_type() + )) + })?; + Self::try_from_array(&dict_array.values(), values_index) + } + + } impl From for ScalarValue { From c99ee5df5c2c382f737a90b1b78d4ea992abeae2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 4 May 2021 08:54:30 -0400 Subject: [PATCH 2/4] cleanup --- datafusion/src/execution/context.rs | 5 ----- datafusion/src/physical_plan/distinct_expressions.rs | 9 +-------- datafusion/src/physical_plan/hash_aggregate.rs | 3 --- datafusion/src/physical_plan/mod.rs | 3 --- datafusion/src/scalar.rs | 5 +---- 5 files changed, 2 insertions(+), 23 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index a61da1c32a912..7462ddfa8d324 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1796,9 +1796,6 @@ mod tests { run_test_case::().await; } - - - async fn run_count_distinct_integers_aggregated_scenario( partitions: Vec>, ) -> Result> { @@ -1925,8 +1922,6 @@ mod tests { Ok(()) } - - #[test] fn aggregate_with_alias() -> Result<()> { let tmp_dir = TempDir::new()?; diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index 59bb1b549a815..9bd1437d916ee 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -61,9 +61,6 @@ impl DistinctCount { name: String, data_type: DataType, ) -> Self { - println!("creating distinct counts for {:#?}", input_data_types); - println!(" name {}, data_type: {:?}", name, data_type); - Self { input_data_types, exprs, @@ -99,7 +96,6 @@ impl AggregateExpr for DistinctCount { .iter() .map(|input_data_type| { let state_type = state_type(input_data_type.clone()); - println!("AAL using state type of {:?}", state_type); Field::new( &format_state_name(&self.name, "count distinct"), DataType::List(Box::new(Field::new("item", state_type, true))), @@ -136,10 +132,7 @@ impl Accumulator for DistinctCountAccumulator { self.values.insert(DistinctScalarValues( values .iter() - .map(|v| { - println!("trying to update from {:?}", v); - GroupByScalar::try_from(v) - }) + .map(GroupByScalar::try_from) .collect::>>()?, )); } diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 5ca5778682e17..fad4fa585034b 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -1070,8 +1070,6 @@ fn dictionary_create_group_by_value( /// Extract the value in `col[row]` as a GroupByScalar fn create_group_by_value(col: &ArrayRef, row: usize) -> Result { - println!("AAL creating group by value for row {} of type {:?}", row, col); - match col.data_type() { DataType::Float32 => { let array = col.as_any().downcast_ref::().unwrap(); @@ -1178,7 +1176,6 @@ pub(crate) fn create_group_by_values( row: usize, vec: &mut Box<[GroupByScalar]>, ) -> Result<()> { - println!("AAL creating group by values for row {}", row); for (i, col) in group_by_keys.iter().enumerate() { vec[i] = create_group_by_value(col, row)? } diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index e4602ff3120ab..a8f6f0c35f00e 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -312,9 +312,6 @@ pub trait Accumulator: Send + Sync + Debug { if values.is_empty() { return Ok(()); }; - - println!("Accumulator::update_batch for values: {:#?}", values); - (0..values[0].len()).try_for_each(|index| { let v = values .iter() diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 1d86f3ba29214..fbb5cc61c12a7 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -360,10 +360,7 @@ impl ScalarValue { DataType::LargeUtf8 => { build_list!(LargeStringBuilder, LargeUtf8, values, size) } - _ => { - println!("list data type was: {}", data_type); - panic!("Unexpected DataType for list") - } + _ => panic!("Unexpected DataType for list"), }), ScalarValue::Date32(e) => match e { Some(value) => Arc::new(Date32Array::from_value(*value, size)), From 850598df78b32b5938ab0f6cdafa178c17e81b6d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 4 May 2021 09:56:31 -0400 Subject: [PATCH 3/4] cleanup --- datafusion/src/execution/context.rs | 11 ++--- .../src/physical_plan/distinct_expressions.rs | 34 ++++++++------ datafusion/src/scalar.rs | 44 ++++++++----------- 3 files changed, 45 insertions(+), 44 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 7462ddfa8d324..b53f7c15e3aac 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1768,10 +1768,12 @@ mod tests { assert_batches_sorted_eq!(expected, &results); // Now, use dict as an aggregate - let results = - plan_and_collect(&mut ctx, "SELECT val, count(distinct dict) FROM t GROUP BY val") - .await - .expect("ran plan correctly"); + let results = plan_and_collect( + &mut ctx, + "SELECT val, count(distinct dict) FROM t GROUP BY val", + ) + .await + .expect("ran plan correctly"); let expected = vec![ "+-----+----------------------+", @@ -1783,7 +1785,6 @@ mod tests { "+-----+----------------------+", ]; assert_batches_sorted_eq!(expected, &results); - } run_test_case::().await; diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index 9bd1437d916ee..f397dd11febe2 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -47,8 +47,8 @@ pub struct DistinctCount { name: String, /// The DataType for the final count data_type: DataType, - /// The DataType for each input argument - input_data_types: Vec, + /// The DataType used to hold the state for each input + state_data_types: Vec, /// The input arguments exprs: Vec>, } @@ -61,8 +61,10 @@ impl DistinctCount { name: String, data_type: DataType, ) -> Self { + let state_data_types = input_data_types.into_iter().map(state_type).collect(); + Self { - input_data_types, + state_data_types, exprs, name, data_type, @@ -70,16 +72,15 @@ impl DistinctCount { } } -// return the type to use to accumulate state for the specified input type +/// return the type to use to accumulate state for the specified input type fn state_type(data_type: DataType) -> DataType { match data_type { // when aggregating dictionary values, use the underlying value type DataType::Dictionary(_key_type, value_type) => *value_type, - t @ _ => t + t @ _ => t, } } - impl AggregateExpr for DistinctCount { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -92,13 +93,16 @@ impl AggregateExpr for DistinctCount { fn state_fields(&self) -> Result> { Ok(self - .input_data_types + .state_data_types .iter() - .map(|input_data_type| { - let state_type = state_type(input_data_type.clone()); + .map(|state_data_type| { Field::new( &format_state_name(&self.name, "count distinct"), - DataType::List(Box::new(Field::new("item", state_type, true))), + DataType::List(Box::new(Field::new( + "item", + state_data_type.clone(), + true, + ))), false, ) }) @@ -112,7 +116,7 @@ impl AggregateExpr for DistinctCount { fn create_accumulator(&self) -> Result> { Ok(Box::new(DistinctCountAccumulator { values: HashSet::default(), - data_types: self.input_data_types.clone(), + state_data_types: self.state_data_types.clone(), count_data_type: self.data_type.clone(), })) } @@ -121,7 +125,7 @@ impl AggregateExpr for DistinctCount { #[derive(Debug)] struct DistinctCountAccumulator { values: HashSet, - data_types: Vec, + state_data_types: Vec, count_data_type: DataType, } @@ -167,9 +171,11 @@ impl Accumulator for DistinctCountAccumulator { fn state(&self) -> Result> { let mut cols_out = self - .data_types + .state_data_types .iter() - .map(|data_type| ScalarValue::List(Some(Vec::new()), state_type(data_type.clone()))) + .map(|state_data_type| { + ScalarValue::List(Some(Vec::new()), state_data_type.clone()) + }) .collect::>(); let mut cols_vec = cols_out diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index fbb5cc61c12a7..eb9b3095cd626 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -22,9 +22,10 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; use arrow::datatypes::{ArrowDictionaryKeyType, DataType, Field, IntervalUnit, TimeUnit}; use arrow::{ array::*, - datatypes::{ArrowNativeType, Float32Type, TimestampNanosecondType, - Int16Type, Int32Type, Int64Type, - Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type}, + datatypes::{ + ArrowNativeType, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type, + TimestampNanosecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + }, }; use arrow::{ array::{ @@ -445,23 +446,13 @@ impl ScalarValue { } DataType::Timestamp(TimeUnit::Nanosecond, _) => { typed_cast!(array, index, TimestampNanosecondArray, TimestampNanosecond) - }, + } DataType::Dictionary(index_type, _) => match **index_type { - DataType::Int8 => { - Self::try_from_dict_array::(array, index)? - } - DataType::Int16 => { - Self::try_from_dict_array::(array, index)? - } - DataType::Int32 => { - Self::try_from_dict_array::(array, index)? - } - DataType::Int64 => { - Self::try_from_dict_array::(array, index)? - } - DataType::UInt8 => { - Self::try_from_dict_array::(array, index)? - } + DataType::Int8 => Self::try_from_dict_array::(array, index)?, + DataType::Int16 => Self::try_from_dict_array::(array, index)?, + DataType::Int32 => Self::try_from_dict_array::(array, index)?, + DataType::Int64 => Self::try_from_dict_array::(array, index)?, + DataType::UInt8 => Self::try_from_dict_array::(array, index)?, DataType::UInt16 => { Self::try_from_dict_array::(array, index)? } @@ -471,11 +462,13 @@ impl ScalarValue { DataType::UInt64 => { Self::try_from_dict_array::(array, index)? } - _ => return Err(DataFusionError::Internal(format!( + _ => { + return Err(DataFusionError::Internal(format!( "Index type not supported while creating scalar from dictionary: {}", array.data_type(), - ))), - } + ))) + } + }, other => { return Err(DataFusionError::NotImplemented(format!( "Can't create a scalar from array of type \"{:?}\"", @@ -485,7 +478,10 @@ impl ScalarValue { }) } - fn try_from_dict_array(array: &ArrayRef, index: usize) -> Result { + fn try_from_dict_array( + array: &ArrayRef, + index: usize, + ) -> Result { let dict_array = array.as_any().downcast_ref::>().unwrap(); // look up the index in the values dictionary @@ -498,8 +494,6 @@ impl ScalarValue { })?; Self::try_from_array(&dict_array.values(), values_index) } - - } impl From for ScalarValue { From cf1eab48f6cdb6073b88bece588e7b3543cbcce5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 4 May 2021 10:48:33 -0400 Subject: [PATCH 4/4] fix: clippy --- datafusion/src/physical_plan/distinct_expressions.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index f397dd11febe2..8167541c3e1a5 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -77,7 +77,7 @@ fn state_type(data_type: DataType) -> DataType { match data_type { // when aggregating dictionary values, use the underlying value type DataType::Dictionary(_key_type, value_type) => *value_type, - t @ _ => t, + t => t, } }