Skip to content

Comments

support page skipping when using vectorized Parquet reader#15211

Open
lurnagao-dahua wants to merge 2 commits intoapache:mainfrom
lurnagao-dahua:page-skipping-vectorized-read
Open

support page skipping when using vectorized Parquet reader#15211
lurnagao-dahua wants to merge 2 commits intoapache:mainfrom
lurnagao-dahua:page-skipping-vectorized-read

Conversation

@lurnagao-dahua
Copy link
Contributor

@lurnagao-dahua lurnagao-dahua commented Feb 1, 2026

Parquet Column Index is a new feature in Parquet 1.11 which allows very efficient filtering on page level (some benchmark numbers can be found here), especially when data is sorted. The feature is largely implemented in parquet-mr (via classes such as ColumnIndex and ColumnIndexFilter).

The implementation of this feature was discussed in 193.

The implementation of the vectorized case is based on the implementation in Spark's Parquet reader (see spark-32753), which is in Spark 3.2.
In addition, PositionVectorReader supports position deletion based on readOrderToRowGroupPosMap(ParquetReadState.java#L67).

I look forward to someone interested in reviewing this PR, and I welcome anyone willing to be a co-author with me to improve it together.

@lurnagao-dahua
Copy link
Contributor Author

lurnagao-dahua commented Feb 3, 2026

Hi team!
May I ask if you would be interested in reviewing it? @rdblue @nastra @RussellSpitzer @pvary @huaxingao

@manuzhang
Copy link
Member

@lurnagao-dahua Do you have any benchmark numbers?

@gaborkaszab
Copy link
Contributor

@wypoon I recall you also had a PR for this before. It didn't get merged then, would you mind sharing why, and maybe take a look at this one?

@manuzhang
Copy link
Member

I believe it's #10399 which @lurnagao-dahua has also commented.

@lurnagao-dahua
Copy link
Contributor Author

lurnagao-dahua commented Feb 5, 2026

Benchmark and benchmark result:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iceberg.spark.source.parquet.vectorized;

import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.source.IcebergSourceBenchmark;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.current_date;
import static org.apache.spark.sql.functions.date_add;
import static org.apache.spark.sql.functions.expr;

public class VectorizedPageSkippingParquetDataBenchmark extends IcebergSourceBenchmark {
  static final long NUM_ROWS = 10_000_000;

  @Setup
  public void setupBenchmark() {
    setupSpark();
    appendData();
  }

  @TearDown
  public void tearDownBenchmark() throws IOException {
    tearDownSpark();
    cleanupFiles();
  }

  @Override
  protected Configuration initHadoopConf() {
    return new Configuration();
  }

  @Override
  protected Table initTable() {
    // bigDecimalCol is big enough to be encoded as fix len binary (9 bytes),
    // decimalCol is small enough to be encoded as a 64-bit int
    Schema schema =
        new Schema(
            optional(1, "longCol", Types.LongType.get()),
            optional(2, "intCol", Types.IntegerType.get()),
            optional(3, "floatCol", Types.FloatType.get()),
            optional(4, "doubleCol", Types.DoubleType.get()),
            optional(5, "bigDecimalCol", Types.DecimalType.of(20, 5)),
            optional(6, "decimalCol", Types.DecimalType.of(18, 5)),
            optional(7, "dateCol", Types.DateType.get()),
            optional(8, "timestampCol", Types.TimestampType.withZone()),
            optional(9, "stringCol", Types.StringType.get()));
    PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
    HadoopTables tables = new HadoopTables(hadoopConf());
    Map<String, String> properties = parquetWriteProps();
    return tables.create(schema, partitionSpec, properties, newTableLocation());
  }

  Map<String, String> parquetWriteProps() {
    Map<String, String> properties = Maps.newHashMap();
    properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
    properties.put(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
    return properties;
  }

  void appendData() {
    Dataset<Row> df = 
        spark()
            .range(NUM_ROWS)
             .withColumn("longCol", col("id"))
             .drop("id")
             .withColumn("intCol", expr("CAST(longCol AS INT)"))
             .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
             .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
             .withColumn("bigDecimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
             .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(18, 5))"))
             .withColumn("dateCol", date_add(current_date(), 1))
             .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
             .withColumn("stringCol", expr("CAST(longCol AS STRING)"));
    appendAsFile(df);
  }

  @Benchmark
  @Threads(1)
  public void readLong10PercentBenchmark() {
    runBenchmark("longCol < " + (NUM_ROWS * 10 / 100));
  }

  @Benchmark
  @Threads(1)
  public void readLong50PercentBenchmark() {
    runBenchmark("longCol < " + (NUM_ROWS * 50 / 100));
  }

  @Benchmark
  @Threads(1)
  public void readLong90PercentBenchmark() {
    runBenchmark("longCol < " + (NUM_ROWS * 90 / 100));
  }

  @Benchmark
  @Threads(1)
  public void readString10PercentBenchmark() {
    // Approximately 10% of matching conditions
    runBenchmark("stringCol < '2'");
  }

  @Benchmark
  @Threads(1)
  public void readString50PercentBenchmark() {
    // Approximately 50% of matching conditions
    runBenchmark("stringCol < '5'");
  }

  @Benchmark
  @Threads(1)
  public void readString90PercentBenchmark() {
    // Approximately 90% of matching conditions
    runBenchmark("stringCol < '9'");
  }

  public void runBenchmark(String filter) {
    withTableProperties(
            tablePropsWithVectorizationEnabled(5000),
            () -> {
              String tableLocation = table().location();
              Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).select("*").filter(filter);
              materialize(df);
            });
  }


  private static Map<String, String> tablePropsWithVectorizationEnabled(int batchSize) {
    Map<String, String> tableProperties = Maps.newHashMap();
    tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true");
    tableProperties.put(TableProperties.PARQUET_BATCH_SIZE, String.valueOf(batchSize));
    return tableProperties;
  }
}
Benchmark                                                                Mode  Cnt  Score   Error  Units
VectorizedPageSkippingParquetDataBenchmark.readLong10PercentBenchmark      ss    5  2.429 ± 0.053   s/op
VectorizedPageSkippingParquetDataBenchmark.readLong50PercentBenchmark      ss    5  3.462 ± 0.035   s/op
VectorizedPageSkippingParquetDataBenchmark.readLong90PercentBenchmark      ss    5  4.471 ± 0.046   s/op
VectorizedPageSkippingParquetDataBenchmark.readString10PercentBenchmark    ss    5  2.551 ± 0.143   s/op
VectorizedPageSkippingParquetDataBenchmark.readString50PercentBenchmark    ss    5  3.307 ± 0.080   s/op
VectorizedPageSkippingParquetDataBenchmark.readString90PercentBenchmark    ss    5  4.682 ± 0.095   s/op

Benchmark(page skipping)                                                 Mode  Cnt  Score   Error  Units
VectorizedPageSkippingParquetDataBenchmark.readLong10PercentBenchmark      ss    5  0.620 ± 0.106   s/op
VectorizedPageSkippingParquetDataBenchmark.readLong50PercentBenchmark      ss    5  2.511 ± 0.145   s/op
VectorizedPageSkippingParquetDataBenchmark.readLong90PercentBenchmark      ss    5  4.405 ± 0.212   s/op
VectorizedPageSkippingParquetDataBenchmark.readString10PercentBenchmark    ss    5  0.717 ± 0.054   s/op
VectorizedPageSkippingParquetDataBenchmark.readString50PercentBenchmark    ss    5  2.463 ± 0.129   s/op
VectorizedPageSkippingParquetDataBenchmark.readString90PercentBenchmark    ss    5  4.729 ± 0.156   s/op

@lurnagao-dahua
Copy link
Contributor Author

lurnagao-dahua commented Feb 5, 2026

@lurnagao-dahua Do you have any benchmark numbers?

Hi, I have added a simple benchmark and the result indicate that it can improve performance, could you please review it when you have free time?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants