@@ -91,7 +91,7 @@ impl FileOpener for ParquetOpener {
9191
9292 let metadata_size_hint = file_meta. metadata_size_hint . or ( self . metadata_size_hint ) ;
9393
94- let mut reader : Box < dyn AsyncFileReader > =
94+ let mut async_file_reader : Box < dyn AsyncFileReader > =
9595 self . parquet_file_reader_factory . create_reader (
9696 self . partition_index ,
9797 file_meta,
@@ -121,23 +121,40 @@ impl FileOpener for ParquetOpener {
121121 let enable_page_index = self . enable_page_index ;
122122
123123 Ok ( Box :: pin ( async move {
124- // Don't load the page index yet - we will decide later if we need it
125- let options = ArrowReaderOptions :: new ( ) . with_page_index ( false ) ;
126-
124+ // Don't load the page index yet. Since it is not stored inline in
125+ // the footer, loading the page index if it is not needed will do
126+ // unecessary I/O. We decide later if it is needed to evaluate the
127+ // pruning predicates. Thus default to not requesting if from the
128+ // underlying reader.
129+ let mut options = ArrowReaderOptions :: new ( ) . with_page_index ( false ) ;
127130 let mut metadata_timer = file_metrics. metadata_load_time . timer ( ) ;
128- let mut metadata =
129- ArrowReaderMetadata :: load_async ( & mut reader, options. clone ( ) ) . await ?;
131+
132+ // Begin by loading the metadata from the underlying reader (note
133+ // the returned metadata may actually include page indexes as some
134+ // readers may return page indexes even when not requested -- for
135+ // example when they are cached)
136+ let mut reader_metadata =
137+ ArrowReaderMetadata :: load_async ( & mut async_file_reader, options. clone ( ) )
138+ . await ?;
139+
130140 // Note about schemas: we are actually dealing with **3 different schemas** here:
131141 // - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc.
132142 // - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to.
133143 // - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
134- let mut physical_file_schema = Arc :: clone ( metadata . schema ( ) ) ;
144+ let mut physical_file_schema = Arc :: clone ( reader_metadata . schema ( ) ) ;
135145
136- // read with view types
146+ // The schema loaded from the file may not be the same as the
147+ // desired schema (for example if we want to instruct the parquet
148+ // reader to read strings using Utf8View instead). Update if necessary
137149 if let Some ( merged) =
138150 apply_file_schema_type_coercions ( & table_schema, & physical_file_schema)
139151 {
140152 physical_file_schema = Arc :: new ( merged) ;
153+ options = options. with_schema ( Arc :: clone ( & physical_file_schema) ) ;
154+ reader_metadata = ArrowReaderMetadata :: try_new (
155+ Arc :: clone ( reader_metadata. metadata ( ) ) ,
156+ options. clone ( ) ,
157+ ) ?;
141158 }
142159
143160 // Build predicates for this specific file
@@ -147,23 +164,25 @@ impl FileOpener for ParquetOpener {
147164 & predicate_creation_errors,
148165 ) ;
149166
150- // Now check if we should load the page index
167+ // The page index is not stored inline in the parquet footer so the
168+ // code above may not have raed the page index structures yet. If we
169+ // need them for reading and they aren't yet loaded, we need to load them now.
151170 if should_enable_page_index ( enable_page_index, & page_pruning_predicate) {
152- metadata = load_page_index (
153- metadata ,
154- & mut reader ,
171+ reader_metadata = load_page_index (
172+ reader_metadata ,
173+ & mut async_file_reader ,
155174 // Since we're manually loading the page index the option here should not matter but we pass it in for consistency
156- ArrowReaderOptions :: new ( )
157- . with_page_index ( true )
158- . with_schema ( Arc :: clone ( & physical_file_schema) ) ,
175+ options. with_page_index ( true ) ,
159176 )
160177 . await ?;
161178 }
162179
163180 metadata_timer. stop ( ) ;
164181
165- let mut builder =
166- ParquetRecordBatchStreamBuilder :: new_with_metadata ( reader, metadata) ;
182+ let mut builder = ParquetRecordBatchStreamBuilder :: new_with_metadata (
183+ async_file_reader,
184+ reader_metadata,
185+ ) ;
167186
168187 let ( schema_mapping, adapted_projections) =
169188 schema_adapter. map_schema ( & physical_file_schema) ?;
@@ -372,12 +391,14 @@ fn build_pruning_predicates(
372391 ( pruning_predicate, Some ( page_pruning_predicate) )
373392}
374393
394+ /// Returns a `ArrowReaderMetadata` with the page index loaded, loading
395+ /// it from the underlying `AsyncFileReader` if necessary.
375396async fn load_page_index < T : AsyncFileReader > (
376- arrow_reader : ArrowReaderMetadata ,
397+ reader_metadata : ArrowReaderMetadata ,
377398 input : & mut T ,
378399 options : ArrowReaderOptions ,
379400) -> Result < ArrowReaderMetadata > {
380- let parquet_metadata = arrow_reader . metadata ( ) ;
401+ let parquet_metadata = reader_metadata . metadata ( ) ;
381402 let missing_column_index = parquet_metadata. column_index ( ) . is_none ( ) ;
382403 let missing_offset_index = parquet_metadata. offset_index ( ) . is_none ( ) ;
383404 // You may ask yourself: why are we even checking if the page index is already loaded here?
@@ -397,6 +418,6 @@ async fn load_page_index<T: AsyncFileReader>(
397418 Ok ( new_arrow_reader)
398419 } else {
399420 // No need to load the page index again, just return the existing metadata
400- Ok ( arrow_reader )
421+ Ok ( reader_metadata )
401422 }
402423}
0 commit comments