From 002a5e57dee52273cf2459f2c6d699afc6e9ead2 Mon Sep 17 00:00:00 2001 From: Jian Tang Date: Thu, 23 Apr 2020 20:35:48 +0100 Subject: [PATCH 1/5] Benchmark Parquet Nested Field Predicate Pushdown --- ...edicatePushDownBenchmark-jdk11-results.txt | 20 +++ ...quetNestedPredicatePushDownBenchmark.scala | 121 ++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala diff --git a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt new file mode 100644 index 0000000000000..47bf07b857467 --- /dev/null +++ b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt @@ -0,0 +1,20 @@ +OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +LoadNoRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +NestedFieldsPredicatePushDownDisabled 35852 36874 793 2.9 341.9 1.0X +NestedFieldsPredicatePushDownEnabled 85 101 12 1238.9 0.8 423.6X + +OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +LoadSomeRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +NestedFieldsPredicatePushDownDisabled 36277 36626 314 2.9 346.0 1.0X +NestedFieldsPredicatePushDownEnabled 3714 4513 NaN 28.2 35.4 9.8X + +OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +LoadAllRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +NestedFieldsPredicatePushDownDisabled 44274 46513 586 2.4 422.2 1.0X +NestedFieldsPredicatePushDownEnabled 45424 46303 NaN 2.3 433.2 1.0X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala new file mode 100644 index 0000000000000..dbe6a5abf8ec4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.internal.SQLConf + +/** + * Synthetic benchmark for nested fields predicate push down performance for Parquet datasource. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class --jars + * 2. build/sbt "sql/test:runMain " + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt". + * }}} + */ +object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { + + private val N = 100 * 1024 * 1024 + private val NUMBER_OF_ITER = 10 + + override def getSparkSession: SparkSession = { + val conf = new SparkConf() + .setAppName(this.getClass.getSimpleName) + // Since `spark.master` always exists, overrides this value + .set("spark.master", "local[1]") + + SparkSession.builder().config(conf).getOrCreate() + } + + private val df: DataFrame = spark.range(1, N, 1, 4).toDF("id") + .selectExpr("id", "STRUCT(id x, STRUCT(CAST(id AS STRING) z) y) nested") + .sort("id") + + private def addCase(benchmark: Benchmark, inputPath: String, + enableNestedPD: Boolean, + name: String, filterFn: DataFrame => DataFrame): Unit = { + val loadDF = spark.read.parquet(inputPath) + benchmark.addCase(name) { + _ => + withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, + enableNestedPD.toString)) { + filterFn(loadDF).noop() + } + } + } + + private def createAndRunBenchmark(name: String, filterFn: DataFrame => DataFrame): Unit = { + withTempPath { + tempDir => + val outputPath = tempDir.getCanonicalPath + df.write.mode(SaveMode.Overwrite).parquet(tempDir.getCanonicalPath) + val benchmark = new Benchmark(name, N, NUMBER_OF_ITER, output = output) + addCase(benchmark, outputPath, enableNestedPD = false, + "NestedFieldsPredicatePushDownDisabled", filterFn) + addCase(benchmark, outputPath, enableNestedPD = true, + "NestedFieldsPredicatePushDownEnabled", filterFn) + benchmark.run() + } + } + /** + * Benchmark for sorted data with a filter which allows to filter out all the row groups + * when nested fields predicate push down enabled + */ + def runLoadNoRowGroupWhenPredicatePushedDown(): Unit = { + // no row group will be loaded when predicate pushed down + val filterFn: DataFrame => DataFrame = df => df.filter("nested.x < 0") + createAndRunBenchmark("LoadNoRowGroupsWhenPredicatePushedDown", filterFn) + } + + /** + * Benchmark with a filter which allows to load only some row groups + * when nested fields predicate push down enabled + */ + def runLoadSomeRowGroupWhenPredicatePushedDown(): Unit = { + // only a row group will be loaded when predicate pushed down + val filterFn: DataFrame => DataFrame = df => df.filter("nested.x = 100") + createAndRunBenchmark("LoadSomeRowGroupsWhenPredicatePushedDown", filterFn) + } + + /** + * Benchmark with a filter which still requires to + * load all the row groups on sorted data to see if we introduce too much + * overhead or not if enable nested predicate push down. + */ + def runLoadAllRowGroupsWhenPredicatePushedDown(): Unit = { + + // all row groups will be loaded with a whole range filter + val filterFn: DataFrame => DataFrame = { df => + df.filter("nested.x >= 0").filter(s"nested.x <= $N") + } + createAndRunBenchmark("LoadAllRowGroupsWhenPredicatePushedDown", filterFn) + } + + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runLoadNoRowGroupWhenPredicatePushedDown() + runLoadSomeRowGroupWhenPredicatePushedDown() + runLoadAllRowGroupsWhenPredicatePushedDown() + } +} From 7a6df2c5e4f3847a566c9cfb5adfccdbd3da81ef Mon Sep 17 00:00:00 2001 From: Jian Tang Date: Thu, 23 Apr 2020 22:21:59 +0100 Subject: [PATCH 2/5] SPARK-31364 Benchmark Parquet Predicate Pushdown Add jdk8 benchmark result --- ...stedPredicatePushDownBenchmark-results.txt | 21 +++++++++++++++++++ ...quetNestedPredicatePushDownBenchmark.scala | 3 +-- 2 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt diff --git a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt new file mode 100644 index 0000000000000..81f44df865281 --- /dev/null +++ b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt @@ -0,0 +1,21 @@ +OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +LoadNoRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +NestedFieldsPredicatePushDownDisabled 33189 34705 443 3.2 316.5 1.0X +NestedFieldsPredicatePushDownEnabled 81 93 8 1291.5 0.8 408.8X + +OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +LoadSomeRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +NestedFieldsPredicatePushDownDisabled 33122 34397 901 3.2 315.9 1.0X +NestedFieldsPredicatePushDownEnabled 3393 3449 54 30.9 32.4 9.8X + +OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +LoadAllRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +NestedFieldsPredicatePushDownDisabled 35266 35849 572 3.0 336.3 1.0X +NestedFieldsPredicatePushDownEnabled 34682 36049 NaN 3.0 330.8 1.0X + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala index dbe6a5abf8ec4..af7b86861c63b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala @@ -78,6 +78,7 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { benchmark.run() } } + /** * Benchmark for sorted data with a filter which allows to filter out all the row groups * when nested fields predicate push down enabled @@ -104,7 +105,6 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { * overhead or not if enable nested predicate push down. */ def runLoadAllRowGroupsWhenPredicatePushedDown(): Unit = { - // all row groups will be loaded with a whole range filter val filterFn: DataFrame => DataFrame = { df => df.filter("nested.x >= 0").filter(s"nested.x <= $N") @@ -112,7 +112,6 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { createAndRunBenchmark("LoadAllRowGroupsWhenPredicatePushedDown", filterFn) } - override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runLoadNoRowGroupWhenPredicatePushedDown() runLoadSomeRowGroupWhenPredicatePushedDown() From c72d0cef190416aa87be68af522d603e42cd12b0 Mon Sep 17 00:00:00 2001 From: Jian Tang Date: Fri, 24 Apr 2020 15:39:52 +0100 Subject: [PATCH 3/5] SPARK-31364 Benchmark Parquet Predicate Pushdown Add jdk8 benchmark result --- ...edicatePushDownBenchmark-jdk11-results.txt | 19 ++++----- ...stedPredicatePushDownBenchmark-results.txt | 18 ++++----- ...quetNestedPredicatePushDownBenchmark.scala | 39 ++++++++----------- 3 files changed, 35 insertions(+), 41 deletions(-) diff --git a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt index 47bf07b857467..ffaf17a3bdee9 100644 --- a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt @@ -1,20 +1,21 @@ OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -LoadNoRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +CanSkipAllRowGroupsWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -NestedFieldsPredicatePushDownDisabled 35852 36874 793 2.9 341.9 1.0X -NestedFieldsPredicatePushDownEnabled 85 101 12 1238.9 0.8 423.6X +Without nested predicate Pushdown 36379 37296 781 2.9 346.9 1.0X +With nested predicate Pushdown 89 100 9 1183.5 0.8 410.6X OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -LoadSomeRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +CanSkipSomeRowGroupsWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -NestedFieldsPredicatePushDownDisabled 36277 36626 314 2.9 346.0 1.0X -NestedFieldsPredicatePushDownEnabled 3714 4513 NaN 28.2 35.4 9.8X +Without nested predicate Pushdown 36174 37605 NaN 2.9 345.0 1.0X +With nested predicate Pushdown 3653 3684 22 28.7 34.8 9.9X OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -LoadAllRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +NoRowGroupSkippedWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -NestedFieldsPredicatePushDownDisabled 44274 46513 586 2.4 422.2 1.0X -NestedFieldsPredicatePushDownEnabled 45424 46303 NaN 2.3 433.2 1.0X +Without nested predicate Pushdown 39290 40747 NaN 2.7 374.7 1.0X +With nested predicate Pushdown 38936 39447 587 2.7 371.3 1.0X + diff --git a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt index 81f44df865281..90c537e4e7fb0 100644 --- a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt +++ b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt @@ -1,21 +1,21 @@ OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -LoadNoRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +CanSkipAllRowGroupsWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -NestedFieldsPredicatePushDownDisabled 33189 34705 443 3.2 316.5 1.0X -NestedFieldsPredicatePushDownEnabled 81 93 8 1291.5 0.8 408.8X +Without nested predicate Pushdown 30955 33020 372 3.4 295.2 1.0X +With nested predicate Pushdown 82 92 8 1280.6 0.8 378.0X OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -LoadSomeRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +CanSkipSomeRowGroupsWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -NestedFieldsPredicatePushDownDisabled 33122 34397 901 3.2 315.9 1.0X -NestedFieldsPredicatePushDownEnabled 3393 3449 54 30.9 32.4 9.8X +Without nested predicate Pushdown 30796 32251 918 3.4 293.7 1.0X +With nested predicate Pushdown 3218 3287 55 32.6 30.7 9.6X OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -LoadAllRowGroupsWhenPredicatePushedDown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +NoRowGroupSkippedWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -NestedFieldsPredicatePushDownDisabled 35266 35849 572 3.0 336.3 1.0X -NestedFieldsPredicatePushDownEnabled 34682 36049 NaN 3.0 330.8 1.0X +Without nested predicate Pushdown 35003 36026 989 3.0 333.8 1.0X +With nested predicate Pushdown 34945 36033 415 3.0 333.3 1.0X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala index af7b86861c63b..de78631666730 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala @@ -52,29 +52,27 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { .selectExpr("id", "STRUCT(id x, STRUCT(CAST(id AS STRING) z) y) nested") .sort("id") - private def addCase(benchmark: Benchmark, inputPath: String, - enableNestedPD: Boolean, - name: String, filterFn: DataFrame => DataFrame): Unit = { + private def addCase(benchmark: Benchmark, inputPath: String, enableNestedPD: Boolean, + name: String, withFilter: DataFrame => DataFrame): Unit = { val loadDF = spark.read.parquet(inputPath) - benchmark.addCase(name) { - _ => - withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, - enableNestedPD.toString)) { - filterFn(loadDF).noop() - } + benchmark.addCase(name) { _ => + withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, + enableNestedPD.toString)) { + withFilter(loadDF).noop() + } } } - private def createAndRunBenchmark(name: String, filterFn: DataFrame => DataFrame): Unit = { + private def createAndRunBenchmark(name: String, withFilter: DataFrame => DataFrame): Unit = { withTempPath { tempDir => val outputPath = tempDir.getCanonicalPath df.write.mode(SaveMode.Overwrite).parquet(tempDir.getCanonicalPath) val benchmark = new Benchmark(name, N, NUMBER_OF_ITER, output = output) addCase(benchmark, outputPath, enableNestedPD = false, - "NestedFieldsPredicatePushDownDisabled", filterFn) + "Without nested predicate Pushdown", withFilter) addCase(benchmark, outputPath, enableNestedPD = true, - "NestedFieldsPredicatePushDownEnabled", filterFn) + "With nested predicate Pushdown", withFilter) benchmark.run() } } @@ -84,9 +82,8 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { * when nested fields predicate push down enabled */ def runLoadNoRowGroupWhenPredicatePushedDown(): Unit = { - // no row group will be loaded when predicate pushed down - val filterFn: DataFrame => DataFrame = df => df.filter("nested.x < 0") - createAndRunBenchmark("LoadNoRowGroupsWhenPredicatePushedDown", filterFn) + createAndRunBenchmark("CanSkipAllRowGroupsWithNestedPredicatePushdown", + _.filter("nested.x < 0")) } /** @@ -94,9 +91,8 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { * when nested fields predicate push down enabled */ def runLoadSomeRowGroupWhenPredicatePushedDown(): Unit = { - // only a row group will be loaded when predicate pushed down - val filterFn: DataFrame => DataFrame = df => df.filter("nested.x = 100") - createAndRunBenchmark("LoadSomeRowGroupsWhenPredicatePushedDown", filterFn) + createAndRunBenchmark("CanSkipSomeRowGroupsWithNestedPredicatePushdown", + _.filter("nested.x = 100")) } /** @@ -105,11 +101,8 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { * overhead or not if enable nested predicate push down. */ def runLoadAllRowGroupsWhenPredicatePushedDown(): Unit = { - // all row groups will be loaded with a whole range filter - val filterFn: DataFrame => DataFrame = { df => - df.filter("nested.x >= 0").filter(s"nested.x <= $N") - } - createAndRunBenchmark("LoadAllRowGroupsWhenPredicatePushedDown", filterFn) + createAndRunBenchmark("NoRowGroupSkippedWithNestedPredicatePushdown", + _.filter(s"nested.x >= 0 and nested.x <= $N")) } override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { From e0703a550a75fba8413a48f40356bdafcd41497d Mon Sep 17 00:00:00 2001 From: Jian Tang Date: Fri, 24 Apr 2020 21:48:11 +0100 Subject: [PATCH 4/5] SPARK-31364 Benchmark Parquet Predicate Pushdown Rename benchmark and apply scalafmt --- ...edicatePushDownBenchmark-jdk11-results.txt | 18 +++---- ...stedPredicatePushDownBenchmark-results.txt | 18 +++---- ...quetNestedPredicatePushDownBenchmark.scala | 51 +++++++++++-------- 3 files changed, 48 insertions(+), 39 deletions(-) diff --git a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt index ffaf17a3bdee9..c33f8a372d82b 100644 --- a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt @@ -1,21 +1,21 @@ OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -CanSkipAllRowGroupsWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +Can skip all row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Without nested predicate Pushdown 36379 37296 781 2.9 346.9 1.0X -With nested predicate Pushdown 89 100 9 1183.5 0.8 410.6X +Without nested predicate Pushdown 34214 35752 NaN 3.1 326.3 1.0X +With nested predicate Pushdown 86 102 11 1216.2 0.8 396.8X OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -CanSkipSomeRowGroupsWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +Can skip some row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Without nested predicate Pushdown 36174 37605 NaN 2.9 345.0 1.0X -With nested predicate Pushdown 3653 3684 22 28.7 34.8 9.9X +Without nested predicate Pushdown 34211 35162 843 3.1 326.3 1.0X +With nested predicate Pushdown 3470 3514 36 30.2 33.1 9.9X OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -NoRowGroupSkippedWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +Can skip no row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Without nested predicate Pushdown 39290 40747 NaN 2.7 374.7 1.0X -With nested predicate Pushdown 38936 39447 587 2.7 371.3 1.0X +Without nested predicate Pushdown 37533 37919 329 2.8 357.9 1.0X +With nested predicate Pushdown 37876 39132 536 2.8 361.2 1.0X diff --git a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt index 90c537e4e7fb0..35dd4f0a5e9cb 100644 --- a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt +++ b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt @@ -1,21 +1,21 @@ OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -CanSkipAllRowGroupsWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +Can skip all row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Without nested predicate Pushdown 30955 33020 372 3.4 295.2 1.0X -With nested predicate Pushdown 82 92 8 1280.6 0.8 378.0X +Without nested predicate Pushdown 30687 31552 NaN 3.4 292.7 1.0X +With nested predicate Pushdown 105 150 61 999.3 1.0 292.5X OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -CanSkipSomeRowGroupsWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +Can skip some row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Without nested predicate Pushdown 30796 32251 918 3.4 293.7 1.0X -With nested predicate Pushdown 3218 3287 55 32.6 30.7 9.6X +Without nested predicate Pushdown 30505 31828 NaN 3.4 290.9 1.0X +With nested predicate Pushdown 3156 3215 77 33.2 30.1 9.7X OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz -NoRowGroupSkippedWithNestedPredicatePushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +Can skip no row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Without nested predicate Pushdown 35003 36026 989 3.0 333.8 1.0X -With nested predicate Pushdown 34945 36033 415 3.0 333.3 1.0X +Without nested predicate Pushdown 34475 35302 NaN 3.0 328.8 1.0X +With nested predicate Pushdown 34003 34596 567 3.1 324.3 1.0X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala index de78631666730..005efd6dcfeae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala @@ -48,32 +48,44 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { SparkSession.builder().config(conf).getOrCreate() } - private val df: DataFrame = spark.range(1, N, 1, 4).toDF("id") + private val df: DataFrame = spark + .range(1, N, 1, 4) + .toDF("id") .selectExpr("id", "STRUCT(id x, STRUCT(CAST(id AS STRING) z) y) nested") .sort("id") - private def addCase(benchmark: Benchmark, inputPath: String, enableNestedPD: Boolean, - name: String, withFilter: DataFrame => DataFrame): Unit = { + private def addCase( + benchmark: Benchmark, + inputPath: String, + enableNestedPD: Boolean, + name: String, + withFilter: DataFrame => DataFrame): Unit = { val loadDF = spark.read.parquet(inputPath) benchmark.addCase(name) { _ => - withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, - enableNestedPD.toString)) { + withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, enableNestedPD.toString)) { withFilter(loadDF).noop() } } } private def createAndRunBenchmark(name: String, withFilter: DataFrame => DataFrame): Unit = { - withTempPath { - tempDir => - val outputPath = tempDir.getCanonicalPath - df.write.mode(SaveMode.Overwrite).parquet(tempDir.getCanonicalPath) - val benchmark = new Benchmark(name, N, NUMBER_OF_ITER, output = output) - addCase(benchmark, outputPath, enableNestedPD = false, - "Without nested predicate Pushdown", withFilter) - addCase(benchmark, outputPath, enableNestedPD = true, - "With nested predicate Pushdown", withFilter) - benchmark.run() + withTempPath { tempDir => + val outputPath = tempDir.getCanonicalPath + df.write.mode(SaveMode.Overwrite).parquet(outputPath) + val benchmark = new Benchmark(name, N, NUMBER_OF_ITER, output = output) + addCase( + benchmark, + outputPath, + enableNestedPD = false, + "Without nested predicate Pushdown", + withFilter) + addCase( + benchmark, + outputPath, + enableNestedPD = true, + "With nested predicate Pushdown", + withFilter) + benchmark.run() } } @@ -82,8 +94,7 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { * when nested fields predicate push down enabled */ def runLoadNoRowGroupWhenPredicatePushedDown(): Unit = { - createAndRunBenchmark("CanSkipAllRowGroupsWithNestedPredicatePushdown", - _.filter("nested.x < 0")) + createAndRunBenchmark("Can skip all row groups", _.filter("nested.x < 0")) } /** @@ -91,8 +102,7 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { * when nested fields predicate push down enabled */ def runLoadSomeRowGroupWhenPredicatePushedDown(): Unit = { - createAndRunBenchmark("CanSkipSomeRowGroupsWithNestedPredicatePushdown", - _.filter("nested.x = 100")) + createAndRunBenchmark("Can skip some row groups", _.filter("nested.x = 100")) } /** @@ -101,8 +111,7 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { * overhead or not if enable nested predicate push down. */ def runLoadAllRowGroupsWhenPredicatePushedDown(): Unit = { - createAndRunBenchmark("NoRowGroupSkippedWithNestedPredicatePushdown", - _.filter(s"nested.x >= 0 and nested.x <= $N")) + createAndRunBenchmark("Can skip no row groups", _.filter(s"nested.x >= 0 and nested.x <= $N")) } override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { From f275021e9170f35a65b569c1188945092ba8ae43 Mon Sep 17 00:00:00 2001 From: Jian Tang Date: Fri, 24 Apr 2020 22:57:38 +0100 Subject: [PATCH 5/5] SPARK-31364 Benchmark Parquet Predicate Pushdown remove override of spark session creation --- .../ParquetNestedPredicatePushDownBenchmark.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala index 005efd6dcfeae..11bc91a4b1551 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala @@ -39,15 +39,6 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { private val N = 100 * 1024 * 1024 private val NUMBER_OF_ITER = 10 - override def getSparkSession: SparkSession = { - val conf = new SparkConf() - .setAppName(this.getClass.getSimpleName) - // Since `spark.master` always exists, overrides this value - .set("spark.master", "local[1]") - - SparkSession.builder().config(conf).getOrCreate() - } - private val df: DataFrame = spark .range(1, N, 1, 4) .toDF("id")