From 739ccecc1df23afc8402c92395ade1aafa037287 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 7 Feb 2024 23:45:03 -0800 Subject: [PATCH 1/3] Use prep_null_mask_filter to handle nulls in selection mask --- .../src/joins/sort_merge_join.rs | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 107fd7dde0f6e..e477f95f132c9 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1209,7 +1209,14 @@ impl SMJStream { ) { // The reverse of the selection mask. For the rows not pass join filter above, // we need to join them (left or right) with null rows for outer joins. - let not_mask = compute::not(mask)?; + let not_mask = if mask.null_count() > 0 { + // If the mask contains nulls, we need to use `prep_null_mask_filter` to + // handle the nulls in the mask as false. + compute::not(&compute::prep_null_mask_filter(mask))? + } else { + compute::not(mask)? + }; + let null_joined_batch = compute::filter_record_batch(&output_batch, ¬_mask)?; @@ -1254,6 +1261,20 @@ impl SMJStream { // For full join, we also need to output the null joined rows from the buffered side if matches!(self.join_type, JoinType::Full) { + // Handle not mask for buffered side further. + // For buffered side, we want to output the rows that are not null joined with + // the streamed side. i.e. the rows that are not null in the `buffered_indices`. + let not_mask = if buffered_indices.null_count() > 0 { + let nulls = buffered_indices.nulls().unwrap(); + let mask = not_mask.values() & nulls.inner(); + BooleanArray::new(mask, None) + } else { + not_mask + }; + + let null_joined_batch = + compute::filter_record_batch(&output_batch, ¬_mask)?; + let mut streamed_columns = self .streamed_schema .fields() From 93b32c4e6d9f8e676079bc1b747b7d42ebbc7b5a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 9 Feb 2024 13:06:28 -0800 Subject: [PATCH 2/3] Update datafusion/physical-plan/src/joins/sort_merge_join.rs Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/joins/sort_merge_join.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index e477f95f132c9..86734b9fc82e6 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1211,7 +1211,8 @@ impl SMJStream { // we need to join them (left or right) with null rows for outer joins. let not_mask = if mask.null_count() > 0 { // If the mask contains nulls, we need to use `prep_null_mask_filter` to - // handle the nulls in the mask as false. + // handle the nulls in the mask as false to produce rows where the mask + // was null itself. compute::not(&compute::prep_null_mask_filter(mask))? } else { compute::not(mask)? From 120a5b1bf38389ffec672c53c64db58279775fe7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 9 Feb 2024 13:09:56 -0800 Subject: [PATCH 3/3] Avoid unwrap --- datafusion/physical-plan/src/joins/sort_merge_join.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 86734b9fc82e6..7af614e534917 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1265,8 +1265,7 @@ impl SMJStream { // Handle not mask for buffered side further. // For buffered side, we want to output the rows that are not null joined with // the streamed side. i.e. the rows that are not null in the `buffered_indices`. - let not_mask = if buffered_indices.null_count() > 0 { - let nulls = buffered_indices.nulls().unwrap(); + let not_mask = if let Some(nulls) = buffered_indices.nulls() { let mask = not_mask.values() & nulls.inner(); BooleanArray::new(mask, None) } else {