From 9170ceb69fda3ae6a064b1941cd380ee7a2a13ed Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 16 Aug 2017 04:53:49 +0000 Subject: [PATCH 1/4] Check for structural integrity of the plan in Optimzer in test mode. --- .../sql/catalyst/optimizer/Optimizer.scala | 6 ++ .../sql/catalyst/rules/RuleExecutor.scala | 14 +++++ .../optimizer/OptimizerSICheckerSuite.scala | 60 +++++++++++++++++++ .../catalyst/trees/RuleExecutorSuite.scala | 21 +++++++ 4 files changed, 101 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSICheckerSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a51b385399d88..213085da6a6ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -37,6 +37,12 @@ import org.apache.spark.sql.types._ abstract class Optimizer(sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] { + // Check for structural integrity of the plan in test mode. Currently we only check if a plan is + // still resolved after the execution of each rule. + override protected def planChecker: Option[LogicalPlan => Boolean] = Some( + (plan: LogicalPlan) => plan.resolved + ) + protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) def batches: Seq[Batch] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 85b368c862630..e5dc2a3459038 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -63,6 +63,13 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { /** Defines a sequence of rule batches, to be overridden by the implementation. */ protected def batches: Seq[Batch] + /** + * Defines a check function which checks for structural integrity of the plan in test mode after + * the execution of each rule. For example, we can check whether a plan is still resolved after + * each rule in `Optimizer`, so we can catch rules that return invalid plans. The check function + * returns `false` if the given plan doesn't pass the structural integrity check. + */ + protected def planChecker: Option[TreeType => Boolean] = None /** * Executes the batches of rules defined by the subclass. The batches are executed serially @@ -94,6 +101,13 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { """.stripMargin) } + // In test mode, run the structural integrity checker against the plan after each rule. + if (Utils.isTesting && !planChecker.map(_.apply(result)).getOrElse(true)) { + val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + + "the structural integrity of the plan is broken." + throw new TreeNodeException(result, message, null) + } + result } iteration += 1 diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSICheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSICheckerSuite.scala new file mode 100644 index 0000000000000..23d6c153b0cb0 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSICheckerSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.internal.SQLConf + + +class OptimizerSICheckerkSuite extends PlanTest { + + object OptimizeRuleBreakSI extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Project(projectList, child) => + val newAttr = UnresolvedAttribute("unresolvedAttr") + Project(projectList ++ Seq(newAttr), child) + } + } + + object Optimize extends Optimizer( + new SessionCatalog( + new InMemoryCatalog, + EmptyFunctionRegistry, + new SQLConf())) { + val newBatch = Batch("OptimizeRuleBreakSI", Once, OptimizeRuleBreakSI) + override def batches: Seq[Batch] = Seq(newBatch) ++ super.batches + } + + test("check for invalid plan after execution of rule") { + val analyzed = Project(Alias(Literal(10), "attr")() :: Nil, OneRowRelation()).analyze + assert(analyzed.resolved) + val message = intercept[TreeNodeException[LogicalPlan]] { + Optimize.execute(analyzed) + }.getMessage + val ruleName = OptimizeRuleBreakSI.ruleName + assert(message.contains(s"After applying rule $ruleName in batch OptimizeRuleBreakSI")) + assert(message.contains("the structural integrity of the plan is broken")) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index c9d36910b0998..c29c0a1acaac7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -56,4 +56,25 @@ class RuleExecutorSuite extends SparkFunSuite { }.getMessage assert(message.contains("Max iterations (10) reached for batch fixedPoint")) } + + test("structural integrity checker") { + object WithSIChecker extends RuleExecutor[Expression] { + override protected def planChecker: Option[Expression => Boolean] = Some( + (expr: Expression) => { + expr match { + case IntegerLiteral(_) => true + case _ => false + } + } + ) + val batches = Batch("once", Once, DecrementLiterals) :: Nil + } + + assert(WithSIChecker.execute(Literal(10)) === Literal(9)) + + val message = intercept[TreeNodeException[LogicalPlan]] { + WithSIChecker.execute(Literal(10.1)) + }.getMessage + assert(message.contains("the structural integrity of the plan is broken")) + } } From c99011ddbf60ae104cb91c578d56c971e6b87c86 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 16 Aug 2017 05:28:09 +0000 Subject: [PATCH 2/4] Address comment. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 7 ++++--- .../spark/sql/catalyst/rules/RuleExecutor.scala | 14 +++++++------- .../sql/catalyst/trees/RuleExecutorSuite.scala | 12 ++++-------- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 213085da6a6ad..e927d9c608cc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * Abstract class all optimizers should inherit of, contains the standard batches (extending @@ -39,9 +40,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // Check for structural integrity of the plan in test mode. Currently we only check if a plan is // still resolved after the execution of each rule. - override protected def planChecker: Option[LogicalPlan => Boolean] = Some( - (plan: LogicalPlan) => plan.resolved - ) + override protected def planChecker(plan: LogicalPlan): Boolean = { + Utils.isTesting && plan.resolved + } protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index e5dc2a3459038..9fd035431f53f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -64,12 +64,12 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { protected def batches: Seq[Batch] /** - * Defines a check function which checks for structural integrity of the plan in test mode after - * the execution of each rule. For example, we can check whether a plan is still resolved after - * each rule in `Optimizer`, so we can catch rules that return invalid plans. The check function - * returns `false` if the given plan doesn't pass the structural integrity check. + * Defines a check function which checks for structural integrity of the plan after the execution + * of each rule. For example, we can check whether a plan is still resolved after each rule in + * `Optimizer`, so we can catch rules that return invalid plans. The check function will returns + * `false` if the given plan doesn't pass the structural integrity check. */ - protected def planChecker: Option[TreeType => Boolean] = None + protected def planChecker(plan: TreeType): Boolean = true /** * Executes the batches of rules defined by the subclass. The batches are executed serially @@ -101,8 +101,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { """.stripMargin) } - // In test mode, run the structural integrity checker against the plan after each rule. - if (Utils.isTesting && !planChecker.map(_.apply(result)).getOrElse(true)) { + // Run the structural integrity checker against the plan after each rule. + if (!planChecker(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." throw new TreeNodeException(result, message, null) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index c29c0a1acaac7..5af5b4bd60458 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -59,14 +59,10 @@ class RuleExecutorSuite extends SparkFunSuite { test("structural integrity checker") { object WithSIChecker extends RuleExecutor[Expression] { - override protected def planChecker: Option[Expression => Boolean] = Some( - (expr: Expression) => { - expr match { - case IntegerLiteral(_) => true - case _ => false - } - } - ) + override protected def planChecker(expr: Expression): Boolean = expr match { + case IntegerLiteral(_) => true + case _ => false + } val batches = Batch("once", Once, DecrementLiterals) :: Nil } From 959e31539f7e26d919617bb9969edec6e4daf2fd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 7 Sep 2017 14:42:17 +0000 Subject: [PATCH 3/4] We should analyze the plan for evaluating expression before optimization. --- .../sql/catalyst/expressions/ExpressionEvalHelper.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 0496d611ec3c7..b4c8eab19c5cc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -25,7 +25,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.serializer.JavaSerializer import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone +import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} @@ -188,7 +188,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { expected: Any, inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation()) - val optimizedPlan = SimpleTestOptimizer.execute(plan) + // We should analyze the plan first, otherwise we possibly optimize an unresolved plan. + val analyzedPlan = SimpleAnalyzer.execute(plan) + val optimizedPlan = SimpleTestOptimizer.execute(analyzedPlan) checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow) } From ecdfb7db34d0d01e357bff0d32b62137ef0ae735 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 7 Sep 2017 23:20:49 +0000 Subject: [PATCH 4/4] Address comments. --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../apache/spark/sql/catalyst/rules/RuleExecutor.scala | 8 ++++---- ...ala => OptimizerStructuralIntegrityCheckerSuite.scala} | 2 +- .../spark/sql/catalyst/trees/RuleExecutorSuite.scala | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/{OptimizerSICheckerSuite.scala => OptimizerStructuralIntegrityCheckerSuite.scala} (97%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 80c18027c8c5c..2426a8b4a9062 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -40,7 +40,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // Check for structural integrity of the plan in test mode. Currently we only check if a plan is // still resolved after the execution of each rule. - override protected def planChecker(plan: LogicalPlan): Boolean = { + override protected def isPlanIntegral(plan: LogicalPlan): Boolean = { Utils.isTesting && plan.resolved } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 9fd035431f53f..7e4b784033bfc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -64,12 +64,12 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { protected def batches: Seq[Batch] /** - * Defines a check function which checks for structural integrity of the plan after the execution + * Defines a check function that checks for structural integrity of the plan after the execution * of each rule. For example, we can check whether a plan is still resolved after each rule in - * `Optimizer`, so we can catch rules that return invalid plans. The check function will returns + * `Optimizer`, so we can catch rules that return invalid plans. The check function returns * `false` if the given plan doesn't pass the structural integrity check. */ - protected def planChecker(plan: TreeType): Boolean = true + protected def isPlanIntegral(plan: TreeType): Boolean = true /** * Executes the batches of rules defined by the subclass. The batches are executed serially @@ -102,7 +102,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { } // Run the structural integrity checker against the plan after each rule. - if (!planChecker(result)) { + if (!isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." throw new TreeNodeException(result, message, null) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSICheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala similarity index 97% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSICheckerSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index 23d6c153b0cb0..6e183d81b7265 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSICheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.internal.SQLConf -class OptimizerSICheckerkSuite extends PlanTest { +class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { object OptimizeRuleBreakSI extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index 5af5b4bd60458..a67f54b263cc9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -59,7 +59,7 @@ class RuleExecutorSuite extends SparkFunSuite { test("structural integrity checker") { object WithSIChecker extends RuleExecutor[Expression] { - override protected def planChecker(expr: Expression): Boolean = expr match { + override protected def isPlanIntegral(expr: Expression): Boolean = expr match { case IntegerLiteral(_) => true case _ => false }