Skip to content

Commit dee4261

Browse files
committed
Fix: handle NULL input in lead/lag window function
1 parent e00af2c commit dee4261

2 files changed

Lines changed: 81 additions & 9 deletions

File tree

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -257,31 +257,55 @@ fn create_built_in_window_expr(
257257
}
258258
}
259259
BuiltInWindowFunction::Lag => {
260-
let arg = Arc::clone(&args[0]);
260+
let mut arg = Arc::clone(&args[0]);
261261
let shift_offset = get_scalar_value_from_args(args, 1)?
262262
.map(get_signed_integer)
263263
.map_or(Ok(None), |v| v.map(Some))?;
264-
let default_value =
265-
get_casted_value(get_scalar_value_from_args(args, 2)?, out_data_type)?;
264+
// If value is NULL, we use default data type as output data type, no need to cast data type
265+
let default_value = match out_data_type {
266+
DataType::Null => match get_scalar_value_from_args(args, 2)? {
267+
Some(value) => {
268+
let null_value = ScalarValue::try_from(value.data_type())?;
269+
arg = Arc::new(Literal::new(null_value));
270+
value
271+
}
272+
None => ScalarValue::try_from(DataType::Null)?,
273+
},
274+
_ => {
275+
get_casted_value(get_scalar_value_from_args(args, 2)?, out_data_type)?
276+
}
277+
};
266278
Arc::new(lag(
267279
name,
268-
out_data_type.clone(),
280+
default_value.data_type().clone(),
269281
arg,
270282
shift_offset,
271283
default_value,
272284
ignore_nulls,
273285
))
274286
}
275287
BuiltInWindowFunction::Lead => {
276-
let arg = Arc::clone(&args[0]);
288+
let mut arg = Arc::clone(&args[0]);
277289
let shift_offset = get_scalar_value_from_args(args, 1)?
278290
.map(get_signed_integer)
279291
.map_or(Ok(None), |v| v.map(Some))?;
280-
let default_value =
281-
get_casted_value(get_scalar_value_from_args(args, 2)?, out_data_type)?;
292+
// If value is NULL, we use default data type as output data type, no need to cast data type
293+
let default_value = match out_data_type {
294+
DataType::Null => match get_scalar_value_from_args(args, 2)? {
295+
Some(value) => {
296+
let null_value = ScalarValue::try_from(value.data_type())?;
297+
arg = Arc::new(Literal::new(null_value));
298+
value
299+
}
300+
None => ScalarValue::try_from(DataType::Null)?,
301+
},
302+
_ => {
303+
get_casted_value(get_scalar_value_from_args(args, 2)?, out_data_type)?
304+
}
305+
};
282306
Arc::new(lead(
283307
name,
284-
out_data_type.clone(),
308+
default_value.data_type().clone(),
285309
arg,
286310
shift_offset,
287311
default_value,

datafusion/sqllogictest/test_files/window.slt

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4932,4 +4932,52 @@ SELECT v1, NTH_VALUE(v2, 0) OVER (PARTITION BY v1 ORDER BY v2) FROM t;
49324932
statement ok
49334933
DROP TABLE t;
49344934

4935-
## end test handle NULL and 0 of NTH_VALUE
4935+
## end test handle NULL and 0 of NTH_VALUE
4936+
4937+
## test handle NULL of lead
4938+
4939+
statement ok
4940+
create table t1(v1 int);
4941+
4942+
statement ok
4943+
insert into t1 values (1);
4944+
4945+
query B
4946+
SELECT LEAD(NULL, 0, false) OVER () FROM t1;
4947+
----
4948+
NULL
4949+
4950+
query B
4951+
SELECT LAG(NULL, 0, false) OVER () FROM t1;
4952+
----
4953+
NULL
4954+
4955+
query B
4956+
SELECT LEAD(NULL, 1, false) OVER () FROM t1;
4957+
----
4958+
false
4959+
4960+
query B
4961+
SELECT LAG(NULL, 1, false) OVER () FROM t1;
4962+
----
4963+
false
4964+
4965+
statement ok
4966+
insert into t1 values (2);
4967+
4968+
query B
4969+
SELECT LEAD(NULL, 1, false) OVER () FROM t1;
4970+
----
4971+
NULL
4972+
false
4973+
4974+
query B
4975+
SELECT LAG(NULL, 1, false) OVER () FROM t1;
4976+
----
4977+
false
4978+
NULL
4979+
4980+
statement ok
4981+
DROP TABLE t1;
4982+
4983+
## end test handle NULL of lead

0 commit comments

Comments
 (0)