From f33f775bc12977e6bcb703dfead96d78bacc6c10 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Fri, 25 Apr 2025 14:53:59 +0530 Subject: [PATCH 01/25] Update function.md --- docs/source/user-guide/sql/window_functions.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/sql/window_functions.md b/docs/source/user-guide/sql/window_functions.md index 68a7003803123..a2d4a2d0196a3 100644 --- a/docs/source/user-guide/sql/window_functions.md +++ b/docs/source/user-guide/sql/window_functions.md @@ -333,3 +333,4 @@ FROM employees; | 40000 | +-----------+ ``` + From 8c38ca5a4fa65c098c96a26f2d1a64c7c2f9c642 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Fri, 25 Apr 2025 14:53:59 +0530 Subject: [PATCH 02/25] Update function.md --- docs/source/user-guide/sql/window_functions.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/sql/window_functions.md b/docs/source/user-guide/sql/window_functions.md index 68a7003803123..a2d4a2d0196a3 100644 --- a/docs/source/user-guide/sql/window_functions.md +++ b/docs/source/user-guide/sql/window_functions.md @@ -333,3 +333,4 @@ FROM employees; | 40000 | +-----------+ ``` + From 40f0f52beb7ca18b3402a846a7cacaec7bec91ac Mon Sep 17 00:00:00 2001 From: Adez017 Date: Fri, 2 May 2025 08:31:22 +0530 Subject: [PATCH 03/25] Prettier Fix --- docs/source/user-guide/sql/window_functions.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/source/user-guide/sql/window_functions.md b/docs/source/user-guide/sql/window_functions.md index a2d4a2d0196a3..68a7003803123 100644 --- a/docs/source/user-guide/sql/window_functions.md +++ b/docs/source/user-guide/sql/window_functions.md @@ -333,4 +333,3 @@ FROM employees; | 40000 | +-----------+ ``` - From f81a38e570b7a4d2e5810ae91f09c9e18d880e27 Mon Sep 17 00:00:00 2001 From: aditya singh rathore <142787780+Adez017@users.noreply.github.com> Date: Wed, 23 Apr 2025 22:31:47 +0530 Subject: [PATCH 04/25] Update extending-operators.md --- .../library-user-guide/extending-operators.md | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 631bdc67975a4..06c0c2a10a9e5 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -18,5 +18,28 @@ --> # Extending DataFusion's operators: custom LogicalPlan and Execution Plans +This module contains an end to end demonstration of creatinga user defined operator in DataFusion.12 .Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode` that add an OptimizerRule to rewrite a `LogicalPlan` to use that node as a `LogicalPlan`, create an `ExecutionPlan` and finally produce results. + +## TopK Background: +A "Top K" node is a common query optimization which is used for queries such as "find the top 3 customers by revenue". The(simplified) SQL for such a query might be: + + ```sql +CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) + STORED AS CSV location 'tests/data/customer.csv'; + SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +``` + And a naive plan would be: + ``` +> explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; + +--------------+----------------------------------------+ + | plan_type | plan | + +--------------+----------------------------------------+ + | logical_plan | Limit: 3 | + | | Sort: revenue DESC NULLS FIRST | + | | Projection: customer_id, revenue | + | | TableScan: sales | + +--------------+----------------------------------------+ + ``` +While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everythingother than the top 3 elements. +The same answer can be produced by simply keeping track of the top N elements, reducing the total amount of required buffer memory. -Coming soon From 03fe815c1d84392d8e0f3013a8a8c14b0ea01b8f Mon Sep 17 00:00:00 2001 From: Adez017 Date: Thu, 24 Apr 2025 23:15:22 +0530 Subject: [PATCH 05/25] fmt fix --- docs/source/library-user-guide/extending-operators.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 06c0c2a10a9e5..eda79163b1247 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -29,8 +29,10 @@ CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; ``` And a naive plan would be: + ```sql + explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; ``` -> explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; + ```text +--------------+----------------------------------------+ | plan_type | plan | +--------------+----------------------------------------+ From c4fb35f13b4d2b6c40b267b9b916a60b354acbf4 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Thu, 24 Apr 2025 23:26:50 +0530 Subject: [PATCH 06/25] prettier fix --- .../library-user-guide/extending-operators.md | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index eda79163b1247..c0fcc3f54bba0 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -18,30 +18,35 @@ --> # Extending DataFusion's operators: custom LogicalPlan and Execution Plans + This module contains an end to end demonstration of creatinga user defined operator in DataFusion.12 .Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode` that add an OptimizerRule to rewrite a `LogicalPlan` to use that node as a `LogicalPlan`, create an `ExecutionPlan` and finally produce results. ## TopK Background: + A "Top K" node is a common query optimization which is used for queries such as "find the top 3 customers by revenue". The(simplified) SQL for such a query might be: - ```sql +```sql CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) - STORED AS CSV location 'tests/data/customer.csv'; - SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; + STORED AS CSV location 'tests/data/customer.csv'; +SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +``` + +And a naive plan would be: + +```sql +explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; ``` - And a naive plan would be: - ```sql - explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; - ``` - ```text - +--------------+----------------------------------------+ - | plan_type | plan | - +--------------+----------------------------------------+ - | logical_plan | Limit: 3 | - | | Sort: revenue DESC NULLS FIRST | - | | Projection: customer_id, revenue | - | | TableScan: sales | - +--------------+----------------------------------------+ - ``` + +```text ++--------------+----------------------------------------+ +| plan_type | plan | ++--------------+----------------------------------------+ +| logical_plan | Limit: 3 | +| | Sort: revenue DESC NULLS FIRST | +| | Projection: customer_id, revenue | +| | TableScan: sales | ++--------------+----------------------------------------+ +``` + While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everythingother than the top 3 elements. The same answer can be produced by simply keeping track of the top N elements, reducing the total amount of required buffer memory. - From 2eef6554422d600136bed66ede86cc26290352a6 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Fri, 25 Apr 2025 11:24:29 +0530 Subject: [PATCH 07/25] Update extending_operators.md --- .../library-user-guide/extending-operators.md | 349 ++++++++++++++++++ 1 file changed, 349 insertions(+) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index c0fcc3f54bba0..1fd1f3f582144 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -50,3 +50,352 @@ explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everythingother than the top 3 elements. The same answer can be produced by simply keeping track of the top N elements, reducing the total amount of required buffer memory. +## Process for Defining Extending Operator +The below example illustrates the example of topK node : +### LogicalPlan Node Definition +- This section defines the custom logical plan node `TopKPlanNode`, which represents the `TopK` operation. +- It includes trait implementations like `UserDefinedLogicalNodeCore` and Debug. +code: +```rust +#[derive(Debug)] +struct TopKQueryPlanner {} + +#[async_trait] +impl QueryPlanner for TopKQueryPlanner { + /// Given a `LogicalPlan` created from above, create an + /// `ExecutionPlan` suitable for execution + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> Result> { + // Teach the default physical planner how to plan TopK nodes. + let physical_planner = + DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( + TopKPlanner {}, + )]); + // Delegate most work of physical planning to the default physical planner + physical_planner + .create_physical_plan(logical_plan, session_state) + .await + } +} +``` +### Optimizer Rule +- Implements the `TopKOptimizerRule` to detect a `Limit` followed by a Sort and replace it with the `TopKPlanNode`. +- Includes the logic for transforming the logical plan. +code: +```rust +#[derive(Default, Debug)] +struct TopKOptimizerRule { + /// A testing-only hashable fixture. + invariant_mock: Option, +} + +impl OptimizerRule for TopKOptimizerRule { + fn name(&self) -> &str { + "topk" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + // Example rewrite pass to insert a user defined LogicalPlanNode + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + // Note: this code simply looks for the pattern of a Limit followed by a + // Sort and replaces it by a TopK node. It does not handle many + // edge cases (e.g multiple sort columns, sort ASC / DESC), etc. + let LogicalPlan::Limit(ref limit) = plan else { + return Ok(Transformed::no(plan)); + }; + let FetchType::Literal(Some(fetch)) = limit.get_fetch_type()? else { + return Ok(Transformed::no(plan)); + }; + + if let LogicalPlan::Sort(Sort { + ref expr, + ref input, + .. + }) = limit.input.as_ref() + { + if expr.len() == 1 { + // we found a sort with a single sort expr, replace with a a TopK + return Ok(Transformed::yes(LogicalPlan::Extension(Extension { + node: Arc::new(TopKPlanNode { + k: fetch, + input: input.as_ref().clone(), + expr: expr[0].clone(), + invariant_mock: self.invariant_mock.clone(), + }), + }))); + } + } + + Ok(Transformed::no(plan)) + } +} +``` +### Physical planner +-The `TopKPlanner` is implemented to map the custom logical plan node (`TopKPlanNode`) to a physical execution plan (`TopKExec`). +```rust +struct TopKPlanner {} + +#[async_trait] +impl ExtensionPlanner for TopKPlanner { + /// Create a physical plan for an extension node + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> Result>> { + Ok( + if let Some(topk_node) = node.as_any().downcast_ref::() { + assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs"); + assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs"); + // figure out input name + Some(Arc::new(TopKExec::new( + physical_inputs[0].clone(), + topk_node.k, + ))) + } else { + None + }, + ) + } +} +``` + +### Physical Execution Plan +- Defines the physical execution operator `TopKExec` and its properties. +- Implements the `ExecutionPlan` trait to describe how the operator is executed. +code: +```rust +struct TopKExec { + input: Arc, + /// The maximum number of values + k: usize, + cache: PlanProperties, +} + +impl TopKExec { + fn new(input: Arc, k: usize) -> Self { + let cache = Self::compute_properties(input.schema()); + Self { input, k, cache } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl Debug for TopKExec { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "TopKExec") + } +} + +impl DisplayAs for TopKExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "TopKExec: k={}", self.k) + } + DisplayFormatType::TreeRender => { + // TODO: collect info + write!(f, "") + } + } + } +} + +#[async_trait] +impl ExecutionPlan for TopKExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(TopKExec::new(children[0].clone(), self.k))) + } + + /// Execute one partition and return an iterator over RecordBatch + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + if 0 != partition { + return internal_err!("TopKExec invalid partition {partition}"); + } + + Ok(Box::pin(TopKReader { + input: self.input.execute(partition, context)?, + k: self.k, + done: false, + state: BTreeMap::new(), + })) + } + + fn statistics(&self) -> Result { + // to improve the optimizability of this plan + // better statistics inference could be provided + Ok(Statistics::new_unknown(&self.schema())) + } +} +``` +### Execution Logic +- Implements the `TopKReade`r, which processes input batches to calculate the top `k` values. +- Contains helper functions like `add_row`, `remove_lowest_value`, and `accumulate_batch` for execution logic. + + + +```rust +struct TopKReader { + /// The input to read data from + input: SendableRecordBatchStream, + /// Maximum number of output values + k: usize, + /// Have we produced the output yet? + done: bool, + /// Output + state: BTreeMap, +} + +/// Keeps track of the revenue from customer_id and stores if it +/// is the top values we have seen so far. +fn add_row( + top_values: &mut BTreeMap, + customer_id: &str, + revenue: i64, + k: &usize, +) { + top_values.insert(revenue, customer_id.into()); + // only keep top k + while top_values.len() > *k { + remove_lowest_value(top_values) + } +} + +fn remove_lowest_value(top_values: &mut BTreeMap) { + if !top_values.is_empty() { + let smallest_revenue = { + let (revenue, _) = top_values.iter().next().unwrap(); + *revenue + }; + top_values.remove(&smallest_revenue); + } +} + +fn accumulate_batch( + input_batch: &RecordBatch, + mut top_values: BTreeMap, + k: &usize, +) -> BTreeMap { + let num_rows = input_batch.num_rows(); + // Assuming the input columns are + // column[0]: customer_id / UTF8 + // column[1]: revenue: Int64 + let customer_id = + as_string_array(input_batch.column(0)).expect("Column 0 is not customer_id"); + + let revenue = as_int64_array(input_batch.column(1)).unwrap(); + + for row in 0..num_rows { + add_row( + &mut top_values, + customer_id.value(row), + revenue.value(row), + k, + ); + } + top_values +} + +impl Stream for TopKReader { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if self.done { + return Poll::Ready(None); + } + // this aggregates and thus returns a single RecordBatch. + + // take this as immutable + let k = self.k; + let schema = self.schema(); + let poll = self.input.poll_next_unpin(cx); + + match poll { + Poll::Ready(Some(Ok(batch))) => { + self.state = accumulate_batch(&batch, self.state.clone(), &k); + Poll::Ready(Some(Ok(RecordBatch::new_empty(schema)))) + } + Poll::Ready(None) => { + self.done = true; + let (revenue, customer): (Vec, Vec<&String>) = + self.state.iter().rev().unzip(); + + let customer: Vec<&str> = customer.iter().map(|&s| &**s).collect(); + Poll::Ready(Some( + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(customer)), + Arc::new(Int64Array::from(revenue)), + ], + ) + .map_err(Into::into), + )) + } + other => other, + } + } +} + +impl RecordBatchStream for TopKReader { + fn schema(&self) -> SchemaRef { + self.input.schema() + } +} +``` From 1fc512c071ecdfa7dcaba532a29c279285c6c2d5 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Fri, 25 Apr 2025 11:26:47 +0530 Subject: [PATCH 08/25] cargo fmt --- docs/source/library-user-guide/extending-operators.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 1fd1f3f582144..7b0fdb82bf7bb 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -399,3 +399,5 @@ impl RecordBatchStream for TopKReader { } } ``` + + From d128bc6ecef18e3997b75e318617367fd4b6aa57 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Fri, 25 Apr 2025 11:30:09 +0530 Subject: [PATCH 09/25] Fix Prettier formatting issues --- .../library-user-guide/extending-operators.md | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 7b0fdb82bf7bb..97a21888f5d20 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -50,12 +50,17 @@ explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everythingother than the top 3 elements. The same answer can be produced by simply keeping track of the top N elements, reducing the total amount of required buffer memory. + ## Process for Defining Extending Operator + The below example illustrates the example of topK node : + ### LogicalPlan Node Definition + - This section defines the custom logical plan node `TopKPlanNode`, which represents the `TopK` operation. - It includes trait implementations like `UserDefinedLogicalNodeCore` and Debug. -code: + code: + ```rust #[derive(Debug)] struct TopKQueryPlanner {} @@ -81,10 +86,13 @@ impl QueryPlanner for TopKQueryPlanner { } } ``` + ### Optimizer Rule + - Implements the `TopKOptimizerRule` to detect a `Limit` followed by a Sort and replace it with the `TopKPlanNode`. - Includes the logic for transforming the logical plan. -code: + code: + ```rust #[derive(Default, Debug)] struct TopKOptimizerRule { @@ -144,8 +152,11 @@ impl OptimizerRule for TopKOptimizerRule { } } ``` + ### Physical planner --The `TopKPlanner` is implemented to map the custom logical plan node (`TopKPlanNode`) to a physical execution plan (`TopKExec`). + +-The `TopKPlanner` is implemented to map the custom logical plan node (`TopKPlanNode`) to a physical execution plan (`TopKExec`). + ```rust struct TopKPlanner {} @@ -178,9 +189,11 @@ impl ExtensionPlanner for TopKPlanner { ``` ### Physical Execution Plan + - Defines the physical execution operator `TopKExec` and its properties. - Implements the `ExecutionPlan` trait to describe how the operator is executed. -code: + code: + ```rust struct TopKExec { input: Arc, @@ -281,12 +294,12 @@ impl ExecutionPlan for TopKExec { } } ``` + ### Execution Logic + - Implements the `TopKReade`r, which processes input batches to calculate the top `k` values. - Contains helper functions like `add_row`, `remove_lowest_value`, and `accumulate_batch` for execution logic. - - ```rust struct TopKReader { /// The input to read data from @@ -399,5 +412,3 @@ impl RecordBatchStream for TopKReader { } } ``` - - From 6540cbe302b61520cf81109d522b2f684176d7d3 Mon Sep 17 00:00:00 2001 From: aditya singh rathore <142787780+Adez017@users.noreply.github.com> Date: Mon, 5 May 2025 13:42:13 +0530 Subject: [PATCH 10/25] Update extending-operators.md --- .../library-user-guide/extending-operators.md | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 97a21888f5d20..52ce2f7c7553a 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -62,6 +62,38 @@ The below example illustrates the example of topK node : code: ```rust +# use datafusion::common::{ +# cast::{as_int64_array, as_string_array}, +# tree_node::Transformed, +# types::TypeSignature::Extension, +# DataFusionError, +# }; +# use datafusion::execution::{SessionState, TaskContext}; +# use datafusion::logical_expr::{ +# FetchType, LogicalPlan, LogicalPlan::Sort, UserDefinedLogicalNode, +# }; +# use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; +# use datafusion::physical_expr::EquivalenceProperties; +# use datafusion::physical_plan::{ +# execution_plan::{Boundedness, EmissionType}, +# internal_err, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, +# PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, +# }; +# use datafusion::physical_planner::{ +# DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, +# }; +# use arrow::array::{Int64Array, StringArray}; +# use arrow::datatypes::SchemaRef; +# use arrow::record_batch::RecordBatch; +# use async_trait::async_trait; +# use futures::Stream; +# use std::{ +# any::Any, +# collections::BTreeMap, +# fmt::{self, Debug}, +# sync::Arc, +# task::{Context, Poll}, +# }; #[derive(Debug)] struct TopKQueryPlanner {} From ab62ec31bf7dae4c169da2dac40a3a1c538b752a Mon Sep 17 00:00:00 2001 From: aditya singh rathore <142787780+Adez017@users.noreply.github.com> Date: Mon, 5 May 2025 14:06:21 +0530 Subject: [PATCH 11/25] Update extending-operators.md --- .../library-user-guide/extending-operators.md | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 52ce2f7c7553a..0787a329ad196 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -62,32 +62,32 @@ The below example illustrates the example of topK node : code: ```rust -# use datafusion::common::{ +# pub use datafusion::common::{ # cast::{as_int64_array, as_string_array}, # tree_node::Transformed, # types::TypeSignature::Extension, # DataFusionError, # }; -# use datafusion::execution::{SessionState, TaskContext}; -# use datafusion::logical_expr::{ +# pub use datafusion::execution::{SessionState, TaskContext}; +# pub use datafusion::logical_expr::{ # FetchType, LogicalPlan, LogicalPlan::Sort, UserDefinedLogicalNode, # }; -# use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; -# use datafusion::physical_expr::EquivalenceProperties; -# use datafusion::physical_plan::{ +# pub use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; +# pub use datafusion::physical_expr::EquivalenceProperties; +# pub use datafusion::physical_plan::{ # execution_plan::{Boundedness, EmissionType}, # internal_err, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, # PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, # }; -# use datafusion::physical_planner::{ +# pub use datafusion::physical_planner::{ # DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, # }; -# use arrow::array::{Int64Array, StringArray}; -# use arrow::datatypes::SchemaRef; -# use arrow::record_batch::RecordBatch; -# use async_trait::async_trait; -# use futures::Stream; -# use std::{ +# pub use arrow::array::{Int64Array, StringArray}; +# pub use arrow::datatypes::SchemaRef; +# pub use arrow::record_batch::RecordBatch; +# pub use async_trait::async_trait; +# pub use futures::Stream; +# pub use std::{ # any::Any, # collections::BTreeMap, # fmt::{self, Debug}, From 4061916440aa004d5169c1eb26223e3068181847 Mon Sep 17 00:00:00 2001 From: aditya singh rathore <142787780+Adez017@users.noreply.github.com> Date: Mon, 5 May 2025 14:25:06 +0530 Subject: [PATCH 12/25] Update extending-operators.md --- docs/source/library-user-guide/extending-operators.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 0787a329ad196..1569968db91c2 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -94,6 +94,11 @@ The below example illustrates the example of topK node : # sync::Arc, # task::{Context, Poll}, # }; +#use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream}; +#use datafusion_execution::TaskContext; +#use std::sync::Arc; +#use std::collections::BTreeMap; +#use parquet::file::statistics::Statistics; #[derive(Debug)] struct TopKQueryPlanner {} From 8eeb34973350ecc7e6f6e2cb1deb261c13ded8ba Mon Sep 17 00:00:00 2001 From: aditya singh rathore <142787780+Adez017@users.noreply.github.com> Date: Mon, 5 May 2025 15:37:12 +0530 Subject: [PATCH 13/25] Update lib.rs --- datafusion/core/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index db8cc919c59bf..3a5c6aef83eae 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -748,6 +748,11 @@ pub mod common { // Backwards compatibility pub use common::config; +pub use std::sync::*; +pub use datafusion_execution::TaskContext; +pub use datafusion_expr_common::statistics::Distribution; +pub use parquet::file::statistics::Statistics; + // NB datafusion execution is re-exported in the `execution` module @@ -841,6 +846,7 @@ pub mod variable { #[cfg(not(target_arch = "wasm32"))] pub mod test; + mod schema_equivalence; pub mod test_util; From 6e9c19c4c5ba9d8e1d67b3cf511aa7209cc3857d Mon Sep 17 00:00:00 2001 From: aditya singh rathore <142787780+Adez017@users.noreply.github.com> Date: Mon, 5 May 2025 15:39:19 +0530 Subject: [PATCH 14/25] Update lib.rs --- datafusion/core/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index db8cc919c59bf..3a5c6aef83eae 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -748,6 +748,11 @@ pub mod common { // Backwards compatibility pub use common::config; +pub use std::sync::*; +pub use datafusion_execution::TaskContext; +pub use datafusion_expr_common::statistics::Distribution; +pub use parquet::file::statistics::Statistics; + // NB datafusion execution is re-exported in the `execution` module @@ -841,6 +846,7 @@ pub mod variable { #[cfg(not(target_arch = "wasm32"))] pub mod test; + mod schema_equivalence; pub mod test_util; From ba03eea4290519226ee9bd02f1a67aea3707f5b0 Mon Sep 17 00:00:00 2001 From: aditya singh rathore <142787780+Adez017@users.noreply.github.com> Date: Mon, 5 May 2025 16:01:16 +0530 Subject: [PATCH 15/25] Update lib.rs --- datafusion/core/src/lib.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 3a5c6aef83eae..db8cc919c59bf 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -748,11 +748,6 @@ pub mod common { // Backwards compatibility pub use common::config; -pub use std::sync::*; -pub use datafusion_execution::TaskContext; -pub use datafusion_expr_common::statistics::Distribution; -pub use parquet::file::statistics::Statistics; - // NB datafusion execution is re-exported in the `execution` module @@ -846,7 +841,6 @@ pub mod variable { #[cfg(not(target_arch = "wasm32"))] pub mod test; - mod schema_equivalence; pub mod test_util; From 7c4c5b02105ddd1dfd6a34c13430883bd4afaeb8 Mon Sep 17 00:00:00 2001 From: aditya singh rathore <142787780+Adez017@users.noreply.github.com> Date: Mon, 5 May 2025 16:03:33 +0530 Subject: [PATCH 16/25] Update lib.rs --- datafusion/core/src/lib.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 3a5c6aef83eae..db8cc919c59bf 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -748,11 +748,6 @@ pub mod common { // Backwards compatibility pub use common::config; -pub use std::sync::*; -pub use datafusion_execution::TaskContext; -pub use datafusion_expr_common::statistics::Distribution; -pub use parquet::file::statistics::Statistics; - // NB datafusion execution is re-exported in the `execution` module @@ -846,7 +841,6 @@ pub mod variable { #[cfg(not(target_arch = "wasm32"))] pub mod test; - mod schema_equivalence; pub mod test_util; From 80c0de8f4b57ae2de12c9b22d478ab62baf5f30f Mon Sep 17 00:00:00 2001 From: Adez017 Date: Tue, 6 May 2025 19:12:52 +0530 Subject: [PATCH 17/25] Update extend-operators.md --- .../library-user-guide/extending-operators.md | 102 +++++++++++------- 1 file changed, 61 insertions(+), 41 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 1569968db91c2..0f810fe56fc96 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -19,7 +19,7 @@ # Extending DataFusion's operators: custom LogicalPlan and Execution Plans -This module contains an end to end demonstration of creatinga user defined operator in DataFusion.12 .Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode` that add an OptimizerRule to rewrite a `LogicalPlan` to use that node as a `LogicalPlan`, create an `ExecutionPlan` and finally produce results. +This module contains an end-to-end demonstration of creating a user-defined operator in DataFusion. Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode`, add an OptimizerRule to rewrite a `LogicalPlan` to use that node, create an `ExecutionPlan`, and finally produce results. ## TopK Background: @@ -48,57 +48,55 @@ explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +--------------+----------------------------------------+ ``` -While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everythingother than the top 3 elements. +While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everything other than the top 3 elements. The same answer can be produced by simply keeping track of the top N elements, reducing the total amount of required buffer memory. ## Process for Defining Extending Operator -The below example illustrates the example of topK node : +The following example illustrates the implementation of a `TopK` node: ### LogicalPlan Node Definition - This section defines the custom logical plan node `TopKPlanNode`, which represents the `TopK` operation. - It includes trait implementations like `UserDefinedLogicalNodeCore` and Debug. - code: +**Code:** ```rust -# pub use datafusion::common::{ -# cast::{as_int64_array, as_string_array}, -# tree_node::Transformed, -# types::TypeSignature::Extension, -# DataFusionError, -# }; -# pub use datafusion::execution::{SessionState, TaskContext}; -# pub use datafusion::logical_expr::{ -# FetchType, LogicalPlan, LogicalPlan::Sort, UserDefinedLogicalNode, -# }; -# pub use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; -# pub use datafusion::physical_expr::EquivalenceProperties; -# pub use datafusion::physical_plan::{ -# execution_plan::{Boundedness, EmissionType}, -# internal_err, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, -# PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, -# }; -# pub use datafusion::physical_planner::{ -# DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, -# }; -# pub use arrow::array::{Int64Array, StringArray}; -# pub use arrow::datatypes::SchemaRef; -# pub use arrow::record_batch::RecordBatch; -# pub use async_trait::async_trait; -# pub use futures::Stream; -# pub use std::{ -# any::Any, -# collections::BTreeMap, -# fmt::{self, Debug}, -# sync::Arc, -# task::{Context, Poll}, -# }; -#use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream}; -#use datafusion_execution::TaskContext; -#use std::sync::Arc; -#use std::collections::BTreeMap; -#use parquet::file::statistics::Statistics; +use datafusion::common::{ + cast::{as_int64_array, as_string_array}, + tree_node::Transformed, + types::TypeSignature::Extension, + DataFusionError, +}; +use datafusion::execution::{SessionState, TaskContext}; +use datafusion::logical_expr::{ + FetchType, LogicalPlan, LogicalPlan::Sort, UserDefinedLogicalNode, +}; +use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::{ + execution_plan::{Boundedness, EmissionType}, + internal_err, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, +}; +use datafusion::physical_planner::{ + DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, +}; +use arrow::array::{Int64Array, StringArray}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use futures::Stream; +use std::{ + any::Any, + collections::BTreeMap, + fmt::{self, Debug}, + sync::Arc, + task::{Context, Poll}, +}; +use datafusion::physical_planner::planner::QueryPlanner; + + #[derive(Debug)] struct TopKQueryPlanner {} @@ -131,6 +129,11 @@ impl QueryPlanner for TopKQueryPlanner { code: ```rust +use std::sync::Arc; +use datafusion::logical_expr::{Expr, Extension}; +use datafusion::logical_expr::Limit; +use datafusion::logical_expr::Sort as LogicalSort; +use crate::invariant_mock::InvariantMock; #[derive(Default, Debug)] struct TopKOptimizerRule { /// A testing-only hashable fixture. @@ -195,6 +198,8 @@ impl OptimizerRule for TopKOptimizerRule { -The `TopKPlanner` is implemented to map the custom logical plan node (`TopKPlanNode`) to a physical execution plan (`TopKExec`). ```rust +use datafusion::logical_expr::UserDefinedLogicalNode; + struct TopKPlanner {} #[async_trait] @@ -232,6 +237,12 @@ impl ExtensionPlanner for TopKPlanner { code: ```rust +use datafusion::physical_plan::{Distribution, ExecutionPlan, PlanProperties}; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; // if used instead of a custom stream +use datafusion::error::Result; +use datafusion::common::internal_err; + struct TopKExec { input: Arc, /// The maximum number of values @@ -338,6 +349,15 @@ impl ExecutionPlan for TopKExec { - Contains helper functions like `add_row`, `remove_lowest_value`, and `accumulate_batch` for execution logic. ```rust +use futures::Stream; +use std::task::{Context, Poll}; +use std::collections::BTreeMap; +use std::sync::Arc; +use arrow::record_batch::RecordBatch; +use arrow::array::{Int64Array, StringArray}; +use datafusion::common::cast::{as_int64_array, as_string_array}; +use datafusion::error::Result; + struct TopKReader { /// The input to read data from input: SendableRecordBatchStream, From fac8673d9107107141a47e77afe065660cf44b9c Mon Sep 17 00:00:00 2001 From: Adez017 Date: Tue, 6 May 2025 19:17:53 +0530 Subject: [PATCH 18/25] pretier fix --- docs/source/library-user-guide/extending-operators.md | 9 --------- 1 file changed, 9 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 0f810fe56fc96..458aaf03e1bfb 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -349,15 +349,6 @@ impl ExecutionPlan for TopKExec { - Contains helper functions like `add_row`, `remove_lowest_value`, and `accumulate_batch` for execution logic. ```rust -use futures::Stream; -use std::task::{Context, Poll}; -use std::collections::BTreeMap; -use std::sync::Arc; -use arrow::record_batch::RecordBatch; -use arrow::array::{Int64Array, StringArray}; -use datafusion::common::cast::{as_int64_array, as_string_array}; -use datafusion::error::Result; - struct TopKReader { /// The input to read data from input: SendableRecordBatchStream, From 908b195ba8aed0964b769af39c6373b7890683e0 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Tue, 6 May 2025 19:21:28 +0530 Subject: [PATCH 19/25] pretier fix --- docs/source/library-user-guide/extending-operators.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 458aaf03e1bfb..49ce699d25d78 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -59,7 +59,7 @@ The following example illustrates the implementation of a `TopK` node: - This section defines the custom logical plan node `TopKPlanNode`, which represents the `TopK` operation. - It includes trait implementations like `UserDefinedLogicalNodeCore` and Debug. -**Code:** + **Code:** ```rust use datafusion::common::{ From 330d75d0d92d078a9abaedb66bace9d790c701c3 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Tue, 6 May 2025 22:24:48 +0530 Subject: [PATCH 20/25] Update imports --- docs/source/library-user-guide/extending-operators.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 49ce699d25d78..9cfe12992c52c 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -198,8 +198,12 @@ impl OptimizerRule for TopKOptimizerRule { -The `TopKPlanner` is implemented to map the custom logical plan node (`TopKPlanNode`) to a physical execution plan (`TopKExec`). ```rust -use datafusion::logical_expr::UserDefinedLogicalNode; - +use std::sync::Arc; +use async_trait::async_trait; +use datafusion::execution::{SessionState, TaskContext}; +use datafusion::logical_expr::LogicalPlan; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner, ExtensionPlanner}; struct TopKPlanner {} #[async_trait] From 6d95bd9272f895451a4590a6d6d7db8696d7637c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 9 May 2025 17:49:22 -0400 Subject: [PATCH 21/25] Tweaks --- .../library-user-guide/extending-operators.md | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 9cfe12992c52c..661d9b33cb0ad 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -19,24 +19,22 @@ # Extending DataFusion's operators: custom LogicalPlan and Execution Plans -This module contains an end-to-end demonstration of creating a user-defined operator in DataFusion. Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode`, add an OptimizerRule to rewrite a `LogicalPlan` to use that node, create an `ExecutionPlan`, and finally produce results. +This section contains an end-to-end demonstration of creating a user-defined operator in DataFusion. Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode`, add an `OptimizerRule` to rewrite a `LogicalPlan` to use that node, create an `ExecutionPlan`, and finally produce results. ## TopK Background: -A "Top K" node is a common query optimization which is used for queries such as "find the top 3 customers by revenue". The(simplified) SQL for such a query might be: +Note: DataFusion contains a highly optimized version of the `TopK` operator, but we present a simplified version in this section for explanatory purposes. For more information, see the full implementation in the [DataFusion repository]. -```sql -CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) - STORED AS CSV location 'tests/data/customer.csv'; -SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; -``` +[DataFusion repository]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html -And a naive plan would be: +"Top K" operator is a common query optimization used for queries such as "find the top 3 customers by revenue". The(simplified) SQL for such a query might be: ```sql explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; ``` +And a naive plan would be: + ```text +--------------+----------------------------------------+ | plan_type | plan | @@ -49,7 +47,7 @@ explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; ``` While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everything other than the top 3 elements. -The same answer can be produced by simply keeping track of the top N elements, reducing the total amount of required buffer memory. +The same answer can be produced more efficiently by simply keeping track of the top N elements, reducing the total amount of memory buffer. ## Process for Defining Extending Operator From e3087b38d762f9014ec997e8bdf969ef32b380bf Mon Sep 17 00:00:00 2001 From: Adez017 Date: Wed, 14 May 2025 22:45:16 +0530 Subject: [PATCH 22/25] tweaks fix --- .../library-user-guide/extending-operators.md | 300 +++++++++--------- 1 file changed, 156 insertions(+), 144 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 661d9b33cb0ad..b0b22c046e54d 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -53,155 +53,135 @@ The same answer can be produced more efficiently by simply keeping track of the The following example illustrates the implementation of a `TopK` node: -### LogicalPlan Node Definition - -- This section defines the custom logical plan node `TopKPlanNode`, which represents the `TopK` operation. -- It includes trait implementations like `UserDefinedLogicalNodeCore` and Debug. - **Code:** - ```rust -use datafusion::common::{ - cast::{as_int64_array, as_string_array}, - tree_node::Transformed, - types::TypeSignature::Extension, - DataFusionError, -}; -use datafusion::execution::{SessionState, TaskContext}; -use datafusion::logical_expr::{ - FetchType, LogicalPlan, LogicalPlan::Sort, UserDefinedLogicalNode, -}; -use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; -use datafusion::physical_expr::EquivalenceProperties; -use datafusion::physical_plan::{ - execution_plan::{Boundedness, EmissionType}, - internal_err, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, +use std::fmt::Debug; +use std::hash::Hash; +use std::task::{Context, Poll}; +use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; + +use arrow::{ + array::{Int64Array, StringArray}, + datatypes::SchemaRef, + record_batch::RecordBatch, + util::pretty::pretty_format_batches, }; -use datafusion::physical_planner::{ - DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, +use datafusion::execution::session_state::SessionStateBuilder; +use datafusion::{ + common::cast::{as_int64_array, as_string_array}, + common::{arrow_datafusion_err, internal_err, DFSchemaRef}, + error::{DataFusionError, Result}, + execution::{ + context::{QueryPlanner, SessionState, TaskContext}, + runtime_env::RuntimeEnv, + }, + logical_expr::{ + Expr, Extension, LogicalPlan, Sort, UserDefinedLogicalNode, + UserDefinedLogicalNodeCore, + }, + optimizer::{OptimizerConfig, OptimizerRule}, + physical_expr::EquivalenceProperties, + physical_plan::{ + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, + }, + physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}, + prelude::{SessionConfig, SessionContext}, }; -use arrow::array::{Int64Array, StringArray}; -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::ScalarValue; +use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr}; +use datafusion_optimizer::optimizer::ApplyOrder; +use datafusion_optimizer::AnalyzerRule; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; + use async_trait::async_trait; -use futures::Stream; -use std::{ - any::Any, - collections::BTreeMap, - fmt::{self, Debug}, - sync::Arc, - task::{Context, Poll}, -}; -use datafusion::physical_planner::planner::QueryPlanner; +use futures::{Stream, StreamExt}; +#[derive(PartialEq, Eq, PartialOrd, Hash)] +struct TopKPlanNode { + k: usize, + input: LogicalPlan, + /// The sort expression (this example only supports a single sort + /// expr) + expr: SortExpr, -#[derive(Debug)] -struct TopKQueryPlanner {} + /// A testing-only hashable fixture. + /// For actual use, define the [`Invariant`] in the [`UserDefinedLogicalNodeCore::invariants`]. + invariant_mock: Option, +} -#[async_trait] -impl QueryPlanner for TopKQueryPlanner { - /// Given a `LogicalPlan` created from above, create an - /// `ExecutionPlan` suitable for execution - async fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - session_state: &SessionState, - ) -> Result> { - // Teach the default physical planner how to plan TopK nodes. - let physical_planner = - DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( - TopKPlanner {}, - )]); - // Delegate most work of physical planning to the default physical planner - physical_planner - .create_physical_plan(logical_plan, session_state) - .await +impl Debug for TopKPlanNode { + /// For TopK, use explain format for the Debug format. Other types + /// of nodes may + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + UserDefinedLogicalNodeCore::fmt_for_explain(self, f) } } -``` - -### Optimizer Rule -- Implements the `TopKOptimizerRule` to detect a `Limit` followed by a Sort and replace it with the `TopKPlanNode`. -- Includes the logic for transforming the logical plan. - code: - -```rust -use std::sync::Arc; -use datafusion::logical_expr::{Expr, Extension}; -use datafusion::logical_expr::Limit; -use datafusion::logical_expr::Sort as LogicalSort; -use crate::invariant_mock::InvariantMock; -#[derive(Default, Debug)] -struct TopKOptimizerRule { - /// A testing-only hashable fixture. - invariant_mock: Option, +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] +struct InvariantMock { + should_fail_invariant: bool, + kind: InvariantLevel, } -impl OptimizerRule for TopKOptimizerRule { +impl UserDefinedLogicalNodeCore for TopKPlanNode { fn name(&self) -> &str { - "topk" + "TopK" } - fn apply_order(&self) -> Option { - Some(ApplyOrder::TopDown) + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] } - fn supports_rewrite(&self) -> bool { - true + /// Schema for TopK is the same as the input + fn schema(&self) -> &DFSchemaRef { + self.input.schema() } - // Example rewrite pass to insert a user defined LogicalPlanNode - fn rewrite( - &self, - plan: LogicalPlan, - _config: &dyn OptimizerConfig, - ) -> Result, DataFusionError> { - // Note: this code simply looks for the pattern of a Limit followed by a - // Sort and replaces it by a TopK node. It does not handle many - // edge cases (e.g multiple sort columns, sort ASC / DESC), etc. - let LogicalPlan::Limit(ref limit) = plan else { - return Ok(Transformed::no(plan)); - }; - let FetchType::Literal(Some(fetch)) = limit.get_fetch_type()? else { - return Ok(Transformed::no(plan)); - }; - - if let LogicalPlan::Sort(Sort { - ref expr, - ref input, - .. - }) = limit.input.as_ref() + fn check_invariants(&self, check: InvariantLevel, _plan: &LogicalPlan) -> Result<()> { + if let Some(InvariantMock { + should_fail_invariant, + kind, + }) = self.invariant_mock.clone() { - if expr.len() == 1 { - // we found a sort with a single sort expr, replace with a a TopK - return Ok(Transformed::yes(LogicalPlan::Extension(Extension { - node: Arc::new(TopKPlanNode { - k: fetch, - input: input.as_ref().clone(), - expr: expr[0].clone(), - invariant_mock: self.invariant_mock.clone(), - }), - }))); + if should_fail_invariant && check == kind { + return internal_err!("node fails check, such as improper inputs"); } } + Ok(()) + } - Ok(Transformed::no(plan)) + fn expressions(&self) -> Vec { + vec![self.expr.expr.clone()] + } + + /// For example: `TopK: k=10` + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "TopK: k={}", self.k) } -} -``` -### Physical planner + fn with_exprs_and_inputs( + &self, + mut exprs: Vec, + mut inputs: Vec, + ) -> Result { + assert_eq!(inputs.len(), 1, "input size inconsistent"); + assert_eq!(exprs.len(), 1, "expression size inconsistent"); + Ok(Self { + k: self.k, + input: inputs.swap_remove(0), + expr: self.expr.with_expr(exprs.swap_remove(0)), + invariant_mock: self.invariant_mock.clone(), + }) + } --The `TopKPlanner` is implemented to map the custom logical plan node (`TopKPlanNode`) to a physical execution plan (`TopKExec`). + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } +} -```rust -use std::sync::Arc; -use async_trait::async_trait; -use datafusion::execution::{SessionState, TaskContext}; -use datafusion::logical_expr::LogicalPlan; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner, ExtensionPlanner}; +/// Physical planner for TopK nodes struct TopKPlanner {} #[async_trait] @@ -230,21 +210,9 @@ impl ExtensionPlanner for TopKPlanner { ) } } -``` - -### Physical Execution Plan - -- Defines the physical execution operator `TopKExec` and its properties. -- Implements the `ExecutionPlan` trait to describe how the operator is executed. - code: - -```rust -use datafusion::physical_plan::{Distribution, ExecutionPlan, PlanProperties}; -use datafusion::execution::TaskContext; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; // if used instead of a custom stream -use datafusion::error::Result; -use datafusion::common::internal_err; +/// Physical operator that implements TopK for u64 data types. This +/// code is not general and is meant as an illustration only struct TopKExec { input: Arc, /// The maximum number of values @@ -343,14 +311,8 @@ impl ExecutionPlan for TopKExec { Ok(Statistics::new_unknown(&self.schema())) } } -``` - -### Execution Logic - -- Implements the `TopKReade`r, which processes input batches to calculate the top `k` values. -- Contains helper functions like `add_row`, `remove_lowest_value`, and `accumulate_batch` for execution logic. -```rust +// A very specialized TopK implementation struct TopKReader { /// The input to read data from input: SendableRecordBatchStream, @@ -461,4 +423,54 @@ impl RecordBatchStream for TopKReader { self.input.schema() } } -``` + +#[derive(Default, Debug)] +struct MyAnalyzerRule {} + +impl AnalyzerRule for MyAnalyzerRule { + fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result { + Self::analyze_plan(plan) + } + + fn name(&self) -> &str { + "my_analyzer_rule" + } +} + +impl MyAnalyzerRule { + fn analyze_plan(plan: LogicalPlan) -> Result { + plan.transform(|plan| { + Ok(match plan { + LogicalPlan::Projection(projection) => { + let expr = Self::analyze_expr(projection.expr.clone())?; + Transformed::yes(LogicalPlan::Projection(Projection::try_new( + expr, + projection.input, + )?)) + } + _ => Transformed::no(plan), + }) + }) + .data() + } + + fn analyze_expr(expr: Vec) -> Result> { + expr.into_iter() + .map(|e| { + e.transform(|e| { + Ok(match e { + Expr::Literal(ScalarValue::Int64(i)) => { + // transform to UInt64 + Transformed::yes(Expr::Literal(ScalarValue::UInt64( + i.map(|i| i as u64), + ))) + } + _ => Transformed::no(e), + }) + }) + .data() + }) + .collect() + } +} +``` \ No newline at end of file From 6d51a3f7bcefc1cf172f8c611f41c402bd107d87 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Wed, 14 May 2025 22:58:28 +0530 Subject: [PATCH 23/25] prettier fix --- docs/source/library-user-guide/extending-operators.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index b4c1293b4e66b..9495b7dd0f7a2 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -19,7 +19,6 @@ # Extending DataFusion's operators: custom LogicalPlan and Execution Plans - This section contains an end-to-end demonstration of creating a user-defined operator in DataFusion. Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode`, add an `OptimizerRule` to rewrite a `LogicalPlan` to use that node, create an `ExecutionPlan`, and finally produce results. ## TopK Background: @@ -28,7 +27,7 @@ Note: DataFusion contains a highly optimized version of the `TopK` operator, but [DataFusion repository]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html -"Top K" operator is a common query optimization used for queries such as "find the top 3 customers by revenue". The(simplified) SQL for such a query might be: +"Top K" operator is a common query optimization used for queries such as "find the top 3 customers by revenue". The(simplified) SQL for such a query might be: ```sql explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; @@ -475,6 +474,7 @@ impl MyAnalyzerRule { } } ``` + DataFusion supports extension of operators by transforming logical plan and execution plan through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section will use the µWheel project to illustrate such capabilities. ## About DataFusion µWheel @@ -513,4 +513,3 @@ fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result { ``` To get a deeper dive into the usage of the µWheel project, visit the [blog post](https://uwheel.rs/post/datafusion_uwheel/) by Max Meldrum. - From 0e918a45d355ede25328c9e5719446dc2486e167 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Thu, 15 May 2025 09:04:00 +0530 Subject: [PATCH 24/25] imports fix --- .../library-user-guide/extending-operators.md | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 9495b7dd0f7a2..8742d67010b82 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -54,19 +54,19 @@ The same answer can be produced more efficiently by simply keeping track of the The following example illustrates the implementation of a `TopK` node: ```rust -use std::fmt::Debug; -use std::hash::Hash; -use std::task::{Context, Poll}; -use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; +# use std::fmt::Debug; +# use std::hash::Hash; +# use std::task::{Context, Poll}; +# use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; -use arrow::{ +# use arrow::{ array::{Int64Array, StringArray}, datatypes::SchemaRef, record_batch::RecordBatch, util::pretty::pretty_format_batches, }; -use datafusion::execution::session_state::SessionStateBuilder; -use datafusion::{ +# use datafusion::execution::session_state::SessionStateBuilder; +# use datafusion::{ common::cast::{as_int64_array, as_string_array}, common::{arrow_datafusion_err, internal_err, DFSchemaRef}, error::{DataFusionError, Result}, @@ -87,16 +87,16 @@ use datafusion::{ physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}, prelude::{SessionConfig, SessionContext}, }; -use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::ScalarValue; -use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr}; -use datafusion_optimizer::optimizer::ApplyOrder; -use datafusion_optimizer::AnalyzerRule; -use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; - -use async_trait::async_trait; -use futures::{Stream, StreamExt}; +# use datafusion_common::config::ConfigOptions; +# use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +# use datafusion_common::ScalarValue; +# use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr}; +# use datafusion_optimizer::optimizer::ApplyOrder; +# use datafusion_optimizer::AnalyzerRule; +# use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; + +# use async_trait::async_trait; +# use futures::{Stream, StreamExt}; #[derive(PartialEq, Eq, PartialOrd, Hash)] struct TopKPlanNode { From d3f31162e658989fb44bd385a0ad7b638fad75b8 Mon Sep 17 00:00:00 2001 From: Adez017 Date: Fri, 16 May 2025 18:16:06 +0530 Subject: [PATCH 25/25] duplicates uWheel content --- .../library-user-guide/extending-operators.md | 39 +------------------ 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 8742d67010b82..0283eb7c2dfd8 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -475,41 +475,4 @@ impl MyAnalyzerRule { } ``` -DataFusion supports extension of operators by transforming logical plan and execution plan through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section will use the µWheel project to illustrate such capabilities. - -## About DataFusion µWheel - -[DataFusion µWheel](https://github.com/uwheel/datafusion-uwheel/tree/main) is a native DataFusion optimizer which improves query performance for time-based analytics through fast temporal aggregation and pruning using custom indices. The integration of µWheel into DataFusion is a joint effort with the DataFusion community. - -### Optimizing Logical Plan - -The `rewrite` function transforms logical plans by identifying temporal patterns and aggregation functions that match the stored wheel indices. When match is found, it queries the corresponding index to retrieve pre-computed aggregate values, stores these results in a [MemTable](https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html), and returns as a new `LogicalPlan::TableScan`. If no match is found, the original plan proceeds unchanged through DataFusion's standard execution path. - -```rust,ignore -fn rewrite( - &self, - plan: LogicalPlan, - _config: &dyn OptimizerConfig, -) -> Result> { - // Attemps to rewrite a logical plan to a uwheel-based plan that either provides - // plan-time aggregates or skips execution based on min/max pruning. - if let Some(rewritten) = self.try_rewrite(&plan) { - Ok(Transformed::yes(rewritten)) - } else { - Ok(Transformed::no(plan)) - } -} -``` - -```rust,ignore -// Converts a uwheel aggregate result to a TableScan with a MemTable as source -fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result { - let data = Float64Array::from(vec![result]); - let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)])?; - let df_schema = Arc::new(DFSchema::try_from(schema.clone())?); - let mem_table = MemTable::try_new(schema, vec![vec![record_batch]])?; - mem_table_as_table_scan(mem_table, df_schema) -} -``` - -To get a deeper dive into the usage of the µWheel project, visit the [blog post](https://uwheel.rs/post/datafusion_uwheel/) by Max Meldrum. +DataFusion supports extension of operators by transforming logical plan and execution plan through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html).