From 3e02ad602423b0ff94209f4971d2956050c3c9be Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 15 Jun 2020 10:13:16 +0900 Subject: [PATCH 1/3] Fix --- .../scala/org/apache/spark/sql/Dataset.scala | 16 +++++++++++++++- .../org/apache/spark/sql/DataFrameSuite.scala | 17 ++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ae8d33d8558ba..7b3eaf7e37516 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -2541,7 +2542,20 @@ class Dataset[T] private[sql]( def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { val resolver = sparkSession.sessionState.analyzer.resolver val allColumns = queryExecution.analyzed.output - val groupCols = colNames.distinct.flatMap { (colName: String) => + // SPARK-31990: We must preserve the input order of `colNames` because of the compatibility + // issue (the Streaming's state store depends on the `groupCols` order). + val orderPreservingDistinctColNames = { + val nameSeen = mutable.Set[String]() + colNames.flatMap { colName => + if (nameSeen.contains(colName)) { + None + } else { + nameSeen += colName + Some(colName) + } + } + } + val groupCols = orderPreservingDistinctColNames.flatMap { (colName: String) => // It is possibly there are more than one columns with the same name, // so we call filter instead of find. val cols = allColumns.filter(col => resolver(col.name, colName)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 8359dff674a87..6aa8cc9d3c947 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions.Uuid import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, LocalRelation, OneRowRelation} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -2548,6 +2548,21 @@ class DataFrameSuite extends QueryTest assert(df.schema === new StructType().add(StructField("d", DecimalType(38, 0)))) } } + + test("SPARK-31990: preserves the input order of colNames in dropDuplicates") { + val df = Seq((1, 2, 3, 4, 5), (1, 2, 3, 4, 5)).toDF("c", "e", "d", "a", "b") + val inputColNames = Seq("c", "b", "c", "d", "b", "c", "b") + val distinctDf = df.dropDuplicates(inputColNames) + val duplicatePlan = distinctDf.queryExecution.analyzed.collectFirst { + case p: Deduplicate => p + }.get + val keysInDuplicate = duplicatePlan.keys.map(_.name) + (0 until keysInDuplicate.length - 1).foreach { i => + val v1 = keysInDuplicate(i) + val v2 = keysInDuplicate(i + 1) + assert(inputColNames.indexOf(v1) < inputColNames.indexOf(v2)) + } + } } case class GroupByKey(a: Int, b: Int) From 2bcb5c5df410d7eaeafc7d3cfbec835eb1059830 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 15 Jun 2020 12:17:48 +0900 Subject: [PATCH 2/3] Revert "Fix" This reverts commit 3e02ad602423b0ff94209f4971d2956050c3c9be. --- .../scala/org/apache/spark/sql/Dataset.scala | 16 +--------------- .../org/apache/spark/sql/DataFrameSuite.scala | 17 +---------------- 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 7b3eaf7e37516..ae8d33d8558ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream} import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -2542,20 +2541,7 @@ class Dataset[T] private[sql]( def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { val resolver = sparkSession.sessionState.analyzer.resolver val allColumns = queryExecution.analyzed.output - // SPARK-31990: We must preserve the input order of `colNames` because of the compatibility - // issue (the Streaming's state store depends on the `groupCols` order). - val orderPreservingDistinctColNames = { - val nameSeen = mutable.Set[String]() - colNames.flatMap { colName => - if (nameSeen.contains(colName)) { - None - } else { - nameSeen += colName - Some(colName) - } - } - } - val groupCols = orderPreservingDistinctColNames.flatMap { (colName: String) => + val groupCols = colNames.distinct.flatMap { (colName: String) => // It is possibly there are more than one columns with the same name, // so we call filter instead of find. val cols = allColumns.filter(col => resolver(col.name, colName)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 6aa8cc9d3c947..8359dff674a87 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions.Uuid import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, LocalRelation, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, OneRowRelation} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -2548,21 +2548,6 @@ class DataFrameSuite extends QueryTest assert(df.schema === new StructType().add(StructField("d", DecimalType(38, 0)))) } } - - test("SPARK-31990: preserves the input order of colNames in dropDuplicates") { - val df = Seq((1, 2, 3, 4, 5), (1, 2, 3, 4, 5)).toDF("c", "e", "d", "a", "b") - val inputColNames = Seq("c", "b", "c", "d", "b", "c", "b") - val distinctDf = df.dropDuplicates(inputColNames) - val duplicatePlan = distinctDf.queryExecution.analyzed.collectFirst { - case p: Deduplicate => p - }.get - val keysInDuplicate = duplicatePlan.keys.map(_.name) - (0 until keysInDuplicate.length - 1).foreach { i => - val v1 = keysInDuplicate(i) - val v2 = keysInDuplicate(i + 1) - assert(inputColNames.indexOf(v1) < inputColNames.indexOf(v2)) - } - } } case class GroupByKey(a: Int, b: Int) From 7546ba4eebeee480d9a2ff8b948e900cd6023dfc Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 15 Jun 2020 12:20:40 +0900 Subject: [PATCH 3/3] Revert --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ae8d33d8558ba..524e231eb7eb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2541,7 +2541,9 @@ class Dataset[T] private[sql]( def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { val resolver = sparkSession.sessionState.analyzer.resolver val allColumns = queryExecution.analyzed.output - val groupCols = colNames.distinct.flatMap { (colName: String) => + // SPARK-31990: We must keep `toSet.toSeq` here because of the backward compatibility issue + // (the Streaming's state store depends on the `groupCols` order). + val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) => // It is possibly there are more than one columns with the same name, // so we call filter instead of find. val cols = allColumns.filter(col => resolver(col.name, colName))