General virtual columns support + row numbers as a first use-case#8715
General virtual columns support + row numbers as a first use-case#8715alamb merged 64 commits intoapache:mainfrom
Conversation
Co-authored-by: scovich <scovich@users.noreply.github.com>
…eature tests pass
| } | ||
|
|
||
| fn skip_records(&mut self, num_records: usize) -> Result<usize> { | ||
| // TODO: Use advance_by when it stabilizes to improve performance |
There was a problem hiding this comment.
TODO from original PR
| } | ||
|
|
||
| fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> { | ||
| Box::new(std::iter::once(self.metadata.row_group(self.row_group_idx))) |
There was a problem hiding this comment.
this duplicates a lot, not sure if anything can be done here
parquet/src/arrow/schema/complex.rs
Outdated
| /// - If nullable: def_level = parent_def_level + 1 | ||
| /// - If required: def_level = parent_def_level | ||
| /// - rep_level = parent_rep_level (virtual fields are not repeated) | ||
| fn convert_virtual_field( |
There was a problem hiding this comment.
the name used here is not aligned with what other convert_ functions do
…hen metadata parsing may skip row groups
…ef/arrow-rs into feature/parquet-virtual-row-numbers
| /// # Ok(()) | ||
| /// # } | ||
| /// ``` | ||
| pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Self { |
There was a problem hiding this comment.
@vustef I think this is where we'd be able to detect row group filtering. There would be a with_row_group_selection() or some such function added to control skipping, and a check could be added both here and in the new function to disallow setting both.
There was a problem hiding this comment.
Thanks for figuring this out. I guess there's no action that we can take right now then, please let me know if it's otherwise.
|
Also thanks to @jkylling whose started this project |
|
Once the Ci is green I'll merge this PR. Thank you @vustef |
|
gogoogogogogo!!! |
|
The 57.1.0 patch release may be the most epic minor release we have ever had |
scovich
left a comment
There was a problem hiding this comment.
Post-merge drive by review
| // Sort ranges by ordinal to maintain original row group order | ||
| ranges.sort_by_key(|(ordinal, _)| *ordinal); |
There was a problem hiding this comment.
I don't understand this part? The row groups were supplied in some particular order (by the row_groups iterator), and we're reordering by row group ordinal instead? Wouldn't that cause row number mismatches with other columns that continue reading in the original order? It seems like we actually need:
let selected_ordinals = HashMap<i16, usize> = row_groups
.enumerate()
.map(...)
.collect::<Result<_>>()?;and then ranges needs to use that enumeration ordinal (not the row group ordinal):
if let Some(i) = selected_ordinals.get(&ordinal) {
ranges.push((i, ...);
}... so that the sorted ranges match the original row_group iterator's order?
There was a problem hiding this comment.
I messed this up when I started computing first row indexes...thank you for catching this. WIll follow up shortly with tests and the fix.
| .extension_type_name() | ||
| .map(|name| name.starts_with(VIRTUAL_PREFIX!())) | ||
| .unwrap_or(false) | ||
| } |
There was a problem hiding this comment.
| } | |
| .map_or(false, |name| name.starts_with(VIRTUAL_PREFIX!())) |
| if !is_virtual_column(field) { | ||
| panic!( |
There was a problem hiding this comment.
not a fan of panics, but if we're going to panic why not just
assert!(
is_virtual_column(field),
"...",
field.name()
);There was a problem hiding this comment.
Me neither, but seemed like a unwritten rule that all these with_ methods in ArrowReaderOptions return Self rather than a Result. Please comment in the new PR if I should change that behaviour.
@alamb also checking for your opinion on this.
There was a problem hiding this comment.
Yes, the builder-like with_ functions don't give much opportunity for validity checking. We could probably use a proper ArrowReaderOptionsBuilder and do that kind of checking in build(). Or just change this one to a setter (so it's obvious it can't be chained) and return a Result<()>.
There was a problem hiding this comment.
I think it is ok to return errors when trying to build options, like
let options = options.with_virtual_columns(cols)?;If there is an error it is unlikely the code wants to continue configuring the options anyways
# Which issue does this PR close? Closes #8864. # Rationale for this change #8715 introduced row numbers feature last week. However, it had a bug, which luckily for us @scovich pointed out soon after the merge. The issue is that the row numbers are produced in ordinal-based order of the row groups, instead of user-requested order of row groups. The former is wrong, and is being fixed here by switching to user-requested order. # What changes are included in this PR? Just fixing the bug as explained above, and adding test. Also addressing two small comments from post-merge review: #8715 (review) # Are these changes tested? Yes. # Are there any user-facing changes? No, this wasn't released yet. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
…ic on invalid input (#8867) # Which issue does this PR close? - Follow on to #8715 - related to #8863 # Rationale for this change per #8715 (comment), @scovich rightly says > not a fan of panics It is much better for a user facing API to return an error on invalid input than painc # What changes are included in this PR? 1. Make `ArrowReaderOptions::with_virtual_columns` error rather than panic on invalid input 2. Update tests to match # Are these changes tested? Yes by CI # Are there any user-facing changes? While this is an API change it was introduced in #8715 which has not yet been released. Therefore, we can make this change without breaking semver.
…rquet It would be useful to expose the virtual columns of the arrow Parquet reader in the datasource-parquet `ParquetSource` added in apache/arrow-rs#8715. Then engines can use both DataFusion's partition value machinery and the virtual columns. I made a go at it in this PR, but hit some rough edges. This is closer to an issue than a PR, but it is easier to explain with code. The virtual columns we added are a bit difficult to integrate cleanly today. They are part of the physical schema of the Parquet reader, but cannot currently be projected. We need some additional handling to avoid predicate pushdown for virtual columns, to build the correct projection mask, and to build the correct stream schema. See the changes to `opener.rs` in this PR. One alternative would be to modify the arrow-rs implementation to remove these workarounds. Then the only change to `opener.rs` would be `.with_virtual_columns(virtual_columns.to_vec())?` (and maybe even that could be avoided? See the discussion below). What would be the best way forward here? It is redundant that the user needs to specify both `Field::new("row_index", DataType::Int64, false).with_extension_type(RowNumber)`, and add the column in a special way to the reader options with `.with_virtual_columns(virtual_columns.to_vec())?`. When the extension type `RowNumber` is added, we know that it is a virtual column. All users of the `TableSchema/ParquetSource` must know that a schema is built out of three parts: the physical Parquet columns, the virtual columns and the partition columns. From a user perspective, the user would just like to supply a schema. One alternative is to only indicate the column kind using extension types, and the user only supplies a schema. That is, there would be an extension type indicating that a column is a partition column or virtual column, instead of the user supplying this information piecemeal. This may have a performance impact, as we would likely need to extract different extension type columns during planning, which could be problematic for large schemas. Signed-off-by: Jonas Irgens Kylling <jkylling@gmail.com>
…rquet It would be useful to expose the virtual columns of the arrow Parquet reader in the datasource-parquet `ParquetSource` added in apache/arrow-rs#8715. Then engines can use both DataFusion's partition value machinery and the virtual columns. I made a go at it in this PR, but hit some rough edges. This is closer to an issue than a PR, but it is easier to explain with code. The virtual columns we added are a bit difficult to integrate cleanly today. They are part of the physical schema of the Parquet reader, but cannot currently be projected. We need some additional handling to avoid predicate pushdown for virtual columns, to build the correct projection mask, and to build the correct stream schema. See the changes to `opener.rs` in this PR. One alternative would be to modify the arrow-rs implementation to remove these workarounds. Then the only change to `opener.rs` would be `.with_virtual_columns(virtual_columns.to_vec())?` (and maybe even that could be avoided? See the discussion below). What would be the best way forward here? It is redundant that the user needs to specify both `Field::new("row_index", DataType::Int64, false).with_extension_type(RowNumber)`, and add the column in a special way to the reader options with `.with_virtual_columns(virtual_columns.to_vec())?`. When the extension type `RowNumber` is added, we know that it is a virtual column. All users of the `TableSchema/ParquetSource` must know that a schema is built out of three parts: the physical Parquet columns, the virtual columns and the partition columns. From a user perspective, the user would just like to supply a schema. One alternative is to only indicate the column kind using extension types, and the user only supplies a schema. That is, there would be an extension type indicating that a column is a partition column or virtual column, instead of the user supplying this information piecemeal. This may have a performance impact, as we would likely need to extract different extension type columns during planning, which could be problematic for large schemas. Signed-off-by: Jonas Irgens Kylling <jkylling@gmail.com>
Upstream has a solution to this in arrow v57 as [apache#8715](apache#8715)
Upstream has a solution to this in arrow v57 as [apache#8715](apache#8715)
Upstream has a solution to this in arrow v57 as [apache#8715](apache#8715)
Upstream has a solution to this in arrow v57 as [apache#8715](apache#8715)
Based on #7307.
Which issue does this PR close?
Rationale for this change
We need row numbers for many of the downstream features, e.g. computing unique row identifier in iceberg.
What changes are included in this PR?
New API to get row numbers as a virtual column:
This column is defined as an extension type.
Parquet metadata is propagated to the array builder to compute first row indexes.
New Virtual column is included in addition to Primitive and Group.
Are these changes tested?
Yes
Are there any user-facing changes?
This is user facing feature, and has added docstrings.
No breaking changes, at least I tried not to, by creating a duplicate of public method to add more parameters.