From 6de753de6584ced4d44e3146dece3f684e803212 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 31 May 2020 14:08:49 +0900 Subject: [PATCH 1/5] Fix --- .../expressions/objects/objects.scala | 19 +++++++++++++++---- .../expressions/ObjectExpressionsSuite.scala | 15 +++++++++++++++ .../apache/spark/sql/execution/objects.scala | 4 ++-- .../org/apache/spark/sql/DatasetSuite.scala | 11 +++++++++++ 4 files changed, 43 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index d5de95c65e49e..2018a41114397 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -383,15 +383,26 @@ case class Invoke( """ } + val evalFunc = if (propagateNull) { + s""" + ${ev.isNull} = $resultIsNull; + if (!${ev.isNull}) { + $evaluate + } + """ + } else { + s""" + ${ev.isNull} = false; + $evaluate + """ + } + val code = obj.code + code""" boolean ${ev.isNull} = true; $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${obj.isNull}) { $argCode - ${ev.isNull} = $resultIsNull; - if (!${ev.isNull}) { - $evaluate - } + $evalFunc } """ ev.copy(code = code) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index c40149368b055..cb7d0726133ed 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -44,6 +44,7 @@ class InvokeTargetClass extends Serializable { def filterInt(e: Any): Any = e.asInstanceOf[Int] > 0 def filterPrimitiveInt(e: Int): Boolean = e > 0 def binOp(e1: Int, e2: Double): Double = e1 + e2 + def mapFunc(e: Any): Tuple2[Any, Any] = (e, e) } class InvokeTargetSubClass extends InvokeTargetClass { @@ -202,6 +203,20 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, InternalRow.apply(1, 0.25)) } + test("SPARK-31854 Invoke in MapElementsExec should respect propagateNull") { + val targetObject = new InvokeTargetClass + val funcClass = classOf[InvokeTargetClass] + val funcObj = Literal.create(targetObject, ObjectType(funcClass)) + val inputInt = Seq(BoundReference(0, ObjectType(classOf[Any]), true)) + val outputType = ObjectType(classOf[(Any, Any)]) + val inputRow = InternalRow.fromSeq(Seq(null.asInstanceOf[java.lang.Integer])) + val createExpr = (propagateNull: Boolean) => { + Invoke(funcObj, "mapFunc", outputType, inputInt, propagateNull) + } + checkObjectExprEvaluation(createExpr(true), null, inputRow) + checkObjectExprEvaluation(createExpr(false), (null, null), inputRow) + } + test("SPARK-23593: InitializeJavaBean should support interpreted execution") { val list = new java.util.LinkedList[Int]() list.add(1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index d05113431df41..4b2d4195ee906 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -276,12 +276,12 @@ case class MapElementsExec( } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - val (funcClass, methodName) = func match { + val (funcClass, funcName) = func match { case m: MapFunction[_, _] => classOf[MapFunction[_, _]] -> "call" case _ => FunctionUtils.getFunctionOneName(outputObjectType, child.output(0).dataType) } val funcObj = Literal.create(func, ObjectType(funcClass)) - val callFunc = Invoke(funcObj, methodName, outputObjectType, child.output) + val callFunc = Invoke(funcObj, funcName, outputObjectType, child.output, propagateNull = false) val result = BindReferences.bindReference(callFunc, child.output).genCode(ctx) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index af65957691b37..55404c85c7c28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1916,6 +1916,17 @@ class DatasetSuite extends QueryTest assert(df1.semanticHash !== df3.semanticHash) assert(df3.semanticHash === df4.semanticHash) } + + test("SPARK-31854 Invoke in MapElementsExec should respect propagateNull") { + spark.conf.set("spark.sql.codegen.wholeStage", false) + Seq("true", "false").foreach { wholeStage => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStage) { + val ds = Seq(1.asInstanceOf[Integer], null.asInstanceOf[Integer]).toDS() + val expectedAnswer = Seq[(Integer, Integer)]((1, 1), (null, null)) + checkDataset(ds.map(v => (v, v)), expectedAnswer: _*) + } + } + } } object AssertExecutionId { From 8e88d7345e93f1d6fdaa628c14e9473a217bfcd1 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 31 May 2020 18:13:21 +0900 Subject: [PATCH 2/5] Review --- .../expressions/objects/objects.scala | 25 +++++++------------ .../expressions/ObjectExpressionsSuite.scala | 2 +- .../org/apache/spark/sql/DatasetSuite.scala | 2 +- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 2018a41114397..5b06046fa8ad2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -366,11 +366,15 @@ case class Invoke( // If the function can return null, we do an extra check to make sure our null bit is still // set correctly. val assignResult = if (!returnNullable) { - s"${ev.value} = (${CodeGenerator.boxedType(javaType)}) $funcResult;" + s""" + ${ev.value} = (${CodeGenerator.boxedType(javaType)}) $funcResult; + ${ev.isNull} = false; + """ } else { s""" if ($funcResult != null) { ${ev.value} = (${CodeGenerator.boxedType(javaType)}) $funcResult; + ${ev.isNull} = false; } else { ${ev.isNull} = true; } @@ -383,26 +387,15 @@ case class Invoke( """ } - val evalFunc = if (propagateNull) { - s""" - ${ev.isNull} = $resultIsNull; - if (!${ev.isNull}) { - $evaluate - } - """ - } else { - s""" - ${ev.isNull} = false; - $evaluate - """ - } - val code = obj.code + code""" boolean ${ev.isNull} = true; $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${obj.isNull}) { $argCode - $evalFunc + ${ev.isNull} = $resultIsNull; + if (!${ev.isNull}) { + $evaluate + } } """ ev.copy(code = code) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index cb7d0726133ed..8b12e41a831ca 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -203,7 +203,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, InternalRow.apply(1, 0.25)) } - test("SPARK-31854 Invoke in MapElementsExec should respect propagateNull") { + test("SPARK-31854 Invoke in MapElementsExec should not propagate null") { val targetObject = new InvokeTargetClass val funcClass = classOf[InvokeTargetClass] val funcObj = Literal.create(targetObject, ObjectType(funcClass)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 55404c85c7c28..f0bf5b007285c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1917,7 +1917,7 @@ class DatasetSuite extends QueryTest assert(df3.semanticHash === df4.semanticHash) } - test("SPARK-31854 Invoke in MapElementsExec should respect propagateNull") { + test("SPARK-31854 Invoke in MapElementsExec should not propagate null") { spark.conf.set("spark.sql.codegen.wholeStage", false) Seq("true", "false").foreach { wholeStage => withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStage) { From 89eee820db9d75bbe95ab6a5ac192fdad873cecd Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 31 May 2020 18:48:22 +0900 Subject: [PATCH 3/5] Review --- .../spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala | 2 +- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index 8b12e41a831ca..10fcaf620dfe9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -203,7 +203,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, InternalRow.apply(1, 0.25)) } - test("SPARK-31854 Invoke in MapElementsExec should not propagate null") { + test("SPARK-31854: Invoke in MapElementsExec should not propagate null") { val targetObject = new InvokeTargetClass val funcClass = classOf[InvokeTargetClass] val funcObj = Literal.create(targetObject, ObjectType(funcClass)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f0bf5b007285c..bd09f46fcf819 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1917,7 +1917,7 @@ class DatasetSuite extends QueryTest assert(df3.semanticHash === df4.semanticHash) } - test("SPARK-31854 Invoke in MapElementsExec should not propagate null") { + test("SPARK-31854: Invoke in MapElementsExec should not propagate null") { spark.conf.set("spark.sql.codegen.wholeStage", false) Seq("true", "false").foreach { wholeStage => withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStage) { From 6008bc2369ecbc82d8333e62a61f3be0b9fdf431 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 1 Jun 2020 07:07:18 +0900 Subject: [PATCH 4/5] Fix --- .../spark/sql/catalyst/expressions/objects/objects.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 5b06046fa8ad2..d5de95c65e49e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -366,15 +366,11 @@ case class Invoke( // If the function can return null, we do an extra check to make sure our null bit is still // set correctly. val assignResult = if (!returnNullable) { - s""" - ${ev.value} = (${CodeGenerator.boxedType(javaType)}) $funcResult; - ${ev.isNull} = false; - """ + s"${ev.value} = (${CodeGenerator.boxedType(javaType)}) $funcResult;" } else { s""" if ($funcResult != null) { ${ev.value} = (${CodeGenerator.boxedType(javaType)}) $funcResult; - ${ev.isNull} = false; } else { ${ev.isNull} = true; } From fe943f6877c89d6a7e1750dc01636c9978e6a16a Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 1 Jun 2020 07:52:41 +0900 Subject: [PATCH 5/5] Fix --- .../expressions/ObjectExpressionsSuite.scala | 15 --------------- .../scala/org/apache/spark/sql/DatasetSuite.scala | 1 - 2 files changed, 16 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index 10fcaf620dfe9..c40149368b055 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -44,7 +44,6 @@ class InvokeTargetClass extends Serializable { def filterInt(e: Any): Any = e.asInstanceOf[Int] > 0 def filterPrimitiveInt(e: Int): Boolean = e > 0 def binOp(e1: Int, e2: Double): Double = e1 + e2 - def mapFunc(e: Any): Tuple2[Any, Any] = (e, e) } class InvokeTargetSubClass extends InvokeTargetClass { @@ -203,20 +202,6 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, InternalRow.apply(1, 0.25)) } - test("SPARK-31854: Invoke in MapElementsExec should not propagate null") { - val targetObject = new InvokeTargetClass - val funcClass = classOf[InvokeTargetClass] - val funcObj = Literal.create(targetObject, ObjectType(funcClass)) - val inputInt = Seq(BoundReference(0, ObjectType(classOf[Any]), true)) - val outputType = ObjectType(classOf[(Any, Any)]) - val inputRow = InternalRow.fromSeq(Seq(null.asInstanceOf[java.lang.Integer])) - val createExpr = (propagateNull: Boolean) => { - Invoke(funcObj, "mapFunc", outputType, inputInt, propagateNull) - } - checkObjectExprEvaluation(createExpr(true), null, inputRow) - checkObjectExprEvaluation(createExpr(false), (null, null), inputRow) - } - test("SPARK-23593: InitializeJavaBean should support interpreted execution") { val list = new java.util.LinkedList[Int]() list.add(1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index bd09f46fcf819..06600c1e4b1d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1918,7 +1918,6 @@ class DatasetSuite extends QueryTest } test("SPARK-31854: Invoke in MapElementsExec should not propagate null") { - spark.conf.set("spark.sql.codegen.wholeStage", false) Seq("true", "false").foreach { wholeStage => withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStage) { val ds = Seq(1.asInstanceOf[Integer], null.asInstanceOf[Integer]).toDS()