Skip to content

[SUPPORT] parquet bloom filters not supported by hudi #7117

@parisni

Description

@parisni

hudi 0.12.1
spark 3.2.1

hudi has its own bloom implementation used mainly for fast lookup on the hudi key. Up to 0.11 hudi allow to store them in the metadata table, and also add bloom on several columns. So far those bloom are not use at read time.

Spark leverage parquet bloom filters at write time. They are then used at read time to skip files. Those bloom can improve queries a lot. But so far, hudi don't support them :

Basic parquet table => bloom works as expected: the accumulator is not used when filtered values are outside df values, but incremented when values matches

spark.sql("set parquet.filter.bloom.enabled=true")
spark.sql("set parquet.filter.columnindex.enabled=false")
spark.sql("set parquet.filter.stats.enabled=false")

import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
import org.apache.spark.sql.Row
{spark.sql("select '0' as a, '1' as b, '2' c, 4 as d union select '1', '2', '3', 4").write.mode("overwrite").option("parquet.bloom.filter.enabled#b", "true").parquet("/tmp/bloom")}

class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
  private var _sum = 0
  override def isZero: Boolean = _sum == 0
  override def copy(): AccumulatorV2[Integer, Integer] = {
    val acc = new NumRowGroupsAcc()
    acc._sum = _sum
    acc
  }
  override def reset(): Unit = _sum = 0
  override def add(v: Integer): Unit = _sum += v
  override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other match {
    case a: NumRowGroupsAcc => _sum += a._sum
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }
  override def value: Integer = _sum
}
val accu = new NumRowGroupsAcc
sc.register(accu)
spark.read.format("parquet").load("/tmp/bloom").filter("b = '3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
println(accu)
spark.read.format("parquet").load("/tmp/bloom").filter("b = '2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
println(accu)

/**
NumRowGroupsAcc(id: 5611, name: None, value: 0)
NumRowGroupsAcc(id: 5611, name: None, value: 1)
/**

Hudi parquet table => bloom works NOT as expected: the accumulator is incremented when match and no match

import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
import org.apache.spark.sql.Row
spark.sql("set parquet.filter.bloom.enabled=true")
spark.sql("set parquet.filter.columnindex.enabled=false")
spark.sql("set parquet.filter.stats.enabled=false")
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}

val basePath = "/tmp/bloom_hudi/"
val hiveOptions = Map[String, String](
  "hoodie.table.name"-> "hudi_test",
"hoodie.datasource.write.recordkey.field"-> "a",
"hoodie.datasource.write.partitionpath.field"-> "d",
"hoodie.datasource.write.precombine.field"-> "c",
"hoodie.datasource.write.table.name"-> "hudi_test",
"hoodie.datasource.write.operation" -> "bulk_insert",
"parquet.bloom.filter.enabled#b" -> "true"
)
spark.sql("select '0' as a, '1' as b, '2' c, 4 as d union select '1', '2', '3', 4").write.format("hudi").
options(hiveOptions).
mode("overwrite").
save(basePath)

class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
  private var _sum = 0
  override def isZero: Boolean = _sum == 0
  override def copy(): AccumulatorV2[Integer, Integer] = {
    val acc = new NumRowGroupsAcc()
    acc._sum = _sum
    acc
  }
  override def reset(): Unit = _sum = 0
  override def add(v: Integer): Unit = _sum += v
  override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other match {
    case a: NumRowGroupsAcc => _sum += a._sum
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }
  override def value: Integer = _sum
}
val accu = new NumRowGroupsAcc
sc.register(accu)
spark.read.format("hudi").load(basePath).filter("b = '3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
println(accu)
spark.read.format("hudi").load(basePath).filter("b = '2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
println(accu)
/**
accu: NumRowGroupsAcc = NumRowGroupsAcc(id: 635, name: None, value: 1)
accu: NumRowGroupsAcc = NumRowGroupsAcc(id: 635, name: None, value: 2)

/*

Metadata

Metadata

Assignees

Labels

area:performancePerformance optimizationsarea:sqlSQL interfacespriority:highSignificant impact; potential bugstype:featureNew features and enhancements

Type

No type
No fields configured for issues without a type.

Projects

Status
✅ Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions