Skip to content

Commit 8965b59

Browse files
Dandandanalamb
authored andcommitted
ARROW-11033 [Rust] Csv writing performance improvements
Some performance improvements for the csv writer * Use lexical core for numeric types * Allow setting batch size in convert (slightly faster reading when using higher batch size, could also be helpful in writing) * Avoid allocation of vec PR: `cargo run --release --bin tpch -- convert --input path --output ./output --format csv -s 20000` Orders / lineitems: ``` Conversion completed in 2050 ms Conversion completed in 16955 ms ``` Master `cargo run --release --bin tpch -- convert --input path --output ./output --format csv` ``` Conversion completed in 2336 ms Conversion completed in 19070 ms ``` Closes #9010 from Dandandan/csv_write_perf Lead-authored-by: Daniël Heres <danielheres@gmail.com> Co-authored-by: Heres, Daniel <danielheres@gmail.com> Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 2f68741 commit 8965b59

2 files changed

Lines changed: 38 additions & 13 deletions

File tree

rust/arrow/src/csv/writer.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,33 @@ use crate::array::*;
7373
use crate::datatypes::*;
7474
use crate::error::{ArrowError, Result};
7575
use crate::record_batch::RecordBatch;
76-
7776
const DEFAULT_DATE_FORMAT: &str = "%F";
7877
const DEFAULT_TIME_FORMAT: &str = "%T";
7978
const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f";
8079

80+
pub fn to_string<N: lexical_core::ToLexical>(n: N) -> String {
81+
let mut buf = Vec::<u8>::with_capacity(N::FORMATTED_SIZE_DECIMAL);
82+
unsafe {
83+
// JUSTIFICATION
84+
// Benefit
85+
// Allows using the faster serializer lexical core and convert to string
86+
// Soundness
87+
// Length of buf is set as written length afterwards. lexical_core
88+
// creates a valid string, so doesn't need to be checked.
89+
let slice = std::slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
90+
let len = lexical_core::write(n, slice).len();
91+
buf.set_len(len);
92+
String::from_utf8_unchecked(buf)
93+
}
94+
}
95+
8196
fn write_primitive_value<T>(array: &ArrayRef, i: usize) -> String
8297
where
8398
T: ArrowNumericType,
84-
T::Native: std::string::ToString,
99+
T::Native: lexical_core::ToLexical,
85100
{
86101
let c = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
87-
c.value(i).to_string()
102+
to_string(c.value(i))
88103
}
89104

90105
/// A CSV writer
@@ -124,14 +139,18 @@ impl<W: Write> Writer<W> {
124139
}
125140

126141
/// Convert a record to a string vector
127-
fn convert(&self, batch: &RecordBatch, row_index: usize) -> Result<Vec<String>> {
142+
fn convert(
143+
&self,
144+
batch: &RecordBatch,
145+
row_index: usize,
146+
buffer: &mut [String],
147+
) -> Result<()> {
128148
// TODO: it'd be more efficient if we could create `record: Vec<&[u8]>
129-
let mut record: Vec<String> = Vec::with_capacity(batch.num_columns());
130-
for col_index in 0..batch.num_columns() {
149+
for (col_index, item) in buffer.iter_mut().enumerate() {
131150
let col = batch.column(col_index);
132151
if col.is_null(row_index) {
133152
// write an empty value
134-
record.push(String::from(""));
153+
*item = "".to_string();
135154
continue;
136155
}
137156
let string = match col.data_type() {
@@ -243,10 +262,9 @@ impl<W: Write> Writer<W> {
243262
)));
244263
}
245264
};
246-
247-
record.push(string);
265+
*item = string;
248266
}
249-
Ok(record)
267+
Ok(())
250268
}
251269

252270
/// Write a vector of record batches to a writable object
@@ -265,9 +283,11 @@ impl<W: Write> Writer<W> {
265283
self.beginning = false;
266284
}
267285

286+
let mut buffer = vec!["".to_string(); batch.num_columns()];
287+
268288
for row_index in 0..batch.num_rows() {
269-
let record = self.convert(batch, row_index)?;
270-
self.writer.write_record(&record[..])?;
289+
self.convert(batch, row_index, &mut buffer)?;
290+
self.writer.write_record(&buffer)?;
271291
}
272292
self.writer.flush()?;
273293

rust/benchmarks/src/bin/tpch.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ struct ConvertOpt {
8989
/// Number of partitions to produce
9090
#[structopt(short = "p", long = "partitions", default_value = "1")]
9191
partitions: usize,
92+
93+
/// Batch size when reading CSV or Parquet files
94+
#[structopt(short = "s", long = "batch-size", default_value = "4096")]
95+
batch_size: usize,
9296
}
9397

9498
#[derive(Debug, StructOpt)]
@@ -1019,7 +1023,8 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
10191023
.delimiter(b'|')
10201024
.file_extension(".tbl");
10211025

1022-
let mut ctx = ExecutionContext::new();
1026+
let config = ExecutionConfig::new().with_batch_size(opt.batch_size);
1027+
let mut ctx = ExecutionContext::with_config(config);
10231028

10241029
// build plan to read the TBL file
10251030
let mut csv = ctx.read_csv(&input_path, options)?;

0 commit comments

Comments
 (0)