From afe083ff45313ed07cb95ded4c089ead7d80ecce Mon Sep 17 00:00:00 2001 From: Taaffy <32072374+Taaffy@users.noreply.github.com> Date: Mon, 18 Sep 2017 18:56:51 +0200 Subject: [PATCH] Incorrect Metric reported in MetricsReporter.scala Current implementation for processingRate-total uses wrong metric: mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond --- .../apache/spark/sql/execution/streaming/MetricsReporter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala index 5551d12fa8ad2..b84e6ce64c611 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala @@ -40,7 +40,7 @@ class MetricsReporter( // Metric names should not have . in them, so that all the metrics of a query are identified // together in Ganglia as a single metric group registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond) - registerGauge("processingRate-total", () => stream.lastProgress.inputRowsPerSecond) + registerGauge("processingRate-total", () => stream.lastProgress.processedRowsPerSecond) registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue()) private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = {