Skip to content

Commit b3b32d4

Browse files
committed
[test] added proper integration tests
1 parent 2afd695 commit b3b32d4

5 files changed

Lines changed: 287 additions & 98 deletions

File tree

datafusion/src/execution/context.rs

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,8 +1152,6 @@ mod tests {
11521152
use crate::physical_plan::functions::{make_scalar_function, Volatility};
11531153
use crate::physical_plan::{collect, collect_partitioned};
11541154
use crate::test;
1155-
use crate::test::object_store::TestObjectStore;
1156-
use crate::test_util::arrow_test_data;
11571155
use crate::variable::VarType;
11581156
use crate::{
11591157
assert_batches_eq, assert_batches_sorted_eq,
@@ -3292,51 +3290,6 @@ mod tests {
32923290
Ok(())
32933291
}
32943292

3295-
#[tokio::test]
3296-
async fn read_files_from_partitioned_path() -> Result<()> {
3297-
let mut ctx = ExecutionContext::new();
3298-
3299-
let testdata = arrow_test_data();
3300-
let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata);
3301-
let file_schema = aggr_test_schema();
3302-
let object_store = TestObjectStore::new_mirror(
3303-
csv_file_path,
3304-
&[
3305-
"mytable/date=2021-10-27/file.csv",
3306-
"mytable/date=2021-10-28/file.csv",
3307-
],
3308-
);
3309-
3310-
let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
3311-
options.table_partition_cols = vec!["date".to_owned()];
3312-
3313-
let table =
3314-
ListingTable::new(object_store, "mytable".to_owned(), file_schema, options);
3315-
3316-
ctx.register_table("t", Arc::new(table))?;
3317-
3318-
let result = ctx
3319-
.sql("SELECT c1, date FROM t WHERE date='2021-10-27' LIMIT 5")
3320-
.await?
3321-
.collect()
3322-
.await?;
3323-
3324-
let expected = vec![
3325-
"+----+------------+",
3326-
"| c1 | date |",
3327-
"+----+------------+",
3328-
"| a | 2021-10-27 |",
3329-
"| b | 2021-10-27 |",
3330-
"| b | 2021-10-27 |",
3331-
"| c | 2021-10-27 |",
3332-
"| d | 2021-10-27 |",
3333-
"+----+------------+",
3334-
];
3335-
assert_batches_sorted_eq!(expected, &result);
3336-
3337-
Ok(())
3338-
}
3339-
33403293
#[tokio::test]
33413294
async fn custom_query_planner() -> Result<()> {
33423295
let mut ctx = ExecutionContext::with_config(

datafusion/src/test/object_store.rs

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,51 +14,35 @@
1414
// KIND, either express or implied. See the License for the
1515
// specific language governing permissions and limitations
1616
// under the License.
17-
1817
//! Object store implem used for testing
1918
2019
use std::{
21-
fs, io,
20+
io,
2221
io::{Cursor, Read},
2322
sync::Arc,
2423
};
2524

26-
use crate::datasource::object_store::{
27-
local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream, ObjectReader,
28-
ObjectStore, SizedFile,
25+
use crate::{
26+
datasource::object_store::{
27+
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile,
28+
},
29+
error::{DataFusionError, Result},
2930
};
30-
use crate::error::{DataFusionError, Result};
31-
3231
use async_trait::async_trait;
3332
use futures::{stream, AsyncRead, StreamExt};
3433

3534
#[derive(Debug)]
3635
/// An object store implem that is useful for testing.
37-
/// Can either generate `ObjectReader`s that are filled with zero-
38-
/// bytes or mirror a given file to multiple path.
36+
/// `ObjectReader`s are filled with zero bytes.
3937
pub struct TestObjectStore {
4038
/// The `(path,size)` of the files that "exist" in the store
4139
files: Vec<(String, u64)>,
42-
/// The file that will be read at all path. If none fille the
43-
/// file with zero-bytes.
44-
mirrored_file: Option<String>,
4540
}
4641

4742
impl TestObjectStore {
4843
pub fn new_arc(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
4944
Arc::new(Self {
5045
files: files.iter().map(|f| (f.0.to_owned(), f.1)).collect(),
51-
mirrored_file: None,
52-
})
53-
}
54-
pub fn new_mirror(mirrored_file: String, paths: &[&str]) -> Arc<dyn ObjectStore> {
55-
let metadata = fs::metadata(&mirrored_file).expect("Local file metadata");
56-
Arc::new(Self {
57-
files: paths
58-
.iter()
59-
.map(|&f| (f.to_owned(), metadata.len()))
60-
.collect(),
61-
mirrored_file: Some(mirrored_file),
6246
})
6347
}
6448
}
@@ -96,15 +80,8 @@ impl ObjectStore for TestObjectStore {
9680

9781
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
9882
match self.files.iter().find(|item| file.path == item.0) {
99-
Some(&(_, size)) if size == file.size => {
100-
if let Some(mirrored_file) = &self.mirrored_file {
101-
Ok(LocalFileSystem {}.file_reader(SizedFile {
102-
path: mirrored_file.clone(),
103-
size,
104-
})?)
105-
} else {
106-
Ok(Arc::new(EmptyObjectReader(size)))
107-
}
83+
Some((_, size)) if *size == file.size => {
84+
Ok(Arc::new(EmptyObjectReader(*size)))
10885
}
10986
Some(_) => Err(DataFusionError::IoError(io::Error::new(
11087
io::ErrorKind::NotFound,

datafusion/tests/common.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! methods that are common to multiple integration test setups
19+
20+
use std::sync::Arc;
21+
22+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
23+
24+
pub fn aggr_test_schema() -> SchemaRef {
25+
Arc::new(Schema::new(vec![
26+
Field::new("c1", DataType::Utf8, false),
27+
Field::new("c2", DataType::UInt32, false),
28+
Field::new("c3", DataType::Int8, false),
29+
Field::new("c4", DataType::Int16, false),
30+
Field::new("c5", DataType::Int32, false),
31+
Field::new("c6", DataType::Int64, false),
32+
Field::new("c7", DataType::UInt8, false),
33+
Field::new("c8", DataType::UInt16, false),
34+
Field::new("c9", DataType::UInt32, false),
35+
Field::new("c10", DataType::UInt64, false),
36+
Field::new("c11", DataType::Float32, false),
37+
Field::new("c12", DataType::Float64, false),
38+
Field::new("c13", DataType::Utf8, false),
39+
]))
40+
}

0 commit comments

Comments
 (0)