Skip to content

Commit ecc89f9

Browse files
committed
a lot of comments
1 parent 054b415 commit ecc89f9

8 files changed

Lines changed: 743 additions & 430 deletions

File tree

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 375 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,31 @@
2626
//! select * from data limit 10;
2727
//! ```
2828
29+
use std::fs::{self, File};
2930
use std::path::Path;
31+
use std::sync::Arc;
3032

33+
use arrow::array::{Int64Array, StringArray, StructArray};
3134
use arrow::compute::concat_batches;
3235
use arrow::record_batch::RecordBatch;
36+
use arrow::util::pretty::pretty_format_batches;
37+
use arrow_schema::{DataType, Field, Fields, Schema};
38+
use datafusion::assert_batches_eq;
39+
use datafusion::config::TableParquetOptions;
40+
use datafusion::datasource::listing::ListingOptions;
3341
use datafusion::physical_plan::collect;
3442
use datafusion::physical_plan::metrics::MetricsSet;
35-
use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext};
43+
use datafusion::prelude::{
44+
col, lit, lit_timestamp_nano, Expr, SessionConfig, SessionContext,
45+
};
3646
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
3747
use datafusion_common::instant::Instant;
48+
use datafusion_common::{assert_contains, Result};
49+
use datafusion_datasource_parquet::ParquetFormat;
3850
use datafusion_expr::utils::{conjunction, disjunction, split_conjunction};
3951

4052
use itertools::Itertools;
53+
use parquet::arrow::ArrowWriter;
4154
use parquet::file::properties::WriterProperties;
4255
use tempfile::TempDir;
4356
use test_utils::AccessLogGenerator;
@@ -597,3 +610,364 @@ fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
597610
}
598611
}
599612
}
613+
614+
struct DynamicFilterTestCase {
615+
query: String,
616+
path: String,
617+
}
618+
619+
impl DynamicFilterTestCase {
620+
fn new(query: String, path: String) -> Self {
621+
Self { query, path }
622+
}
623+
624+
async fn run_query(&self, query: &str) -> Vec<RecordBatch> {
625+
// Force 1 partition and 1 rg per partition because if we widen the plan
626+
// and read all batches at once we won't get any dynamic pushdown.
627+
let mut cfg = SessionConfig::new();
628+
cfg = cfg.set_u64("datafusion.execution.parquet.max_row_group_size", 1);
629+
let ctx = SessionContext::new_with_config(cfg);
630+
631+
let mut pq_options = TableParquetOptions::default();
632+
pq_options.global.max_row_group_size = 1;
633+
pq_options.global.pushdown_filters = true;
634+
let fmt = ParquetFormat::default().with_options(pq_options);
635+
let opt = ListingOptions::new(Arc::new(fmt)).with_target_partitions(1);
636+
ctx.register_listing_table("base_table", &self.path, opt, None, None)
637+
.await
638+
.unwrap();
639+
640+
ctx.sql(query).await.unwrap().collect().await.unwrap()
641+
}
642+
643+
async fn results(&self) -> Vec<RecordBatch> {
644+
self.run_query(&self.query).await
645+
}
646+
647+
async fn explain_plan(&self) -> String {
648+
let query = format!("EXPLAIN ANALYZE {}", self.query);
649+
let batches = self.run_query(&query).await;
650+
651+
pretty_format_batches(&batches)
652+
.map(|s| format!("{}", s))
653+
.unwrap_or_else(|_| "No explain plan generated".to_string())
654+
}
655+
}
656+
657+
fn write_file_with_non_null_ids(file: &String, value: i64) {
658+
let schema = Schema::new(vec![
659+
Field::new("id", DataType::Int64, true),
660+
Field::new("name", DataType::Utf8, false),
661+
]);
662+
let id_array = Int64Array::from(vec![Some(value)]);
663+
let name_array = StringArray::from(vec![Some("test")]);
664+
let schema = Arc::new(schema);
665+
666+
let batch = RecordBatch::try_new(
667+
schema.clone(),
668+
vec![Arc::new(id_array), Arc::new(name_array)],
669+
)
670+
.unwrap();
671+
write_record_batch(file, batch).unwrap();
672+
}
673+
674+
fn write_file_with_null_ids(file: &String) {
675+
let schema = Schema::new(vec![
676+
Field::new("id", DataType::Int64, true),
677+
Field::new("name", DataType::Utf8, false),
678+
]);
679+
let id_array = Int64Array::from(vec![None]);
680+
let name_array = StringArray::from(vec![Some(format!("test{:02}", "null"))]);
681+
let schema = Arc::new(schema);
682+
683+
let batch = RecordBatch::try_new(
684+
schema.clone(),
685+
vec![Arc::new(id_array), Arc::new(name_array)],
686+
)
687+
.unwrap();
688+
write_record_batch(file, batch).unwrap();
689+
}
690+
691+
fn write_record_batch(file: &String, batch: RecordBatch) -> Result<()> {
692+
let file = File::create(file)?;
693+
let w_opt = WriterProperties::builder()
694+
.set_max_row_group_size(1)
695+
.build();
696+
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(w_opt))?;
697+
writer.write(&batch)?;
698+
writer.flush()?;
699+
writer.close()?;
700+
Ok(())
701+
}
702+
703+
fn write_file(file: &String) {
704+
let struct_fields = Fields::from(vec![
705+
Field::new("id", DataType::Int64, false),
706+
Field::new("name", DataType::Utf8, false),
707+
]);
708+
let schema = Schema::new(vec![
709+
Field::new("struct", DataType::Struct(struct_fields.clone()), false),
710+
Field::new("id", DataType::Int64, true),
711+
Field::new("name", DataType::Utf8, false),
712+
]);
713+
let id_array = Int64Array::from(vec![Some(2), Some(1)]);
714+
let columns = vec![
715+
Arc::new(Int64Array::from(vec![3, 4])) as _,
716+
Arc::new(StringArray::from(vec!["zzz", "aaa"])) as _,
717+
];
718+
let struct_array = StructArray::new(struct_fields, columns, None);
719+
720+
let name_array = StringArray::from(vec![Some("test02"), Some("test01")]);
721+
let schema = Arc::new(schema);
722+
723+
let batch = RecordBatch::try_new(
724+
schema.clone(),
725+
vec![
726+
Arc::new(struct_array),
727+
Arc::new(id_array),
728+
Arc::new(name_array),
729+
],
730+
)
731+
.unwrap();
732+
write_record_batch(file, batch).unwrap();
733+
}
734+
735+
#[tokio::test]
736+
async fn test_topk_predicate_pushdown() {
737+
let tmp_dir = TempDir::new().unwrap();
738+
let path = tmp_dir.path().to_str().unwrap().to_string();
739+
740+
for file in 0..5 {
741+
// write 2 files so that one is processed before the other
742+
let name = format!("test{:02}.parquet", file);
743+
write_file(&format!("{path}/{name}"));
744+
}
745+
746+
let query = "select name from base_table order by id desc limit 3";
747+
748+
let test_case = DynamicFilterTestCase::new(query.to_string(), path);
749+
750+
let batches = test_case.results().await;
751+
#[rustfmt::skip]
752+
let expected = [
753+
"+--------+",
754+
"| name |",
755+
"+--------+",
756+
"| test02 |",
757+
"| test02 |",
758+
"| test02 |",
759+
"+--------+",
760+
];
761+
assert_batches_eq!(expected, &batches);
762+
763+
let plan = test_case.explain_plan().await;
764+
assert_contains!(&plan, "row_groups_pruned_statistics=2");
765+
}
766+
767+
#[tokio::test]
768+
async fn test_topk_predicate_pushdown_nulls_first() {
769+
let tmp_dir = TempDir::new().unwrap();
770+
let path = tmp_dir.path().to_str().unwrap().to_string();
771+
772+
for file in 0..5 {
773+
// write multiple files to ensure we get pushdown of dynamic filters from one file to another
774+
let name = format!("test{:02}.parquet", file);
775+
write_file(&format!("{path}/{name}"));
776+
}
777+
778+
let name = format!("test{:02}.parquet", 100);
779+
write_file_with_null_ids(&format!("{path}/{name}"));
780+
781+
// nulls first by default
782+
let query = "select name from base_table order by id desc limit 3";
783+
let test_case = DynamicFilterTestCase::new(query.to_string(), path);
784+
785+
let batches = test_case.results().await;
786+
#[rustfmt::skip]
787+
let expected = [
788+
"+----------+",
789+
"| name |",
790+
"+----------+",
791+
"| testnull |",
792+
"| test02 |",
793+
"| test02 |",
794+
"+----------+",
795+
];
796+
assert_batches_eq!(expected, &batches);
797+
798+
let plan = test_case.explain_plan().await;
799+
assert_contains!(&plan, "row_groups_pruned_statistics=2");
800+
}
801+
802+
#[tokio::test]
803+
async fn test_topk_predicate_pushdown_multi_key() {
804+
let tmp_dir = TempDir::new().unwrap();
805+
let path = tmp_dir.path().to_str().unwrap().to_string();
806+
for file in 0..5 {
807+
// write multiple files to ensure we get pushdown of dynamic filters from one file to another
808+
// Ensure files are read in order
809+
let name = format!("test{:02}.parquet", file);
810+
write_file_with_non_null_ids(&format!("{path}/{name}"), file);
811+
}
812+
813+
let query = "select id from base_table order by name desc, id limit 3";
814+
let test_case = DynamicFilterTestCase::new(query.to_string(), path.clone());
815+
let batches = test_case.results().await;
816+
#[rustfmt::skip]
817+
let expected = [
818+
"+----+",
819+
"| id |",
820+
"+----+",
821+
"| 0 |",
822+
"| 1 |",
823+
"| 2 |",
824+
"+----+",
825+
];
826+
assert_batches_eq!(expected, &batches);
827+
let plan = test_case.explain_plan().await;
828+
assert_contains!(&plan, "row_groups_pruned_statistics=1");
829+
830+
let query1 = "select id from base_table order by name desc, id desc limit 3";
831+
let test_case = DynamicFilterTestCase::new(query1.to_string(), path.clone());
832+
let batches = test_case.results().await;
833+
#[rustfmt::skip]
834+
let expected = [
835+
"+----+",
836+
"| id |",
837+
"+----+",
838+
"| 4 |",
839+
"| 3 |",
840+
"| 2 |",
841+
"+----+",
842+
];
843+
assert_batches_eq!(expected, &batches);
844+
let plan = test_case.explain_plan().await;
845+
assert_contains!(&plan, "row_groups_pruned_statistics=0");
846+
847+
let query1 = "select id from base_table order by name asc, id desc limit 3";
848+
let test_case = DynamicFilterTestCase::new(query1.to_string(), path);
849+
let batches = test_case.results().await;
850+
#[rustfmt::skip]
851+
let expected = [
852+
"+----+",
853+
"| id |",
854+
"+----+",
855+
"| 4 |",
856+
"| 3 |",
857+
"| 2 |",
858+
"+----+",
859+
];
860+
assert_batches_eq!(expected, &batches);
861+
let plan = test_case.explain_plan().await;
862+
assert_contains!(&plan, "row_groups_pruned_statistics=0");
863+
}
864+
865+
#[tokio::test]
866+
async fn test_topk_predicate_pushdown_nulls_last() {
867+
let tmp_dir = TempDir::new().unwrap();
868+
let path = tmp_dir.path().to_str().unwrap().to_string();
869+
870+
for file in 0..5 {
871+
let name = format!("test{:02}.parquet", file);
872+
write_file(&format!("{path}/{name}"));
873+
}
874+
let name = format!("test{:02}.parquet", 100);
875+
write_file_with_null_ids(&format!("{path}/{name}"));
876+
877+
let query = "select name from base_table order by id desc nulls last limit 3";
878+
let test_case = DynamicFilterTestCase::new(query.to_string(), path);
879+
880+
let batches = test_case.results().await;
881+
#[rustfmt::skip]
882+
let expected = [
883+
"+--------+",
884+
"| name |",
885+
"+--------+",
886+
"| test02 |",
887+
"| test02 |",
888+
"| test02 |",
889+
"+--------+",
890+
];
891+
assert_batches_eq!(expected, &batches);
892+
893+
let plan = test_case.explain_plan().await;
894+
assert_contains!(&plan, "row_groups_pruned_statistics=3");
895+
}
896+
897+
#[tokio::test]
898+
async fn test_topk_predicate_pushdown_single_file() {
899+
let tmp_dir = TempDir::new().unwrap();
900+
let path = tmp_dir.path().to_str().unwrap().to_string();
901+
902+
write_file(&format!("{path}/test.parquet"));
903+
904+
let query = "select name from base_table order by id desc nulls last limit 1";
905+
let test_case = DynamicFilterTestCase::new(query.to_string(), path);
906+
907+
let batches = test_case.results().await;
908+
#[rustfmt::skip]
909+
let expected = [
910+
"+--------+",
911+
"| name |",
912+
"+--------+",
913+
"| test02 |",
914+
"+--------+",
915+
];
916+
assert_batches_eq!(expected, &batches);
917+
918+
let plan = test_case.explain_plan().await;
919+
assert_contains!(&plan, "pushdown_rows_pruned=1");
920+
}
921+
922+
#[tokio::test]
923+
async fn test_topk_predicate_pushdown_ignores_partition_columns() {
924+
// The TopK operator will try to push down predicates on `file_id`.
925+
// But since `file_id` is a partition column and not part of the file itself
926+
// we cannot actually do any filtering on it at the file level.
927+
// Thus it has to be ignored by `ParquetSource`.
928+
// This test only shows that this does not result in any errors or panics,
929+
// it is expected that "nothing exciting" happens here.
930+
// I do think in the future it would be interesting to re-design how partition columns
931+
// get handled, in particular by pushing them into SchemaAdapter so that the table schema == file schema
932+
// and we can do predicate pushdown on them as well without relying on each TableProvider to
933+
// do special handling of partition columns.
934+
935+
let ctx = SessionContext::new();
936+
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
937+
.with_table_partition_cols(vec![("file_id".to_string(), DataType::UInt32)])
938+
// We need to force 1 partition because TopK predicate pushdown happens on a per-partition basis
939+
// If we had 1 file per partition (as an example) no pushdown would happen
940+
.with_target_partitions(1);
941+
942+
let tmp_dir = TempDir::new().unwrap();
943+
let path = tmp_dir.path().to_str().unwrap().to_string();
944+
for file in 0..5 {
945+
// crete a directory for the partition
946+
fs::create_dir_all(format!("{path}/file_id={file}")).unwrap();
947+
let name = format!("file_id={file}/test.parquet");
948+
write_file(&format!("{path}/{name}"));
949+
}
950+
ctx.register_listing_table("base_table", path, opt, None, None)
951+
.await
952+
.unwrap();
953+
954+
let query = "select file_id from base_table order by file_id asc limit 3";
955+
956+
let batches = ctx.sql(query).await.unwrap().collect().await.unwrap();
957+
#[rustfmt::skip]
958+
let expected = [
959+
"+---------+",
960+
"| file_id |",
961+
"+---------+",
962+
"| 0 |",
963+
"| 0 |",
964+
"| 1 |",
965+
"+---------+",
966+
];
967+
assert_batches_eq!(expected, &batches);
968+
969+
let sql = format!("explain analyze {query}");
970+
let batches = ctx.sql(&sql).await.unwrap().collect().await.unwrap();
971+
let explain_plan = format!("{}", pretty_format_batches(&batches).unwrap());
972+
assert_contains!(explain_plan, "row_groups_pruned_statistics=0"); // just documenting current behavior
973+
}

0 commit comments

Comments
 (0)