Skip to content

Commit 8b53342

Browse files
committed
change the null type in the row filter
1 parent 9fbee1a commit 8b53342

1 file changed

Lines changed: 20 additions & 6 deletions

File tree

datafusion/core/src/physical_plan/file_format/row_filter.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use arrow::array::{Array, BooleanArray};
1919
use arrow::datatypes::{DataType, Field, Schema};
2020
use arrow::error::{ArrowError, Result as ArrowResult};
2121
use arrow::record_batch::RecordBatch;
22-
use datafusion_common::{Column, Result, ScalarValue, ToDFSchema};
22+
use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema};
2323
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};
2424

2525
use datafusion_expr::{uncombine_filter, Expr};
@@ -202,7 +202,18 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
202202
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
203203
if let Expr::Column(Column { name, .. }) = &expr {
204204
if self.file_schema.field_with_name(name).is_err() {
205-
return Ok(Expr::Literal(ScalarValue::Null));
205+
// the column expr must be in the table schema
206+
return match self.table_schema.field_with_name(name) {
207+
Ok(field) => {
208+
// return the null value corresponding to the data type
209+
let null_value = ScalarValue::try_from(field.data_type())?;
210+
Ok(Expr::Literal(null_value))
211+
}
212+
Err(e) => {
213+
// If the column is not in the table schema, should throw the error
214+
Err(DataFusionError::ArrowError(e))
215+
}
216+
};
206217
}
207218
}
208219

@@ -314,12 +325,13 @@ mod test {
314325
use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder;
315326
use arrow::datatypes::{DataType, Field, Schema};
316327
use datafusion_common::ScalarValue;
317-
use datafusion_expr::{col, lit};
328+
use datafusion_expr::{cast, col, lit};
318329
use parquet::arrow::parquet_to_arrow_schema;
319330
use parquet::file::reader::{FileReader, SerializedFileReader};
320331

321332
// Assume a column expression for a column not in the table schema is a projected column and ignore it
322333
#[test]
334+
#[should_panic(expected = "building candidate failed")]
323335
fn test_filter_candidate_builder_ignore_projected_columns() {
324336
let testdata = crate::test_util::parquet_test_data();
325337
let file = std::fs::File::open(&format!("{}/alltypes_plain.parquet", testdata))
@@ -337,7 +349,7 @@ mod test {
337349

338350
let candidate = FilterCandidateBuilder::new(expr, &table_schema, &table_schema)
339351
.build(metadata)
340-
.expect("building candidate");
352+
.expect("building candidate failed");
341353

342354
assert!(candidate.is_none());
343355
}
@@ -386,8 +398,10 @@ mod test {
386398
Field::new("float_col", DataType::Float32, true),
387399
]);
388400

389-
let expr = col("bigint_col").eq(col("int_col"));
390-
let expected_candidate_expr = col("bigint_col").eq(lit(ScalarValue::Null));
401+
// The parquet file with `file_schema` just has `bigint_col` and `float_col` column, and don't have the `int_col`
402+
let expr = col("bigint_col").eq(cast(col("int_col"), DataType::Int64));
403+
let expected_candidate_expr =
404+
col("bigint_col").eq(cast(lit(ScalarValue::Int32(None)), DataType::Int64));
391405

392406
let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema)
393407
.build(metadata)

0 commit comments

Comments
 (0)