diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 6058b7974e923..c0efe55fc376a 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1046,6 +1046,17 @@ async fn collect_left_input( }) .await?; + if batches.is_empty() { + return Ok(JoinLeftData::new( + Box::new(JoinHashMapU32::with_capacity(0)), + RecordBatch::new_empty(schema), + Vec::new(), + Mutex::new(BooleanBufferBuilder::new(0)), + AtomicUsize::new(probe_threads_count), + reservation, + )); + }; + // Estimation of memory size, required for hashtable, prior to allocation. // Final result can be verified using `RawTable.allocation_info()` let fixed_size_u32 = size_of::(); @@ -1068,49 +1079,42 @@ async fn collect_left_input( }; let mut hashes_buffer = Vec::new(); - let mut offset = 0; - - // Updating hashmap starting from the last batch - let batches_iter = batches.iter().rev(); - for batch in batches_iter.clone() { - hashes_buffer.clear(); - hashes_buffer.resize(batch.num_rows(), 0); - update_hash( - &on_left, - batch, - &mut *hashmap, - offset, - &random_state, - &mut hashes_buffer, - 0, - true, - )?; - offset += batch.num_rows(); - } - // Merge all batches into a single batch, so we can directly index into the arrays + + let batches_iter = batches.iter(); let single_batch = concat_batches(&schema, batches_iter)?; + let left_values = on_left + .iter() + .map(|c| { + c.evaluate(&single_batch)? + .into_array(single_batch.num_rows()) + }) + .collect::>>()?; + + hashes_buffer.clear(); + hashes_buffer.resize(single_batch.num_rows(), 0); + update_hash( + &left_values, + &mut *hashmap, + 0, // we pass in 0 offset since it is a single batch + &random_state, + &mut hashes_buffer, + 0, + true, + )?; // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { - let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8); + let bitmap_size = bit_util::ceil(num_rows, 8); reservation.try_grow(bitmap_size)?; metrics.build_mem_used.add(bitmap_size); - let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows()); + let mut bitmap_buffer = BooleanBufferBuilder::new(num_rows); bitmap_buffer.append_n(num_rows, false); bitmap_buffer } else { BooleanBufferBuilder::new(0) }; - let left_values = on_left - .iter() - .map(|c| { - c.evaluate(&single_batch)? - .into_array(single_batch.num_rows()) - }) - .collect::>>()?; - let data = JoinLeftData::new( hashmap, single_batch, @@ -1131,8 +1135,7 @@ async fn collect_left_input( /// as a chain head for rows with equal hash values. #[allow(clippy::too_many_arguments)] pub fn update_hash( - on: &[PhysicalExprRef], - batch: &RecordBatch, + key_values: &Vec>, hash_map: &mut dyn JoinHashMapType, offset: usize, random_state: &RandomState, @@ -1140,17 +1143,11 @@ pub fn update_hash( deleted_offset: usize, fifo_hashmap: bool, ) -> Result<()> { - // evaluate the keys - let keys_values = on - .iter() - .map(|c| c.evaluate(batch)?.into_array(batch.num_rows())) - .collect::>>()?; - // calculate the hash values - let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + let hash_values = create_hashes(key_values, random_state, hashes_buffer)?; // For usual JoinHashmap, the implementation is void. - hash_map.extend_zero(batch.num_rows()); + hash_map.extend_zero(key_values[0].len()); // Updating JoinHashMap from hash values iterator let hash_values_iter = hash_values diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 9a8d4cbb66050..ceb250e366eb3 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -34,7 +34,7 @@ use std::vec; use crate::common::SharedMemoryReservation; use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; -use crate::joins::hash_join::{equal_rows_arr, update_hash}; +use crate::joins::hash_join::equal_rows_arr; use crate::joins::stream_join_utils::{ calculate_filter_expr_intervals, combine_two_batches, convert_sort_expr_with_filter_schema, get_pruning_anti_indices, @@ -1280,6 +1280,44 @@ impl OneSideHashJoiner { } } +#[allow(clippy::too_many_arguments)] +pub fn update_hash( + on: &[PhysicalExprRef], + batch: &RecordBatch, + hash_map: &mut dyn JoinHashMapType, + offset: usize, + random_state: &RandomState, + hashes_buffer: &mut Vec, + deleted_offset: usize, + fifo_hashmap: bool, +) -> Result<()> { + // evaluate the keys + let keys_values = on + .iter() + .map(|c| c.evaluate(batch)?.into_array(batch.num_rows())) + .collect::>>()?; + + // calculate the hash values + let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + + // For usual JoinHashmap, the implementation is void. + hash_map.extend_zero(batch.num_rows()); + + // Updating JoinHashMap from hash values iterator + let hash_values_iter = hash_values + .iter() + .enumerate() + .map(|(i, val)| (i + offset, val)); + + if fifo_hashmap { + hash_map.update_from_iter(Box::new(hash_values_iter.rev()), deleted_offset); + } else { + hash_map.update_from_iter(Box::new(hash_values_iter), deleted_offset); + } + + Ok(()) +} + /// `SymmetricHashJoinStream` manages incremental join operations between two /// streams. Unlike traditional join approaches that need to scan one side of /// the join fully before proceeding, `SymmetricHashJoinStream` facilitates