From 05274e7ad4c74e6241b5a05a9365c475f0c3c0a3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 13 Aug 2017 06:06:10 +0000 Subject: [PATCH 01/24] Decouple consume functions of physical operators in whole-stage codegen. --- .../sql/execution/WholeStageCodegenExec.scala | 62 ++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 34134db278ad8..7f6c235afd7e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -149,13 +149,73 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + + // Under certain conditions, we can put the logic to consume the rows of this operator into + // another function. So we can prevent a generated function too long to be optimized by JIT. + val consumeFunc = + if (row == null && outputVars.nonEmpty && parent.usedInputs.size == inputVars.size) { + constructDoConsumeFunction(ctx, inputVars) + } else { + parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc """.stripMargin } + /** + * To prevent concatenated function growing too long to be optimized by JIT. We separate the + * consume function of each `CodegenSupport` operator into a function to call. + */ + protected def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode]): String = { + val (callingParams, arguList, inputVarsInFunc) = + constructConsumeParameters(ctx, output, inputVars) + val rowVar = ExprCode("", "false", "unsafeRow") + val doConsume = ctx.freshName("doConsume") + val doConsumeFuncName = ctx.addNewFunction(doConsume, + s""" + | private void $doConsume($arguList) throws java.io.IOException { + | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} + | } + """.stripMargin) + + if (isShouldStopRequired) { + // Because the processing logic is enclosed in a function, `shouldStop` call in the function + // don't be affect outside loop, we need to check it and stop the loop. + s""" + | $doConsumeFuncName($callingParams); + | if (shouldStop()) return; + """.stripMargin + } else { + s"$doConsumeFuncName($callingParams);" + } + } + + /** + * Returns source code for calling consume function and the argument list of the consume function + * and also the `ExprCode` for the argument list. + */ + protected def constructConsumeParameters( + ctx: CodegenContext, + attributes: Seq[Attribute], + variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = { + val params = variables.zipWithIndex.map { case (ev, i) => + val callingParam = ev.value + ", " + ev.isNull + val arguName = ctx.freshName(s"expr_$i") + val arguIsNull = ctx.freshName(s"exprIsNull_$i") + (callingParam, + ctx.javaType(attributes(i).dataType) + " " + arguName + ", boolean " + arguIsNull, + ExprCode("", arguIsNull, arguName)) + }.unzip3 + (params._1.mkString(", "), + params._2.mkString(", "), + params._3) + } + /** * Returns source code to evaluate all the variables, and clear the code of them, to prevent * them to be evaluated twice. From e0e7a6ecc957b4659db9b0367ef32d09537b32fd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 13 Aug 2017 07:43:17 +0000 Subject: [PATCH 02/24] shouldStop is called outside consume(). --- .../spark/sql/execution/WholeStageCodegenExec.scala | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 7f6c235afd7e2..d241f8e8c9d77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -183,16 +183,7 @@ trait CodegenSupport extends SparkPlan { | } """.stripMargin) - if (isShouldStopRequired) { - // Because the processing logic is enclosed in a function, `shouldStop` call in the function - // don't be affect outside loop, we need to check it and stop the loop. - s""" - | $doConsumeFuncName($callingParams); - | if (shouldStop()) return; - """.stripMargin - } else { - s"$doConsumeFuncName($callingParams);" - } + s"$doConsumeFuncName($callingParams);" } /** From 413707dd0c31a15514f00aea9addca77fe1dd2ce Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 13 Aug 2017 10:52:28 +0000 Subject: [PATCH 03/24] Fix the condition and the case of using continue in consume. --- .../sql/execution/WholeStageCodegenExec.scala | 42 +++++++++++++++++-- .../execution/basicPhysicalOperators.scala | 4 +- .../joins/BroadcastHashJoinExec.scala | 16 +++---- 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index d241f8e8c9d77..c7c79a8e0fc9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -152,8 +152,10 @@ trait CodegenSupport extends SparkPlan { // Under certain conditions, we can put the logic to consume the rows of this operator into // another function. So we can prevent a generated function too long to be optimized by JIT. + val requireAllOutput = output.forall(parent.usedInputs.contains(_)) val consumeFunc = - if (row == null && outputVars.nonEmpty && parent.usedInputs.size == inputVars.size) { + if (row == null && outputVars.nonEmpty && requireAllOutput) { + parentConsumeInSeparateFunc = true constructDoConsumeFunction(ctx, inputVars) } else { parent.doConsume(ctx, inputVars, rowVar) @@ -165,6 +167,34 @@ trait CodegenSupport extends SparkPlan { """.stripMargin } + /** + * To prevent concatenated function growing too long to be optimized by JIT. We decide to separate + * the consume function of each `CodegenSupport` operator into a function to call in runtime. When + * it is happened, we set this variable to `true`. + */ + private var parentConsumeInSeparateFunc: Boolean = false + + /** + * Returning true means we have at least one consume logic from child operator or this operator is + * separated in a function. If this is `true`, parent operator shouldn't use `continue` statement, + * because its generated codes aren't enclosed in main while-loop. + */ + protected def isConsumeInSeparateFunc: Boolean = { + val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) + codegenChildren.exists(_.parentConsumeInSeparateFunc) || + codegenChildren.exists(_.isConsumeInSeparateFunc) + } + + protected def effectiveContinueStatement: String = if (isConsumeInSeparateFunc) { + // When the separated consume logic in parent operators needs to do continue for outer loop, + // we consider if this plan's consume or any child's consume logic is separated in functions. + // If yes, we can't simply do continue. Instead, we return `true`. + "return true;"; + } else { + // In the end of this separated consume function chain, we can do continue as usual. + "continue;" + } + /** * To prevent concatenated function growing too long to be optimized by JIT. We separate the * consume function of each `CodegenSupport` operator into a function to call. @@ -178,12 +208,16 @@ trait CodegenSupport extends SparkPlan { val doConsume = ctx.freshName("doConsume") val doConsumeFuncName = ctx.addNewFunction(doConsume, s""" - | private void $doConsume($arguList) throws java.io.IOException { + | private boolean $doConsume($arguList) throws java.io.IOException { | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} + | return false; | } """.stripMargin) - s"$doConsumeFuncName($callingParams);" + s""" + | boolean continueForLoop = $doConsumeFuncName($callingParams); + | if (continueForLoop) $effectiveContinueStatement; + """.stripMargin } /** @@ -303,6 +337,8 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp child.execute() :: Nil } + override protected def isConsumeInSeparateFunc: Boolean = false + override def doProduce(ctx: CodegenContext): String = { val input = ctx.freshName("input") // Right now, InputAdapter is only used when there is one input RDD. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 2151c339b9b87..9e21cffafcf97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -149,7 +149,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) s""" |$evaluated |${ev.code} - |if (${nullCheck}!${ev.value}) continue; + |if (${nullCheck}!${ev.value}) $effectiveContinueStatement """.stripMargin } @@ -317,7 +317,7 @@ case class SampleExec( """.stripMargin.trim) s""" - | if ($sampler.sample() == 0) continue; + | if ($sampler.sample() == 0) $effectiveContinueStatement | $numOutput.add(1); | ${consume(ctx, input)} """.stripMargin.trim diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index bfa1e9d49a545..fef025b05b9d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -206,10 +206,10 @@ case class BroadcastHashJoinExec( s""" |$eval |${ev.code} - |if ($skipRow) continue; + |if ($skipRow) $effectiveContinueStatement """.stripMargin } else if (anti) { - "continue;" + effectiveContinueStatement } else { "" } @@ -235,7 +235,7 @@ case class BroadcastHashJoinExec( |${keyEv.code} |// find matches from HashedRelation |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value}); - |if ($matched == null) continue; + |if ($matched == null) $effectiveContinueStatement |$checkCondition |$numOutput.add(1); |${consume(ctx, resultVars)} @@ -250,7 +250,7 @@ case class BroadcastHashJoinExec( |${keyEv.code} |// find matches from HashRelation |$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value}); - |if ($matches == null) continue; + |if ($matches == null) $effectiveContinueStatement |while ($matches.hasNext()) { | UnsafeRow $matched = (UnsafeRow) $matches.next(); | $checkCondition @@ -351,7 +351,7 @@ case class BroadcastHashJoinExec( |${keyEv.code} |// find matches from HashedRelation |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value}); - |if ($matched == null) continue; + |if ($matched == null) $effectiveContinueStatement |$checkCondition |$numOutput.add(1); |${consume(ctx, input)} @@ -365,14 +365,14 @@ case class BroadcastHashJoinExec( |${keyEv.code} |// find matches from HashRelation |$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value}); - |if ($matches == null) continue; + |if ($matches == null) $effectiveContinueStatement |boolean $found = false; |while (!$found && $matches.hasNext()) { | UnsafeRow $matched = (UnsafeRow) $matches.next(); | $checkCondition | $found = true; |} - |if (!$found) continue; + |if (!$found) $effectiveContinueStatement |$numOutput.add(1); |${consume(ctx, input)} """.stripMargin @@ -424,7 +424,7 @@ case class BroadcastHashJoinExec( | $checkCondition | $found = true; | } - | if ($found) continue; + | if ($found) $effectiveContinueStatement | } |} |$numOutput.add(1); From 0bb8c0ec70243e75f5593ca83788e830e9e4bc25 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 13 Aug 2017 10:57:45 +0000 Subject: [PATCH 04/24] More comment. --- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index c7c79a8e0fc9e..b231dc6ac7536 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -152,6 +152,12 @@ trait CodegenSupport extends SparkPlan { // Under certain conditions, we can put the logic to consume the rows of this operator into // another function. So we can prevent a generated function too long to be optimized by JIT. + // The conditions: + // 1. The parent uses all variables in output. we can't defer variable evaluation when consume + // in another function. + // 2. The output variables are not empty. If it's empty, we don't bother to do that. + // 3. We don't use row variable. The construction of row uses deferred variable evaluation. We + // can't do it. val requireAllOutput = output.forall(parent.usedInputs.contains(_)) val consumeFunc = if (row == null && outputVars.nonEmpty && requireAllOutput) { From 6d600d5eb4a275eb6bc72ccf353d2d1ded03635f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 13 Aug 2017 14:17:01 +0000 Subject: [PATCH 05/24] Fix aggregation. --- .../org/apache/spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../spark/sql/execution/aggregate/HashAggregateExec.scala | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index b231dc6ac7536..e4dd71ad434bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -222,7 +222,7 @@ trait CodegenSupport extends SparkPlan { s""" | boolean continueForLoop = $doConsumeFuncName($callingParams); - | if (continueForLoop) $effectiveContinueStatement; + | if (continueForLoop) $effectiveContinueStatement """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 56f61c30c4a38..78b67b4a993e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -163,6 +163,8 @@ case class HashAggregateExec( // The variables used as aggregation buffer. Only used for aggregation without keys. private var bufVars: Seq[ExprCode] = _ + override protected def effectiveContinueStatement: String = "continue;" + private def doProduceWithoutKeys(ctx: CodegenContext): String = { val initAgg = ctx.freshName("initAgg") ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") From 502139aca30db03d2ef52dc9e140b83668467122 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 13 Aug 2017 14:22:49 +0000 Subject: [PATCH 06/24] Also deal with sort case. --- .../main/scala/org/apache/spark/sql/execution/SortExec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index ff71fd4dc7bb7..65c9258d7f3f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -177,6 +177,8 @@ case class SortExec( """.stripMargin.trim } + override protected def effectiveContinueStatement: String = "continue;" + protected override val shouldStopRequired = false override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { From 5fe3762a3dcf1893b3bfffb17832fdd5c3d1e364 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Aug 2017 02:42:35 +0000 Subject: [PATCH 07/24] Fix broadcasthash join. --- .../joins/BroadcastHashJoinExec.scala | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index fef025b05b9d3..fe062432a7716 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -187,9 +187,15 @@ case class BroadcastHashJoinExec( private def getJoinCondition( ctx: CodegenContext, input: Seq[ExprCode], + uniqueKeyCodePath: Boolean, anti: Boolean = false): (String, String, Seq[ExprCode]) = { val matched = ctx.freshName("matched") val buildVars = genBuildSideVars(ctx, matched) + val continueStatement = if (uniqueKeyCodePath) { + effectiveContinueStatement + } else { + "continue;" + } val checkCondition = if (condition.isDefined) { val expr = condition.get // evaluate the variables from build side that used by condition @@ -206,10 +212,10 @@ case class BroadcastHashJoinExec( s""" |$eval |${ev.code} - |if ($skipRow) $effectiveContinueStatement + |if ($skipRow) $continueStatement """.stripMargin - } else if (anti) { - effectiveContinueStatement + } else if (anti && uniqueKeyCodePath) { + continueStatement } else { "" } @@ -221,15 +227,16 @@ case class BroadcastHashJoinExec( */ private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = { val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) + val uniqueKeyCodePath = broadcastRelation.value.keyIsUnique val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) - val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input) + val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input, uniqueKeyCodePath) val numOutput = metricTerm(ctx, "numOutputRows") val resultVars = buildSide match { case BuildLeft => buildVars ++ input case BuildRight => input ++ buildVars } - if (broadcastRelation.value.keyIsUnique) { + if (uniqueKeyCodePath) { s""" |// generate join key for stream side |${keyEv.code} @@ -342,10 +349,11 @@ case class BroadcastHashJoinExec( */ private def codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String = { val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) + val uniqueKeyCodePath = broadcastRelation.value.keyIsUnique val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) - val (matched, checkCondition, _) = getJoinCondition(ctx, input) + val (matched, checkCondition, _) = getJoinCondition(ctx, input, uniqueKeyCodePath) val numOutput = metricTerm(ctx, "numOutputRows") - if (broadcastRelation.value.keyIsUnique) { + if (uniqueKeyCodePath) { s""" |// generate join key for stream side |${keyEv.code} @@ -386,7 +394,7 @@ case class BroadcastHashJoinExec( val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) val uniqueKeyCodePath = broadcastRelation.value.keyIsUnique val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) - val (matched, checkCondition, _) = getJoinCondition(ctx, input, uniqueKeyCodePath) + val (matched, checkCondition, _) = getJoinCondition(ctx, input, uniqueKeyCodePath, true) val numOutput = metricTerm(ctx, "numOutputRows") if (uniqueKeyCodePath) { From 4bef5677b7338818bd9c44389fd183a8bd775610 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Aug 2017 03:09:31 +0000 Subject: [PATCH 08/24] Add more comments. --- .../sql/execution/WholeStageCodegenExec.scala | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index e4dd71ad434bf..e394a5261ddfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -184,6 +184,21 @@ trait CodegenSupport extends SparkPlan { * Returning true means we have at least one consume logic from child operator or this operator is * separated in a function. If this is `true`, parent operator shouldn't use `continue` statement, * because its generated codes aren't enclosed in main while-loop. + * + * For example, we have generated codes for a query plan like: + * Op1Exec + * Op2Exec + * Op3Exec + * + * If we put the consume code of Op2Exec into a separated function, the generated codes are like: + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * } + * For now, `isConsumeInSeparateFunc` of Op2Exec will be `true`. */ protected def isConsumeInSeparateFunc: Boolean = { val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) @@ -191,6 +206,32 @@ trait CodegenSupport extends SparkPlan { codegenChildren.exists(_.isConsumeInSeparateFunc) } + /** + * Returning true means we have at least one consume logic from child operator or this operator is + * separated in a function. If this is `true`, parent operator shouldn't use `continue` statement, + * because its generated codes aren't enclosed in main while-loop. + * + * We use the same example in `isConsumeInSeparateFunc`'s comment: + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * continue; // Wrong. We can't use continue with the while-loop. + * } + * + * Instead, we do something like: + * while (...) { + * ... // logic of Op3Exec. + * boolean continueForLoop = Op2Exec_doConsume(...); + * if (continueForLoop) continue; + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * return true; // When we need to do continue, we return true. + * } + */ protected def effectiveContinueStatement: String = if (isConsumeInSeparateFunc) { // When the separated consume logic in parent operators needs to do continue for outer loop, // we consider if this plan's consume or any child's consume logic is separated in functions. From 1694c9b3429b0c1dba0616a0af7b04d1e34bb702 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Aug 2017 06:35:27 +0000 Subject: [PATCH 09/24] Fix the cases where operators set up its produce framework. --- .../main/scala/org/apache/spark/sql/execution/SortExec.scala | 2 +- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 4 ++++ .../spark/sql/execution/aggregate/HashAggregateExec.scala | 2 +- .../apache/spark/sql/execution/joins/SortMergeJoinExec.scala | 2 ++ 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 65c9258d7f3f7..42c3c832244b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -177,7 +177,7 @@ case class SortExec( """.stripMargin.trim } - override protected def effectiveContinueStatement: String = "continue;" + override protected def isConsumeInSeparateFunc: Boolean = false protected override val shouldStopRequired = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index e394a5261ddfe..3349d05845098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -199,6 +199,10 @@ trait CodegenSupport extends SparkPlan { * ... // logic of Op2Exec to consume rows. * } * For now, `isConsumeInSeparateFunc` of Op2Exec will be `true`. + * + * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions + * but begins with its produce framework. We should override `isConsumeInSeparateFunc` to return + * `false`. */ protected def isConsumeInSeparateFunc: Boolean = { val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 78b67b4a993e4..3cf9c353288ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -163,7 +163,7 @@ case class HashAggregateExec( // The variables used as aggregation buffer. Only used for aggregation without keys. private var bufVars: Seq[ExprCode] = _ - override protected def effectiveContinueStatement: String = "continue;" + override protected def isConsumeInSeparateFunc: Boolean = false private def doProduceWithoutKeys(ctx: CodegenContext): String = { val initAgg = ctx.freshName("initAgg") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index f41fa14213df5..58e190e74457f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -547,6 +547,8 @@ case class SortMergeJoinExec( } } + override protected def isConsumeInSeparateFunc: Boolean = false + override def doProduce(ctx: CodegenContext): String = { ctx.copyResult = true val leftInput = ctx.freshName("leftInput") From 8f3b9841ab1a8885f774263b8886513cc174f019 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Aug 2017 23:43:17 +0000 Subject: [PATCH 10/24] Fix Expand. --- .../main/scala/org/apache/spark/sql/execution/ExpandExec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index d5603b3b00914..a4218b8fffe71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -89,6 +89,8 @@ case class ExpandExec( child.asInstanceOf[CodegenSupport].inputRDDs() } + override protected def isConsumeInSeparateFunc: Boolean = false + protected override def doProduce(ctx: CodegenContext): String = { child.asInstanceOf[CodegenSupport].produce(ctx, this) } From c04da158a4221104717944f72be2c67ae23fdb63 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 15 Aug 2017 03:46:08 +0000 Subject: [PATCH 11/24] Fix BroadcastHashJoin. --- .../spark/sql/execution/joins/BroadcastHashJoinExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index fe062432a7716..9b3810ed086c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -204,7 +204,7 @@ case class BroadcastHashJoinExec( ctx.currentVars = input ++ buildVars val ev = BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).genCode(ctx) - val skipRow = if (!anti) { + val skipRow = if (!(anti && uniqueKeyCodePath)) { s"${ev.isNull} || !${ev.value}" } else { s"!${ev.isNull} && ${ev.value}" From 954019588b547046b844f7bfcb4a1326a71b23cd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Aug 2017 13:28:19 +0000 Subject: [PATCH 12/24] Rename variables. --- .../spark/sql/execution/ExpandExec.scala | 2 +- .../apache/spark/sql/execution/SortExec.scala | 2 +- .../sql/execution/WholeStageCodegenExec.scala | 44 ++++++++----------- .../aggregate/HashAggregateExec.scala | 2 +- .../execution/basicPhysicalOperators.scala | 4 +- .../joins/BroadcastHashJoinExec.scala | 14 +++--- .../execution/joins/SortMergeJoinExec.scala | 2 +- 7 files changed, 32 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index a4218b8fffe71..a3cd5f8c19d66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -89,7 +89,7 @@ case class ExpandExec( child.asInstanceOf[CodegenSupport].inputRDDs() } - override protected def isConsumeInSeparateFunc: Boolean = false + override protected def doConsumeInChainOfFunc: Boolean = false protected override def doProduce(ctx: CodegenContext): String = { child.asInstanceOf[CodegenSupport].produce(ctx, this) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 42c3c832244b5..acdbfccd0fd5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -177,7 +177,7 @@ case class SortExec( """.stripMargin.trim } - override protected def isConsumeInSeparateFunc: Boolean = false + override protected def doConsumeInChainOfFunc: Boolean = false protected override val shouldStopRequired = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 3349d05845098..c0237f702c6c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -161,7 +161,6 @@ trait CodegenSupport extends SparkPlan { val requireAllOutput = output.forall(parent.usedInputs.contains(_)) val consumeFunc = if (row == null && outputVars.nonEmpty && requireAllOutput) { - parentConsumeInSeparateFunc = true constructDoConsumeFunction(ctx, inputVars) } else { parent.doConsume(ctx, inputVars, rowVar) @@ -174,16 +173,16 @@ trait CodegenSupport extends SparkPlan { } /** - * To prevent concatenated function growing too long to be optimized by JIT. We decide to separate - * the consume function of each `CodegenSupport` operator into a function to call in runtime. When - * it is happened, we set this variable to `true`. + * To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining, + * we may put the consume logic of parent operator into a function and set this flag to `true`. + * The parent operator can know if its consume logic is inlined or in separated function. */ - private var parentConsumeInSeparateFunc: Boolean = false + private var doConsumeInFunc: Boolean = false /** * Returning true means we have at least one consume logic from child operator or this operator is - * separated in a function. If this is `true`, parent operator shouldn't use `continue` statement, - * because its generated codes aren't enclosed in main while-loop. + * separated in a function. If this is `true`, this operator shouldn't use `continue` statement to + * continue on next row, because its generated codes aren't enclosed in main while-loop. * * For example, we have generated codes for a query plan like: * Op1Exec @@ -198,24 +197,21 @@ trait CodegenSupport extends SparkPlan { * private boolean Op2Exec_doConsume(...) { * ... // logic of Op2Exec to consume rows. * } - * For now, `isConsumeInSeparateFunc` of Op2Exec will be `true`. + * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`. * * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions - * but begins with its produce framework. We should override `isConsumeInSeparateFunc` to return + * but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return * `false`. */ - protected def isConsumeInSeparateFunc: Boolean = { + protected def doConsumeInChainOfFunc: Boolean = { val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) - codegenChildren.exists(_.parentConsumeInSeparateFunc) || - codegenChildren.exists(_.isConsumeInSeparateFunc) + doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc) } /** - * Returning true means we have at least one consume logic from child operator or this operator is - * separated in a function. If this is `true`, parent operator shouldn't use `continue` statement, - * because its generated codes aren't enclosed in main while-loop. + * The actual java statement this operator should use if there is a need to continue on next row + * in its `doConsume` codes. * - * We use the same example in `isConsumeInSeparateFunc`'s comment: * while (...) { * ... // logic of Op3Exec. * Op2Exec_doConsume(...); @@ -224,6 +220,7 @@ trait CodegenSupport extends SparkPlan { * ... // logic of Op2Exec to consume rows. * continue; // Wrong. We can't use continue with the while-loop. * } + * In above code, we can't use `continue` in `Op2Exec_doConsume`. * * Instead, we do something like: * while (...) { @@ -236,25 +233,22 @@ trait CodegenSupport extends SparkPlan { * return true; // When we need to do continue, we return true. * } */ - protected def effectiveContinueStatement: String = if (isConsumeInSeparateFunc) { - // When the separated consume logic in parent operators needs to do continue for outer loop, - // we consider if this plan's consume or any child's consume logic is separated in functions. - // If yes, we can't simply do continue. Instead, we return `true`. + protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) { "return true;"; } else { - // In the end of this separated consume function chain, we can do continue as usual. "continue;" } /** - * To prevent concatenated function growing too long to be optimized by JIT. We separate the - * consume function of each `CodegenSupport` operator into a function to call. + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. */ protected def constructDoConsumeFunction( ctx: CodegenContext, inputVars: Seq[ExprCode]): String = { val (callingParams, arguList, inputVarsInFunc) = constructConsumeParameters(ctx, output, inputVars) + parent.doConsumeInFunc = true val rowVar = ExprCode("", "false", "unsafeRow") val doConsume = ctx.freshName("doConsume") val doConsumeFuncName = ctx.addNewFunction(doConsume, @@ -267,7 +261,7 @@ trait CodegenSupport extends SparkPlan { s""" | boolean continueForLoop = $doConsumeFuncName($callingParams); - | if (continueForLoop) $effectiveContinueStatement + | if (continueForLoop) $continueStatementInDoConsume """.stripMargin } @@ -388,7 +382,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp child.execute() :: Nil } - override protected def isConsumeInSeparateFunc: Boolean = false + override protected def doConsumeInChainOfFunc: Boolean = false override def doProduce(ctx: CodegenContext): String = { val input = ctx.freshName("input") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 3cf9c353288ce..6e6d4be5d5af4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -163,7 +163,7 @@ case class HashAggregateExec( // The variables used as aggregation buffer. Only used for aggregation without keys. private var bufVars: Seq[ExprCode] = _ - override protected def isConsumeInSeparateFunc: Boolean = false + override protected def doConsumeInChainOfFunc: Boolean = false private def doProduceWithoutKeys(ctx: CodegenContext): String = { val initAgg = ctx.freshName("initAgg") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 9e21cffafcf97..951008d6e5e22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -149,7 +149,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) s""" |$evaluated |${ev.code} - |if (${nullCheck}!${ev.value}) $effectiveContinueStatement + |if (${nullCheck}!${ev.value}) $continueStatementInDoConsume """.stripMargin } @@ -317,7 +317,7 @@ case class SampleExec( """.stripMargin.trim) s""" - | if ($sampler.sample() == 0) $effectiveContinueStatement + | if ($sampler.sample() == 0) $continueStatementInDoConsume | $numOutput.add(1); | ${consume(ctx, input)} """.stripMargin.trim diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index 9b3810ed086c6..49c2ed5be3724 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -192,7 +192,7 @@ case class BroadcastHashJoinExec( val matched = ctx.freshName("matched") val buildVars = genBuildSideVars(ctx, matched) val continueStatement = if (uniqueKeyCodePath) { - effectiveContinueStatement + continueStatementInDoConsume } else { "continue;" } @@ -242,7 +242,7 @@ case class BroadcastHashJoinExec( |${keyEv.code} |// find matches from HashedRelation |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value}); - |if ($matched == null) $effectiveContinueStatement + |if ($matched == null) $continueStatementInDoConsume |$checkCondition |$numOutput.add(1); |${consume(ctx, resultVars)} @@ -257,7 +257,7 @@ case class BroadcastHashJoinExec( |${keyEv.code} |// find matches from HashRelation |$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value}); - |if ($matches == null) $effectiveContinueStatement + |if ($matches == null) $continueStatementInDoConsume |while ($matches.hasNext()) { | UnsafeRow $matched = (UnsafeRow) $matches.next(); | $checkCondition @@ -359,7 +359,7 @@ case class BroadcastHashJoinExec( |${keyEv.code} |// find matches from HashedRelation |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value}); - |if ($matched == null) $effectiveContinueStatement + |if ($matched == null) $continueStatementInDoConsume |$checkCondition |$numOutput.add(1); |${consume(ctx, input)} @@ -373,14 +373,14 @@ case class BroadcastHashJoinExec( |${keyEv.code} |// find matches from HashRelation |$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value}); - |if ($matches == null) $effectiveContinueStatement + |if ($matches == null) $continueStatementInDoConsume |boolean $found = false; |while (!$found && $matches.hasNext()) { | UnsafeRow $matched = (UnsafeRow) $matches.next(); | $checkCondition | $found = true; |} - |if (!$found) $effectiveContinueStatement + |if (!$found) $continueStatementInDoConsume |$numOutput.add(1); |${consume(ctx, input)} """.stripMargin @@ -432,7 +432,7 @@ case class BroadcastHashJoinExec( | $checkCondition | $found = true; | } - | if ($found) $effectiveContinueStatement + | if ($found) $continueStatementInDoConsume | } |} |$numOutput.add(1); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 58e190e74457f..ac90e73eab3aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -547,7 +547,7 @@ case class SortMergeJoinExec( } } - override protected def isConsumeInSeparateFunc: Boolean = false + override protected def doConsumeInChainOfFunc: Boolean = false override def doProduce(ctx: CodegenContext): String = { ctx.copyResult = true From 1101b2c085d78e6416401c78221e32fef018851b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 Sep 2017 05:03:50 +0000 Subject: [PATCH 13/24] Don't create consume function if the number of arguments are more than maximum number. --- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index c0237f702c6c0..82e5416fe426b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -158,9 +158,11 @@ trait CodegenSupport extends SparkPlan { // 2. The output variables are not empty. If it's empty, we don't bother to do that. // 3. We don't use row variable. The construction of row uses deferred variable evaluation. We // can't do it. + // 4. The number of output variables must less than maximum number of parameters in Java method + // declaration. val requireAllOutput = output.forall(parent.usedInputs.contains(_)) val consumeFunc = - if (row == null && outputVars.nonEmpty && requireAllOutput) { + if (row == null && outputVars.nonEmpty && requireAllOutput && outputVars.length < 255) { constructDoConsumeFunction(ctx, inputVars) } else { parent.doConsume(ctx, inputVars, rowVar) From e36ec3c513c7da8500a703d7afa4860caa135e54 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 26 Sep 2017 00:26:30 +0000 Subject: [PATCH 14/24] Remove the part of "continue" processing. --- .../spark/sql/execution/ExpandExec.scala | 2 - .../apache/spark/sql/execution/SortExec.scala | 2 - .../sql/execution/WholeStageCodegenExec.scala | 82 ++----------------- .../aggregate/HashAggregateExec.scala | 2 - .../execution/joins/SortMergeJoinExec.scala | 2 - 5 files changed, 5 insertions(+), 85 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index a3cd5f8c19d66..d5603b3b00914 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -89,8 +89,6 @@ case class ExpandExec( child.asInstanceOf[CodegenSupport].inputRDDs() } - override protected def doConsumeInChainOfFunc: Boolean = false - protected override def doProduce(ctx: CodegenContext): String = { child.asInstanceOf[CodegenSupport].produce(ctx, this) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index acdbfccd0fd5e..ff71fd4dc7bb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -177,8 +177,6 @@ case class SortExec( """.stripMargin.trim } - override protected def doConsumeInChainOfFunc: Boolean = false - protected override val shouldStopRequired = false override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 9b63b03c06257..f51b249e6049e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -174,96 +174,26 @@ trait CodegenSupport extends SparkPlan { """.stripMargin } - /** - * To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining, - * we may put the consume logic of parent operator into a function and set this flag to `true`. - * The parent operator can know if its consume logic is inlined or in separated function. - */ - private var doConsumeInFunc: Boolean = false - - /** - * Returning true means we have at least one consume logic from child operator or this operator is - * separated in a function. If this is `true`, this operator shouldn't use `continue` statement to - * continue on next row, because its generated codes aren't enclosed in main while-loop. - * - * For example, we have generated codes for a query plan like: - * Op1Exec - * Op2Exec - * Op3Exec - * - * If we put the consume code of Op2Exec into a separated function, the generated codes are like: - * while (...) { - * ... // logic of Op3Exec. - * Op2Exec_doConsume(...); - * } - * private boolean Op2Exec_doConsume(...) { - * ... // logic of Op2Exec to consume rows. - * } - * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`. - * - * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions - * but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return - * `false`. - */ - protected def doConsumeInChainOfFunc: Boolean = { - val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) - doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc) - } - - /** - * The actual java statement this operator should use if there is a need to continue on next row - * in its `doConsume` codes. - * - * while (...) { - * ... // logic of Op3Exec. - * Op2Exec_doConsume(...); - * } - * private boolean Op2Exec_doConsume(...) { - * ... // logic of Op2Exec to consume rows. - * continue; // Wrong. We can't use continue with the while-loop. - * } - * In above code, we can't use `continue` in `Op2Exec_doConsume`. - * - * Instead, we do something like: - * while (...) { - * ... // logic of Op3Exec. - * boolean continueForLoop = Op2Exec_doConsume(...); - * if (continueForLoop) continue; - * } - * private boolean Op2Exec_doConsume(...) { - * ... // logic of Op2Exec to consume rows. - * return true; // When we need to do continue, we return true. - * } - */ - protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) { - "return true;"; - } else { - "continue;" - } - /** * To prevent concatenated function growing too long to be optimized by JIT. We can separate the * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. */ - protected def constructDoConsumeFunction( + private def constructDoConsumeFunction( ctx: CodegenContext, inputVars: Seq[ExprCode]): String = { val (callingParams, arguList, inputVarsInFunc) = constructConsumeParameters(ctx, output, inputVars) - parent.doConsumeInFunc = true val rowVar = ExprCode("", "false", "unsafeRow") val doConsume = ctx.freshName("doConsume") val doConsumeFuncName = ctx.addNewFunction(doConsume, s""" - | private boolean $doConsume($arguList) throws java.io.IOException { + | private void $doConsume($arguList) throws java.io.IOException { | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} - | return false; | } """.stripMargin) s""" - | boolean continueForLoop = $doConsumeFuncName($callingParams); - | if (continueForLoop) $continueStatementInDoConsume + | $doConsumeFuncName($callingParams); """.stripMargin } @@ -271,7 +201,7 @@ trait CodegenSupport extends SparkPlan { * Returns source code for calling consume function and the argument list of the consume function * and also the `ExprCode` for the argument list. */ - protected def constructConsumeParameters( + private def constructConsumeParameters( ctx: CodegenContext, attributes: Seq[Attribute], variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = { @@ -280,7 +210,7 @@ trait CodegenSupport extends SparkPlan { val arguName = ctx.freshName(s"expr_$i") val arguIsNull = ctx.freshName(s"exprIsNull_$i") (callingParam, - ctx.javaType(attributes(i).dataType) + " " + arguName + ", boolean " + arguIsNull, + s"${ctx.javaType(attributes(i).dataType)} $arguName, boolean $arguIsNull", ExprCode("", arguIsNull, arguName)) }.unzip3 (params._1.mkString(", "), @@ -387,8 +317,6 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp child.execute() :: Nil } - override protected def doConsumeInChainOfFunc: Boolean = false - override def doProduce(ctx: CodegenContext): String = { val input = ctx.freshName("input") // Right now, InputAdapter is only used when there is one input RDD. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index a9342e83ce07c..abdf9530c6c7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -164,8 +164,6 @@ case class HashAggregateExec( // The variables used as aggregation buffer. Only used for aggregation without keys. private var bufVars: Seq[ExprCode] = _ - override protected def doConsumeInChainOfFunc: Boolean = false - private def doProduceWithoutKeys(ctx: CodegenContext): String = { val initAgg = ctx.freshName("initAgg") ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index b72e28f870b49..14de2dc23e3c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -569,8 +569,6 @@ case class SortMergeJoinExec( } } - override protected def doConsumeInChainOfFunc: Boolean = false - override def doProduce(ctx: CodegenContext): String = { ctx.copyResult = true val leftInput = ctx.freshName("leftInput") From 601c2251c397b30f2ea9a42f6a23e3636129d5bc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 7 Oct 2017 00:45:52 +0000 Subject: [PATCH 15/24] Fix test. --- .../org/apache/spark/sql/execution/WholeStageCodegenSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index aaa77b3ee6201..21418b69a6f3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -181,7 +181,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val codeWithShortFunctions = genGroupByCode(3) val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions) assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) - val codeWithLongFunctions = genGroupByCode(20) + val codeWithLongFunctions = genGroupByCode(50) val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions) assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } From 476994f9bd284a650b148583f27da403adcf21f6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Oct 2017 03:05:53 +0000 Subject: [PATCH 16/24] More accurate calculation of valid method parameter length. --- .../sql/execution/WholeStageCodegenExec.scala | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index e883db4f834ad..e0b8afb3c7058 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -162,7 +162,7 @@ trait CodegenSupport extends SparkPlan { // declaration. val requireAllOutput = output.forall(parent.usedInputs.contains(_)) val consumeFunc = - if (row == null && outputVars.nonEmpty && requireAllOutput && outputVars.length < 255) { + if (row == null && outputVars.nonEmpty && requireAllOutput && isValidParamLength(ctx)) { constructDoConsumeFunction(ctx, inputVars) } else { parent.doConsume(ctx, inputVars, rowVar) @@ -174,6 +174,25 @@ trait CodegenSupport extends SparkPlan { """.stripMargin } + /** + * In Java, a method descriptor is valid only if it represents method parameters with a total + * length of 255 or less. `this` contributes one unit and a parameter of type long or double + * contributes two units. Besides, for nullable parameters, we also need to pass a boolean + * for the null status. + */ + private def isValidParamLength(ctx: CodegenContext): Boolean = { + var paramLength = 1 // for `this` parameter. + output.foreach { attr => + ctx.javaType(attr.dataType) match { + case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => paramLength += 2 + case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => paramLength += 3 + case _ if !attr.nullable => paramLength += 1 + case _ => paramLength += 2 + } + } + paramLength <= 255 + } + /** * To prevent concatenated function growing too long to be optimized by JIT. We can separate the * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. @@ -206,12 +225,19 @@ trait CodegenSupport extends SparkPlan { attributes: Seq[Attribute], variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = { val params = variables.zipWithIndex.map { case (ev, i) => - val callingParam = ev.value + ", " + ev.isNull val arguName = ctx.freshName(s"expr_$i") - val arguIsNull = ctx.freshName(s"exprIsNull_$i") - (callingParam, - s"${ctx.javaType(attributes(i).dataType)} $arguName, boolean $arguIsNull", - ExprCode("", arguIsNull, arguName)) + val arguType = ctx.javaType(attributes(i).dataType) + + val (callingParam, funcParams, arguIsNull) = if (!attributes(i).nullable) { + // When the argument is not nullable, we don't need to pass in `isNull` param for it and + // simply give a `false`. + val arguIsNull = "false" + (ev.value, s"$arguType $arguName", arguIsNull) + } else { + val arguIsNull = ctx.freshName(s"exprIsNull_$i") + (ev.value + ", " + ev.isNull, s"$arguType $arguName, boolean $arguIsNull", arguIsNull) + } + (callingParam, funcParams, ExprCode("", arguIsNull, arguName)) }.unzip3 (params._1.mkString(", "), params._2.mkString(", "), From bdc1146d9b57c64c367cade89e83511a841f0492 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 12 Oct 2017 13:35:23 +0000 Subject: [PATCH 17/24] Address comment. --- .../sql/execution/WholeStageCodegenExec.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index e0b8afb3c7058..bb20d25fd9ce6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -181,16 +181,15 @@ trait CodegenSupport extends SparkPlan { * for the null status. */ private def isValidParamLength(ctx: CodegenContext): Boolean = { - var paramLength = 1 // for `this` parameter. - output.foreach { attr => + // Start value is 1 for `this`. + output.foldLeft(1) { case (curLength, attr) => ctx.javaType(attr.dataType) match { - case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => paramLength += 2 - case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => paramLength += 3 - case _ if !attr.nullable => paramLength += 1 - case _ => paramLength += 2 + case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => curLength + 2 + case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => curLength + 3 + case _ if !attr.nullable => curLength + 1 + case _ => curLength + 2 } - } - paramLength <= 255 + } <= 255 } /** From 58eaf0049e85aa8cbc7ffa875f331f65dba1dfa0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 24 Jan 2018 03:27:33 +0000 Subject: [PATCH 18/24] Address comments. --- .../expressions/codegen/CodeGenerator.scala | 20 +++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 10 ++++++ .../sql/execution/WholeStageCodegenExec.scala | 34 +++++-------------- 3 files changed, 38 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index f9c5ef8439085..11541200451d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -942,6 +942,23 @@ class CodegenContext { "" } } + + /** + * In Java, a method descriptor is valid only if it represents method parameters with a total + * length of 255 or less. `this` contributes one unit and a parameter of type long or double + * contributes two units. Besides, for nullable parameters, we also need to pass a boolean + * for the null status. + */ + def isValidParamLength(params: Seq[Expression]): Boolean = { + def calculateParamLength(input: Expression): Int = { + (if (input.nullable) 1 else 0) + javaType(input.dataType) match { + case JAVA_LONG | JAVA_DOUBLE => 2 + case _ => 1 + } + } + // Initial value is 1 for `this`. + 1 + params.map(calculateParamLength(_)).sum <= CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH + } } /** @@ -1010,6 +1027,9 @@ object CodeGenerator extends Logging { // This is the value of HugeMethodLimit in the OpenJDK JVM settings val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 + // The max valid length of method parameters in JVM. + val MAX_JVM_METHOD_PARAMS_LENGTH = 255 + /** * Compile the Java source code into a Java class, using Janino. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 58323740b80cc..01227dade3fba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -586,6 +586,14 @@ object SQLConf { .intConf .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT) + val DECOUPLE_OPERATOR_CONSUME_FUNCTIONS = buildConf("spark.sql.codegen.decoupleOperatorConsume") + .internal() + .doc("When true, whole stage codegen would put the logic of consuming rows of each physical " + + "operator into individual methods, instead of a single big method. This can be used to " + + "avoid oversized function that can miss the opportunity of JIT optimization.") + .booleanConf + .createWithDefault(true) + val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") .longConf @@ -1061,6 +1069,8 @@ class SQLConf extends Serializable with Logging { def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT) + def decoupleOperatorConsumeFuncs: Boolean = getConf(DECOUPLE_OPERATOR_CONSUME_FUNCTIONS) + def tableRelationCacheSize: Int = getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index bb20d25fd9ce6..bef3eb20aa64d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -153,16 +153,18 @@ trait CodegenSupport extends SparkPlan { // Under certain conditions, we can put the logic to consume the rows of this operator into // another function. So we can prevent a generated function too long to be optimized by JIT. // The conditions: - // 1. The parent uses all variables in output. we can't defer variable evaluation when consume + // 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. + // 2. The parent uses all variables in output. we can't defer variable evaluation when consume // in another function. - // 2. The output variables are not empty. If it's empty, we don't bother to do that. - // 3. We don't use row variable. The construction of row uses deferred variable evaluation. We + // 3. The output variables are not empty. If it's empty, we don't bother to do that. + // 4. We don't use row variable. The construction of row uses deferred variable evaluation. We // can't do it. - // 4. The number of output variables must less than maximum number of parameters in Java method + // 5. The number of output variables must less than maximum number of parameters in Java method // declaration. val requireAllOutput = output.forall(parent.usedInputs.contains(_)) val consumeFunc = - if (row == null && outputVars.nonEmpty && requireAllOutput && isValidParamLength(ctx)) { + if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && + requireAllOutput && ctx.isValidParamLength(output)) { constructDoConsumeFunction(ctx, inputVars) } else { parent.doConsume(ctx, inputVars, rowVar) @@ -174,24 +176,6 @@ trait CodegenSupport extends SparkPlan { """.stripMargin } - /** - * In Java, a method descriptor is valid only if it represents method parameters with a total - * length of 255 or less. `this` contributes one unit and a parameter of type long or double - * contributes two units. Besides, for nullable parameters, we also need to pass a boolean - * for the null status. - */ - private def isValidParamLength(ctx: CodegenContext): Boolean = { - // Start value is 1 for `this`. - output.foldLeft(1) { case (curLength, attr) => - ctx.javaType(attr.dataType) match { - case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => curLength + 2 - case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => curLength + 3 - case _ if !attr.nullable => curLength + 1 - case _ => curLength + 2 - } - } <= 255 - } - /** * To prevent concatenated function growing too long to be optimized by JIT. We can separate the * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. @@ -238,9 +222,7 @@ trait CodegenSupport extends SparkPlan { } (callingParam, funcParams, ExprCode("", arguIsNull, arguName)) }.unzip3 - (params._1.mkString(", "), - params._2.mkString(", "), - params._3) + (params._1.mkString(", "), params._2.mkString(", "), params._3) } /** From 9f0d1da8e6f13e61411755d01e74e86cc399d2e8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 24 Jan 2018 06:38:31 +0000 Subject: [PATCH 19/24] Copy variables used for creating unsaferow. --- .../spark/sql/execution/WholeStageCodegenExec.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 48436b9c09e67..3714b2889b4d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -197,10 +197,16 @@ trait CodegenSupport extends SparkPlan { val colExprs = output.zipWithIndex.map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) } + val varsForRow = inputVarsInFunc.map(_.copy()) + val evaluateInputs = evaluateVariables(varsForRow) ctx.INPUT_ROW = null - ctx.currentVars = inputVarsInFunc + ctx.currentVars = varsForRow val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) - val rowVar = ExprCode(ev.code.trim, "false", ev.value) + val code = s""" + |$evaluateInputs + |${ev.code.trim} + """.stripMargin.trim + val rowVar = ExprCode(code, "false", ev.value) val doConsume = ctx.freshName("doConsume") ctx.currentVars = inputVarsInFunc From 79d010614845a2663353c5e84ff46e2784e3f7b2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 24 Jan 2018 06:44:13 +0000 Subject: [PATCH 20/24] Revert vairables copying. --- .../spark/sql/execution/WholeStageCodegenExec.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 3714b2889b4d3..906350541c097 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -197,16 +197,11 @@ trait CodegenSupport extends SparkPlan { val colExprs = output.zipWithIndex.map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) } - val varsForRow = inputVarsInFunc.map(_.copy()) - val evaluateInputs = evaluateVariables(varsForRow) + // Don't need to copy the variables because they're already evaluated before entering function. ctx.INPUT_ROW = null - ctx.currentVars = varsForRow + ctx.currentVars = inputVarsInFunc val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) - val code = s""" - |$evaluateInputs - |${ev.code.trim} - """.stripMargin.trim - val rowVar = ExprCode(code, "false", ev.value) + val rowVar = ExprCode(ev.code.trim, "false", ev.value) val doConsume = ctx.freshName("doConsume") ctx.currentVars = inputVarsInFunc From 6384aec9adeb9a725a0add9c090686bdb380eb01 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 24 Jan 2018 08:18:13 +0000 Subject: [PATCH 21/24] Add final to constants. --- .../catalyst/expressions/codegen/CodeGenerator.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index d7946f4e69f7d..195ef5621013f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1329,29 +1329,29 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin object CodeGenerator extends Logging { // This is the value of HugeMethodLimit in the OpenJDK JVM settings - val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 + final val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 // The max valid length of method parameters in JVM. - val MAX_JVM_METHOD_PARAMS_LENGTH = 255 + final val MAX_JVM_METHOD_PARAMS_LENGTH = 255 // This is the threshold over which the methods in an inner class are grouped in a single // method which is going to be called by the outer class instead of the many small ones - val MERGE_SPLIT_METHODS_THRESHOLD = 3 + final val MERGE_SPLIT_METHODS_THRESHOLD = 3 // The number of named constants that can exist in the class is limited by the Constant Pool // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a // threshold of 1000k bytes to determine when a function should be inlined to a private, inner // class. - val GENERATED_CLASS_SIZE_THRESHOLD = 1000000 + final val GENERATED_CLASS_SIZE_THRESHOLD = 1000000 // This is the threshold for the number of global variables, whose types are primitive type or // complex type (e.g. more than one-dimensional array), that will be placed at the outer class - val OUTER_CLASS_VARIABLES_THRESHOLD = 10000 + final val OUTER_CLASS_VARIABLES_THRESHOLD = 10000 // This is the maximum number of array elements to keep global variables in one Java array // 32767 is the maximum integer value that does not require a constant pool entry in a Java // bytecode instruction - val MUTABLESTATEARRAY_SIZE_LIMIT = 32768 + final val MUTABLESTATEARRAY_SIZE_LIMIT = 32768 /** * Compile the Java source code into a Java class, using Janino. From 0c4173e5fffaa2dead09f184b301355a40e6118f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 25 Jan 2018 01:22:26 +0000 Subject: [PATCH 22/24] Address comments. --- .../apache/spark/sql/internal/SQLConf.scala | 18 +-- .../sql/execution/WholeStageCodegenExec.scala | 136 +++++++++--------- 2 files changed, 82 insertions(+), 72 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2288741ca2483..470f88c213561 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -661,13 +661,14 @@ object SQLConf { .intConf .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT) - val DECOUPLE_OPERATOR_CONSUME_FUNCTIONS = buildConf("spark.sql.codegen.decoupleOperatorConsume") - .internal() - .doc("When true, whole stage codegen would put the logic of consuming rows of each physical " + - "operator into individual methods, instead of a single big method. This can be used to " + - "avoid oversized function that can miss the opportunity of JIT optimization.") - .booleanConf - .createWithDefault(true) + val WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR = + buildConf("spark.sql.codegen.splitConsumeFuncByOperator") + .internal() + .doc("When true, whole stage codegen would put the logic of consuming rows of each " + + "physical operator into individual methods, instead of a single big method. This can be " + + "used to avoid oversized function that can miss the opportunity of JIT optimization.") + .booleanConf + .createWithDefault(true) val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") @@ -1271,7 +1272,8 @@ class SQLConf extends Serializable with Logging { def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT) - def decoupleOperatorConsumeFuncs: Boolean = getConf(DECOUPLE_OPERATOR_CONSUME_FUNCTIONS) + def wholeStageSplitConsumeFuncByOperator: Boolean = + getConf(WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR) def tableRelationCacheSize: Int = getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 906350541c097..7aa17edc0fe36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution import java.util.Locale +import scala.collection.mutable + import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -106,6 +108,31 @@ trait CodegenSupport extends SparkPlan { */ protected def doProduce(ctx: CodegenContext): String + private def prepareRowVar(ctx: CodegenContext, row: String, colVars: Seq[ExprCode]): ExprCode = { + if (row != null) { + ExprCode("", "false", row) + } else { + if (colVars.nonEmpty) { + val colExprs = output.zipWithIndex.map { case (attr, i) => + BoundReference(i, attr.dataType, attr.nullable) + } + val evaluateInputs = evaluateVariables(colVars) + // generate the code to create a UnsafeRow + ctx.INPUT_ROW = row + ctx.currentVars = colVars + val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) + val code = s""" + |$evaluateInputs + |${ev.code.trim} + """.stripMargin.trim + ExprCode(code, "false", ev.value) + } else { + // There is no columns + ExprCode("", "false", "unsafeRow") + } + } + } + /** * Consume the generated columns or row from current SparkPlan, call its parent's `doConsume()`. * @@ -126,28 +153,7 @@ trait CodegenSupport extends SparkPlan { } } - val rowVar = if (row != null) { - ExprCode("", "false", row) - } else { - if (outputVars.nonEmpty) { - val colExprs = output.zipWithIndex.map { case (attr, i) => - BoundReference(i, attr.dataType, attr.nullable) - } - val evaluateInputs = evaluateVariables(outputVars) - // generate the code to create a UnsafeRow - ctx.INPUT_ROW = row - ctx.currentVars = outputVars - val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) - val code = s""" - |$evaluateInputs - |${ev.code.trim} - """.stripMargin.trim - ExprCode(code, "false", ev.value) - } else { - // There is no columns - ExprCode("", "false", "unsafeRow") - } - } + val rowVar = prepareRowVar(ctx, row, outputVars) // Set up the `currentVars` in the codegen context, as we generate the code of `inputVars` // before calling `parent.doConsume`. We can't set up `INPUT_ROW`, because parent needs to @@ -160,19 +166,16 @@ trait CodegenSupport extends SparkPlan { // Under certain conditions, we can put the logic to consume the rows of this operator into // another function. So we can prevent a generated function too long to be optimized by JIT. // The conditions: - // 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. - // 2. The parent uses all variables in output. we can't defer variable evaluation when consume - // in another function. - // 3. The output variables are not empty. If it's empty, we don't bother to do that. - // 4. We don't use row variable. The construction of row uses deferred variable evaluation. We - // can't do it. - // 5. The number of output variables must less than maximum number of parameters in Java method + // 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is enabled. + // 2. `inputVars` are all materialized. That is guaranteed to be true if the parent plan uses + // all variables in output (see `requireAllOutput`). + // 3. The number of output variables must less than maximum number of parameters in Java method // declaration. val requireAllOutput = output.forall(parent.usedInputs.contains(_)) val consumeFunc = - if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && - requireAllOutput && ctx.isValidParamLength(output)) { - constructDoConsumeFunction(ctx, inputVars) + if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && requireAllOutput && + ctx.isValidParamLength(output)) { + constructDoConsumeFunction(ctx, inputVars, row) } else { parent.doConsume(ctx, inputVars, rowVar) } @@ -189,59 +192,64 @@ trait CodegenSupport extends SparkPlan { */ private def constructDoConsumeFunction( ctx: CodegenContext, - inputVars: Seq[ExprCode]): String = { - val (callingParams, arguList, inputVarsInFunc) = - constructConsumeParameters(ctx, output, inputVars) - - // Set up rowVar because parent plan can possibly consume UnsafeRow instead of variables. - val colExprs = output.zipWithIndex.map { case (attr, i) => - BoundReference(i, attr.dataType, attr.nullable) - } - // Don't need to copy the variables because they're already evaluated before entering function. - ctx.INPUT_ROW = null - ctx.currentVars = inputVarsInFunc - val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) - val rowVar = ExprCode(ev.code.trim, "false", ev.value) + inputVars: Seq[ExprCode], + row: String): String = { + val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, output, inputVars, row) + val rowVar = prepareRowVar(ctx, row, inputVarsInFunc) val doConsume = ctx.freshName("doConsume") ctx.currentVars = inputVarsInFunc ctx.INPUT_ROW = null + val doConsumeFuncName = ctx.addNewFunction(doConsume, s""" - | private void $doConsume($arguList) throws java.io.IOException { + | private void $doConsume(${params.mkString(", ")}) throws java.io.IOException { | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} | } """.stripMargin) s""" - | $doConsumeFuncName($callingParams); + | $doConsumeFuncName(${args.mkString(", ")}); """.stripMargin } /** - * Returns source code for calling consume function and the argument list of the consume function - * and also the `ExprCode` for the argument list. + * Returns arguments for calling method and method definition parameters of the consume function. + * And also returns the list of `ExprCode` for the parameters. */ private def constructConsumeParameters( ctx: CodegenContext, attributes: Seq[Attribute], - variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = { - val params = variables.zipWithIndex.map { case (ev, i) => - val arguName = ctx.freshName(s"expr_$i") - val arguType = ctx.javaType(attributes(i).dataType) - - val (callingParam, funcParams, arguIsNull) = if (!attributes(i).nullable) { - // When the argument is not nullable, we don't need to pass in `isNull` param for it and - // simply give a `false`. - val arguIsNull = "false" - (ev.value, s"$arguType $arguName", arguIsNull) + variables: Seq[ExprCode], + row: String): (Seq[String], Seq[String], Seq[ExprCode]) = { + val arguments = mutable.ArrayBuffer[String]() + val parameters = mutable.ArrayBuffer[String]() + val paramVars = mutable.ArrayBuffer[ExprCode]() + + if (row != null) { + arguments += row + parameters += s"InternalRow $row" + } + + variables.zipWithIndex.foreach { case (ev, i) => + val paramName = ctx.freshName(s"expr_$i") + val paramType = ctx.javaType(attributes(i).dataType) + + arguments += ev.value + parameters += s"$paramType $paramName" + val paramIsNull = if (!attributes(i).nullable) { + // Use constant `false` without passing `isNull` for non-nullable variable. + "false" } else { - val arguIsNull = ctx.freshName(s"exprIsNull_$i") - (ev.value + ", " + ev.isNull, s"$arguType $arguName, boolean $arguIsNull", arguIsNull) + val isNull = ctx.freshName(s"exprIsNull_$i") + arguments += ev.isNull + parameters += s"boolean $isNull" + isNull } - (callingParam, funcParams, ExprCode("", arguIsNull, arguName)) - }.unzip3 - (params._1.mkString(", "), params._2.mkString(", "), params._3) + + paramVars += ExprCode("", paramIsNull, paramName) + } + (arguments, parameters, paramVars) } /** From c859d53dd909cce87056e3fee9fe42b2d4d5acdb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 25 Jan 2018 03:36:15 +0000 Subject: [PATCH 23/24] Add tests. --- .../execution/WholeStageCodegenSuite.scala | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 37e9b4ca12b45..242bb48c22942 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -228,4 +228,49 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { } } } + + test("Control splitting consume function by operators with config") { + import testImplicits._ + val df = spark.range(10).select(Seq.tabulate(2) {i => ('id + i).as(s"c$i")} : _*) + + Seq(true, false).foreach { config => + withSQLConf(SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> s"$config") { + val plan = df.queryExecution.executedPlan + val wholeStageCodeGenExec = plan.find(p => p match { + case wp: WholeStageCodegenExec => true + case _ => false + }) + assert(wholeStageCodeGenExec.isDefined) + val code = wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._2 + assert(code.body.contains("project_doConsume") == config) + } + } + } + + test("Skip splitting consume function when parameter number exceeds JVM limit") { + import testImplicits._ + + Seq((255, false), (254, true)).foreach { case (columnNum, hasSplit) => + withTempPath { dir => + val path = dir.getCanonicalPath + spark.range(10).select(Seq.tabulate(columnNum) {i => ('id + i).as(s"c$i")} : _*) + .write.mode(SaveMode.Overwrite).parquet(path) + + withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255", + SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") { + val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as newC$i") + val df = spark.read.parquet(path).selectExpr(projection: _*) + + val plan = df.queryExecution.executedPlan + val wholeStageCodeGenExec = plan.find(p => p match { + case wp: WholeStageCodegenExec => true + case _ => false + }) + assert(wholeStageCodeGenExec.isDefined) + val code = wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._2 + assert(code.body.contains("project_doConsume") == hasSplit) + } + } + } + } } From 11946e7a62928304560c0602d71b3064789086d6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 25 Jan 2018 05:29:50 +0000 Subject: [PATCH 24/24] Refactor isValidParamLength a bit. --- .../expressions/codegen/CodeGenerator.scala | 21 ++++++++++++------- .../sql/execution/WholeStageCodegenExec.scala | 14 ++++++------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 195ef5621013f..4dcbb702893da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1247,13 +1247,12 @@ class CodegenContext { } /** - * In Java, a method descriptor is valid only if it represents method parameters with a total - * length of 255 or less. `this` contributes one unit and a parameter of type long or double - * contributes two units. Besides, for nullable parameters, we also need to pass a boolean - * for the null status. + * Returns the length of parameters for a Java method descriptor. `this` contributes one unit + * and a parameter of type long or double contributes two units. Besides, for nullable parameter, + * we also need to pass a boolean parameter for the null status. */ - def isValidParamLength(params: Seq[Expression]): Boolean = { - def calculateParamLength(input: Expression): Int = { + def calculateParamLength(params: Seq[Expression]): Int = { + def paramLengthForExpr(input: Expression): Int = { // For a nullable expression, we need to pass in an extra boolean parameter. (if (input.nullable) 1 else 0) + javaType(input.dataType) match { case JAVA_LONG | JAVA_DOUBLE => 2 @@ -1261,7 +1260,15 @@ class CodegenContext { } } // Initial value is 1 for `this`. - 1 + params.map(calculateParamLength(_)).sum <= CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH + 1 + params.map(paramLengthForExpr(_)).sum + } + + /** + * In Java, a method descriptor is valid only if it represents method parameters with a total + * length less than a pre-defined constant. + */ + def isValidParamLength(paramLength: Int): Boolean = { + paramLength <= CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 7aa17edc0fe36..8ea9e81b2e53b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -171,14 +171,14 @@ trait CodegenSupport extends SparkPlan { // all variables in output (see `requireAllOutput`). // 3. The number of output variables must less than maximum number of parameters in Java method // declaration. + val confEnabled = SQLConf.get.wholeStageSplitConsumeFuncByOperator val requireAllOutput = output.forall(parent.usedInputs.contains(_)) - val consumeFunc = - if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && requireAllOutput && - ctx.isValidParamLength(output)) { - constructDoConsumeFunction(ctx, inputVars, row) - } else { - parent.doConsume(ctx, inputVars, rowVar) - } + val paramLength = ctx.calculateParamLength(output) + (if (row != null) 1 else 0) + val consumeFunc = if (confEnabled && requireAllOutput && ctx.isValidParamLength(paramLength)) { + constructDoConsumeFunction(ctx, inputVars, row) + } else { + parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated