Skip to content

Commit 01698cb

Browse files
authored
chore: refactor BuildProbeJoinMetrics to use BaselineMetrics (#16500)
* chore: refactor `BuildProbeJoinMetrics` to use `BaselineMetrics` Closes #16495 Here's an example of an `explain analyze` of a hash join showing these metrics: ``` [(WatchID@0, WatchID@0)], metrics=[output_rows=100, elapsed_compute=2.313624ms, build_input_batches=1, build_input_rows=100, input_batches=1, input_rows=100, output_batches=1, build_mem_used=3688, build_time=865.832µs, join_time=1.369875ms] ``` Notice `output_rows=100, elapsed_compute=2.313624ms` in the above. * test: add checks for join metrics in tests * fix: add record_poll to ExhaustedProbeSide for nested_loop_join This was needed because ExhaustedProbeSide state can also return output rows - in certain types of joins. Without this, the output_rows metric for nested loop join was wrong!
1 parent 12c40ca commit 01698cb

5 files changed

Lines changed: 179 additions & 57 deletions

File tree

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,8 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
559559
handle_state!(ready!(self.fetch_probe_batch(cx)))
560560
}
561561
CrossJoinStreamState::BuildBatches(_) => {
562-
handle_state!(self.build_batches())
562+
let poll = handle_state!(self.build_batches());
563+
self.join_metrics.baseline.record_poll(poll)
563564
}
564565
};
565566
}
@@ -632,7 +633,6 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
632633
}
633634

634635
self.join_metrics.output_batches.add(1);
635-
self.join_metrics.output_rows.add(batch.num_rows());
636636
return Ok(StatefulStreamResult::Ready(Some(batch)));
637637
}
638638
}
@@ -647,7 +647,7 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
647647
mod tests {
648648
use super::*;
649649
use crate::common;
650-
use crate::test::build_table_scan_i32;
650+
use crate::test::{assert_join_metrics, build_table_scan_i32};
651651

652652
use datafusion_common::{assert_contains, test_util::batches_to_sort_string};
653653
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
@@ -657,14 +657,15 @@ mod tests {
657657
left: Arc<dyn ExecutionPlan>,
658658
right: Arc<dyn ExecutionPlan>,
659659
context: Arc<TaskContext>,
660-
) -> Result<(Vec<String>, Vec<RecordBatch>)> {
660+
) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
661661
let join = CrossJoinExec::new(left, right);
662662
let columns_header = columns(&join.schema());
663663

664664
let stream = join.execute(0, context)?;
665665
let batches = common::collect(stream).await?;
666+
let metrics = join.metrics().unwrap();
666667

667-
Ok((columns_header, batches))
668+
Ok((columns_header, batches, metrics))
668669
}
669670

670671
#[tokio::test]
@@ -831,7 +832,7 @@ mod tests {
831832
("c2", &vec![14, 15]),
832833
);
833834

834-
let (columns, batches) = join_collect(left, right, task_ctx).await?;
835+
let (columns, batches, metrics) = join_collect(left, right, task_ctx).await?;
835836

836837
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
837838

@@ -848,6 +849,8 @@ mod tests {
848849
+----+----+----+----+----+----+
849850
"#);
850851

852+
assert_join_metrics!(metrics, 6);
853+
851854
Ok(())
852855
}
853856

0 commit comments

Comments
 (0)