Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions rust/parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@ pub fn parquet_to_arrow_schema(
key_value_metadata: &Option<Vec<KeyValue>>,
) -> Result<Schema> {
let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
let arrow_schema_metadata = metadata
let maybe_schema = metadata
.remove(super::ARROW_SCHEMA_META_KEY)
.map(|encoded| get_arrow_schema_from_metadata(&encoded));

let arrow_schema_metadata = match maybe_schema {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be an idiom for this in rust (that I do not know).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgecarleitao thank you for the tip!

The codes were updated to use map_or for flipping Option<Result> to Result<Option>.
BTW, I found some similar usage of map_or in parquet/src/column/writer.rs 🥇

Some(v) => Some(v?),
_ => None,
};

match arrow_schema_metadata {
Some(Some(schema)) => Ok(schema),
Some(schema) => Ok(schema),
_ => parquet_to_arrow_schema_by_columns(
parquet_schema,
0..parquet_schema.columns().len(),
Expand Down Expand Up @@ -120,10 +125,14 @@ where
T: IntoIterator<Item = usize>,
{
let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
let arrow_schema_metadata = metadata
let maybe_schema = metadata
.remove(super::ARROW_SCHEMA_META_KEY)
.map(|encoded| get_arrow_schema_from_metadata(&encoded))
.unwrap_or_default();
.map(|encoded| get_arrow_schema_from_metadata(&encoded));

let arrow_schema_metadata = match maybe_schema {
Some(v) => Some(v?),
_ => None,
};

// add the Arrow metadata to the Parquet metadata
if let Some(arrow_schema) = &arrow_schema_metadata {
Expand Down Expand Up @@ -175,7 +184,7 @@ where
}

/// Try to convert Arrow schema metadata into a schema
fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option<Schema> {
fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
let decoded = base64::decode(encoded_meta);
match decoded {
Ok(bytes) => {
Expand All @@ -187,28 +196,25 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option<Schema> {
match arrow::ipc::root_as_message(slice) {
Ok(message) => message
.header_as_schema()
.map(arrow::ipc::convert::fb_to_schema),
.map(arrow::ipc::convert::fb_to_schema)
.ok_or(ArrowError("the message is not Arrow Schema".to_string())),
Err(err) => {
// The flatbuffers implementation returns an error on verification error.
// TODO: return error to caller?
eprintln!(
Err(ArrowError(format!(
"Unable to get root as message stored in {}: {:?}",
super::ARROW_SCHEMA_META_KEY,
err
);
None
)))
}
}
}
Err(err) => {
// The C++ implementation returns an error if the schema can't be parsed.
// To prevent this, we explicitly log this, then compute the schema without the metadata
eprintln!(
Err(ArrowError(format!(
"Unable to decode the encoded schema stored in {}, {:?}",
super::ARROW_SCHEMA_META_KEY,
err
);
None
)))
}
}
}
Expand Down