From 10d91be6684778472a5e99871e0c722a0c1843d7 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sat, 19 Jun 2021 14:18:09 +0530 Subject: [PATCH 1/8] SPARK-35756: unionByName support struct having same col names but different sequence --- .../spark/sql/catalyst/analysis/ResolveUnion.scala | 9 +++++++++ .../spark/sql/DataFrameSetOperationsSuite.scala | 12 ++++++++++++ 2 files changed, 21 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala index 2574bf7ab485b..d3f60f3c21437 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala @@ -107,6 +107,15 @@ object ResolveUnion extends Rule[LogicalPlan] { // in `foundAttr`. aliased += foundAttr Alias(addFields(foundAttr, target), foundAttr.name)() + case (source: StructType, target: StructType) + if !allowMissingCol && !source.sameType(target) && + target.toAttributes.map(x => x.name).sorted + == source.toAttributes.map(x => x.name).sorted => + // Having an output with same name, but different struct type. + // We will sort columns in the struct expression to make sure two sides of + // union have consistent schema. + aliased += foundAttr + Alias(addFields(foundAttr, target), foundAttr.name)() case _ => // We don't need/try to add missing fields if: // 1. The attributes of left and right side are the same struct type diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index e622528afc69d..e9d2f5b81c345 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -922,6 +922,16 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-35756: unionByName support struct having same col names but different sequence") { + val df1 = Seq((1, Struct1(1, 2))).toDF("a", "b") + val df2 = Seq((1, Struct2(1, 2))).toDF("a", "b") + val unionDF = df1.unionByName(df2) + val expected = Row(1, Row(1, 2)) :: Row(1, Row(2, 1)) :: Nil + val schema = "`a` INT,`b` STRUCT<`c1`: INT, `c2`: INT>" + assert(unionDF.schema.toDDL === schema) + checkAnswer(unionDF, expected) + } } case class UnionClass1a(a: Int, b: Long, nested: UnionClass2) @@ -931,3 +941,5 @@ case class UnionClass1c(a: Int, b: Long, nested: UnionClass4) case class UnionClass2(a: Int, c: String) case class UnionClass3(a: Int, b: Long) case class UnionClass4(A: Int, b: Long) +case class Struct1(c1: Int, c2: Int) +case class Struct2(c2: Int, c1: Int) From cc9dd34ec70a955ae9771e266f5b166d9ee05e07 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sun, 20 Jun 2021 00:26:57 +0530 Subject: [PATCH 2/8] build the logical plan at left side in case right hand side is build --- .../spark/sql/catalyst/analysis/ResolveUnion.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala index d3f60f3c21437..f13c5f75d832a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala @@ -109,7 +109,7 @@ object ResolveUnion extends Rule[LogicalPlan] { Alias(addFields(foundAttr, target), foundAttr.name)() case (source: StructType, target: StructType) if !allowMissingCol && !source.sameType(target) && - target.toAttributes.map(x => x.name).sorted + target.toAttributes.map(attr => attr.name).sorted == source.toAttributes.map(x => x.name).sorted => // Having an output with same name, but different struct type. // We will sort columns in the struct expression to make sure two sides of @@ -150,8 +150,12 @@ object ResolveUnion extends Rule[LogicalPlan] { val rightChild = Project(rightProjectList ++ notFoundAttrs, right) // Builds a project for `logicalPlan` based on `right` output names, if allowing - // missing columns. - val leftChild = if (allowMissingCol) { + // missing columns is true and also in case of allowing missing columns is false and + // if in the case for struct present at the right output and sequence + // of right hand side is changed + val leftChild = if (allowMissingCol || + (rightChild != right && + rightChild.output.exists(attr => attr.dataType.isInstanceOf[StructType]))) { // Add missing (nested) fields to left plan. val (leftProjectList, _) = compareAndAddFields(rightChild, left, allowMissingCol) if (leftProjectList.map(_.toAttribute) != left.output) { From 907efc218964e4f3c9cebd3b898e486a6977ff48 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sun, 20 Jun 2021 20:29:19 +0530 Subject: [PATCH 3/8] add the code change to used sortStructFields in case of allowMissingCol as false for nested structs --- .../sql/catalyst/analysis/ResolveUnion.scala | 2 +- .../sql/DataFrameSetOperationsSuite.scala | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala index f13c5f75d832a..541a61e770791 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala @@ -115,7 +115,7 @@ object ResolveUnion extends Rule[LogicalPlan] { // We will sort columns in the struct expression to make sure two sides of // union have consistent schema. aliased += foundAttr - Alias(addFields(foundAttr, target), foundAttr.name)() + Alias(sortStructFields(foundAttr), foundAttr.name)() case _ => // We don't need/try to add missing fields if: // 1. The attributes of left and right side are the same struct type diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index e9d2f5b81c345..99105e0370ccf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -924,22 +924,37 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { } test("SPARK-35756: unionByName support struct having same col names but different sequence") { - val df1 = Seq((1, Struct1(1, 2))).toDF("a", "b") - val df2 = Seq((1, Struct2(1, 2))).toDF("a", "b") + var df1 = Seq((1, Struct1(1, 2))).toDF("a", "b") + var df2 = Seq((1, Struct2(1, 2))).toDF("a", "b") val unionDF = df1.unionByName(df2) val expected = Row(1, Row(1, 2)) :: Row(1, Row(2, 1)) :: Nil val schema = "`a` INT,`b` STRUCT<`c1`: INT, `c2`: INT>" assert(unionDF.schema.toDDL === schema) checkAnswer(unionDF, expected) + + // nested struct, inner struct having different col name + df1 = Seq((1, 2, UnionClass1d(1, 2, Struct3(1)))).toDF("a", "b", "c") + df2 = Seq((1, 2, UnionClass1e(1, 2, Struct4(1)))).toDF("a", "b", "c") + val errMsg = intercept[AnalysisException] { + df1.unionByName(df2) + }.getMessage + assert(errMsg.contains( + "Union can only be performed on tables with the compatible column types." + + " struct> <> struct>" + + " at the third column of the second table")) } } case class UnionClass1a(a: Int, b: Long, nested: UnionClass2) case class UnionClass1b(a: Int, b: Long, nested: UnionClass3) case class UnionClass1c(a: Int, b: Long, nested: UnionClass4) +case class UnionClass1d(c1: Int, c2: Int, c3: Struct3) +case class UnionClass1e(c2: Int, c1: Int, c3: Struct4) case class UnionClass2(a: Int, c: String) case class UnionClass3(a: Int, b: Long) case class UnionClass4(A: Int, b: Long) case class Struct1(c1: Int, c2: Int) case class Struct2(c2: Int, c1: Int) +case class Struct3(c3: Int) +case class Struct4(c4: Int) From 2b8966625ab47ae68f76318bf8b9adfc36a4d270 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 23 Jun 2021 13:02:33 +0530 Subject: [PATCH 4/8] remove the column name comparison in the validation size sortStruct sort recursively both left and right side of the union --- .../sql/catalyst/analysis/ResolveUnion.scala | 4 +--- .../spark/sql/DataFrameSetOperationsSuite.scala | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala index 541a61e770791..d95a8ed72edcf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala @@ -108,9 +108,7 @@ object ResolveUnion extends Rule[LogicalPlan] { aliased += foundAttr Alias(addFields(foundAttr, target), foundAttr.name)() case (source: StructType, target: StructType) - if !allowMissingCol && !source.sameType(target) && - target.toAttributes.map(attr => attr.name).sorted - == source.toAttributes.map(x => x.name).sorted => + if !allowMissingCol && !source.sameType(target) => // Having an output with same name, but different struct type. // We will sort columns in the struct expression to make sure two sides of // union have consistent schema. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index 99105e0370ccf..b07ab20e601b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -926,8 +926,8 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { test("SPARK-35756: unionByName support struct having same col names but different sequence") { var df1 = Seq((1, Struct1(1, 2))).toDF("a", "b") var df2 = Seq((1, Struct2(1, 2))).toDF("a", "b") - val unionDF = df1.unionByName(df2) - val expected = Row(1, Row(1, 2)) :: Row(1, Row(2, 1)) :: Nil + var unionDF = df1.unionByName(df2) + var expected = Row(1, Row(1, 2)) :: Row(1, Row(2, 1)) :: Nil val schema = "`a` INT,`b` STRUCT<`c1`: INT, `c2`: INT>" assert(unionDF.schema.toDDL === schema) checkAnswer(unionDF, expected) @@ -942,6 +942,15 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { "Union can only be performed on tables with the compatible column types." + " struct> <> struct>" + " at the third column of the second table")) + + // diff Case sensitive attributes names and diff sequence scenario for unionByName + df1 = Seq((1, 2, UnionClass1d(1, 2, Struct3(1)))).toDF("a", "b", "c") + df2 = Seq((1, 2, UnionClass1f(1, 2, Struct3a(1)))).toDF("a", "b", "c") + expected = + Row(1, 2, Row(1, 2, Row(1, 5))) :: Row(1, 2, Row(2, 1, Row(1, 5))) :: Nil + + unionDF = df1.unionByName(df2) + checkAnswer(unionDF, expected) } } @@ -950,6 +959,7 @@ case class UnionClass1b(a: Int, b: Long, nested: UnionClass3) case class UnionClass1c(a: Int, b: Long, nested: UnionClass4) case class UnionClass1d(c1: Int, c2: Int, c3: Struct3) case class UnionClass1e(c2: Int, c1: Int, c3: Struct4) +case class UnionClass1f(c2: Int, c1: Int, c3: Struct3a) case class UnionClass2(a: Int, c: String) case class UnionClass3(a: Int, b: Long) @@ -957,4 +967,5 @@ case class UnionClass4(A: Int, b: Long) case class Struct1(c1: Int, c2: Int) case class Struct2(c2: Int, c1: Int) case class Struct3(c3: Int) +case class Struct3a(C3: Int) case class Struct4(c4: Int) From c4756ef3696f430894c69456da2c91b9e46cf9ac Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 23 Jun 2021 20:13:02 +0530 Subject: [PATCH 5/8] add the code change for fixing the combination of uppercase and lower case attributes name --- .../sql/catalyst/analysis/ResolveUnion.scala | 45 ++++++++----------- .../sql/DataFrameSetOperationsSuite.scala | 13 ++++++ 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala index d95a8ed72edcf..5e8852fc4b61d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.analysis +import java.util.Locale + import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions._ @@ -40,7 +42,8 @@ object ResolveUnion extends Rule[LogicalPlan] { * already contain them. Currently we don't support merging structs nested inside of arrays * or maps. */ - private def addFields(col: Expression, targetType: StructType): Expression = { + private def addFields(col: Expression, + targetType: StructType, allowMissing: Boolean): Expression = { assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") val resolver = conf.resolver @@ -54,20 +57,22 @@ object ResolveUnion extends Rule[LogicalPlan] { val newExpression = (currentField, expectedField.dataType) match { case (Some(cf), expectedType: StructType) if cf.dataType.isInstanceOf[StructType] => val extractedValue = ExtractValue(col, Literal(cf.name), resolver) - addFields(extractedValue, expectedType) + addFields(extractedValue, expectedType, allowMissing) case (Some(cf), _) => ExtractValue(col, Literal(cf.name), resolver) - case (None, expectedType) => + case (None, expectedType) if allowMissing => Literal(null, expectedType) } newStructFields ++= Literal(expectedField.name) :: newExpression :: Nil } - colType.fields - .filter(f => targetType.fields.find(tf => resolver(f.name, tf.name)).isEmpty) - .foreach { f => - newStructFields ++= Literal(f.name) :: ExtractValue(col, Literal(f.name), resolver) :: Nil - } + if (allowMissing) { + colType.fields + .filter(f => targetType.fields.find(tf => resolver(f.name, tf.name)).isEmpty) + .foreach { f => + newStructFields ++= Literal(f.name) :: ExtractValue(col, Literal(f.name), resolver) :: Nil + } + } val newStruct = CreateNamedStruct(newStructFields.toSeq) if (col.nullable) { @@ -77,7 +82,6 @@ object ResolveUnion extends Rule[LogicalPlan] { } } - /** * This method will compare right to left plan's outputs. If there is one struct attribute * at right side has same name with left side struct attribute, but two structs are not the @@ -101,19 +105,12 @@ object ResolveUnion extends Rule[LogicalPlan] { val foundDt = foundAttr.dataType (foundDt, lattr.dataType) match { case (source: StructType, target: StructType) - if allowMissingCol && !source.sameType(target) => + if !source.sameType(target) => // We have two structs with different types, so make sure the two structs have their - // fields in the same order by using `target`'s fields and then inluding any remaining - // in `foundAttr`. - aliased += foundAttr - Alias(addFields(foundAttr, target), foundAttr.name)() - case (source: StructType, target: StructType) - if !allowMissingCol && !source.sameType(target) => - // Having an output with same name, but different struct type. - // We will sort columns in the struct expression to make sure two sides of - // union have consistent schema. + // fields in the same order by using `target`'s fields and then including any remaining + // in `foundAttr` in case of allowMissingCol is true. aliased += foundAttr - Alias(sortStructFields(foundAttr), foundAttr.name)() + Alias(addFields(foundAttr, target, allowMissingCol), foundAttr.name)() case _ => // We don't need/try to add missing fields if: // 1. The attributes of left and right side are the same struct type @@ -148,12 +145,8 @@ object ResolveUnion extends Rule[LogicalPlan] { val rightChild = Project(rightProjectList ++ notFoundAttrs, right) // Builds a project for `logicalPlan` based on `right` output names, if allowing - // missing columns is true and also in case of allowing missing columns is false and - // if in the case for struct present at the right output and sequence - // of right hand side is changed - val leftChild = if (allowMissingCol || - (rightChild != right && - rightChild.output.exists(attr => attr.dataType.isInstanceOf[StructType]))) { + // missing columns. + val leftChild = if (allowMissingCol) { // Add missing (nested) fields to left plan. val (leftProjectList, _) = compareAndAddFields(rightChild, left, allowMissingCol) if (leftProjectList.map(_.toAttribute) != left.output) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index b07ab20e601b3..30aabbec1bbc8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -924,6 +924,7 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { } test("SPARK-35756: unionByName support struct having same col names but different sequence") { + // struct having same col names but different sequence var df1 = Seq((1, Struct1(1, 2))).toDF("a", "b") var df2 = Seq((1, Struct2(1, 2))).toDF("a", "b") var unionDF = df1.unionByName(df2) @@ -951,6 +952,17 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { unionDF = df1.unionByName(df2) checkAnswer(unionDF, expected) + + // sorting of sequence based on the lower case + df1 = Seq((1, Struct1(1, 2))).toDF("a", "b") + df2 = Seq((1, Struct2a(1, 2))).toDF("a", "b") + expected = Row(1, Row(1, 2, 5)) :: Row(1, Row(2, 1, 5)) :: Nil + + unionDF = df1.unionByName(df2) + checkAnswer(unionDF, expected) + + unionDF = df1.unionByName(df2, true) + checkAnswer(unionDF, expected) } } @@ -966,6 +978,7 @@ case class UnionClass3(a: Int, b: Long) case class UnionClass4(A: Int, b: Long) case class Struct1(c1: Int, c2: Int) case class Struct2(c2: Int, c1: Int) +case class Struct2a(C2: Int, c1: Int) case class Struct3(c3: Int) case class Struct3a(C3: Int) case class Struct4(c4: Int) From a82dd58bdda7f665281487e251e906d6a5528acf Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Tue, 29 Jun 2021 19:29:20 +0530 Subject: [PATCH 6/8] resolving the conflict as per the new change in resolveunion --- .../sql/catalyst/analysis/ResolveUnion.scala | 19 ++++++------ .../sql/DataFrameSetOperationsSuite.scala | 30 +++++++++++-------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala index 5e8852fc4b61d..44b8a6babbaf7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.analysis -import java.util.Locale - import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions._ @@ -60,19 +58,22 @@ object ResolveUnion extends Rule[LogicalPlan] { addFields(extractedValue, expectedType, allowMissing) case (Some(cf), _) => ExtractValue(col, Literal(cf.name), resolver) + // for allowMissingCol allow the null values case (None, expectedType) if allowMissing => Literal(null, expectedType) + // for allowMissingCol as false throw exception for missing col + case (_, _) if !allowMissing => + throw QueryCompilationErrors.noSuchStructFieldInGivenFieldsError( + expectedField.name, colType.fields) } newStructFields ++= Literal(expectedField.name) :: newExpression :: Nil } - if (allowMissing) { - colType.fields - .filter(f => targetType.fields.find(tf => resolver(f.name, tf.name)).isEmpty) - .foreach { f => - newStructFields ++= Literal(f.name) :: ExtractValue(col, Literal(f.name), resolver) :: Nil - } - } + colType.fields + .filter(f => targetType.fields.find(tf => resolver(f.name, tf.name)).isEmpty) + .foreach { f => + newStructFields ++= Literal(f.name) :: ExtractValue(col, Literal(f.name), resolver) :: Nil + } val newStruct = CreateNamedStruct(newStructFields.toSeq) if (col.nullable) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index 30aabbec1bbc8..4ae864c6e21f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -934,35 +934,39 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { checkAnswer(unionDF, expected) // nested struct, inner struct having different col name + df1 = Seq((0, UnionClass1a(0, 1L, UnionClass2(1, "2")))).toDF("id", "a") + df2 = Seq((1, UnionClass1b(1, 2L, UnionClass3(2, 3L)))).toDF("id", "a") + var errMsg = intercept[AnalysisException] { + df1.unionByName(df2) + }.getMessage + assert(errMsg.contains("No such struct field c in a, b")) + + // If right side of the nested struct has extra col. df1 = Seq((1, 2, UnionClass1d(1, 2, Struct3(1)))).toDF("a", "b", "c") - df2 = Seq((1, 2, UnionClass1e(1, 2, Struct4(1)))).toDF("a", "b", "c") - val errMsg = intercept[AnalysisException] { + df2 = Seq((1, 2, UnionClass1e(1, 2, Struct4(1, 5)))).toDF("a", "b", "c") + errMsg = intercept[AnalysisException] { df1.unionByName(df2) }.getMessage - assert(errMsg.contains( - "Union can only be performed on tables with the compatible column types." + - " struct> <> struct>" + - " at the third column of the second table")) + assert(errMsg.contains("Union can only be performed on tables with" + + " the compatible column types." + + " struct> <> struct>" + + " at the third column of the second table")) // diff Case sensitive attributes names and diff sequence scenario for unionByName df1 = Seq((1, 2, UnionClass1d(1, 2, Struct3(1)))).toDF("a", "b", "c") df2 = Seq((1, 2, UnionClass1f(1, 2, Struct3a(1)))).toDF("a", "b", "c") expected = - Row(1, 2, Row(1, 2, Row(1, 5))) :: Row(1, 2, Row(2, 1, Row(1, 5))) :: Nil + Row(1, 2, Row(1, 2, Row(1))) :: Row(1, 2, Row(2, 1, Row(1))) :: Nil unionDF = df1.unionByName(df2) checkAnswer(unionDF, expected) - // sorting of sequence based on the lower case df1 = Seq((1, Struct1(1, 2))).toDF("a", "b") df2 = Seq((1, Struct2a(1, 2))).toDF("a", "b") - expected = Row(1, Row(1, 2, 5)) :: Row(1, Row(2, 1, 5)) :: Nil + expected = Row(1, Row(1, 2)) :: Row(1, Row(2, 1)) :: Nil unionDF = df1.unionByName(df2) checkAnswer(unionDF, expected) - - unionDF = df1.unionByName(df2, true) - checkAnswer(unionDF, expected) } } @@ -981,4 +985,4 @@ case class Struct2(c2: Int, c1: Int) case class Struct2a(C2: Int, c1: Int) case class Struct3(c3: Int) case class Struct3a(C3: Int) -case class Struct4(c4: Int) +case class Struct4(c3: Int, c5: Int) From b3b775521e7af5b7994c2deb453c4f3e7eb628c4 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 30 Jun 2021 13:29:34 +0530 Subject: [PATCH 7/8] Code refactoring for match case --- .../sql/catalyst/analysis/ResolveUnion.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala index 44b8a6babbaf7..98c529f57898e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala @@ -58,13 +58,15 @@ object ResolveUnion extends Rule[LogicalPlan] { addFields(extractedValue, expectedType, allowMissing) case (Some(cf), _) => ExtractValue(col, Literal(cf.name), resolver) - // for allowMissingCol allow the null values - case (None, expectedType) if allowMissing => - Literal(null, expectedType) - // for allowMissingCol as false throw exception for missing col - case (_, _) if !allowMissing => - throw QueryCompilationErrors.noSuchStructFieldInGivenFieldsError( - expectedField.name, colType.fields) + case (None, expectedType) => + if (allowMissing) { + // for allowMissingCol allow the null values + Literal(null, expectedType) + } else { + // for allowMissingCol as false throw exception for missing col + throw QueryCompilationErrors.noSuchStructFieldInGivenFieldsError( + expectedField.name, colType.fields) + } } newStructFields ++= Literal(expectedField.name) :: newExpression :: Nil } From a29df922d029988250d829dd49c28ff02f2adac3 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Wed, 30 Jun 2021 16:46:34 +0530 Subject: [PATCH 8/8] add struct for comparison instead of String schema --- .../spark/sql/DataFrameSetOperationsSuite.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index 4ae864c6e21f7..fcd3e8315b3ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -925,12 +925,15 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { test("SPARK-35756: unionByName support struct having same col names but different sequence") { // struct having same col names but different sequence - var df1 = Seq((1, Struct1(1, 2))).toDF("a", "b") - var df2 = Seq((1, Struct2(1, 2))).toDF("a", "b") + var df1 = Seq(("d1", Struct1(1, 2))).toDF("a", "b") + var df2 = Seq(("d2", Struct2(1, 2))).toDF("a", "b") var unionDF = df1.unionByName(df2) - var expected = Row(1, Row(1, 2)) :: Row(1, Row(2, 1)) :: Nil - val schema = "`a` INT,`b` STRUCT<`c1`: INT, `c2`: INT>" - assert(unionDF.schema.toDDL === schema) + var expected = Row("d1", Row(1, 2)) :: Row("d2", Row(2, 1)) :: Nil + val schema = StructType(Seq(StructField("a", StringType), + StructField("b", StructType(Seq(StructField("c1", IntegerType), + StructField("c2", IntegerType)))))) + + assert(unionDF.schema === schema) checkAnswer(unionDF, expected) // nested struct, inner struct having different col name