From 59c1f5c75896b11d2b1d426be53e3867ec90e457 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 2 Nov 2023 14:21:36 -0400 Subject: [PATCH 1/7] Minor: Encapsulate EquivalenceClass --- datafusion/physical-expr/src/equivalence.rs | 229 +++++++++++++----- datafusion/physical-expr/src/physical_expr.rs | 35 +-- 2 files changed, 174 insertions(+), 90 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 04b0f2eedcdb2..e1fc8574ef5b1 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -20,11 +20,11 @@ use std::hash::Hash; use std::sync::Arc; use crate::expressions::Column; -use crate::physical_expr::{deduplicate_physical_exprs, have_common_entries}; use crate::sort_properties::{ExprOrdering, SortProperties}; use crate::{ - physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, - LexRequirementRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, + physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, LexOrdering, + LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, + PhysicalSortRequirement, }; use arrow::datatypes::SchemaRef; @@ -32,14 +32,102 @@ use arrow_schema::SortOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{JoinSide, JoinType, Result}; +use crate::physical_expr::deduplicate_physical_exprs; use indexmap::map::Entry; use indexmap::IndexMap; /// An `EquivalenceClass` is a set of [`Arc`]s that are known /// to have the same value for all tuples in a relation. These are generated by -/// equality predicates, typically equi-join conditions and equality conditions -/// in filters. -pub type EquivalenceClass = Vec>; +/// equality predicates (e.g. `a = b`), typically equi-join conditions and +/// equality conditions in filters. +#[derive(Debug, Clone)] +pub struct EquivalenceClass { + inner: Vec>, +} + +impl PartialEq for EquivalenceClass { + fn eq(&self, other: &Self) -> bool { + physical_exprs_equal(&self.inner, &other.inner) + } +} + +impl EquivalenceClass { + /// Create a new empty equivalence class + pub fn new_empty() -> Self { + Self { inner: vec![] } + } + + // Create a new equivalence class from a pre-existing `Vec` + pub fn new_from_vec(inner: Vec>) -> Self { + Self { inner } + } + + /// Return the "canonical" expression for this class (the first element) + /// if any + fn canonical_expr(&self) -> Option> { + self.inner.first().cloned() + } + + /// Insert the expression into this class, meaning it is known to be equal to + /// all other expressions in this class + pub fn push(&mut self, expr: Arc) { + self.inner.push(expr); + } + + /// Inserts all the expressions from other into this class + pub fn extend(&mut self, other: Self) { + self.inner.extend(other.inner); + } + + /// Returns true if this equivalence class contains t expression + pub fn contains(&self, expr: &Arc) -> bool { + physical_exprs_contains(&self.inner, expr) + } + + /// Returns true if this equivalence class has any entries in common with other + pub fn contains_any(&self, other: &Self) -> bool { + self.inner.iter().any(|e| other.contains(e)) + } + + /// return the number of items in this class + pub fn len(&self) -> usize { + self.inner.len() + } + + /// return true if this class is empty + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Removes all duplicated exprs in this class + // TODO should we deduplicate on insert? + fn deduplicate(&mut self) { + deduplicate_physical_exprs(&mut self.inner); + } + + /// Iterate over all elements in this class + pub fn iter(&self) -> impl Iterator> { + self.inner.iter() + } + + /// Return a new equivalence class that have the specified offset added to + /// each expression (used when schemas are appended such as in joins) + pub fn with_offset(&self, offset: usize) -> Self { + let new_inner = self + .inner + .iter() + .cloned() + .map(|e| add_offset_to_expr(e, offset)) + .collect(); + Self { inner: new_inner } + } + + /// Returns true if other is equal in the sense + /// of bags (multi-sets), disregarding their orderings. + pub fn eq_bag(&self, other: &Self) -> bool { + physical_exprs_bag_equal(&self.inner, &other.inner) + } +} /// Stores the mapping between source expressions and target expressions for a /// projection. Indices in the vector corresponds to the indices after projection. @@ -91,10 +179,10 @@ impl EquivalenceGroup { let mut first_class = None; let mut second_class = None; for (idx, cls) in self.classes.iter().enumerate() { - if physical_exprs_contains(cls, left) { + if cls.contains(left) { first_class = Some(idx); } - if physical_exprs_contains(cls, right) { + if cls.contains(right) { second_class = Some(idx); } } @@ -124,7 +212,10 @@ impl EquivalenceGroup { (None, None) => { // None of the expressions is among existing classes. // Create a new equivalence class and extend the group. - self.classes.push(vec![left.clone(), right.clone()]); + self.classes.push(EquivalenceClass::new_from_vec(vec![ + left.clone(), + right.clone(), + ])); } } } @@ -135,7 +226,7 @@ impl EquivalenceGroup { self.classes.retain_mut(|cls| { // Keep groups that have at least two entries as singleton class is // meaningless (i.e. it contains no non-trivial information): - deduplicate_physical_exprs(cls); + cls.deduplicate(); cls.len() > 1 }); // Unify/bridge groups that have common expressions: @@ -152,7 +243,7 @@ impl EquivalenceGroup { let mut next_idx = idx + 1; let start_size = self.classes[idx].len(); while next_idx < self.classes.len() { - if have_common_entries(&self.classes[idx], &self.classes[next_idx]) { + if self.classes[idx].contains_any(&self.classes[next_idx]) { let extension = self.classes.swap_remove(next_idx); self.classes[idx].extend(extension); } else { @@ -160,7 +251,7 @@ impl EquivalenceGroup { } } if self.classes[idx].len() > start_size { - deduplicate_physical_exprs(&mut self.classes[idx]); + self.classes[idx].deduplicate(); if self.classes[idx].len() > start_size { continue; } @@ -182,8 +273,8 @@ impl EquivalenceGroup { expr.clone() .transform(&|expr| { for cls in self.iter() { - if physical_exprs_contains(cls, &expr) { - return Ok(Transformed::Yes(cls[0].clone())); + if cls.contains(&expr) { + return Ok(Transformed::Yes(cls.canonical_expr().unwrap())); } } Ok(Transformed::No(expr)) @@ -273,7 +364,7 @@ impl EquivalenceGroup { if source.eq(expr) || self .get_equivalence_class(source) - .map_or(false, |group| physical_exprs_contains(group, expr)) + .map_or(false, |group| group.contains(expr)) { return Some(target.clone()); } @@ -323,7 +414,7 @@ impl EquivalenceGroup { .iter() .filter_map(|expr| self.project_expr(mapping, expr)) .collect::>(); - (new_class.len() > 1).then_some(new_class) + (new_class.len() > 1).then_some(EquivalenceClass::new_from_vec(new_class)) }); // TODO: Convert the algorithm below to a version that uses `HashMap`. // once `Arc` can be stored in `HashMap`. @@ -345,7 +436,9 @@ impl EquivalenceGroup { // equivalence classes are meaningless. let new_classes = new_classes .into_iter() - .filter_map(|(_, values)| (values.len() > 1).then_some(values)); + .filter_map(|(_, values)| (values.len() > 1).then_some(values)) + .map(EquivalenceClass::new_from_vec); + let classes = projected_classes.chain(new_classes).collect(); Self::new(classes) } @@ -355,10 +448,8 @@ impl EquivalenceGroup { fn get_equivalence_class( &self, expr: &Arc, - ) -> Option<&[Arc]> { - self.iter() - .map(|cls| cls.as_slice()) - .find(|cls| physical_exprs_contains(cls, expr)) + ) -> Option<&EquivalenceClass> { + self.iter().find(|cls| cls.contains(expr)) } /// Combine equivalence groups of the given join children. @@ -374,12 +465,11 @@ impl EquivalenceGroup { let mut result = Self::new( self.iter() .cloned() - .chain(right_equivalences.iter().map(|item| { - item.iter() - .cloned() - .map(|expr| add_offset_to_expr(expr, left_size)) - .collect() - })) + .chain( + right_equivalences + .iter() + .map(|cls| cls.with_offset(left_size)), + ) .collect(), ); // In we have an inner join, expressions in the "on" condition @@ -1188,14 +1278,13 @@ mod tests { use std::sync::Arc; use super::*; - use crate::expressions::{col, lit, BinaryExpr, Column}; - use crate::physical_expr::{physical_exprs_bag_equal, physical_exprs_equal}; + use crate::expressions::{col, lit, BinaryExpr, Column, Literal}; use arrow::compute::{lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array}; use arrow_schema::{Fields, SortOptions}; - use datafusion_common::Result; + use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Operator; use itertools::{izip, Itertools}; @@ -1382,8 +1471,8 @@ mod tests { assert_eq!(eq_properties.eq_group().len(), 1); let eq_groups = &eq_properties.eq_group().classes[0]; assert_eq!(eq_groups.len(), 2); - assert!(physical_exprs_contains(eq_groups, &col_a_expr)); - assert!(physical_exprs_contains(eq_groups, &col_b_expr)); + assert!(eq_groups.contains(&col_a_expr)); + assert!(eq_groups.contains(&col_b_expr)); // b and c are aliases. Exising equivalence class should expand, // however there shouldn't be any new equivalence class @@ -1391,9 +1480,9 @@ mod tests { assert_eq!(eq_properties.eq_group().len(), 1); let eq_groups = &eq_properties.eq_group().classes[0]; assert_eq!(eq_groups.len(), 3); - assert!(physical_exprs_contains(eq_groups, &col_a_expr)); - assert!(physical_exprs_contains(eq_groups, &col_b_expr)); - assert!(physical_exprs_contains(eq_groups, &col_c_expr)); + assert!(eq_groups.contains(&col_a_expr)); + assert!(eq_groups.contains(&col_b_expr)); + assert!(eq_groups.contains(&col_c_expr)); // This is a new set of equality. Hence equivalent class count should be 2. eq_properties.add_equal_conditions(&col_x_expr, &col_y_expr); @@ -1405,11 +1494,11 @@ mod tests { assert_eq!(eq_properties.eq_group().len(), 1); let eq_groups = &eq_properties.eq_group().classes[0]; assert_eq!(eq_groups.len(), 5); - assert!(physical_exprs_contains(eq_groups, &col_a_expr)); - assert!(physical_exprs_contains(eq_groups, &col_b_expr)); - assert!(physical_exprs_contains(eq_groups, &col_c_expr)); - assert!(physical_exprs_contains(eq_groups, &col_x_expr)); - assert!(physical_exprs_contains(eq_groups, &col_y_expr)); + assert!(eq_groups.contains(&col_a_expr)); + assert!(eq_groups.contains(&col_b_expr)); + assert!(eq_groups.contains(&col_c_expr)); + assert!(eq_groups.contains(&col_x_expr)); + assert!(eq_groups.contains(&col_y_expr)); Ok(()) } @@ -1449,10 +1538,10 @@ mod tests { assert_eq!(out_properties.eq_group().len(), 1); let eq_class = &out_properties.eq_group().classes[0]; assert_eq!(eq_class.len(), 4); - assert!(physical_exprs_contains(eq_class, col_a1)); - assert!(physical_exprs_contains(eq_class, col_a2)); - assert!(physical_exprs_contains(eq_class, col_a3)); - assert!(physical_exprs_contains(eq_class, col_a4)); + assert!(eq_class.contains(col_a1)); + assert!(eq_class.contains(col_a2)); + assert!(eq_class.contains(col_a3)); + assert!(eq_class.contains(col_a4)); Ok(()) } @@ -1792,10 +1881,12 @@ mod tests { let entries = entries .into_iter() .map(|entry| entry.into_iter().map(lit).collect::>()) + .map(EquivalenceClass::new_from_vec) .collect::>(); let expected = expected .into_iter() .map(|entry| entry.into_iter().map(lit).collect::>()) + .map(EquivalenceClass::new_from_vec) .collect::>(); let mut eq_groups = EquivalenceGroup::new(entries.clone()); eq_groups.bridge_classes(); @@ -1806,11 +1897,7 @@ mod tests { ); assert_eq!(eq_groups.len(), expected.len(), "{}", err_msg); for idx in 0..eq_groups.len() { - assert!( - physical_exprs_bag_equal(&eq_groups[idx], &expected[idx]), - "{}", - err_msg - ); + assert!(eq_groups[idx].eq_bag(&expected[idx]), "{}", err_msg); } } Ok(()) @@ -1819,14 +1906,17 @@ mod tests { #[test] fn test_remove_redundant_entries_eq_group() -> Result<()> { let entries = vec![ - vec![lit(1), lit(1), lit(2)], + EquivalenceClass::new_from_vec(vec![lit(1), lit(1), lit(2)]), // This group is meaningless should be removed - vec![lit(3), lit(3)], - vec![lit(4), lit(5), lit(6)], + EquivalenceClass::new_from_vec(vec![lit(3), lit(3)]), + EquivalenceClass::new_from_vec(vec![lit(4), lit(5), lit(6)]), ]; // Given equivalences classes are not in succinct form. // Expected form is the most plain representation that is functionally same. - let expected = vec![vec![lit(1), lit(2)], vec![lit(4), lit(5), lit(6)]]; + let expected = vec![ + EquivalenceClass::new_from_vec(vec![lit(1), lit(2)]), + EquivalenceClass::new_from_vec(vec![lit(4), lit(5), lit(6)]), + ]; let mut eq_groups = EquivalenceGroup::new(entries); eq_groups.remove_redundant_entries(); @@ -1834,8 +1924,8 @@ mod tests { assert_eq!(eq_groups.len(), expected.len()); assert_eq!(eq_groups.len(), 2); - assert!(physical_exprs_equal(&eq_groups[0], &expected[0])); - assert!(physical_exprs_equal(&eq_groups[1], &expected[1])); + assert_eq!(eq_groups[0], expected[0]); + assert_eq!(eq_groups[1], expected[1]); Ok(()) } @@ -2091,7 +2181,7 @@ mod tests { // expressions in the equivalence classes. For other expressions in the same // equivalence class use same result. This util gets already calculated result, when available. fn get_representative_arr( - eq_group: &[Arc], + eq_group: &EquivalenceClass, existing_vec: &[Option], schema: SchemaRef, ) -> Option { @@ -2164,7 +2254,7 @@ mod tests { get_representative_arr(eq_group, &schema_vec, schema.clone()) .unwrap_or_else(|| generate_random_array(n_elem, n_distinct)); - for expr in eq_group { + for expr in eq_group.iter() { let col = expr.as_any().downcast_ref::().unwrap(); let (idx, _field) = schema.column_with_name(col.name()).unwrap(); schema_vec[idx] = Some(representative_array.clone()); @@ -2565,4 +2655,29 @@ mod tests { Ok(()) } + + #[test] + fn test_contains_any() { + let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))) + as Arc; + let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false)))) + as Arc; + let lit2 = + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc; + let lit1 = + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; + let col_b_expr = Arc::new(Column::new("b", 1)) as Arc; + + let cls1 = + EquivalenceClass::new_from_vec(vec![lit_true.clone(), lit_false.clone()]); + let cls2 = + EquivalenceClass::new_from_vec(vec![lit_true.clone(), col_b_expr.clone()]); + let cls3 = EquivalenceClass::new_from_vec(vec![lit2.clone(), lit1.clone()]); + + // lit_true is common + assert!(cls1.contains_any(&cls2)); + // there is no common entry + assert!(!cls1.contains_any(&cls3)); + assert!(!cls2.contains_any(&cls3)); + } } diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 79cbe6828b64b..455ca84a792f5 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -228,14 +228,6 @@ pub fn physical_exprs_contains( .any(|physical_expr| physical_expr.eq(expr)) } -/// Checks whether the given slices have any common entries. -pub fn have_common_entries( - lhs: &[Arc], - rhs: &[Arc], -) -> bool { - lhs.iter().any(|expr| physical_exprs_contains(rhs, expr)) -} - /// Checks whether the given physical expression slices are equal. pub fn physical_exprs_equal( lhs: &[Arc], @@ -293,8 +285,8 @@ mod tests { use crate::expressions::{Column, Literal}; use crate::physical_expr::{ - deduplicate_physical_exprs, have_common_entries, physical_exprs_bag_equal, - physical_exprs_contains, physical_exprs_equal, PhysicalExpr, + deduplicate_physical_exprs, physical_exprs_bag_equal, physical_exprs_contains, + physical_exprs_equal, PhysicalExpr, }; use datafusion_common::ScalarValue; @@ -334,29 +326,6 @@ mod tests { assert!(!physical_exprs_contains(&physical_exprs, &lit1)); } - #[test] - fn test_have_common_entries() { - let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))) - as Arc; - let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false)))) - as Arc; - let lit2 = - Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc; - let lit1 = - Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; - let col_b_expr = Arc::new(Column::new("b", 1)) as Arc; - - let vec1 = vec![lit_true.clone(), lit_false.clone()]; - let vec2 = vec![lit_true.clone(), col_b_expr.clone()]; - let vec3 = vec![lit2.clone(), lit1.clone()]; - - // lit_true is common - assert!(have_common_entries(&vec1, &vec2)); - // there is no common entry - assert!(!have_common_entries(&vec1, &vec3)); - assert!(!have_common_entries(&vec2, &vec3)); - } - #[test] fn test_physical_exprs_equal() { let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))) From be683000e4df1f5351bfef9fdd5bdc2775eaaa47 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 Nov 2023 12:36:12 -0500 Subject: [PATCH 2/7] Rename inner to exprs --- datafusion/physical-expr/src/equivalence.rs | 36 ++++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 4752e7f82379b..405ea9374085a 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -42,90 +42,94 @@ use indexmap::IndexMap; /// equality conditions in filters. #[derive(Debug, Clone)] pub struct EquivalenceClass { - inner: Vec>, + exprs: Vec>, } impl PartialEq for EquivalenceClass { fn eq(&self, other: &Self) -> bool { - physical_exprs_equal(&self.inner, &other.inner) + physical_exprs_equal(&self.exprs, &other.exprs) } } impl EquivalenceClass { /// Create a new empty equivalence class pub fn new_empty() -> Self { - Self { inner: vec![] } + Self { exprs: vec![] } } // Create a new equivalence class from a pre-existing `Vec` pub fn new_from_vec(inner: Vec>) -> Self { - Self { inner } + Self { exprs: inner } } + /// Return the inner vector of expressions + pub fn into_inner(self) -> Vec> { + self.exprs + } /// Return the "canonical" expression for this class (the first element) /// if any fn canonical_expr(&self) -> Option> { - self.inner.first().cloned() + self.exprs.first().cloned() } /// Insert the expression into this class, meaning it is known to be equal to /// all other expressions in this class pub fn push(&mut self, expr: Arc) { - self.inner.push(expr); + self.exprs.push(expr); } /// Inserts all the expressions from other into this class pub fn extend(&mut self, other: Self) { - self.inner.extend(other.inner); + self.exprs.extend(other.exprs); } /// Returns true if this equivalence class contains t expression pub fn contains(&self, expr: &Arc) -> bool { - physical_exprs_contains(&self.inner, expr) + physical_exprs_contains(&self.exprs, expr) } /// Returns true if this equivalence class has any entries in common with other pub fn contains_any(&self, other: &Self) -> bool { - self.inner.iter().any(|e| other.contains(e)) + self.exprs.iter().any(|e| other.contains(e)) } /// return the number of items in this class pub fn len(&self) -> usize { - self.inner.len() + self.exprs.len() } /// return true if this class is empty pub fn is_empty(&self) -> bool { - self.inner.is_empty() + self.exprs.is_empty() } /// Removes all duplicated exprs in this class // TODO should we deduplicate on insert? fn deduplicate(&mut self) { - deduplicate_physical_exprs(&mut self.inner); + deduplicate_physical_exprs(&mut self.exprs); } /// Iterate over all elements in this class pub fn iter(&self) -> impl Iterator> { - self.inner.iter() + self.exprs.iter() } /// Return a new equivalence class that have the specified offset added to /// each expression (used when schemas are appended such as in joins) pub fn with_offset(&self, offset: usize) -> Self { let new_inner = self - .inner + .exprs .iter() .cloned() .map(|e| add_offset_to_expr(e, offset)) .collect(); - Self { inner: new_inner } + Self { exprs: new_inner } } /// Returns true if other is equal in the sense /// of bags (multi-sets), disregarding their orderings. pub fn eq_bag(&self, other: &Self) -> bool { - physical_exprs_bag_equal(&self.inner, &other.inner) + physical_exprs_bag_equal(&self.exprs, &other.exprs) } } From 077b2fa8440cc66f85c5d32efd75adc74fcdd8e3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 Nov 2023 12:37:04 -0500 Subject: [PATCH 3/7] Rename new_from_vec to new --- datafusion/physical-expr/src/equivalence.rs | 34 +++++++++------------ 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 405ea9374085a..c35a84d182353 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -58,7 +58,7 @@ impl EquivalenceClass { } // Create a new equivalence class from a pre-existing `Vec` - pub fn new_from_vec(inner: Vec>) -> Self { + pub fn new(inner: Vec>) -> Self { Self { exprs: inner } } @@ -273,10 +273,8 @@ impl EquivalenceGroup { (None, None) => { // None of the expressions is among existing classes. // Create a new equivalence class and extend the group. - self.classes.push(EquivalenceClass::new_from_vec(vec![ - left.clone(), - right.clone(), - ])); + self.classes + .push(EquivalenceClass::new(vec![left.clone(), right.clone()])); } } } @@ -475,7 +473,7 @@ impl EquivalenceGroup { .iter() .filter_map(|expr| self.project_expr(mapping, expr)) .collect::>(); - (new_class.len() > 1).then_some(EquivalenceClass::new_from_vec(new_class)) + (new_class.len() > 1).then_some(EquivalenceClass::new(new_class)) }); // TODO: Convert the algorithm below to a version that uses `HashMap`. // once `Arc` can be stored in `HashMap`. @@ -498,7 +496,7 @@ impl EquivalenceGroup { let new_classes = new_classes .into_iter() .filter_map(|(_, values)| (values.len() > 1).then_some(values)) - .map(EquivalenceClass::new_from_vec); + .map(EquivalenceClass::new); let classes = projected_classes.chain(new_classes).collect(); Self::new(classes) @@ -1945,12 +1943,12 @@ mod tests { let entries = entries .into_iter() .map(|entry| entry.into_iter().map(lit).collect::>()) - .map(EquivalenceClass::new_from_vec) + .map(EquivalenceClass::new) .collect::>(); let expected = expected .into_iter() .map(|entry| entry.into_iter().map(lit).collect::>()) - .map(EquivalenceClass::new_from_vec) + .map(EquivalenceClass::new) .collect::>(); let mut eq_groups = EquivalenceGroup::new(entries.clone()); eq_groups.bridge_classes(); @@ -1970,16 +1968,16 @@ mod tests { #[test] fn test_remove_redundant_entries_eq_group() -> Result<()> { let entries = vec![ - EquivalenceClass::new_from_vec(vec![lit(1), lit(1), lit(2)]), + EquivalenceClass::new(vec![lit(1), lit(1), lit(2)]), // This group is meaningless should be removed - EquivalenceClass::new_from_vec(vec![lit(3), lit(3)]), - EquivalenceClass::new_from_vec(vec![lit(4), lit(5), lit(6)]), + EquivalenceClass::new(vec![lit(3), lit(3)]), + EquivalenceClass::new(vec![lit(4), lit(5), lit(6)]), ]; // Given equivalences classes are not in succinct form. // Expected form is the most plain representation that is functionally same. let expected = vec![ - EquivalenceClass::new_from_vec(vec![lit(1), lit(2)]), - EquivalenceClass::new_from_vec(vec![lit(4), lit(5), lit(6)]), + EquivalenceClass::new(vec![lit(1), lit(2)]), + EquivalenceClass::new(vec![lit(4), lit(5), lit(6)]), ]; let mut eq_groups = EquivalenceGroup::new(entries); eq_groups.remove_redundant_entries(); @@ -2732,11 +2730,9 @@ mod tests { Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; let col_b_expr = Arc::new(Column::new("b", 1)) as Arc; - let cls1 = - EquivalenceClass::new_from_vec(vec![lit_true.clone(), lit_false.clone()]); - let cls2 = - EquivalenceClass::new_from_vec(vec![lit_true.clone(), col_b_expr.clone()]); - let cls3 = EquivalenceClass::new_from_vec(vec![lit2.clone(), lit1.clone()]); + let cls1 = EquivalenceClass::new(vec![lit_true.clone(), lit_false.clone()]); + let cls2 = EquivalenceClass::new(vec![lit_true.clone(), col_b_expr.clone()]); + let cls3 = EquivalenceClass::new(vec![lit2.clone(), lit1.clone()]); // lit_true is common assert!(cls1.contains_any(&cls2)); From ce13ee11dfa61652da9d928f845ac1675324ca7c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 Nov 2023 12:38:01 -0500 Subject: [PATCH 4/7] Apply suggestions from code review Co-authored-by: Mehmet Ozan Kabak --- datafusion/physical-expr/src/equivalence.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 4752e7f82379b..34b4a4334f8c2 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -84,7 +84,7 @@ impl EquivalenceClass { physical_exprs_contains(&self.inner, expr) } - /// Returns true if this equivalence class has any entries in common with other + /// Returns true if this equivalence class has any entries in common with `other` pub fn contains_any(&self, other: &Self) -> bool { self.inner.iter().any(|e| other.contains(e)) } From 66c16d245ad0c4838510cdb07402cf26bbd89ccc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 Nov 2023 13:51:14 -0500 Subject: [PATCH 5/7] treat as set rather than vec --- datafusion/physical-expr/src/equivalence.rs | 52 +++++++++++---------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 210b232ad9e67..fb794f8215090 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::expressions::Column; use crate::sort_properties::{ExprOrdering, SortProperties}; use crate::{ - physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, LexOrdering, + physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; @@ -40,14 +40,23 @@ use indexmap::IndexMap; /// to have the same value for all tuples in a relation. These are generated by /// equality predicates (e.g. `a = b`), typically equi-join conditions and /// equality conditions in filters. +/// +/// Two `EquivalenceClass`es are equal if they contains the same expressions in +/// without any ordering. #[derive(Debug, Clone)] pub struct EquivalenceClass { + /// The expressions in this equivalence class. The order doesn't + /// matter for equivalence purposes + /// + /// TODO: use a HashSet for this instead of a Vec exprs: Vec>, } impl PartialEq for EquivalenceClass { + /// Returns true if other is equal in the sense + /// of bags (multi-sets), disregarding their orderings. fn eq(&self, other: &Self) -> bool { - physical_exprs_equal(&self.exprs, &other.exprs) + physical_exprs_bag_equal(&self.exprs, &other.exprs) } } @@ -58,14 +67,16 @@ impl EquivalenceClass { } // Create a new equivalence class from a pre-existing `Vec` - pub fn new(inner: Vec>) -> Self { - Self { exprs: inner } + pub fn new(mut exprs: Vec>) -> Self { + deduplicate_physical_exprs(&mut exprs); + Self { exprs } } /// Return the inner vector of expressions - pub fn into_inner(self) -> Vec> { + pub fn into_vec(self) -> Vec> { self.exprs } + /// Return the "canonical" expression for this class (the first element) /// if any fn canonical_expr(&self) -> Option> { @@ -75,12 +86,17 @@ impl EquivalenceClass { /// Insert the expression into this class, meaning it is known to be equal to /// all other expressions in this class pub fn push(&mut self, expr: Arc) { - self.exprs.push(expr); + if !self.contains(&expr) { + self.exprs.push(expr); + } } /// Inserts all the expressions from other into this class pub fn extend(&mut self, other: Self) { - self.exprs.extend(other.exprs); + for expr in other.exprs { + // use push so entries are deduplicated + self.push(expr); + } } /// Returns true if this equivalence class contains t expression @@ -103,13 +119,7 @@ impl EquivalenceClass { self.exprs.is_empty() } - /// Removes all duplicated exprs in this class - // TODO should we deduplicate on insert? - fn deduplicate(&mut self) { - deduplicate_physical_exprs(&mut self.exprs); - } - - /// Iterate over all elements in this class + /// Iterate over all elements in this class, in some arbitrary order pub fn iter(&self) -> impl Iterator> { self.exprs.iter() } @@ -117,19 +127,13 @@ impl EquivalenceClass { /// Return a new equivalence class that have the specified offset added to /// each expression (used when schemas are appended such as in joins) pub fn with_offset(&self, offset: usize) -> Self { - let new_inner = self + let new_exprs = self .exprs .iter() .cloned() .map(|e| add_offset_to_expr(e, offset)) .collect(); - Self { exprs: new_inner } - } - - /// Returns true if other is equal in the sense - /// of bags (multi-sets), disregarding their orderings. - pub fn eq_bag(&self, other: &Self) -> bool { - physical_exprs_bag_equal(&self.exprs, &other.exprs) + Self::new(new_exprs) } } @@ -285,7 +289,6 @@ impl EquivalenceGroup { self.classes.retain_mut(|cls| { // Keep groups that have at least two entries as singleton class is // meaningless (i.e. it contains no non-trivial information): - cls.deduplicate(); cls.len() > 1 }); // Unify/bridge groups that have common expressions: @@ -310,7 +313,6 @@ impl EquivalenceGroup { } } if self.classes[idx].len() > start_size { - self.classes[idx].deduplicate(); if self.classes[idx].len() > start_size { continue; } @@ -1959,7 +1961,7 @@ mod tests { ); assert_eq!(eq_groups.len(), expected.len(), "{}", err_msg); for idx in 0..eq_groups.len() { - assert!(eq_groups[idx].eq_bag(&expected[idx]), "{}", err_msg); + assert_eq!(&eq_groups[idx], &expected[idx], "{}", err_msg); } } Ok(()) From 36c1269ae2af4ed31144fcd9a61572458f4abd23 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 Nov 2023 14:17:56 -0500 Subject: [PATCH 6/7] fmt --- datafusion/physical-expr/src/equivalence.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index fb794f8215090..8156cfa2034b5 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -22,8 +22,8 @@ use std::sync::Arc; use crate::expressions::Column; use crate::sort_properties::{ExprOrdering, SortProperties}; use crate::{ - physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, - LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, + physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, LexOrderingRef, + LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; From 97d126c4e3bf1ce0c66c979932e811921676d83f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 13 Nov 2023 14:45:40 -0500 Subject: [PATCH 7/7] clippy --- datafusion/physical-expr/src/equivalence.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 8156cfa2034b5..84291653fb4f9 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -313,9 +313,7 @@ impl EquivalenceGroup { } } if self.classes[idx].len() > start_size { - if self.classes[idx].len() > start_size { - continue; - } + continue; } idx += 1; }