diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 90e0947f64f62..3d58d5f54d4ba 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -570,7 +570,7 @@ mod tests { let plan = ctx.sql(&query).await?; let plan = plan.create_physical_plan().await?; let bytes = physical_plan_to_bytes(plan.clone())?; - let plan2 = physical_plan_from_bytes(&bytes, &ctx)?; + let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?; let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false)); let plan2_formatted = format!("{}", displayable(plan2.as_ref()).indent(false)); diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 30ecb4d33baa3..b93bdf254a279 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -424,7 +424,7 @@ mod tests { let plan = ctx.sql(&query).await?; let plan = plan.create_physical_plan().await?; let bytes = physical_plan_to_bytes(plan.clone())?; - let plan2 = physical_plan_from_bytes(&bytes, &ctx)?; + let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?; let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false)); let plan2_formatted = format!("{}", displayable(plan2.as_ref()).indent(false)); diff --git a/datafusion-examples/examples/composed_extension_codec.rs b/datafusion-examples/examples/composed_extension_codec.rs index d3548167f197e..57f2c370413aa 100644 --- a/datafusion-examples/examples/composed_extension_codec.rs +++ b/datafusion-examples/examples/composed_extension_codec.rs @@ -32,12 +32,11 @@ use std::any::Any; use std::fmt::Debug; -use std::ops::Deref; use std::sync::Arc; use datafusion::common::internal_err; use datafusion::common::Result; -use datafusion::logical_expr::registry::FunctionRegistry; +use datafusion::execution::TaskContext; use datafusion::physical_plan::{DisplayAs, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_proto::physical_plan::{ @@ -71,9 +70,8 @@ async fn main() { .expect("to proto"); // deserialize proto back to execution plan - let runtime = ctx.runtime_env(); let result_exec_plan: Arc = proto - .try_into_physical_plan(&ctx, runtime.deref(), &composed_codec) + .try_into_physical_plan(&ctx.task_ctx(), &composed_codec) .expect("from proto"); // assert that the original and deserialized execution plans are equal @@ -124,7 +122,7 @@ impl ExecutionPlan for ParentExec { fn execute( &self, _partition: usize, - _context: Arc, + _context: Arc, ) -> Result { unreachable!() } @@ -139,7 +137,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { &self, buf: &[u8], inputs: &[Arc], - _registry: &dyn FunctionRegistry, + _ctx: &TaskContext, ) -> Result> { if buf == "ParentExec".as_bytes() { Ok(Arc::new(ParentExec { @@ -200,7 +198,7 @@ impl ExecutionPlan for ChildExec { fn execute( &self, _partition: usize, - _context: Arc, + _context: Arc, ) -> Result { unreachable!() } @@ -215,7 +213,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { &self, buf: &[u8], _inputs: &[Arc], - _registry: &dyn FunctionRegistry, + _ctx: &TaskContext, ) -> Result> { if buf == "ChildExec".as_bytes() { Ok(Arc::new(ChildExec {})) diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 832e82dda35b5..48c2698a58c75 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -181,6 +181,7 @@ impl TryFrom for PlanProperties { // TODO Extend FFI to get the registry and codex let default_ctx = SessionContext::new(); + let task_context = default_ctx.task_ctx(); let codex = DefaultPhysicalExtensionCodec {}; let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) }; @@ -190,7 +191,7 @@ impl TryFrom for PlanProperties { .map_err(|e| DataFusionError::External(Box::new(e)))?; let sort_exprs = parse_physical_sort_exprs( &proto_output_ordering.physical_sort_expr_nodes, - &default_ctx, + &task_context, &schema, &codex, )?; @@ -202,7 +203,7 @@ impl TryFrom for PlanProperties { .map_err(|e| DataFusionError::External(Box::new(e)))?; let partitioning = parse_protobuf_partitioning( Some(&proto_output_partitioning), - &default_ctx, + &task_context, &schema, &codex, )? diff --git a/datafusion/ffi/src/udaf/accumulator_args.rs b/datafusion/ffi/src/udaf/accumulator_args.rs index 2cd2fa5f51035..594b839458b03 100644 --- a/datafusion/ffi/src/udaf/accumulator_args.rs +++ b/datafusion/ffi/src/udaf/accumulator_args.rs @@ -116,16 +116,17 @@ impl TryFrom for ForeignAccumulatorArgs { let schema = Schema::try_from(&value.schema.0)?; let default_ctx = SessionContext::new(); + let task_ctx = default_ctx.task_ctx(); let codex = DefaultPhysicalExtensionCodec {}; let order_bys = parse_physical_sort_exprs( &proto_def.ordering_req, - &default_ctx, + &task_ctx, &schema, &codex, )?; - let exprs = parse_physical_exprs(&proto_def.expr, &default_ctx, &schema, &codex)?; + let exprs = parse_physical_exprs(&proto_def.expr, &task_ctx, &schema, &codex)?; Ok(Self { return_field, diff --git a/datafusion/ffi/src/udwf/partition_evaluator_args.rs b/datafusion/ffi/src/udwf/partition_evaluator_args.rs index dffeb23741b66..b6f9d2a13ecea 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator_args.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator_args.rs @@ -148,7 +148,7 @@ impl TryFrom for ForeignPartitionEvaluatorArgs { .map_err(|e| DataFusionError::Execution(e.to_string()))? .iter() .map(|expr_node| { - parse_physical_expr(expr_node, &default_ctx, &schema, &codec) + parse_physical_expr(expr_node, &default_ctx.task_ctx(), &schema, &codec) }) .collect::>>()?; diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 16d65c419ae6c..5b07e59e807f0 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -24,6 +24,7 @@ use crate::physical_plan::{ AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, }; use crate::protobuf; +use datafusion::execution::TaskContext; use datafusion_common::{plan_datafusion_err, Result}; use datafusion_expr::{ create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility, @@ -316,13 +317,13 @@ pub fn physical_plan_from_json( let back: protobuf::PhysicalPlanNode = serde_json::from_str(json) .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?; let extension_codec = DefaultPhysicalExtensionCodec {}; - back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec) + back.try_into_physical_plan(&ctx.task_ctx(), &extension_codec) } /// Deserialize a PhysicalPlan from bytes pub fn physical_plan_from_bytes( bytes: &[u8], - ctx: &SessionContext, + ctx: &TaskContext, ) -> Result> { let extension_codec = DefaultPhysicalExtensionCodec {}; physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec) @@ -331,10 +332,10 @@ pub fn physical_plan_from_bytes( /// Deserialize a PhysicalPlan from bytes pub fn physical_plan_from_bytes_with_extension_codec( bytes: &[u8], - ctx: &SessionContext, + ctx: &TaskContext, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let protobuf = protobuf::PhysicalPlanNode::decode(bytes) .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?; - protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec) + protobuf.try_into_physical_plan(ctx, extension_codec) } diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index b4d72aa1b6cb3..691ee3f067bab 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -115,7 +115,7 @@ //! let bytes = physical_plan_to_bytes(physical_plan.clone())?; //! //! // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan -//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?; +//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?; //! assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip)); //! # Ok(()) //! # } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 910a3fd73c606..ee69ab75b25c8 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -38,7 +38,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource, }; -use datafusion::execution::FunctionRegistry; +use datafusion::execution::{FunctionRegistry, TaskContext}; use datafusion::logical_expr::WindowFunctionDefinition; use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ @@ -47,8 +47,6 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field}; use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; -use datafusion::prelude::SessionContext; -use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_proto_common::common::proto_error; @@ -76,7 +74,7 @@ impl From<&protobuf::PhysicalColumn> for Column { /// * `codec` - An extension codec used to decode custom UDFs. pub fn parse_physical_sort_expr( proto: &protobuf::PhysicalSortExprNode, - ctx: &SessionContext, + ctx: &TaskContext, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result { @@ -103,7 +101,7 @@ pub fn parse_physical_sort_expr( /// * `codec` - An extension codec used to decode custom UDFs. pub fn parse_physical_sort_exprs( proto: &[protobuf::PhysicalSortExprNode], - ctx: &SessionContext, + ctx: &TaskContext, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { @@ -125,7 +123,7 @@ pub fn parse_physical_sort_exprs( /// * `codec` - An extension codec used to decode custom UDFs. pub fn parse_physical_window_expr( proto: &protobuf::PhysicalWindowExprNode, - ctx: &SessionContext, + ctx: &TaskContext, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { @@ -186,7 +184,7 @@ pub fn parse_physical_window_expr( pub fn parse_physical_exprs<'a, I>( protos: I, - ctx: &SessionContext, + ctx: &TaskContext, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result>> @@ -210,7 +208,7 @@ where /// * `codec` - An extension codec used to decode custom UDFs. pub fn parse_physical_expr( proto: &protobuf::PhysicalExprNode, - ctx: &SessionContext, + ctx: &TaskContext, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { @@ -364,11 +362,8 @@ pub fn parse_physical_expr( let scalar_fun_def = Arc::clone(&udf); let args = parse_physical_exprs(&e.args, ctx, input_schema, codec)?; - let config_options = - match ctx.state().execution_props().config_options.as_ref() { - Some(config_options) => Arc::clone(config_options), - None => Arc::new(ConfigOptions::default()), - }; + + let config_options = Arc::clone(ctx.session_config().options()); Arc::new( ScalarFunctionExpr::new( @@ -419,7 +414,7 @@ pub fn parse_physical_expr( fn parse_required_physical_expr( expr: Option<&protobuf::PhysicalExprNode>, - ctx: &SessionContext, + ctx: &TaskContext, field: &str, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, @@ -433,7 +428,7 @@ fn parse_required_physical_expr( pub fn parse_protobuf_hash_partitioning( partitioning: Option<&protobuf::PhysicalHashRepartition>, - ctx: &SessionContext, + ctx: &TaskContext, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { @@ -453,7 +448,7 @@ pub fn parse_protobuf_hash_partitioning( pub fn parse_protobuf_partitioning( partitioning: Option<&protobuf::Partitioning>, - ctx: &SessionContext, + ctx: &TaskContext, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { @@ -491,7 +486,7 @@ pub fn parse_protobuf_file_scan_schema( pub fn parse_protobuf_file_scan_config( proto: &protobuf::FileScanExecConf, - ctx: &SessionContext, + ctx: &TaskContext, codec: &dyn PhysicalExtensionCodec, file_source: Arc, ) -> Result { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 04a4372c19ca1..e7d8479c1405a 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -57,8 +57,7 @@ use datafusion::datasource::physical_plan::{ }; use datafusion::datasource::sink::DataSinkExec; use datafusion::datasource::source::{DataSource, DataSourceExec}; -use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::execution::FunctionRegistry; +use datafusion::execution::{FunctionRegistry, TaskContext}; use datafusion::functions_table::generate_series::{ Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue, }; @@ -98,7 +97,6 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; -use datafusion::prelude::SessionContext; use prost::bytes::BufMut; use prost::Message; @@ -127,8 +125,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { fn try_into_physical_plan( &self, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let plan = self.physical_plan_type.as_ref().ok_or_else(|| { @@ -137,171 +135,118 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { )) })?; match plan { - PhysicalPlanType::Explain(explain) => self.try_into_explain_physical_plan( - explain, - ctx, - runtime, - extension_codec, - ), - PhysicalPlanType::Projection(projection) => self - .try_into_projection_physical_plan( - projection, - ctx, - runtime, - extension_codec, - ), + PhysicalPlanType::Explain(explain) => { + self.try_into_explain_physical_plan(explain, ctx, extension_codec) + } + PhysicalPlanType::Projection(projection) => { + self.try_into_projection_physical_plan(projection, ctx, extension_codec) + } PhysicalPlanType::Filter(filter) => { - self.try_into_filter_physical_plan(filter, ctx, runtime, extension_codec) + self.try_into_filter_physical_plan(filter, ctx, extension_codec) } PhysicalPlanType::CsvScan(scan) => { - self.try_into_csv_scan_physical_plan(scan, ctx, runtime, extension_codec) + self.try_into_csv_scan_physical_plan(scan, ctx, extension_codec) } PhysicalPlanType::JsonScan(scan) => { - self.try_into_json_scan_physical_plan(scan, ctx, runtime, extension_codec) + self.try_into_json_scan_physical_plan(scan, ctx, extension_codec) } #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] - PhysicalPlanType::ParquetScan(scan) => self - .try_into_parquet_scan_physical_plan(scan, ctx, runtime, extension_codec), + PhysicalPlanType::ParquetScan(scan) => { + self.try_into_parquet_scan_physical_plan(scan, ctx, extension_codec) + } #[cfg_attr(not(feature = "avro"), allow(unused_variables))] PhysicalPlanType::AvroScan(scan) => { - self.try_into_avro_scan_physical_plan(scan, ctx, runtime, extension_codec) + self.try_into_avro_scan_physical_plan(scan, ctx, extension_codec) + } + PhysicalPlanType::MemoryScan(scan) => { + self.try_into_memory_scan_physical_plan(scan, ctx, extension_codec) } - PhysicalPlanType::MemoryScan(scan) => self - .try_into_memory_scan_physical_plan(scan, ctx, runtime, extension_codec), PhysicalPlanType::CoalesceBatches(coalesce_batches) => self .try_into_coalesce_batches_physical_plan( coalesce_batches, ctx, - runtime, extension_codec, ), PhysicalPlanType::Merge(merge) => { - self.try_into_merge_physical_plan(merge, ctx, runtime, extension_codec) + self.try_into_merge_physical_plan(merge, ctx, extension_codec) + } + PhysicalPlanType::Repartition(repart) => { + self.try_into_repartition_physical_plan(repart, ctx, extension_codec) + } + PhysicalPlanType::GlobalLimit(limit) => { + self.try_into_global_limit_physical_plan(limit, ctx, extension_codec) + } + PhysicalPlanType::LocalLimit(limit) => { + self.try_into_local_limit_physical_plan(limit, ctx, extension_codec) + } + PhysicalPlanType::Window(window_agg) => { + self.try_into_window_physical_plan(window_agg, ctx, extension_codec) + } + PhysicalPlanType::Aggregate(hash_agg) => { + self.try_into_aggregate_physical_plan(hash_agg, ctx, extension_codec) + } + PhysicalPlanType::HashJoin(hashjoin) => { + self.try_into_hash_join_physical_plan(hashjoin, ctx, extension_codec) } - PhysicalPlanType::Repartition(repart) => self - .try_into_repartition_physical_plan( - repart, - ctx, - runtime, - extension_codec, - ), - PhysicalPlanType::GlobalLimit(limit) => self - .try_into_global_limit_physical_plan( - limit, - ctx, - runtime, - extension_codec, - ), - PhysicalPlanType::LocalLimit(limit) => self - .try_into_local_limit_physical_plan(limit, ctx, runtime, extension_codec), - PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan( - window_agg, - ctx, - runtime, - extension_codec, - ), - PhysicalPlanType::Aggregate(hash_agg) => self - .try_into_aggregate_physical_plan( - hash_agg, - ctx, - runtime, - extension_codec, - ), - PhysicalPlanType::HashJoin(hashjoin) => self - .try_into_hash_join_physical_plan( - hashjoin, - ctx, - runtime, - extension_codec, - ), PhysicalPlanType::SymmetricHashJoin(sym_join) => self .try_into_symmetric_hash_join_physical_plan( sym_join, ctx, - runtime, extension_codec, ), PhysicalPlanType::Union(union) => { - self.try_into_union_physical_plan(union, ctx, runtime, extension_codec) + self.try_into_union_physical_plan(union, ctx, extension_codec) + } + PhysicalPlanType::Interleave(interleave) => { + self.try_into_interleave_physical_plan(interleave, ctx, extension_codec) + } + PhysicalPlanType::CrossJoin(crossjoin) => { + self.try_into_cross_join_physical_plan(crossjoin, ctx, extension_codec) } - PhysicalPlanType::Interleave(interleave) => self - .try_into_interleave_physical_plan( - interleave, - ctx, - runtime, - extension_codec, - ), - PhysicalPlanType::CrossJoin(crossjoin) => self - .try_into_cross_join_physical_plan( - crossjoin, - ctx, - runtime, - extension_codec, - ), PhysicalPlanType::Empty(empty) => { - self.try_into_empty_physical_plan(empty, ctx, runtime, extension_codec) + self.try_into_empty_physical_plan(empty, ctx, extension_codec) } PhysicalPlanType::PlaceholderRow(placeholder) => self .try_into_placeholder_row_physical_plan( placeholder, ctx, - runtime, extension_codec, ), PhysicalPlanType::Sort(sort) => { - self.try_into_sort_physical_plan(sort, ctx, runtime, extension_codec) + self.try_into_sort_physical_plan(sort, ctx, extension_codec) } PhysicalPlanType::SortPreservingMerge(sort) => self - .try_into_sort_preserving_merge_physical_plan( - sort, - ctx, - runtime, - extension_codec, - ), - PhysicalPlanType::Extension(extension) => self - .try_into_extension_physical_plan( - extension, - ctx, - runtime, - extension_codec, - ), - PhysicalPlanType::NestedLoopJoin(join) => self - .try_into_nested_loop_join_physical_plan( - join, - ctx, - runtime, - extension_codec, - ), - PhysicalPlanType::Analyze(analyze) => self.try_into_analyze_physical_plan( - analyze, - ctx, - runtime, - extension_codec, - ), + .try_into_sort_preserving_merge_physical_plan(sort, ctx, extension_codec), + PhysicalPlanType::Extension(extension) => { + self.try_into_extension_physical_plan(extension, ctx, extension_codec) + } + PhysicalPlanType::NestedLoopJoin(join) => { + self.try_into_nested_loop_join_physical_plan(join, ctx, extension_codec) + } + PhysicalPlanType::Analyze(analyze) => { + self.try_into_analyze_physical_plan(analyze, ctx, extension_codec) + } PhysicalPlanType::JsonSink(sink) => { - self.try_into_json_sink_physical_plan(sink, ctx, runtime, extension_codec) + self.try_into_json_sink_physical_plan(sink, ctx, extension_codec) } PhysicalPlanType::CsvSink(sink) => { - self.try_into_csv_sink_physical_plan(sink, ctx, runtime, extension_codec) + self.try_into_csv_sink_physical_plan(sink, ctx, extension_codec) } #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] - PhysicalPlanType::ParquetSink(sink) => self - .try_into_parquet_sink_physical_plan(sink, ctx, runtime, extension_codec), + PhysicalPlanType::ParquetSink(sink) => { + self.try_into_parquet_sink_physical_plan(sink, ctx, extension_codec) + } PhysicalPlanType::Unnest(unnest) => { - self.try_into_unnest_physical_plan(unnest, ctx, runtime, extension_codec) + self.try_into_unnest_physical_plan(unnest, ctx, extension_codec) + } + PhysicalPlanType::Cooperative(cooperative) => { + self.try_into_cooperative_physical_plan(cooperative, ctx, extension_codec) } - PhysicalPlanType::Cooperative(cooperative) => self - .try_into_cooperative_physical_plan( - cooperative, - ctx, - runtime, - extension_codec, - ), PhysicalPlanType::GenerateSeries(generate_series) => { self.try_into_generate_series_physical_plan(generate_series) } PhysicalPlanType::SortMergeJoin(sort_join) => { - self.try_into_sort_join(sort_join, ctx, runtime, extension_codec) + self.try_into_sort_join(sort_join, ctx, extension_codec) } } } @@ -546,8 +491,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_explain_physical_plan( &self, explain: &protobuf::ExplainExecNode, - _ctx: &SessionContext, - _runtime: &RuntimeEnv, + _ctx: &TaskContext, + _extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { Ok(Arc::new(ExplainExec::new( @@ -564,12 +509,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_projection_physical_plan( &self, projection: &protobuf::ProjectionExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&projection.input, ctx, runtime, extension_codec)?; + into_physical_plan(&projection.input, ctx, extension_codec)?; let exprs = projection .expr .iter() @@ -596,12 +541,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_filter_physical_plan( &self, filter: &protobuf::FilterExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&filter.input, ctx, runtime, extension_codec)?; + into_physical_plan(&filter.input, ctx, extension_codec)?; let predicate = filter .expr @@ -644,8 +589,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_csv_scan_physical_plan( &self, scan: &protobuf::CsvScanExecNode, - ctx: &SessionContext, - _runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let escape = @@ -691,8 +636,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_json_scan_physical_plan( &self, scan: &protobuf::JsonScanExecNode, - ctx: &SessionContext, - _runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let scan_conf = parse_protobuf_file_scan_config( @@ -708,8 +653,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_parquet_scan_physical_plan( &self, scan: &protobuf::ParquetScanExecNode, - ctx: &SessionContext, - _runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { #[cfg(feature = "parquet")] @@ -769,8 +714,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_avro_scan_physical_plan( &self, scan: &protobuf::AvroScanExecNode, - ctx: &SessionContext, - _runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { #[cfg(feature = "avro")] @@ -790,8 +735,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_memory_scan_physical_plan( &self, scan: &protobuf::MemoryScanExecNode, - ctx: &SessionContext, - _runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let partitions = scan @@ -841,12 +786,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_coalesce_batches_physical_plan( &self, coalesce_batches: &protobuf::CoalesceBatchesExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&coalesce_batches.input, ctx, runtime, extension_codec)?; + into_physical_plan(&coalesce_batches.input, ctx, extension_codec)?; Ok(Arc::new( CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize) .with_fetch(coalesce_batches.fetch.map(|f| f as usize)), @@ -856,12 +801,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_merge_physical_plan( &self, merge: &protobuf::CoalescePartitionsExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&merge.input, ctx, runtime, extension_codec)?; + into_physical_plan(&merge.input, ctx, extension_codec)?; Ok(Arc::new( CoalescePartitionsExec::new(input) .with_fetch(merge.fetch.map(|f| f as usize)), @@ -871,12 +816,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_repartition_physical_plan( &self, repart: &protobuf::RepartitionExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&repart.input, ctx, runtime, extension_codec)?; + into_physical_plan(&repart.input, ctx, extension_codec)?; let partitioning = parse_protobuf_partitioning( repart.partitioning.as_ref(), ctx, @@ -892,12 +837,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_global_limit_physical_plan( &self, limit: &protobuf::GlobalLimitExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&limit.input, ctx, runtime, extension_codec)?; + into_physical_plan(&limit.input, ctx, extension_codec)?; let fetch = if limit.fetch >= 0 { Some(limit.fetch as usize) } else { @@ -913,24 +858,24 @@ impl protobuf::PhysicalPlanNode { fn try_into_local_limit_physical_plan( &self, limit: &protobuf::LocalLimitExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&limit.input, ctx, runtime, extension_codec)?; + into_physical_plan(&limit.input, ctx, extension_codec)?; Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize))) } fn try_into_window_physical_plan( &self, window_agg: &protobuf::WindowAggExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&window_agg.input, ctx, runtime, extension_codec)?; + into_physical_plan(&window_agg.input, ctx, extension_codec)?; let input_schema = input.schema(); let physical_window_expr: Vec> = window_agg @@ -983,12 +928,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_aggregate_physical_plan( &self, hash_agg: &protobuf::AggregateExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&hash_agg.input, ctx, runtime, extension_codec)?; + into_physical_plan(&hash_agg.input, ctx, extension_codec)?; let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| { proto_error(format!( "Received a AggregateNode message with unknown AggregateMode {}", @@ -1151,14 +1096,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_hash_join_physical_plan( &self, hashjoin: &protobuf::HashJoinExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let left: Arc = - into_physical_plan(&hashjoin.left, ctx, runtime, extension_codec)?; + into_physical_plan(&hashjoin.left, ctx, extension_codec)?; let right: Arc = - into_physical_plan(&hashjoin.right, ctx, runtime, extension_codec)?; + into_physical_plan(&hashjoin.right, ctx, extension_codec)?; let left_schema = left.schema(); let right_schema = right.schema(); let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin @@ -1269,12 +1214,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_symmetric_hash_join_physical_plan( &self, sym_join: &protobuf::SymmetricHashJoinExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { - let left = into_physical_plan(&sym_join.left, ctx, runtime, extension_codec)?; - let right = into_physical_plan(&sym_join.right, ctx, runtime, extension_codec)?; + let left = into_physical_plan(&sym_join.left, ctx, extension_codec)?; + let right = into_physical_plan(&sym_join.right, ctx, extension_codec)?; let left_schema = left.schema(); let right_schema = right.schema(); let on = sym_join @@ -1397,13 +1342,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_union_physical_plan( &self, union: &protobuf::UnionExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let mut inputs: Vec> = vec![]; for input in &union.inputs { - inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?); + inputs.push(input.try_into_physical_plan(ctx, extension_codec)?); } UnionExec::try_new(inputs) } @@ -1411,13 +1356,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_interleave_physical_plan( &self, interleave: &protobuf::InterleaveExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let mut inputs: Vec> = vec![]; for input in &interleave.inputs { - inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?); + inputs.push(input.try_into_physical_plan(ctx, extension_codec)?); } Ok(Arc::new(InterleaveExec::try_new(inputs)?)) } @@ -1425,22 +1370,22 @@ impl protobuf::PhysicalPlanNode { fn try_into_cross_join_physical_plan( &self, crossjoin: &protobuf::CrossJoinExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let left: Arc = - into_physical_plan(&crossjoin.left, ctx, runtime, extension_codec)?; + into_physical_plan(&crossjoin.left, ctx, extension_codec)?; let right: Arc = - into_physical_plan(&crossjoin.right, ctx, runtime, extension_codec)?; + into_physical_plan(&crossjoin.right, ctx, extension_codec)?; Ok(Arc::new(CrossJoinExec::new(left, right))) } fn try_into_empty_physical_plan( &self, empty: &protobuf::EmptyExecNode, - _ctx: &SessionContext, - _runtime: &RuntimeEnv, + _ctx: &TaskContext, + _extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let schema = Arc::new(convert_required!(empty.schema)?); @@ -1450,8 +1395,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_placeholder_row_physical_plan( &self, placeholder: &protobuf::PlaceholderRowExecNode, - _ctx: &SessionContext, - _runtime: &RuntimeEnv, + _ctx: &TaskContext, + _extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let schema = Arc::new(convert_required!(placeholder.schema)?); @@ -1461,11 +1406,11 @@ impl protobuf::PhysicalPlanNode { fn try_into_sort_physical_plan( &self, sort: &protobuf::SortExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { - let input = into_physical_plan(&sort.input, ctx, runtime, extension_codec)?; + let input = into_physical_plan(&sort.input, ctx, extension_codec)?; let exprs = sort .expr .iter() @@ -1513,11 +1458,11 @@ impl protobuf::PhysicalPlanNode { fn try_into_sort_preserving_merge_physical_plan( &self, sort: &protobuf::SortPreservingMergeExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { - let input = into_physical_plan(&sort.input, ctx, runtime, extension_codec)?; + let input = into_physical_plan(&sort.input, ctx, extension_codec)?; let exprs = sort .expr .iter() @@ -1566,14 +1511,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_extension_physical_plan( &self, extension: &protobuf::PhysicalExtensionNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let inputs: Vec> = extension .inputs .iter() - .map(|i| i.try_into_physical_plan(ctx, runtime, extension_codec)) + .map(|i| i.try_into_physical_plan(ctx, extension_codec)) .collect::>()?; let extension_node = @@ -1585,14 +1530,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_nested_loop_join_physical_plan( &self, join: &protobuf::NestedLoopJoinExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let left: Arc = - into_physical_plan(&join.left, ctx, runtime, extension_codec)?; + into_physical_plan(&join.left, ctx, extension_codec)?; let right: Arc = - into_physical_plan(&join.right, ctx, runtime, extension_codec)?; + into_physical_plan(&join.right, ctx, extension_codec)?; let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| { proto_error(format!( "Received a NestedLoopJoinExecNode message with unknown JoinType {}", @@ -1659,12 +1604,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_analyze_physical_plan( &self, analyze: &protobuf::AnalyzeExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: Arc = - into_physical_plan(&analyze.input, ctx, runtime, extension_codec)?; + into_physical_plan(&analyze.input, ctx, extension_codec)?; Ok(Arc::new(AnalyzeExec::new( analyze.verbose, analyze.show_statistics, @@ -1676,11 +1621,11 @@ impl protobuf::PhysicalPlanNode { fn try_into_json_sink_physical_plan( &self, sink: &protobuf::JsonSinkExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { - let input = into_physical_plan(&sink.input, ctx, runtime, extension_codec)?; + let input = into_physical_plan(&sink.input, ctx, extension_codec)?; let data_sink: JsonSink = sink .sink @@ -1714,11 +1659,11 @@ impl protobuf::PhysicalPlanNode { fn try_into_csv_sink_physical_plan( &self, sink: &protobuf::CsvSinkExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { - let input = into_physical_plan(&sink.input, ctx, runtime, extension_codec)?; + let input = into_physical_plan(&sink.input, ctx, extension_codec)?; let data_sink: CsvSink = sink .sink @@ -1752,13 +1697,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_parquet_sink_physical_plan( &self, sink: &protobuf::ParquetSinkExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { #[cfg(feature = "parquet")] { - let input = into_physical_plan(&sink.input, ctx, runtime, extension_codec)?; + let input = into_physical_plan(&sink.input, ctx, extension_codec)?; let data_sink: ParquetSink = sink .sink @@ -1795,11 +1740,11 @@ impl protobuf::PhysicalPlanNode { fn try_into_unnest_physical_plan( &self, unnest: &protobuf::UnnestExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { - let input = into_physical_plan(&unnest.input, ctx, runtime, extension_codec)?; + let input = into_physical_plan(&unnest.input, ctx, extension_codec)?; Ok(Arc::new(UnnestExec::new( input, @@ -1826,13 +1771,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_sort_join( &self, sort_join: &SortMergeJoinExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { - let left = into_physical_plan(&sort_join.left, ctx, runtime, extension_codec)?; + let left = into_physical_plan(&sort_join.left, ctx, extension_codec)?; let left_schema = left.schema(); - let right = into_physical_plan(&sort_join.right, ctx, runtime, extension_codec)?; + let right = into_physical_plan(&sort_join.right, ctx, extension_codec)?; let right_schema = right.schema(); let filter = sort_join @@ -2003,12 +1948,11 @@ impl protobuf::PhysicalPlanNode { fn try_into_cooperative_physical_plan( &self, field_stream: &protobuf::CooperativeExecNode, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { - let input = - into_physical_plan(&field_stream.input, ctx, runtime, extension_codec)?; + let input = into_physical_plan(&field_stream.input, ctx, extension_codec)?; Ok(Arc::new(CooperativeExec::new(input))) } @@ -3276,8 +3220,8 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone { fn try_into_physical_plan( &self, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result>; @@ -3294,7 +3238,7 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { &self, buf: &[u8], inputs: &[Arc], - registry: &dyn FunctionRegistry, + ctx: &TaskContext, ) -> Result>; fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()>; @@ -3350,7 +3294,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { &self, _buf: &[u8], _inputs: &[Arc], - _registry: &dyn FunctionRegistry, + _ctx: &TaskContext, ) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided") } @@ -3452,9 +3396,9 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec { &self, buf: &[u8], inputs: &[Arc], - registry: &dyn FunctionRegistry, + ctx: &TaskContext, ) -> Result> { - self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, registry)) + self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx)) } fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { @@ -3480,12 +3424,12 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec { fn into_physical_plan( node: &Option>, - ctx: &SessionContext, - runtime: &RuntimeEnv, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { if let Some(field) = node { - field.try_into_physical_plan(ctx, runtime, extension_codec) + field.try_into_physical_plan(ctx, extension_codec) } else { Err(proto_error("Missing required field in protobuf")) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 4b4403a5f35d2..c88c62952a500 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -17,7 +17,6 @@ use std::any::Any; use std::fmt::{Display, Formatter}; -use std::ops::Deref; use std::sync::Arc; use std::vec; @@ -53,7 +52,7 @@ use datafusion::datasource::physical_plan::{ }; use datafusion::datasource::sink::DataSinkExec; use datafusion::datasource::source::DataSourceExec; -use datafusion::execution::FunctionRegistry; +use datafusion::execution::TaskContext; use datafusion::functions_aggregate::count::count_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::functions_window::nth_value::nth_value_udwf; @@ -138,9 +137,8 @@ fn roundtrip_test_and_return( let proto: protobuf::PhysicalPlanNode = protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), codec) .expect("to proto"); - let runtime = ctx.runtime_env(); let result_exec_plan: Arc = proto - .try_into_physical_plan(ctx, runtime.deref(), codec) + .try_into_physical_plan(&ctx.task_ctx(), codec) .expect("from proto"); pretty_assertions::assert_eq!( @@ -1024,7 +1022,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, _buf: &[u8], _inputs: &[Arc], - _registry: &dyn FunctionRegistry, + _ctx: &TaskContext, ) -> Result> { unreachable!() } @@ -1132,7 +1130,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { &self, _buf: &[u8], _inputs: &[Arc], - _registry: &dyn FunctionRegistry, + _ctx: &TaskContext, ) -> Result> { not_impl_err!("No extension codec provided") } @@ -1736,11 +1734,8 @@ async fn roundtrip_coalesce() -> Result<()> { )?; let node = PhysicalPlanNode::decode(node.encode_to_vec().as_slice()) .map_err(|e| DataFusionError::External(Box::new(e)))?; - let restored = node.try_into_physical_plan( - &ctx, - ctx.runtime_env().as_ref(), - &DefaultPhysicalExtensionCodec {}, - )?; + let restored = + node.try_into_physical_plan(&ctx.task_ctx(), &DefaultPhysicalExtensionCodec {})?; assert_eq!( plan.schema(), @@ -1775,11 +1770,8 @@ async fn roundtrip_generate_series() -> Result<()> { )?; let node = PhysicalPlanNode::decode(node.encode_to_vec().as_slice()) .map_err(|e| DataFusionError::External(Box::new(e)))?; - let restored = node.try_into_physical_plan( - &ctx, - ctx.runtime_env().as_ref(), - &DefaultPhysicalExtensionCodec {}, - )?; + let restored = + node.try_into_physical_plan(&ctx.task_ctx(), &DefaultPhysicalExtensionCodec {})?; assert_eq!( plan.schema(), @@ -1901,11 +1893,7 @@ async fn roundtrip_physical_plan_node() { .unwrap(); let plan = node - .try_into_physical_plan( - &ctx, - &ctx.runtime_env(), - &DefaultPhysicalExtensionCodec {}, - ) + .try_into_physical_plan(&ctx.task_ctx(), &DefaultPhysicalExtensionCodec {}) .unwrap(); let _ = plan.execute(0, ctx.task_ctx()).unwrap(); @@ -1985,7 +1973,7 @@ async fn test_serialize_deserialize_tpch_queries() -> Result<()> { // deserialize the physical plan let _deserialized_plan = - proto.try_into_physical_plan(&ctx, ctx.runtime_env().as_ref(), &codec)?; + proto.try_into_physical_plan(&ctx.task_ctx(), &codec)?; } } @@ -2104,8 +2092,7 @@ async fn test_tpch_part_in_list_query_with_real_parquet_data() -> Result<()> { let proto = PhysicalPlanNode::try_from_physical_plan(physical_plan.clone(), &codec)?; // This will fail with the bug, but should succeed when fixed - let _deserialized_plan = - proto.try_into_physical_plan(&ctx, ctx.runtime_env().as_ref(), &codec)?; + let _deserialized_plan = proto.try_into_physical_plan(&ctx.task_ctx(), &codec)?; Ok(()) } @@ -2133,11 +2120,8 @@ async fn analyze_roundtrip_unoptimized() -> Result<()> { let node = PhysicalPlanNode::decode(node.encode_to_vec().as_slice()) .map_err(|e| DataFusionError::External(Box::new(e)))?; - let unoptimized = node.try_into_physical_plan( - &ctx, - ctx.runtime_env().as_ref(), - &DefaultPhysicalExtensionCodec {}, - )?; + let unoptimized = + node.try_into_physical_plan(&ctx.task_ctx(), &DefaultPhysicalExtensionCodec {})?; let physical_planner = datafusion::physical_planner::DefaultPhysicalPlanner::default(); diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 97adc5a2c3109..d70413467a954 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -23,7 +23,7 @@ **Note:** DataFusion `51.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. -You can see the current [status of the `51.0.0 `release here](https://github.com/apache/datafusion/issues/17558) +You can see the current [status of the `51.0.0`release here](https://github.com/apache/datafusion/issues/17558) ### `MSRV` updated to 1.87.0 @@ -31,6 +31,42 @@ The Minimum Supported Rust Version (MSRV) has been updated to [`1.87.0`]. [`1.87.0`]: https://releases.rs/docs/1.87.0/ +### `datafusion-proto` use `TaskContext` rather than `SessionContext` in physical plan serde methods + +There have been changes in the public API methods of `datafusion-proto` which handle physical plan serde. + +Methods like `physical_plan_from_bytes`, `parse_physical_expr` and similar, expect `TaskContext` instead of `SessionContext` + +```diff +- let plan2 = physical_plan_from_bytes(&bytes, &ctx)?; ++ let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?; +``` + +as `TaskContext` contains `RuntimeEnv` methods such as `try_into_physical_plan` will not have explicit `RuntimeEnv` parameter. + +```diff +let result_exec_plan: Arc = proto +- .try_into_physical_plan(&ctx, runtime.deref(), &composed_codec) ++. .try_into_physical_plan(&ctx.task_ctx(), &composed_codec) +``` + +`PhysicalExtensionCodec::try_decode()` expects `TaskContext` instead of `FunctionRegistry`: + +```diff +pub trait PhysicalExtensionCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], +- registry: &dyn FunctionRegistry, ++ ctx: &TaskContext, + ) -> Result>; +``` + +See [issue #17601] for more details. + +[issue #17601]: https://github.com/apache/datafusion/issues/17601 + ## DataFusion `50.0.0` ### ListingTable automatically detects Hive Partitioned tables