diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 107fd7dde0f6e..7af614e534917 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1209,7 +1209,15 @@ impl SMJStream { ) { // The reverse of the selection mask. For the rows not pass join filter above, // we need to join them (left or right) with null rows for outer joins. - let not_mask = compute::not(mask)?; + let not_mask = if mask.null_count() > 0 { + // If the mask contains nulls, we need to use `prep_null_mask_filter` to + // handle the nulls in the mask as false to produce rows where the mask + // was null itself. + compute::not(&compute::prep_null_mask_filter(mask))? + } else { + compute::not(mask)? + }; + let null_joined_batch = compute::filter_record_batch(&output_batch, ¬_mask)?; @@ -1254,6 +1262,19 @@ impl SMJStream { // For full join, we also need to output the null joined rows from the buffered side if matches!(self.join_type, JoinType::Full) { + // Handle not mask for buffered side further. + // For buffered side, we want to output the rows that are not null joined with + // the streamed side. i.e. the rows that are not null in the `buffered_indices`. + let not_mask = if let Some(nulls) = buffered_indices.nulls() { + let mask = not_mask.values() & nulls.inner(); + BooleanArray::new(mask, None) + } else { + not_mask + }; + + let null_joined_batch = + compute::filter_record_batch(&output_batch, ¬_mask)?; + let mut streamed_columns = self .streamed_schema .fields()