From ccba8360f2815d1445816e75e33a9c2f41f1072e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sun, 12 Jul 2020 20:48:39 +0800 Subject: [PATCH 1/3] avoid pushing down too many predicated in partition pruning --- .../sql/catalyst/expressions/predicates.scala | 11 ++------ .../PruneFileSourcePartitions.scala | 15 ++++++----- .../execution/PruneHiveTablePartitions.scala | 13 +++++++--- .../execution/PrunePartitionSuiteBase.scala | 25 +++++++++++++++++++ 4 files changed, 46 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 527618b8e2c5a..05abd1c6a3d76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -267,20 +267,13 @@ trait PredicateHelper extends Logging { /** * Convert an expression to conjunctive normal form for predicate pushdown and partition pruning. - * When expanding predicates, this method groups expressions by their references for reducing - * the size of pushed down predicates and corresponding codegen. In partition pruning strategies, - * we split filters by [[splitConjunctivePredicates]] and partition filters by judging if it's - * references is subset of partCols, if we combine expressions group by reference when expand - * predicate of [[Or]], it won't impact final predicate pruning result since - * [[splitConjunctivePredicates]] won't split [[Or]] expression. * * @param condition condition need to be converted * @return the CNF result as sequence of disjunctive expressions. If the number of expressions * exceeds threshold on converting `Or`, `Seq.empty` is returned. */ - def CNFWithGroupExpressionsByReference(condition: Expression): Seq[Expression] = { - conjunctiveNormalForm(condition, (expressions: Seq[Expression]) => - expressions.groupBy(e => AttributeSet(e.references)).map(_._2.reduceLeft(And)).toSeq) + def CNFConversion(condition: Expression): Seq[Expression] = { + conjunctiveNormalForm(condition, (expressions: Seq[Expression]) => expressions) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 576a826faf894..d82eb8b5635a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -53,11 +53,17 @@ private[sql] object PruneFileSourcePartitions val partitionColumns = relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) - val (partitionFilters, dataFilters) = normalizedFilters.partition(f => + val (partitionFilters, remainingFilters) = normalizedFilters.partition(f => f.references.subsetOf(partitionSet) ) - (ExpressionSet(partitionFilters), dataFilters) + // Try extracting more convertible partition filters from the remaining filters by converting + // them into CNF. + val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion) + val extraPartitionFilters = + remainingFilterInCnf.filter(f => f.references.subsetOf(partitionSet)) + + (ExpressionSet(partitionFilters ++ extraPartitionFilters), remainingFilters) } private def rebuildPhysicalOperation( @@ -88,12 +94,9 @@ private[sql] object PruneFileSourcePartitions _, _)) if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => - val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And)) - val finalPredicates = if (predicates.nonEmpty) predicates else filters val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters( - fsRelation.sparkSession, logicalRelation, partitionSchema, finalPredicates, + fsRelation.sparkSession, logicalRelation, partitionSchema, filters, logicalRelation.output) - if (partitionKeyFilters.nonEmpty) { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index c4885f2842597..2cfb14bfedf78 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions.CNFConversion import org.apache.spark.sql.internal.SQLConf /** @@ -54,9 +55,15 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) val normalizedFilters = DataSourceStrategy.normalizeExprs( filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) val partitionColumnSet = AttributeSet(relation.partitionCols) - ExpressionSet(normalizedFilters.filter { f => + val (partitionFilters, remainingFilters) = normalizedFilters.partition { f => !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) - }) + } + // Try extracting more convertible partition filters from the remaining filters by converting + // them into CNF. + val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion) + val extraPartitionFilters = remainingFilterInCnf.filter(f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)) + ExpressionSet(partitionFilters ++ extraPartitionFilters) } /** @@ -103,7 +110,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => - val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And)) + val predicates = CNFConversion(filters.reduceLeft(And)) val finalPredicates = if (predicates.nonEmpty) predicates else filters val partitionKeyFilters = getPartitionKeyFilters(finalPredicates, relation) if (partitionKeyFilters.nonEmpty) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala index d088061cdc6e5..6aa20f1dc83d4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala @@ -67,6 +67,31 @@ abstract class PrunePartitionSuiteBase extends QueryTest with SQLTestUtils with } } + test("SPARK-32284: Avoid pushing down too many predicates in partition pruning") { + withTempView("temp") { + withTable("t") { + sql( + s""" + |CREATE TABLE t(i INT, p0 INT, p1 INT) + |USING $format + |PARTITIONED BY (p0, p1)""".stripMargin) + + spark.range(0, 10, 1).selectExpr("id as col") + .createOrReplaceTempView("temp") + + for (part <- (0 to 25)) { + sql( + s""" + |INSERT OVERWRITE TABLE t PARTITION (p0='$part', p1='$part') + |SELECT col FROM temp""".stripMargin) + } + val scale = 20 + val predicate = (1 to scale).map(i => s"(p0 = '$i' AND p1 = '$i')").mkString(" OR ") + assertPrunedPartitions(s"SELECT * FROM t WHERE $predicate", scale) + } + } + } + protected def assertPrunedPartitions(query: String, expected: Long): Unit = { val plan = sql(query).queryExecution.sparkPlan assert(getScanExecPartitionSize(plan) == expected) From df083906f2d9938d77b4b41878271af1826b5220 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 13 Jul 2020 11:17:39 +0800 Subject: [PATCH 2/3] address comments --- .../execution/datasources/PruneFileSourcePartitions.scala | 3 +++ .../spark/sql/hive/execution/PruneHiveTablePartitions.scala | 5 +---- .../spark/sql/hive/execution/PrunePartitionSuiteBase.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index d82eb8b5635a7..57edab6063630 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -63,6 +63,9 @@ private[sql] object PruneFileSourcePartitions val extraPartitionFilters = remainingFilterInCnf.filter(f => f.references.subsetOf(partitionSet)) + // For the filters that can't be used for partition pruning, we simply use `remainingFilters` + // instead of using the non-convertible part from `remainingFilterInCnf`. Otherwise, the + // result filters can be very long. (ExpressionSet(partitionFilters ++ extraPartitionFilters), remainingFilters) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index 2cfb14bfedf78..54e8e7d53c643 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions.CNFConversion import org.apache.spark.sql.internal.SQLConf /** @@ -110,9 +109,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => - val predicates = CNFConversion(filters.reduceLeft(And)) - val finalPredicates = if (predicates.nonEmpty) predicates else filters - val partitionKeyFilters = getPartitionKeyFilters(finalPredicates, relation) + val partitionKeyFilters = getPartitionKeyFilters(filters, relation) if (partitionKeyFilters.nonEmpty) { val newPartitions = prunePartitions(relation, partitionKeyFilters) val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala index 6aa20f1dc83d4..539c405e22d5f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala @@ -67,7 +67,7 @@ abstract class PrunePartitionSuiteBase extends QueryTest with SQLTestUtils with } } - test("SPARK-32284: Avoid pushing down too many predicates in partition pruning") { + test("SPARK-32284: Avoid expanding too many CNF predicates in partition pruning") { withTempView("temp") { withTable("t") { sql( From 6fe106ce84e83641d66d572c45050b25761bbf3d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 13 Jul 2020 13:41:41 +0800 Subject: [PATCH 3/3] revise method names and comments --- .../sql/catalyst/expressions/predicates.scala | 25 ++++++------------- .../PruneFileSourcePartitions.scala | 2 +- .../execution/PruneHiveTablePartitions.scala | 2 +- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 05abd1c6a3d76..e81df911852bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -207,13 +207,15 @@ trait PredicateHelper extends Logging { * CNF can explode exponentially in the size of the input expression when converting [[Or]] * clauses. Use a configuration [[SQLConf.MAX_CNF_NODE_COUNT]] to prevent such cases. * - * @param condition to be converted into CNF. + * @param condition Condition to be converted into CNF. + * @param groupExpsFunc A method for grouping intermediate results so that the final result can be + * shorter. * @return the CNF result as sequence of disjunctive expressions. If the number of expressions * exceeds threshold on converting `Or`, `Seq.empty` is returned. */ - protected def conjunctiveNormalForm( + protected def CNFConversion( condition: Expression, - groupExpsFunc: Seq[Expression] => Seq[Expression]): Seq[Expression] = { + groupExpsFunc: Seq[Expression] => Seq[Expression] = identity): Seq[Expression] = { val postOrderNodes = postOrderTraversal(condition) val resultStack = new mutable.Stack[Seq[Expression]] val maxCnfNodeCount = SQLConf.get.maxCnfNodeCount @@ -256,26 +258,15 @@ trait PredicateHelper extends Logging { * when expand predicates, we can group by the qualifier avoiding generate unnecessary * expression to control the length of final result since there are multiple tables. * - * @param condition condition need to be converted + * @param condition Condition to be converted into CNF. * @return the CNF result as sequence of disjunctive expressions. If the number of expressions * exceeds threshold on converting `Or`, `Seq.empty` is returned. */ def CNFWithGroupExpressionsByQualifier(condition: Expression): Seq[Expression] = { - conjunctiveNormalForm(condition, (expressions: Seq[Expression]) => + CNFConversion(condition, (expressions: Seq[Expression]) => expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq) } - /** - * Convert an expression to conjunctive normal form for predicate pushdown and partition pruning. - * - * @param condition condition need to be converted - * @return the CNF result as sequence of disjunctive expressions. If the number of expressions - * exceeds threshold on converting `Or`, `Seq.empty` is returned. - */ - def CNFConversion(condition: Expression): Seq[Expression] = { - conjunctiveNormalForm(condition, (expressions: Seq[Expression]) => expressions) - } - /** * Iterative post order traversal over a binary tree built by And/Or clauses with two stacks. * For example, a condition `(a And b) Or c`, the postorder traversal is @@ -287,7 +278,7 @@ trait PredicateHelper extends Logging { * 2.1 Pop a node from first stack and push it to second stack * 2.2 Push the children of the popped node to first stack * - * @param condition to be traversed as binary tree + * @param condition Condition to be traversed as binary tree * @return sub-expressions in post order traversal as a stack. * The first element of result stack is the leftmost node. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 57edab6063630..580a0a773a8b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -59,7 +59,7 @@ private[sql] object PruneFileSourcePartitions // Try extracting more convertible partition filters from the remaining filters by converting // them into CNF. - val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion) + val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion(_)) val extraPartitionFilters = remainingFilterInCnf.filter(f => f.references.subsetOf(partitionSet)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index 54e8e7d53c643..de9bd58ba829a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -59,7 +59,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) } // Try extracting more convertible partition filters from the remaining filters by converting // them into CNF. - val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion) + val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion(_)) val extraPartitionFilters = remainingFilterInCnf.filter(f => !f.references.isEmpty && f.references.subsetOf(partitionColumnSet)) ExpressionSet(partitionFilters ++ extraPartitionFilters)