diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 69901aa2fa37d..364987021a1fc 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -41,7 +41,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; use datafusion_common::{exec_err, internal_err, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement}; use futures::Stream; use itertools::Itertools; @@ -225,6 +225,47 @@ impl ExecutionPlan for UnionExec { } } + /// This will do similarly to maintains_input_order, but: + /// 1. will return a vector of PhysicalSortRequirements rather than bools + /// 2. if the output ordering is a subset of the child ordering because constants were removed, + /// it will return the output ordering + fn required_input_ordering( + &self, + ) -> Vec>> { + if let Some(output_ordering) = self.properties().output_ordering() { + self.inputs() + .iter() + .map(|child| { + if let Some(child_ordering) = child.output_ordering() { + if output_ordering.len() <= child_ordering.len() { + // verify output_ordering is a part of child_ordering + let is_subset = + output_ordering.iter().all(|output_ordering_expr| { + child_ordering.iter().any(|child_ordering_expr| { + output_ordering_expr == child_ordering_expr + }) + }); + + if is_subset { + Some(PhysicalSortRequirement::from_sort_exprs( + output_ordering, + )) + } else { + None + } + } else { + None + } + } else { + None + } + }) + .collect() + } else { + vec![None; self.inputs().len()] + } + } + fn with_new_children( self: Arc, children: Vec>,