diff --git a/datafusion/src/physical_plan/metrics/aggregated.rs b/datafusion/src/physical_plan/metrics/aggregated.rs index cfcdcb78b00de..c55cc1601768c 100644 --- a/datafusion/src/physical_plan/metrics/aggregated.rs +++ b/datafusion/src/physical_plan/metrics/aggregated.rs @@ -35,9 +35,15 @@ pub struct AggregatedMetricsSet { final_: Arc>>, } +impl Default for AggregatedMetricsSet { + fn default() -> Self { + Self::new() + } +} + impl AggregatedMetricsSet { /// Create a new aggregated set - pub(crate) fn new() -> Self { + pub fn new() -> Self { Self { intermediate: Arc::new(std::sync::Mutex::new(vec![])), final_: Arc::new(std::sync::Mutex::new(vec![])), @@ -45,7 +51,7 @@ impl AggregatedMetricsSet { } /// create a new intermediate baseline - pub(crate) fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics { + pub fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics { let ms = ExecutionPlanMetricsSet::new(); let result = BaselineMetrics::new(&ms, partition); self.intermediate.lock().unwrap().push(ms); @@ -53,7 +59,7 @@ impl AggregatedMetricsSet { } /// create a new final baseline - pub(crate) fn new_final_baseline(&self, partition: usize) -> BaselineMetrics { + pub fn new_final_baseline(&self, partition: usize) -> BaselineMetrics { let ms = ExecutionPlanMetricsSet::new(); let result = BaselineMetrics::new(&ms, partition); self.final_.lock().unwrap().push(ms); @@ -137,7 +143,7 @@ impl AggregatedMetricsSet { } /// Aggregate all metrics into a one - pub(crate) fn aggregate_all(&self) -> MetricsSet { + pub fn aggregate_all(&self) -> MetricsSet { let metrics = ExecutionPlanMetricsSet::new(); let baseline = BaselineMetrics::new(&metrics, 0); self.merge_compute_time(baseline.elapsed_compute()); diff --git a/datafusion/src/physical_plan/metrics/baseline.rs b/datafusion/src/physical_plan/metrics/baseline.rs index 4c3ab6f7df114..50c49ece141b9 100644 --- a/datafusion/src/physical_plan/metrics/baseline.rs +++ b/datafusion/src/physical_plan/metrics/baseline.rs @@ -21,7 +21,7 @@ use std::task::Poll; use arrow::{error::ArrowError, record_batch::RecordBatch}; -use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp}; +use super::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, Timestamp}; /// Helper for creating and tracking common "baseline" metrics for /// each operator @@ -56,6 +56,9 @@ pub struct BaselineMetrics { /// total spilled bytes during the execution of the operator spilled_bytes: Count, + /// current memory usage for the operator + mem_used: Gauge, + /// output rows: the total output rows output_rows: Count, } @@ -71,6 +74,7 @@ impl BaselineMetrics { elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition), spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), + mem_used: MetricBuilder::new(metrics).mem_used(partition), output_rows: MetricBuilder::new(metrics).output_rows(partition), } } @@ -90,6 +94,11 @@ impl BaselineMetrics { &self.spilled_bytes } + /// return the metric for current memory usage + pub fn mem_used(&self) -> &Gauge { + &self.mem_used + } + /// Record a spill of `spilled_bytes` size. pub fn record_spill(&self, spilled_bytes: usize) { self.spill_count.add(1); diff --git a/datafusion/src/physical_plan/metrics/builder.rs b/datafusion/src/physical_plan/metrics/builder.rs index 13ffedee63822..30e9764c64460 100644 --- a/datafusion/src/physical_plan/metrics/builder.rs +++ b/datafusion/src/physical_plan/metrics/builder.rs @@ -20,7 +20,7 @@ use std::{borrow::Cow, sync::Arc}; use super::{ - Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time, Timestamp, + Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp, }; /// Structure for constructing metrics, counters, timers, etc. @@ -123,6 +123,14 @@ impl<'a> MetricBuilder<'a> { count } + /// Consume self and create a new gauge for reporting current memory usage + pub fn mem_used(self, partition: usize) -> Gauge { + let gauge = Gauge::new(); + self.with_partition(partition) + .build(MetricValue::CurrentMemoryUsage(gauge.clone())); + gauge + } + /// Consumes self and creates a new [`Count`] for recording some /// arbitrary metric of an operator. pub fn counter( @@ -133,6 +141,16 @@ impl<'a> MetricBuilder<'a> { self.with_partition(partition).global_counter(counter_name) } + /// Consumes self and creates a new [`Gauge`] for reporting some + /// arbitrary metric of an operator. + pub fn gauge( + self, + gauge_name: impl Into>, + partition: usize, + ) -> Gauge { + self.with_partition(partition).global_gauge(gauge_name) + } + /// Consumes self and creates a new [`Count`] for recording a /// metric of an overall operator (not per partition) pub fn global_counter(self, counter_name: impl Into>) -> Count { @@ -144,6 +162,17 @@ impl<'a> MetricBuilder<'a> { count } + /// Consumes self and creates a new [`Gauge`] for reporting a + /// metric of an overall operator (not per partition) + pub fn global_gauge(self, gauge_name: impl Into>) -> Gauge { + let gauge = Gauge::new(); + self.build(MetricValue::Gauge { + name: gauge_name.into(), + gauge: gauge.clone(), + }); + gauge + } + /// Consume self and create a new Timer for recording the elapsed /// CPU time spent by an operator pub fn elapsed_compute(self, partition: usize) -> Time { diff --git a/datafusion/src/physical_plan/metrics/mod.rs b/datafusion/src/physical_plan/metrics/mod.rs index 9174fa3a3d9d9..d48959974e8de 100644 --- a/datafusion/src/physical_plan/metrics/mod.rs +++ b/datafusion/src/physical_plan/metrics/mod.rs @@ -34,7 +34,7 @@ use hashbrown::HashMap; pub use aggregated::AggregatedMetricsSet; pub use baseline::{BaselineMetrics, RecordOutput}; pub use builder::MetricBuilder; -pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp}; +pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; /// Something that tracks a value of interest (metric) of a DataFusion /// [`ExecutionPlan`] execution. diff --git a/datafusion/src/physical_plan/metrics/value.rs b/datafusion/src/physical_plan/metrics/value.rs index 8c1097f7e4bfa..6ac282a496ee3 100644 --- a/datafusion/src/physical_plan/metrics/value.rs +++ b/datafusion/src/physical_plan/metrics/value.rs @@ -77,6 +77,62 @@ impl Count { } } +/// A gauge is the simplest metrics type. It just returns a value. +/// For example, you can easily expose current memory consumption with a gauge. +/// +/// Note `clone`ing gauge update the same underlying metrics +#[derive(Debug, Clone)] +pub struct Gauge { + /// value of the metric gauge + value: std::sync::Arc, +} + +impl PartialEq for Gauge { + fn eq(&self, other: &Self) -> bool { + self.value().eq(&other.value()) + } +} + +impl Display for Gauge { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.value()) + } +} + +impl Default for Gauge { + fn default() -> Self { + Self::new() + } +} + +impl Gauge { + /// create a new gauge + pub fn new() -> Self { + Self { + value: Arc::new(AtomicUsize::new(0)), + } + } + + /// Add `n` to the metric's value + pub fn add(&self, n: usize) { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + self.value.fetch_add(n, Ordering::Relaxed); + } + + /// Set the metric's value to `n` and return the previous value + pub fn set(&self, n: usize) -> usize { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + self.value.swap(n, Ordering::Relaxed) + } + + /// Get the current value + pub fn value(&self) -> usize { + self.value.load(Ordering::Relaxed) + } +} + /// Measure a potentially non contiguous duration of time #[derive(Debug, Clone)] pub struct Time { @@ -287,6 +343,8 @@ pub enum MetricValue { SpillCount(Count), /// Total size of spilled bytes produced: "spilled_bytes" metric SpilledBytes(Count), + /// Current memory used + CurrentMemoryUsage(Gauge), /// Operator defined count. Count { /// The provided name of this metric @@ -294,6 +352,13 @@ pub enum MetricValue { /// The value of the metric count: Count, }, + /// Operator defined gauge. + Gauge { + /// The provided name of this metric + name: Cow<'static, str>, + /// The value of the metric + gauge: Gauge, + }, /// Operator defined time Time { /// The provided name of this metric @@ -314,8 +379,10 @@ impl MetricValue { Self::OutputRows(_) => "output_rows", Self::SpillCount(_) => "spill_count", Self::SpilledBytes(_) => "spilled_bytes", + Self::CurrentMemoryUsage(_) => "mem_used", Self::ElapsedCompute(_) => "elapsed_compute", Self::Count { name, .. } => name.borrow(), + Self::Gauge { name, .. } => name.borrow(), Self::Time { name, .. } => name.borrow(), Self::StartTimestamp(_) => "start_timestamp", Self::EndTimestamp(_) => "end_timestamp", @@ -328,8 +395,10 @@ impl MetricValue { Self::OutputRows(count) => count.value(), Self::SpillCount(count) => count.value(), Self::SpilledBytes(bytes) => bytes.value(), + Self::CurrentMemoryUsage(used) => used.value(), Self::ElapsedCompute(time) => time.value(), Self::Count { count, .. } => count.value(), + Self::Gauge { gauge, .. } => gauge.value(), Self::Time { time, .. } => time.value(), Self::StartTimestamp(timestamp) => timestamp .value() @@ -349,11 +418,16 @@ impl MetricValue { Self::OutputRows(_) => Self::OutputRows(Count::new()), Self::SpillCount(_) => Self::SpillCount(Count::new()), Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()), + Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()), Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()), Self::Count { name, .. } => Self::Count { name: name.clone(), count: Count::new(), }, + Self::Gauge { name, .. } => Self::Gauge { + name: name.clone(), + gauge: Gauge::new(), + }, Self::Time { name, .. } => Self::Time { name: name.clone(), time: Time::new(), @@ -383,6 +457,13 @@ impl MetricValue { count: other_count, .. }, ) => count.add(other_count.value()), + (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge)) + | ( + Self::Gauge { gauge, .. }, + Self::Gauge { + gauge: other_gauge, .. + }, + ) => gauge.add(other_gauge.value()), (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time)) | ( Self::Time { time, .. }, @@ -415,10 +496,12 @@ impl MetricValue { Self::ElapsedCompute(_) => 1, // show second Self::SpillCount(_) => 2, Self::SpilledBytes(_) => 3, - Self::Count { .. } => 4, - Self::Time { .. } => 5, - Self::StartTimestamp(_) => 6, // show timestamps last - Self::EndTimestamp(_) => 7, + Self::CurrentMemoryUsage(_) => 4, + Self::Count { .. } => 5, + Self::Gauge { .. } => 6, + Self::Time { .. } => 7, + Self::StartTimestamp(_) => 8, // show timestamps last + Self::EndTimestamp(_) => 9, } } @@ -438,6 +521,9 @@ impl std::fmt::Display for MetricValue { | Self::Count { count, .. } => { write!(f, "{}", count) } + Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => { + write!(f, "{}", gauge) + } Self::ElapsedCompute(time) | Self::Time { time, .. } => { // distinguish between no time recorded and very small // amount of time recorded diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 0f5c3bdc07f8c..a2df6453ee829 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -51,10 +51,9 @@ use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tempfile::NamedTempFile; -use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task; /// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available). @@ -76,7 +75,6 @@ struct ExternalSorter { runtime: Arc, metrics: AggregatedMetricsSet, inner_metrics: BaselineMetrics, - used: AtomicUsize, } impl ExternalSorter { @@ -97,7 +95,6 @@ impl ExternalSorter { runtime, metrics, inner_metrics, - used: AtomicUsize::new(0), } } @@ -105,7 +102,7 @@ impl ExternalSorter { if input.num_rows() > 0 { let size = batch_byte_size(&input); self.try_grow(size).await?; - self.used.fetch_add(size, Ordering::SeqCst); + self.inner_metrics.mem_used().add(size); let mut in_mem_batches = self.in_mem_batches.lock().await; in_mem_batches.push(input); } @@ -132,7 +129,8 @@ impl ExternalSorter { &self.expr, baseline_metrics, )?; - streams.push(SortedStream::new(in_mem_stream, self.used())); + let prev_used = self.inner_metrics.mem_used().set(0); + streams.push(SortedStream::new(in_mem_stream, prev_used)); } let mut spills = self.spills.lock().await; @@ -152,19 +150,22 @@ impl ExternalSorter { ))) } else if in_mem_batches.len() > 0 { let baseline_metrics = self.metrics.new_final_baseline(partition); - in_mem_partial_sort( + let result = in_mem_partial_sort( &mut *in_mem_batches, self.schema.clone(), &self.expr, baseline_metrics, - ) + ); + self.inner_metrics.mem_used().set(0); + // TODO: the result size is not tracked + result } else { Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) } } fn used(&self) -> usize { - self.used.load(Ordering::SeqCst) + self.inner_metrics.mem_used().value() } fn spilled_bytes(&self) -> usize { @@ -231,22 +232,17 @@ impl MemoryConsumer for ExternalSorter { baseline_metrics, ); - let total_size = spill_partial_sorted_stream( - &mut stream?, - spillfile.path(), - self.schema.clone(), - ) - .await?; - + spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone()) + .await?; let mut spills = self.spills.lock().await; - let used = self.used.swap(0, Ordering::SeqCst); - self.inner_metrics.record_spill(total_size); + let used = self.inner_metrics.mem_used().set(0); + self.inner_metrics.record_spill(used); spills.push(spillfile); Ok(used) } fn mem_used(&self) -> usize { - self.used.load(Ordering::SeqCst) + self.inner_metrics.mem_used().value() } } @@ -288,7 +284,7 @@ async fn spill_partial_sorted_stream( in_mem_stream: &mut SendableRecordBatchStream, path: &Path, schema: SchemaRef, -) -> Result { +) -> Result<()> { let (sender, receiver) = tokio::sync::mpsc::channel(2); let path: PathBuf = path.into(); let handle = task::spawn_blocking(move || write_sorted(receiver, path, schema)); @@ -310,8 +306,8 @@ fn read_spill_as_stream( schema: SchemaRef, ) -> Result { let (sender, receiver): ( - TKSender>, - TKReceiver>, + Sender>, + Receiver>, ) = tokio::sync::mpsc::channel(2); let join_handle = task::spawn_blocking(move || { if let Err(e) = read_spill(sender, path.path()) { @@ -326,10 +322,10 @@ fn read_spill_as_stream( } fn write_sorted( - mut receiver: TKReceiver>, + mut receiver: Receiver>, path: PathBuf, schema: SchemaRef, -) -> Result { +) -> Result<()> { let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; while let Some(batch) = receiver.blocking_recv() { writer.write(&batch?)?; @@ -339,10 +335,10 @@ fn write_sorted( "Spilled {} batches of total {} rows to disk, memory released {}", writer.num_batches, writer.num_rows, writer.num_bytes ); - Ok(writer.num_bytes as usize) + Ok(()) } -fn read_spill(sender: TKSender>, path: &Path) -> Result<()> { +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { let file = BufReader::new(File::open(&path)?); let reader = FileReader::try_new(file)?; for batch in reader {