Skip to content

Commit f2799e0

Browse files
committed
Extract more logic into submodule from execution::context
1 parent 1d44ba6 commit f2799e0

4 files changed

Lines changed: 264 additions & 165 deletions

File tree

datafusion/core/src/execution/context.rs

Lines changed: 5 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
//! [`SessionContext`] contains methods for registering data sources and executing queries
1919
20+
mod avro;
21+
mod csv;
22+
mod json;
2023
#[cfg(feature = "parquet")]
2124
mod parquet;
2225

@@ -81,7 +84,6 @@ use datafusion_sql::{
8184
use sqlparser::dialect::dialect_from_str;
8285

8386
use crate::config::ConfigOptions;
84-
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json};
8587
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
8688
use crate::physical_plan::udaf::AggregateUDF;
8789
use crate::physical_plan::udf::ScalarUDF;
@@ -113,7 +115,7 @@ use crate::execution::options::ArrowReadOptions;
113115
pub use datafusion_execution::config::SessionConfig;
114116
pub use datafusion_execution::TaskContext;
115117

116-
use super::options::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ReadOptions};
118+
use super::options::ReadOptions;
117119

118120
/// DataFilePaths adds a method to convert strings and vector of strings to vector of [`ListingTableUrl`] URLs.
119121
/// This allows methods such [`SessionContext::read_csv`] and [`SessionContext::read_avro`]
@@ -857,34 +859,6 @@ impl SessionContext {
857859
self.read_table(Arc::new(provider))
858860
}
859861

860-
/// Creates a [`DataFrame`] for reading an Avro data source.
861-
///
862-
/// For more control such as reading multiple files, you can use
863-
/// [`read_table`](Self::read_table) with a [`ListingTable`].
864-
///
865-
/// For an example, see [`read_csv`](Self::read_csv)
866-
pub async fn read_avro<P: DataFilePaths>(
867-
&self,
868-
table_paths: P,
869-
options: AvroReadOptions<'_>,
870-
) -> Result<DataFrame> {
871-
self._read_type(table_paths, options).await
872-
}
873-
874-
/// Creates a [`DataFrame`] for reading an JSON data source.
875-
///
876-
/// For more control such as reading multiple files, you can use
877-
/// [`read_table`](Self::read_table) with a [`ListingTable`].
878-
///
879-
/// For an example, see [`read_csv`](Self::read_csv)
880-
pub async fn read_json<P: DataFilePaths>(
881-
&self,
882-
table_paths: P,
883-
options: NdJsonReadOptions<'_>,
884-
) -> Result<DataFrame> {
885-
self._read_type(table_paths, options).await
886-
}
887-
888862
/// Creates a [`DataFrame`] for reading an Arrow data source.
889863
///
890864
/// For more control such as reading multiple files, you can use
@@ -907,34 +881,6 @@ impl SessionContext {
907881
))
908882
}
909883

910-
/// Creates a [`DataFrame`] for reading a CSV data source.
911-
///
912-
/// For more control such as reading multiple files, you can use
913-
/// [`read_table`](Self::read_table) with a [`ListingTable`].
914-
///
915-
/// Example usage is given below:
916-
///
917-
/// ```
918-
/// use datafusion::prelude::*;
919-
/// # use datafusion::error::Result;
920-
/// # #[tokio::main]
921-
/// # async fn main() -> Result<()> {
922-
/// let ctx = SessionContext::new();
923-
/// // You can read a single file using `read_csv`
924-
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
925-
/// // you can also read multiple files:
926-
/// let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?;
927-
/// # Ok(())
928-
/// # }
929-
/// ```
930-
pub async fn read_csv<P: DataFilePaths>(
931-
&self,
932-
table_paths: P,
933-
options: CsvReadOptions<'_>,
934-
) -> Result<DataFrame> {
935-
self._read_type(table_paths, options).await
936-
}
937-
938884
/// Creates a [`DataFrame`] for a [`TableProvider`] such as a
939885
/// [`ListingTable`] or a custom user defined provider.
940886
pub fn read_table(&self, provider: Arc<dyn TableProvider>) -> Result<DataFrame> {
@@ -995,70 +941,6 @@ impl SessionContext {
995941
Ok(())
996942
}
997943

998-
/// Registers a CSV file as a table which can referenced from SQL
999-
/// statements executed against this context.
1000-
pub async fn register_csv(
1001-
&self,
1002-
name: &str,
1003-
table_path: &str,
1004-
options: CsvReadOptions<'_>,
1005-
) -> Result<()> {
1006-
let listing_options = options.to_listing_options(&self.copied_config());
1007-
1008-
self.register_listing_table(
1009-
name,
1010-
table_path,
1011-
listing_options,
1012-
options.schema.map(|s| Arc::new(s.to_owned())),
1013-
None,
1014-
)
1015-
.await?;
1016-
1017-
Ok(())
1018-
}
1019-
1020-
/// Registers a JSON file as a table that it can be referenced
1021-
/// from SQL statements executed against this context.
1022-
pub async fn register_json(
1023-
&self,
1024-
name: &str,
1025-
table_path: &str,
1026-
options: NdJsonReadOptions<'_>,
1027-
) -> Result<()> {
1028-
let listing_options = options.to_listing_options(&self.copied_config());
1029-
1030-
self.register_listing_table(
1031-
name,
1032-
table_path,
1033-
listing_options,
1034-
options.schema.map(|s| Arc::new(s.to_owned())),
1035-
None,
1036-
)
1037-
.await?;
1038-
Ok(())
1039-
}
1040-
1041-
/// Registers an Avro file as a table that can be referenced from
1042-
/// SQL statements executed against this context.
1043-
pub async fn register_avro(
1044-
&self,
1045-
name: &str,
1046-
table_path: &str,
1047-
options: AvroReadOptions<'_>,
1048-
) -> Result<()> {
1049-
let listing_options = options.to_listing_options(&self.copied_config());
1050-
1051-
self.register_listing_table(
1052-
name,
1053-
table_path,
1054-
listing_options,
1055-
options.schema.map(|s| Arc::new(s.to_owned())),
1056-
None,
1057-
)
1058-
.await?;
1059-
Ok(())
1060-
}
1061-
1062944
/// Registers an Arrow file as a table that can be referenced from
1063945
/// SQL statements executed against this context.
1064946
pub async fn register_arrow(
@@ -1234,24 +1116,6 @@ impl SessionContext {
12341116
self.state().create_physical_plan(logical_plan).await
12351117
}
12361118

1237-
/// Executes a query and writes the results to a partitioned CSV file.
1238-
pub async fn write_csv(
1239-
&self,
1240-
plan: Arc<dyn ExecutionPlan>,
1241-
path: impl AsRef<str>,
1242-
) -> Result<()> {
1243-
plan_to_csv(self.task_ctx(), plan, path).await
1244-
}
1245-
1246-
/// Executes a query and writes the results to a partitioned JSON file.
1247-
pub async fn write_json(
1248-
&self,
1249-
plan: Arc<dyn ExecutionPlan>,
1250-
path: impl AsRef<str>,
1251-
) -> Result<()> {
1252-
plan_to_json(self.task_ctx(), plan, path).await
1253-
}
1254-
12551119
/// Get a new TaskContext to run in this session
12561120
pub fn task_ctx(&self) -> Arc<TaskContext> {
12571121
Arc::new(TaskContext::from(self))
@@ -2195,6 +2059,7 @@ impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> {
21952059

21962060
#[cfg(test)]
21972061
mod tests {
2062+
use super::super::options::CsvReadOptions;
21982063
use super::*;
21992064
use crate::assert_batches_eq;
22002065
use crate::execution::context::QueryPlanner;
@@ -2698,29 +2563,4 @@ mod tests {
26982563

26992564
Ok(ctx)
27002565
}
2701-
2702-
// Test for compilation error when calling read_* functions from an #[async_trait] function.
2703-
// See https://github.com/apache/arrow-datafusion/issues/1154
2704-
#[async_trait]
2705-
trait CallReadTrait {
2706-
async fn call_read_csv(&self) -> DataFrame;
2707-
async fn call_read_avro(&self) -> DataFrame;
2708-
}
2709-
2710-
struct CallRead {}
2711-
2712-
#[async_trait]
2713-
impl CallReadTrait for CallRead {
2714-
async fn call_read_csv(&self) -> DataFrame {
2715-
let ctx = SessionContext::new();
2716-
ctx.read_csv("dummy", CsvReadOptions::new()).await.unwrap()
2717-
}
2718-
2719-
async fn call_read_avro(&self) -> DataFrame {
2720-
let ctx = SessionContext::new();
2721-
ctx.read_avro("dummy", AvroReadOptions::default())
2722-
.await
2723-
.unwrap()
2724-
}
2725-
}
27262566
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use super::super::options::{AvroReadOptions, ReadOptions};
21+
use super::{DataFilePaths, DataFrame, Result, SessionContext};
22+
23+
impl SessionContext {
24+
/// Creates a [`DataFrame`] for reading an Avro data source.
25+
///
26+
/// For more control such as reading multiple files, you can use
27+
/// [`read_table`](Self::read_table) with a [`super::ListingTable`].
28+
///
29+
/// For an example, see [`read_csv`](Self::read_csv)
30+
pub async fn read_avro<P: DataFilePaths>(
31+
&self,
32+
table_paths: P,
33+
options: AvroReadOptions<'_>,
34+
) -> Result<DataFrame> {
35+
self._read_type(table_paths, options).await
36+
}
37+
38+
/// Registers an Avro file as a table that can be referenced from
39+
/// SQL statements executed against this context.
40+
pub async fn register_avro(
41+
&self,
42+
name: &str,
43+
table_path: &str,
44+
options: AvroReadOptions<'_>,
45+
) -> Result<()> {
46+
let listing_options = options.to_listing_options(&self.copied_config());
47+
48+
self.register_listing_table(
49+
name,
50+
table_path,
51+
listing_options,
52+
options.schema.map(|s| Arc::new(s.to_owned())),
53+
None,
54+
)
55+
.await?;
56+
Ok(())
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use super::*;
63+
use async_trait::async_trait;
64+
65+
// Test for compilation error when calling read_* functions from an #[async_trait] function.
66+
// See https://github.com/apache/arrow-datafusion/issues/1154
67+
#[async_trait]
68+
trait CallReadTrait {
69+
async fn call_read_avro(&self) -> DataFrame;
70+
}
71+
72+
struct CallRead {}
73+
74+
#[async_trait]
75+
impl CallReadTrait for CallRead {
76+
async fn call_read_avro(&self) -> DataFrame {
77+
let ctx = SessionContext::new();
78+
ctx.read_avro("dummy", AvroReadOptions::default())
79+
.await
80+
.unwrap()
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)