fix: default UDWFImpl::expressions returns all expressions#13169
fix: default UDWFImpl::expressions returns all expressions#13169alamb merged 5 commits intoapache:mainfrom
Conversation
timsaucer
left a comment
There was a problem hiding this comment.
Thank you! I tested this same code on datafusion-python. We do have a unit test there that caught the issue. I wonder if we should add one here. Not a blocking suggestion, because I do think this is a regression.
Thank you @Michael-J-Ward and @timsaucer The reason we introduced a regression is likely due to the lack of a test and thus I think a test with the fix would be super helpful. Any thoughts about how to trigger it ? I can potentially help write such a test |
I'll write it up. |
|
This is a possible way to unit test the default implementation of But putting this in test #[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
use datafusion_common::Result;
use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDF, WindowUDFImpl};
use datafusion_functions_window_common::expr::ExpressionArgs;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use std::any::Any;
use datafusion_physical_expr::expressions::lit;
#[derive(Debug)]
struct ThreeArgWindowUDF {
signature: Signature,
}
impl ThreeArgWindowUDF {
fn new() -> Self {
Self {
signature: Signature::uniform(
3,
vec![DataType::Int32, DataType::Boolean, DataType::Float32],
Volatility::Immutable,
),
}
}
}
impl WindowUDFImpl for ThreeArgWindowUDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"three_arg_window_udf"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> Result<Box<dyn PartitionEvaluator>> {
todo!()
}
fn field(&self, _: WindowUDFFieldArgs) -> Result<Field> {
todo!()
}
}
#[test]
fn test_input_expressions() -> Result<()> {
let udwf = WindowUDF::from(ThreeArgWindowUDF::new());
let input_exprs = vec![lit(1), lit(false), lit(0.5)]; // Vec<Arc<dyn PhysicalExpr>>
let input_types =[DataType::Int32, DataType::Boolean, DataType::Float32]; // Vec<DataType>
let actual = udwf.expressions(ExpressionArgs::new(&input_exprs, &input_types));
assert_eq!(actual.len(), 3);
assert_eq!(
format!("{:?}", actual.first().unwrap()),
format!("{:?}", input_exprs.first().unwrap()),
);
assert_eq!(
format!("{:?}", actual.get(1).unwrap()),
format!("{:?}", input_exprs.get(1).unwrap())
);
assert_eq!(
format!("{:?}", actual.get(2).unwrap()),
format!("{:?}", input_exprs.get(2).unwrap())
);
Ok(())
}
} |
|
Thank you for writing that up! I added it, verified it fails without @Michael-J-Ward 's fix and passes with the correction. @Michael-J-Ward do you mind if I push it to your branch? |
|
Since the testing approach made sense, I rewrote it to be a bit more exhaustive than the initial iteration. I added more test cases to cover when 0, 1, 2 and 3 input expressions are provided. Earlier version only checks for exactly 3 arguments and I think that is insufficient testing to avoid any regressions in the future. It passes in $ cargo test -p datafusion-functions-window test_default_expressions
running 1 test
test tests::test_default_expressions ... FAILED
failures:
---- tests::test_default_expressions stdout ----
thread 'tests::test_default_expressions' panicked at datafusion/functions-window/src/lib.rs:192:13:
assertion `left == right` failed:
Input expressions: [Column { name: "a", index: 0 }, Column { name: "b", index: 1 }]
Returned expressions: [Column { name: "a", index: 0 }]
left: 2
right: 1
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
tests::test_default_expressions
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 10 filtered out; finished in 0.00s
The same limitation as earlier applies. It is not possible to place this unit test in I think we can add this to the @timsaucer Please feel free to make any improvements you feel would be beneficial. Code: #[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
use datafusion_expr::{
PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF,
WindowUDFImpl,
};
use datafusion_functions_window_common::expr::ExpressionArgs;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr::expressions::{col, lit};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::any::Any;
#[derive(Debug)]
struct VariadicWindowUDF {
signature: Signature,
}
impl VariadicWindowUDF {
fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Any(0),
TypeSignature::Any(1),
TypeSignature::Any(2),
TypeSignature::Any(3),
],
Volatility::Immutable,
),
}
}
}
impl WindowUDFImpl for VariadicWindowUDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"variadic_window_udf"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn partition_evaluator(
&self,
_: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
unimplemented!("unnecessary for testing");
}
fn field(&self, _: WindowUDFFieldArgs) -> Result<Field> {
unimplemented!("unnecessary for testing");
}
}
#[test]
// Fixes: default implementation of `WindowUDFImpl::expressions`
// returns all input expressions to the user-defined window
// function unmodified.
//
// See: https://github.com/apache/datafusion/pull/13169
fn test_default_expressions() -> Result<()> {
let udwf = WindowUDF::from(VariadicWindowUDF::new());
let field_a = Field::new("a", DataType::Int32, false);
let field_b = Field::new("b", DataType::Float32, false);
let field_c = Field::new("c", DataType::Boolean, false);
let schema = Schema::new(vec![field_a, field_b, field_c]);
let test_cases = vec![
//
// Zero arguments
//
vec![],
//
// Single argument
//
vec![col("a", &schema)?],
vec![lit(1)],
//
// Two arguments
//
vec![col("a", &schema)?, col("b", &schema)?],
vec![col("a", &schema)?, lit(2)],
vec![lit(false), col("a", &schema)?],
//
// Three arguments
//
vec![col("a", &schema)?, col("b", &schema)?, col("c", &schema)?],
vec![col("a", &schema)?, col("b", &schema)?, lit(false)],
vec![col("a", &schema)?, lit(0.5), col("c", &schema)?],
vec![lit(3), col("b", &schema)?, col("c", &schema)?],
];
for input_exprs in &test_cases {
let input_types = input_exprs
.iter()
.map(|expr: &std::sync::Arc<dyn PhysicalExpr>| {
expr.data_type(&schema).unwrap()
})
.collect::<Vec<_>>();
let expr_args = ExpressionArgs::new(input_exprs, &input_types);
let ret_exprs = udwf.expressions(expr_args);
// Verify same number of input expressions are returned
assert_eq!(
input_exprs.len(),
ret_exprs.len(),
"\nInput expressions: {:?}\nReturned expressions: {:?}",
input_exprs,
ret_exprs
);
// Compares each returned expression with original input expressions
for (expected, actual) in input_exprs.iter().zip(&ret_exprs) {
assert_eq!(
format!("{expected:?}"),
format!("{actual:?}"),
"\nInput expressions: {:?}\nReturned expressions: {:?}",
input_exprs,
ret_exprs
);
}
}
Ok(())
}
} |
We can always put the tests in https://github.com/apache/datafusion/blob/main/datafusion/core/tests/core_integration.rs which has access to all the functionality |
|
Thank you @jcsherin . I appreciate you taking point on writing up the unit test. I've tested and pushed it. Does anyone else want to review? |
alamb
left a comment
There was a problem hiding this comment.
Looks awesome -- thank you everyone 🙏
Which issue does this PR close?
Closes #13168.
Rationale for this change
Same as in #13168.
What changes are included in this PR?
WindowUDFImpl::expressionsis changed to return all the input expressions by default instead of only the 1st one.Are these changes tested?
Covered by existing tests.
Are there any user-facing changes?
Not from a released version.