Skip to content

Commit b8a3a78

Browse files
authored
Compressed CSV/JSON support (#3642)
* Compression text support * Fix the path joining issue on Windows test * Debug code for Windows CI * Utilize `std::path::Path`, instead of `url::Url`
1 parent 58afdf7 commit b8a3a78

18 files changed

Lines changed: 976 additions & 177 deletions

File tree

datafusion-examples/examples/parquet_sql_multiple_files.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion::datasource::file_format::parquet::{
19-
ParquetFormat, DEFAULT_PARQUET_EXTENSION,
20-
};
18+
use datafusion::datasource::file_format::file_type::{FileType, GetExt};
19+
use datafusion::datasource::file_format::parquet::ParquetFormat;
2120
use datafusion::datasource::listing::ListingOptions;
2221
use datafusion::error::Result;
2322
use datafusion::prelude::*;
@@ -35,7 +34,7 @@ async fn main() -> Result<()> {
3534
// Configure listing options
3635
let file_format = ParquetFormat::default().with_enable_pruning(true);
3736
let listing_options = ListingOptions {
38-
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
37+
file_extension: FileType::PARQUET.get_ext(),
3938
format: Arc::new(file_format),
4039
table_partition_cols: vec![],
4140
collect_stat: true,

datafusion/common/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,12 @@ impl Display for DataFusionError {
318318

319319
impl error::Error for DataFusionError {}
320320

321+
impl From<DataFusionError> for io::Error {
322+
fn from(e: DataFusionError) -> Self {
323+
io::Error::new(io::ErrorKind::Other, e)
324+
}
325+
}
326+
321327
#[cfg(test)]
322328
mod test {
323329
use crate::error::DataFusionError;

datafusion/core/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
5757
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
5858
apache-avro = { version = "0.14", optional = true }
5959
arrow = { version = "24.0.0", features = ["prettyprint"] }
60+
async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "futures-io", "tokio"] }
6061
async-trait = "0.1.41"
6162
bytes = "1.1"
63+
bzip2 = "0.4.3"
6264
chrono = { version = "0.4", default-features = false }
6365
datafusion-common = { path = "../common", version = "13.0.0", features = ["parquet", "object_store"] }
6466
datafusion-expr = { path = "../expr", version = "13.0.0" }
@@ -67,6 +69,7 @@ datafusion-optimizer = { path = "../optimizer", version = "13.0.0" }
6769
datafusion-physical-expr = { path = "../physical-expr", version = "13.0.0" }
6870
datafusion-row = { path = "../row", version = "13.0.0" }
6971
datafusion-sql = { path = "../sql", version = "13.0.0" }
72+
flate2 = "1.0.24"
7073
futures = "0.3"
7174
glob = "0.3.0"
7275
hashbrown = { version = "0.12", features = ["raw"] }
@@ -90,6 +93,7 @@ sqlparser = "0.25"
9093
tempfile = "3"
9194
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
9295
tokio-stream = "0.1"
96+
tokio-util = { version = "0.7.4", features = ["io"] }
9397
url = "2.2"
9498
uuid = { version = "1.0", features = ["v4"] }
9599

@@ -102,6 +106,7 @@ ctor = "0.1.22"
102106
doc-comment = "0.3"
103107
env_logger = "0.9"
104108
fuzz-utils = { path = "fuzz-utils" }
109+
rstest = "0.15.0"
105110

106111
[[bench]]
107112
harness = false

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@
1818
//! CSV format abstractions
1919
2020
use std::any::Any;
21+
2122
use std::sync::Arc;
2223

2324
use arrow::datatypes::Schema;
2425
use arrow::{self, datatypes::SchemaRef};
2526
use async_trait::async_trait;
27+
use bytes::Buf;
28+
2629
use datafusion_common::DataFusionError;
30+
2731
use futures::TryFutureExt;
2832
use object_store::{ObjectMeta, ObjectStore};
2933

3034
use super::FileFormat;
35+
use crate::datasource::file_format::file_type::FileCompressionType;
3136
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
3237
use crate::error::Result;
3338
use crate::logical_plan::Expr;
@@ -43,6 +48,7 @@ pub struct CsvFormat {
4348
has_header: bool,
4449
delimiter: u8,
4550
schema_infer_max_rec: Option<usize>,
51+
file_compression_type: FileCompressionType,
4652
}
4753

4854
impl Default for CsvFormat {
@@ -51,6 +57,7 @@ impl Default for CsvFormat {
5157
schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
5258
has_header: true,
5359
delimiter: b',',
60+
file_compression_type: FileCompressionType::UNCOMPRESSED,
5461
}
5562
}
5663
}
@@ -82,6 +89,16 @@ impl CsvFormat {
8289
self
8390
}
8491

92+
/// Set a `FileCompressionType` of CSV
93+
/// - defaults to `FileCompressionType::UNCOMPRESSED`
94+
pub fn with_file_compression_type(
95+
mut self,
96+
file_compression_type: FileCompressionType,
97+
) -> Self {
98+
self.file_compression_type = file_compression_type;
99+
self
100+
}
101+
85102
/// The delimiter character.
86103
pub fn delimiter(&self) -> u8 {
87104
self.delimiter
@@ -110,8 +127,9 @@ impl FileFormat for CsvFormat {
110127
.await
111128
.map_err(|e| DataFusionError::External(Box::new(e)))?;
112129

130+
let decoder = self.file_compression_type.convert_read(data.reader());
113131
let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
114-
&mut data.as_ref(),
132+
decoder,
115133
self.delimiter,
116134
Some(records_to_read),
117135
self.has_header,
@@ -144,7 +162,12 @@ impl FileFormat for CsvFormat {
144162
conf: FileScanConfig,
145163
_filters: &[Expr],
146164
) -> Result<Arc<dyn ExecutionPlan>> {
147-
let exec = CsvExec::new(conf, self.has_header, self.delimiter);
165+
let exec = CsvExec::new(
166+
conf,
167+
self.has_header,
168+
self.delimiter,
169+
self.file_compression_type.to_owned(),
170+
);
148171
Ok(Arc::new(exec))
149172
}
150173
}

0 commit comments

Comments
 (0)