From 5321e25df65fb776bbcfb0b1dbc1e31ea2ec36d3 Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Thu, 18 Jul 2024 14:50:09 +0100 Subject: [PATCH 1/8] feat!: support `newlines_in_values` CSV option This significantly simplifies the UX when dealing with large CSV files that must support newlines in (quoted) values. By default, large CSV files will be repartitioned into multiple parallel range scans. This is great for performance in the common case but when large CSVs contain newlines in values the parallel scan will fail due to splitting on newlines within quotes rather than actual line terminators. With the current implementation, this behaviour can be controlled by the session-level `datafusion.optimizer.repartition_file_scans` and `datafusion.optimizer.repartition_file_min_size` settings. This commit introduces a `newlines_in_values` option to `CsvOptions` and plumbs it through to `CsvExec`, which includes it in the test for whether parallel execution is supported. This provides a convenient and searchable way to disable file scan repartitioning on a per-CSV basis. BREAKING CHANGE: This adds new public fields to types with all public fields, which is a breaking change. --- datafusion/common/src/config.rs | 13 ++++++ .../core/src/datasource/file_format/csv.rs | 46 +++++++++++++++++++ .../src/datasource/file_format/options.rs | 12 +++++ .../core/src/datasource/physical_plan/csv.rs | 22 +++++++-- .../enforce_distribution.rs | 3 ++ .../physical_optimizer/projection_pushdown.rs | 3 ++ .../replace_with_order_preserving_variants.rs | 1 + datafusion/core/src/test/mod.rs | 3 ++ .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 21 +++++++++ .../proto-common/src/generated/prost.rs | 3 ++ datafusion/proto-common/src/to_proto/mod.rs | 3 ++ datafusion/proto/proto/datafusion.proto | 1 + .../src/generated/datafusion_proto_common.rs | 3 ++ datafusion/proto/src/generated/pbjson.rs | 18 ++++++++ datafusion/proto/src/generated/prost.rs | 2 + datafusion/proto/src/physical_plan/mod.rs | 2 + 18 files changed, 155 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b46b002baac02..1a301a786559c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -184,6 +184,10 @@ config_namespace! { /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE` /// if not specified explicitly in the statement. pub has_header: bool, default = false + + /// Default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` + // if not specified explicitly in the statement. + pub newlines_in_values: bool, default = false } } @@ -1593,6 +1597,7 @@ config_namespace! { pub quote: u8, default = b'"' pub escape: Option, default = None pub double_quote: Option, default = None + pub newlines_in_values: Option, default = None pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED pub schema_infer_max_rec: usize, default = 100 pub date_format: Option, default = None @@ -1665,6 +1670,14 @@ impl CsvOptions { self } + /// Set true to ensure that newlines in (quoted) values are supported. + /// Note that setting this may reduce performance as large file scans will not be repartitioned. + /// - default is None + pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self { + self.newlines_in_values = Some(newlines_in_values); + self + } + /// Set a `CompressionTypeVariant` of CSV /// - defaults to `CompressionTypeVariant::UNCOMPRESSED` pub fn with_file_compression_type( diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 5daa8447551b1..bf5f6abd98afc 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -233,6 +233,14 @@ impl CsvFormat { self } + /// Set true to ensure that newlines in (quoted) values are supported. + /// Note that setting this may reduce performance as large file scans will not be repartitioned. + /// - default is None + pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self { + self.options.newlines_in_values = Some(newlines_in_values); + self + } + /// Set a `FileCompressionType` of CSV /// - defaults to `FileCompressionType::UNCOMPRESSED` pub fn with_file_compression_type( @@ -330,6 +338,9 @@ impl FileFormat for CsvFormat { self.options.quote, self.options.escape, self.options.comment, + self.options + .newlines_in_values + .unwrap_or(state.config_options().catalog.newlines_in_values), self.options.compression.into(), ); Ok(Arc::new(exec)) @@ -1052,6 +1063,41 @@ mod tests { Ok(()) } + #[rstest(n_partitions, case(1), case(2), case(3), case(4))] + #[tokio::test] + async fn test_csv_parallel_newlines_in_values(n_partitions: usize) -> Result<()> { + let config = SessionConfig::new() + .with_repartition_file_scans(true) + .with_repartition_file_min_size(0) + .with_target_partitions(n_partitions); + let csv_options = CsvReadOptions::default() + .has_header(true) + .newlines_in_values(true); + let ctx = SessionContext::new_with_config(config); + let testdata = arrow_test_data(); + ctx.register_csv( + "aggr", + &format!("{testdata}/csv/aggregate_test_100.csv"), + csv_options, + ) + .await?; + + let query = "select sum(c3) from aggr;"; + let query_result = ctx.sql(query).await?.collect().await?; + let actual_partitions = count_query_csv_partitions(&ctx, query).await?; + + #[rustfmt::skip] + let expected = ["+--------------+", + "| sum(aggr.c3) |", + "+--------------+", + "| 781 |", + "+--------------+"]; + assert_batches_eq!(expected, &query_result); + assert_eq!(1, actual_partitions); // csv won't be scanned in parallel when newlines_in_values is set + + Ok(()) + } + /// Read a single empty csv file in parallel /// /// empty_0_byte.csv: diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index c6d143ed6749a..474e774cd042a 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -63,6 +63,10 @@ pub struct CsvReadOptions<'a> { pub escape: Option, /// If enabled, lines beginning with this byte are ignored. pub comment: Option, + /// Does the CSV file contain newlines in values? + /// + /// If enabled, parallel scanning will be disabled. + pub newlines_in_values: bool, /// An optional schema representing the CSV files. If None, CSV reader will try to infer it /// based on data in file. pub schema: Option<&'a Schema>, @@ -95,6 +99,7 @@ impl<'a> CsvReadOptions<'a> { delimiter: b',', quote: b'"', escape: None, + newlines_in_values: false, file_extension: DEFAULT_CSV_EXTENSION, table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, @@ -133,6 +138,12 @@ impl<'a> CsvReadOptions<'a> { self } + /// Specify whether to support newlines in values. + pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self { + self.newlines_in_values = newlines_in_values; + self + } + /// Specify the file extension for CSV file selection pub fn file_extension(mut self, file_extension: &'a str) -> Self { self.file_extension = file_extension; @@ -490,6 +501,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_delimiter(self.delimiter) .with_quote(self.quote) .with_escape(self.escape) + .with_newlines_in_values(self.newlines_in_values) .with_schema_infer_max_rec(self.schema_infer_max_records) .with_file_compression_type(self.file_compression_type.to_owned()); diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 327fbd976e877..cfd52ae3addfa 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -59,6 +59,7 @@ pub struct CsvExec { quote: u8, escape: Option, comment: Option, + newlines_in_values: bool, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Compression type of the file associated with CsvExec @@ -75,6 +76,7 @@ impl CsvExec { quote: u8, escape: Option, comment: Option, + newlines_in_values: bool, file_compression_type: FileCompressionType, ) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = @@ -91,6 +93,7 @@ impl CsvExec { delimiter, quote, escape, + newlines_in_values, metrics: ExecutionPlanMetricsSet::new(), file_compression_type, cache, @@ -126,6 +129,13 @@ impl CsvExec { self.escape } + /// Whether newlines are always supported in values. + /// + /// When set, this will disable file repartitioning. + pub fn newlines_in_values(&self) -> bool { + self.newlines_in_values + } + fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning { Partitioning::UnknownPartitioning(file_scan_config.file_groups.len()) } @@ -196,15 +206,15 @@ impl ExecutionPlan for CsvExec { /// Redistribute files across partitions according to their size /// See comments on [`FileGroupPartitioner`] for more detail. /// - /// Return `None` if can't get repartitioned(empty/compressed file). + /// Return `None` if can't get repartitioned (empty, compressed file, or `newlines_in_values` set). fn repartitioned( &self, target_partitions: usize, config: &ConfigOptions, ) -> Result>> { let repartition_file_min_size = config.optimizer.repartition_file_min_size; - // Parallel execution on compressed CSV file is not supported yet. - if self.file_compression_type.is_compressed() { + // Parallel execution on compressed CSV files or files that must support newlines in values is not supported yet. + if self.file_compression_type.is_compressed() || self.newlines_in_values { return Ok(None); } @@ -589,6 +599,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -658,6 +669,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -727,6 +739,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -793,6 +806,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(14, csv.base_config.file_schema.fields().len()); @@ -858,6 +872,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -953,6 +968,7 @@ mod tests { b'"', None, None, + false, file_compression_type.to_owned(), ); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index afed5dd375351..9791f23f963e0 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1472,6 +1472,7 @@ pub(crate) mod tests { b'"', None, None, + false, FileCompressionType::UNCOMPRESSED, )) } @@ -1496,6 +1497,7 @@ pub(crate) mod tests { b'"', None, None, + false, FileCompressionType::UNCOMPRESSED, )) } @@ -3770,6 +3772,7 @@ pub(crate) mod tests { b'"', None, None, + false, compression_type, )), vec![("a".to_string(), "a".to_string())], diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 84f898431762b..d0d0c985b8b64 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -186,6 +186,7 @@ fn try_swapping_with_csv( csv.quote(), csv.escape(), csv.comment(), + csv.newlines_in_values(), csv.file_compression_type, )) as _ }) @@ -1700,6 +1701,7 @@ mod tests { 0, None, None, + false, FileCompressionType::UNCOMPRESSED, )) } @@ -1723,6 +1725,7 @@ mod tests { 0, None, None, + false, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 013155b8400a6..6565e3e7d0d23 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1503,6 +1503,7 @@ mod tests { b'"', None, None, + false, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index e8550a79cb0e0..5cb1b6ea7017b 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -99,6 +99,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result for CsvOptions { quote: proto_opts.quote[0], escape: proto_opts.escape.first().copied(), double_quote: proto_opts.has_header.first().map(|h| *h != 0), + newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0), compression: proto_opts.compression().into(), schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize, date_format: (!proto_opts.date_format.is_empty()) diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index be3cc58b23dfe..4b34660ae2ef5 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1884,6 +1884,9 @@ impl serde::Serialize for CsvOptions { if !self.double_quote.is_empty() { len += 1; } + if !self.newlines_in_values.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1936,6 +1939,10 @@ impl serde::Serialize for CsvOptions { #[allow(clippy::needless_borrow)] struct_ser.serialize_field("doubleQuote", pbjson::private::base64::encode(&self.double_quote).as_str())?; } + if !self.newlines_in_values.is_empty() { + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("newlinesInValues", pbjson::private::base64::encode(&self.newlines_in_values).as_str())?; + } struct_ser.end() } } @@ -1969,6 +1976,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "comment", "double_quote", "doubleQuote", + "newlines_in_values", + "newlinesInValues", ]; #[allow(clippy::enum_variant_names)] @@ -1987,6 +1996,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { NullValue, Comment, DoubleQuote, + NewlinesInValues, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -2022,6 +2032,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "nullValue" | "null_value" => Ok(GeneratedField::NullValue), "comment" => Ok(GeneratedField::Comment), "doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote), + "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -2055,6 +2066,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut null_value__ = None; let mut comment__ = None; let mut double_quote__ = None; + let mut newlines_in_values__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -2155,6 +2167,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) ; } + GeneratedField::NewlinesInValues => { + if newlines_in_values__.is_some() { + return Err(serde::de::Error::duplicate_field("newlinesInValues")); + } + newlines_in_values__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -2172,6 +2192,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { null_value: null_value__.unwrap_or_default(), comment: comment__.unwrap_or_default(), double_quote: double_quote__.unwrap_or_default(), + newlines_in_values: newlines_in_values__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index b0674ff28d754..9a2770997f15e 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -633,6 +633,9 @@ pub struct CsvOptions { /// Indicates if quotes are doubled #[prost(bytes = "vec", tag = "14")] pub double_quote: ::prost::alloc::vec::Vec, + /// Indicates if newlines are supported in values + #[prost(bytes = "vec", tag = "15")] + pub newlines_in_values: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 705a479e01787..9dcb65444a470 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -900,6 +900,9 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { quote: vec![opts.quote], escape: opts.escape.map_or_else(Vec::new, |e| vec![e]), double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h as u8]), + newlines_in_values: opts + .newlines_in_values + .map_or_else(Vec::new, |h| vec![h as u8]), compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec as u64, date_format: opts.date_format.clone().unwrap_or_default(), diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index dc551778c5fb2..49d9f2dde67f0 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1007,6 +1007,7 @@ message CsvScanExecNode { oneof optional_comment { string comment = 6; } + bool newlines_in_values = 7; } message AvroScanExecNode { diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index b0674ff28d754..9a2770997f15e 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -633,6 +633,9 @@ pub struct CsvOptions { /// Indicates if quotes are doubled #[prost(bytes = "vec", tag = "14")] pub double_quote: ::prost::alloc::vec::Vec, + /// Indicates if newlines are supported in values + #[prost(bytes = "vec", tag = "15")] + pub newlines_in_values: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 8f77c24bd9117..25f6646d2a9af 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3605,6 +3605,9 @@ impl serde::Serialize for CsvScanExecNode { if !self.quote.is_empty() { len += 1; } + if self.newlines_in_values { + len += 1; + } if self.optional_escape.is_some() { len += 1; } @@ -3624,6 +3627,9 @@ impl serde::Serialize for CsvScanExecNode { if !self.quote.is_empty() { struct_ser.serialize_field("quote", &self.quote)?; } + if self.newlines_in_values { + struct_ser.serialize_field("newlinesInValues", &self.newlines_in_values)?; + } if let Some(v) = self.optional_escape.as_ref() { match v { csv_scan_exec_node::OptionalEscape::Escape(v) => { @@ -3654,6 +3660,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "hasHeader", "delimiter", "quote", + "newlines_in_values", + "newlinesInValues", "escape", "comment", ]; @@ -3664,6 +3672,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { HasHeader, Delimiter, Quote, + NewlinesInValues, Escape, Comment, } @@ -3691,6 +3700,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "hasHeader" | "has_header" => Ok(GeneratedField::HasHeader), "delimiter" => Ok(GeneratedField::Delimiter), "quote" => Ok(GeneratedField::Quote), + "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), "escape" => Ok(GeneratedField::Escape), "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -3716,6 +3726,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { let mut has_header__ = None; let mut delimiter__ = None; let mut quote__ = None; + let mut newlines_in_values__ = None; let mut optional_escape__ = None; let mut optional_comment__ = None; while let Some(k) = map_.next_key()? { @@ -3744,6 +3755,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { } quote__ = Some(map_.next_value()?); } + GeneratedField::NewlinesInValues => { + if newlines_in_values__.is_some() { + return Err(serde::de::Error::duplicate_field("newlinesInValues")); + } + newlines_in_values__ = Some(map_.next_value()?); + } GeneratedField::Escape => { if optional_escape__.is_some() { return Err(serde::de::Error::duplicate_field("escape")); @@ -3763,6 +3780,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { has_header: has_header__.unwrap_or_default(), delimiter: delimiter__.unwrap_or_default(), quote: quote__.unwrap_or_default(), + newlines_in_values: newlines_in_values__.unwrap_or_default(), optional_escape: optional_escape__, optional_comment: optional_comment__, }) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 605c56fa946a3..ba288fe3d1b87 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1542,6 +1542,8 @@ pub struct CsvScanExecNode { pub delimiter: ::prost::alloc::string::String, #[prost(string, tag = "4")] pub quote: ::prost::alloc::string::String, + #[prost(bool, tag = "7")] + pub newlines_in_values: bool, #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")] pub optional_escape: ::core::option::Option, #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")] diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 1220f42ded836..9e17c19ecbc5d 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -211,6 +211,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + scan.newlines_in_values, FileCompressionType::UNCOMPRESSED, ))), #[cfg(feature = "parquet")] @@ -1579,6 +1580,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + newlines_in_values: exec.newlines_in_values(), }, )), }); From e05ca0e0d01b5f41bfab7b9a13ab6c4299a2ea9e Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Fri, 19 Jul 2024 12:30:12 +0100 Subject: [PATCH 2/8] docs: normalise `newlines_in_values` documentation --- datafusion/common/src/config.rs | 27 +++++++++++++++---- .../core/src/datasource/file_format/csv.rs | 10 ++++--- .../src/datasource/file_format/options.rs | 16 ++++++++--- .../core/src/datasource/physical_plan/csv.rs | 8 ++++-- 4 files changed, 48 insertions(+), 13 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1a301a786559c..3cbe14cb558eb 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -185,8 +185,14 @@ config_namespace! { /// if not specified explicitly in the statement. pub has_header: bool, default = false - /// Default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` - // if not specified explicitly in the statement. + /// Specifies whether newlines in (quoted) CSV values are supported. + /// + /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` + /// if not specified explicitly in the statement. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. pub newlines_in_values: bool, default = false } } @@ -1597,6 +1603,13 @@ config_namespace! { pub quote: u8, default = b'"' pub escape: Option, default = None pub double_quote: Option, default = None + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub newlines_in_values: Option, default = None pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED pub schema_infer_max_rec: usize, default = 100 @@ -1670,9 +1683,13 @@ impl CsvOptions { self } - /// Set true to ensure that newlines in (quoted) values are supported. - /// Note that setting this may reduce performance as large file scans will not be repartitioned. - /// - default is None + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self { self.newlines_in_values = Some(newlines_in_values); self diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index bf5f6abd98afc..185f50883b2ce 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -233,9 +233,13 @@ impl CsvFormat { self } - /// Set true to ensure that newlines in (quoted) values are supported. - /// Note that setting this may reduce performance as large file scans will not be repartitioned. - /// - default is None + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self { self.options.newlines_in_values = Some(newlines_in_values); self diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 474e774cd042a..552977baba17b 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -63,9 +63,13 @@ pub struct CsvReadOptions<'a> { pub escape: Option, /// If enabled, lines beginning with this byte are ignored. pub comment: Option, - /// Does the CSV file contain newlines in values? + /// Specifies whether newlines in (quoted) values are supported. /// - /// If enabled, parallel scanning will be disabled. + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub newlines_in_values: bool, /// An optional schema representing the CSV files. If None, CSV reader will try to infer it /// based on data in file. @@ -138,7 +142,13 @@ impl<'a> CsvReadOptions<'a> { self } - /// Specify whether to support newlines in values. + /// Specifies whether newlines in (quoted) values are supported. + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self { self.newlines_in_values = newlines_in_values; self diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index cfd52ae3addfa..700aec379023a 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -129,9 +129,13 @@ impl CsvExec { self.escape } - /// Whether newlines are always supported in values. + /// Specifies whether newlines in (quoted) values are supported. /// - /// When set, this will disable file repartitioning. + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + /// + /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub fn newlines_in_values(&self) -> bool { self.newlines_in_values } From 9ca9065bf2f943d4f028883799592e9bac1bc907 Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Fri, 19 Jul 2024 12:30:57 +0100 Subject: [PATCH 3/8] test: add/fix sqllogictests for `newlines_in_values` --- .../core/tests/data/newlines_in_values.csv | 13 ++++++ .../sqllogictest/test_files/csv_files.slt | 42 +++++++++++++++++++ .../test_files/information_schema.slt | 2 + 3 files changed, 57 insertions(+) create mode 100644 datafusion/core/tests/data/newlines_in_values.csv diff --git a/datafusion/core/tests/data/newlines_in_values.csv b/datafusion/core/tests/data/newlines_in_values.csv new file mode 100644 index 0000000000000..de0cdb94a5d4a --- /dev/null +++ b/datafusion/core/tests/data/newlines_in_values.csv @@ -0,0 +1,13 @@ +id,message +1,"hello +world" +2,"something +else" +3," +many +lines +make +good test +" +4,unquoted +value,end diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index ca3bebe79f279..f7f5aa54dd0d5 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -293,3 +293,45 @@ id0 "value0" id1 "value1" id2 "value2" id3 "value3" + +# Handling of newlines in values + +statement ok +SET datafusion.optimizer.repartition_file_min_size = 1; + +statement ok +CREATE EXTERNAL TABLE stored_table_with_newlines_in_values_unsafe ( +col1 TEXT, +col2 TEXT +) STORED AS CSV +LOCATION '../core/tests/data/newlines_in_values.csv'; + +statement error incorrect number of fields +select * from stored_table_with_newlines_in_values_unsafe; + +statement ok +CREATE EXTERNAL TABLE stored_table_with_newlines_in_values_safe ( +col1 TEXT, +col2 TEXT +) STORED AS CSV +LOCATION '../core/tests/data/newlines_in_values.csv' +OPTIONS ('format.newlines_in_values' 'true'); + +query TT +select * from stored_table_with_newlines_in_values_safe; +---- +id message +1 +01)hello +02)world +2 +01)something +02)else +3 +01) +02)many +03)lines +04)make +05)good test +4 unquoted +value end diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index f7b755b019118..c8c0d1d45b974 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -168,6 +168,7 @@ datafusion.catalog.format NULL datafusion.catalog.has_header false datafusion.catalog.information_schema true datafusion.catalog.location NULL +datafusion.catalog.newlines_in_values false datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true @@ -252,6 +253,7 @@ datafusion.catalog.format NULL Type of `TableProvider` to use when loading `defa datafusion.catalog.has_header false Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. datafusion.catalog.information_schema true Should DataFusion provide access to `information_schema` virtual tables for displaying schema information datafusion.catalog.location NULL Location scanned to load tables for `default` schema +datafusion.catalog.newlines_in_values false Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting From 34dcdb0199125924cd2aeca7779b5d08d06ad357 Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Fri, 19 Jul 2024 12:38:30 +0100 Subject: [PATCH 4/8] docs: document `datafusion.catalog.newlines_in_values` --- docs/source/user-guide/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 8d3ecbc985447..4bf9f5d769d2e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -44,6 +44,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | | datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | | datafusion.catalog.has_header | false | Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. | +| datafusion.catalog.newline_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. | | datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | | datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | | datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | From 8c2d98d0345a9cfac8cd27fb8d8d5fe7315c099a Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Fri, 19 Jul 2024 22:55:27 +0100 Subject: [PATCH 5/8] fix: typo in config.md --- docs/source/user-guide/configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 4bf9f5d769d2e..5e5de016e375e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -44,7 +44,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | | datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | | datafusion.catalog.has_header | false | Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. | -| datafusion.catalog.newline_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. | +| datafusion.catalog.newlines_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. | | datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | | datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | | datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | From ed0075d416f6f1890dc529ed37718bc1ef7df033 Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Fri, 19 Jul 2024 23:06:42 +0100 Subject: [PATCH 6/8] chore: suppress lint on too many arguments for `CsvExec::new` --- datafusion/core/src/datasource/physical_plan/csv.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 700aec379023a..fb0e23c6c1648 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -69,6 +69,7 @@ pub struct CsvExec { impl CsvExec { /// Create a new CSV reader execution plan provided base and specific configurations + #[allow(clippy::too_many_arguments)] pub fn new( base_config: FileScanConfig, has_header: bool, From b9cc96b5036e027619b37562703494aa830d7afc Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Sat, 20 Jul 2024 18:21:08 +0100 Subject: [PATCH 7/8] fix: always checkout `*.slt` with LF line endings This is a bit of a stab in the dark, but it might fix multiline tests on Windows. --- .gitattributes | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitattributes b/.gitattributes index bcdeffc09a113..dc8dc101359f7 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,3 +1,4 @@ +*.slt text eol=lf .github/ export-ignore datafusion/proto/src/generated/prost.rs linguist-generated datafusion/proto/src/generated/pbjson.rs linguist-generated From 35198b6ce912743768f81f361a6f743e386ab306 Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Sat, 20 Jul 2024 20:19:46 +0100 Subject: [PATCH 8/8] fix: always checkout `newlines_in_values.csv` with `LF` line endings The default git behaviour of converting line endings for checked out files causes the `csv_files.slt` test to fail when testing `newlines_in_values`. This appears to be due to the quoted newlines being converted to CRLF, which are not then normalised when the CSV is read. Assuming that the sqllogictests do normalise line endings in the expected output, this could then lead to a "spurious" diff from the actual output. --- .gitattributes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitattributes b/.gitattributes index dc8dc101359f7..84b47a6fc56e1 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,4 +1,4 @@ -*.slt text eol=lf .github/ export-ignore +datafusion/core/tests/data/newlines_in_values.csv text eol=lf datafusion/proto/src/generated/prost.rs linguist-generated datafusion/proto/src/generated/pbjson.rs linguist-generated