Skip to content

Commit 7ac2b54

Browse files
ZiyaZacloud-fan
authored andcommitted
[SPARK-53535][SQL][FOLLOWUP] Fix findCheapestGroupField returning invalid Map
### What changes were proposed in this pull request? This PR fixes a bug from #52557, where we are reading an additional field if all the requested fields of a struct are missing from the Parquet file. We used to always pick the cheapest leaf column of the struct. However, if this leaf was inside a Map column, then we'd generate an invalid Map type like the following: ``` optional group _1 (MAP) { repeated group key_value { required boolean key; } } ``` Since there is no `value` field in this group, we'd fail later when trying to convert this Parquet type to a Spark type. This PR changes the additional field selection logic to enforce selecting a field from both the key and the value of the map, which can now give us a type like following: ``` optional group _1 (MAP) { repeated group key_value { required boolean key; optional group value { optional int32 _2; } } } ``` ### Why are the changes needed? To fix a critical bug where we would throw an exception when reading a Parquet file. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52758 from ZiyaZa/fix-missing-struct-with-map. Authored-by: Ziya Mukhtarov <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 27180f0 commit 7ac2b54

File tree

3 files changed

+375
-27
lines changed

3 files changed

+375
-27
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
2828
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
2929
import org.apache.parquet.io.api.RecordMaterializer
3030
import org.apache.parquet.schema._
31-
import org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation
31+
import org.apache.parquet.schema.LogicalTypeAnnotation.{ListLogicalTypeAnnotation, MapKeyValueTypeAnnotation, MapLogicalTypeAnnotation}
3232
import org.apache.parquet.schema.Type.Repetition
3333

3434
import org.apache.spark.internal.Logging
@@ -502,36 +502,64 @@ object ParquetReadSupport extends Logging {
502502
}
503503

504504
/**
505-
* Finds the leaf node under a given file schema node that is likely to be cheapest to fetch.
506-
* Keeps this leaf node inside the same parent hierarchy. This is used when all struct fields in
507-
* the requested schema are missing. Uses a very simple heuristic based on the parquet type.
505+
* Finds the leaf node(s) under a given file schema node that is likely to be cheapest to fetch.
506+
* Note that multiple leaves can be selected if a map type is deemed to be the cheapest to fetch,
507+
* because for each map in the hierarchy, we need to fetch something from both the key and value
508+
* types. This function keeps the leaf node(s) inside the same parent hierarchy. This is used when
509+
* all struct fields in the requested schema are missing. Uses a very simple heuristic based on
510+
* the parquet type.
508511
*/
509512
private def findCheapestGroupField(parentGroupType: GroupType): Type = {
510513
def findCheapestGroupFieldRecurse(curType: Type, repLevel: Int = 0): (Type, Int, Int) = {
511514
curType match {
512515
case groupType: GroupType =>
513-
var (bestType, bestRepLevel, bestCost) = (Option.empty[Type], 0, 0)
514-
for (field <- groupType.getFields.asScala) {
515-
val newRepLevel = repLevel + (if (field.isRepetition(Repetition.REPEATED)) 1 else 0)
516-
// Never take a field at a deeper repetition level, since it's likely to have more data.
517-
// Don't do safety checks because we should already have done them when traversing the
518-
// schema for the first time.
519-
if (bestType.isEmpty || newRepLevel <= bestRepLevel) {
520-
val (childType, childRepLevel, childCost) =
521-
findCheapestGroupFieldRecurse(field, newRepLevel)
522-
// Always prefer elements with a lower repetition level, since more nesting of arrays
523-
// is likely to result in more data. At the same repetition level, prefer the smaller
524-
// type.
525-
if (bestType.isEmpty || childRepLevel < bestRepLevel ||
526-
(childRepLevel == bestRepLevel && childCost < bestCost)) {
527-
// This is the new best path.
528-
bestType = Some(childType)
529-
bestRepLevel = childRepLevel
530-
bestCost = childCost
516+
groupType.getLogicalTypeAnnotation match {
517+
case _: MapLogicalTypeAnnotation | _: MapKeyValueTypeAnnotation =>
518+
// For maps, we need to ensure we read something from both the key and value types, as
519+
// otherwise the GroupType we return from here would contain only one child (either
520+
// the key or the value, but not both), which is invalid when the GroupType has a map
521+
// logical annotation. This would later cause failures when converting the row we read
522+
// to Spark type.
523+
// Below code is adapted from ParquetSchemaConverter.convertGroupField
524+
ParquetSchemaConverter.checkConversionRequirement(
525+
groupType.getFieldCount == 1 && !groupType.getType(0).isPrimitive,
526+
s"Invalid map type: $groupType")
527+
528+
val keyValueType = groupType.getFields.get(0).asGroupType()
529+
ParquetSchemaConverter.checkConversionRequirement(
530+
keyValueType.isRepetition(Repetition.REPEATED) && keyValueType.getFieldCount == 2,
531+
s"Invalid map type: $groupType")
532+
533+
val keyResult = findCheapestGroupFieldRecurse(keyValueType.getType(0), repLevel + 1)
534+
val valueResult = findCheapestGroupFieldRecurse(keyValueType.getType(1), repLevel + 1)
535+
(
536+
groupType.withNewFields(keyValueType.withNewFields(keyResult._1, valueResult._1)),
537+
keyResult._2.max(valueResult._2), // Take max repetition level
538+
keyResult._3 + valueResult._3 // Add up the costs of reading both fields
539+
)
540+
case _ =>
541+
var (bestType, bestRepLevel, bestCost) = (Option.empty[Type], 0, 0)
542+
for (field <- groupType.getFields.asScala) {
543+
val newRepLevel = repLevel + (if (field.isRepetition(Repetition.REPEATED)) 1 else 0)
544+
// Never take a field at a deeper repetition level, since it's likely to have more
545+
// data.
546+
if (bestType.isEmpty || newRepLevel <= bestRepLevel) {
547+
val (childType, childRepLevel, childCost) =
548+
findCheapestGroupFieldRecurse(field, newRepLevel)
549+
// Always prefer elements with a lower repetition level, since more nesting of
550+
// arrays is likely to result in more data. At the same repetition level, prefer
551+
// the smaller type.
552+
if (bestType.isEmpty || childRepLevel < bestRepLevel ||
553+
(childRepLevel == bestRepLevel && childCost < bestCost)) {
554+
// This is the new best path.
555+
bestType = Some(childType)
556+
bestRepLevel = childRepLevel
557+
bestCost = childCost
558+
}
559+
}
531560
}
532-
}
561+
(groupType.withNewFields(bestType.get), bestRepLevel, bestCost)
533562
}
534-
(groupType.withNewFields(bestType.get), bestRepLevel, bestCost)
535563
case primitiveType: PrimitiveType =>
536564
val cost = primitiveType.getPrimitiveTypeName match {
537565
case PrimitiveType.PrimitiveTypeName.BOOLEAN => 1

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,101 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
833833
}
834834
}
835835

836+
test("SPARK-53535: vectorized reader: missing all struct fields, struct with map field only") {
837+
val data = Seq(
838+
Row(Row(Map("key1" -> 1)), 100),
839+
Row(Row(Map("key2" -> 2)), 100),
840+
Row(null, 100)
841+
)
842+
843+
val tableSchema = new StructType()
844+
.add("_1", new StructType()
845+
.add("_1", MapType(StringType, IntegerType, valueContainsNull = true)))
846+
.add("_2", IntegerType)
847+
848+
val readSchema = new StructType()
849+
.add("_1", new StructType()
850+
.add("_101", IntegerType))
851+
852+
withTempPath { path =>
853+
val file = path.getCanonicalPath
854+
spark.createDataFrame(data.asJava, tableSchema).write.partitionBy("_2").parquet(file)
855+
856+
for {
857+
offheapEnabled <- Seq(true, false)
858+
returnNullStructIfAllFieldsMissing <- Seq(true, false)
859+
} {
860+
withSQLConf(
861+
SQLConf.LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING.key ->
862+
returnNullStructIfAllFieldsMissing.toString,
863+
SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true",
864+
SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString
865+
) {
866+
val expectedAnswer = if (!returnNullStructIfAllFieldsMissing) {
867+
Row(Row(null), 100) :: Row(Row(null), 100) :: Row(null, 100) :: Nil
868+
} else {
869+
Row(null, 100) :: Row(null, 100) :: Row(null, 100) :: Nil
870+
}
871+
872+
withAllParquetReaders {
873+
checkAnswer(spark.read.schema(readSchema).parquet(file), expectedAnswer)
874+
}
875+
}
876+
}
877+
}
878+
}
879+
880+
test("SPARK-53535: vectorized reader: missing all struct fields, " +
881+
"struct with cheap map and more expensive array field") {
882+
val data = Seq(
883+
Row(Row(Map(false -> Row("expensive", 1)), Seq("test1")), 100),
884+
Row(Row(Map(true -> Row("expensive", 2)), Seq("test2")), 100),
885+
Row(null, 100)
886+
)
887+
888+
val tableSchema = new StructType()
889+
.add("_1", new StructType()
890+
.add("_1", MapType(
891+
BooleanType,
892+
new StructType()
893+
.add("_1", StringType)
894+
.add("_2", IntegerType),
895+
valueContainsNull = true))
896+
.add("_2", ArrayType(StringType, containsNull = true)))
897+
.add("_2", IntegerType)
898+
899+
val readSchema = new StructType()
900+
.add("_1", new StructType()
901+
.add("_101", IntegerType))
902+
903+
withTempPath { path =>
904+
val file = path.getCanonicalPath
905+
spark.createDataFrame(data.asJava, tableSchema).write.partitionBy("_2").parquet(file)
906+
907+
for {
908+
offheapEnabled <- Seq(true, false)
909+
returnNullStructIfAllFieldsMissing <- Seq(true, false)
910+
} {
911+
withSQLConf(
912+
SQLConf.LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING.key ->
913+
returnNullStructIfAllFieldsMissing.toString,
914+
SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true",
915+
SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString
916+
) {
917+
val expectedAnswer = if (!returnNullStructIfAllFieldsMissing) {
918+
Row(Row(null), 100) :: Row(Row(null), 100) :: Row(null, 100) :: Nil
919+
} else {
920+
Row(null, 100) :: Row(null, 100) :: Row(null, 100) :: Nil
921+
}
922+
923+
withAllParquetReaders {
924+
checkAnswer(spark.read.schema(readSchema).parquet(file), expectedAnswer)
925+
}
926+
}
927+
}
928+
}
929+
}
930+
836931
test("vectorized reader: missing some struct fields") {
837932
Seq(true, false).foreach { offheapEnabled =>
838933
withSQLConf(

0 commit comments

Comments
 (0)