Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 8 additions & 35 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@ use async_trait::async_trait;
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
use datafusion_common::stats::Precision;
use datafusion_common::{
Constraints, DataFusionError, SchemaExt, Statistics, internal_datafusion_err,
plan_err, project_schema,
Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::file_sink_config::FileSinkConfig;
use datafusion_datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
};
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
};
Expand Down Expand Up @@ -331,20 +328,6 @@ impl ListingTable {
self.schema_adapter_factory.as_ref()
}

/// Creates a schema adapter for mapping between file and table schemas
///
/// Uses the configured schema adapter factory if available, otherwise falls back
/// to the default implementation.
fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
let table_schema = self.schema();
match &self.schema_adapter_factory {
Some(factory) => {
factory.create_with_projected_schema(Arc::clone(&table_schema))
}
None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
}
}

/// Creates a file source and applies schema adapter factory if available
fn create_file_source_with_schema_adapter(
&self,
Expand All @@ -359,10 +342,8 @@ impl ListingTable {
);

let mut source = self.options.format.file_source(table_schema);
// Apply schema adapter to source if available
//
// Apply schema adapter to source if available.
// The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
// Note: ListingTable also creates a SchemaAdapter in `scan()` but that is only used to adapt collected statistics.
if let Some(factory) = &self.schema_adapter_factory {
source = source.with_schema_adapter_factory(Arc::clone(factory))?;
}
Expand Down Expand Up @@ -709,25 +690,17 @@ impl ListingTable {
)
};

let (mut file_groups, mut stats) = compute_all_files_statistics(
let (file_groups, stats) = compute_all_files_statistics(
file_groups,
self.schema(),
self.options.collect_stat,
inexact_stats,
)?;

let schema_adapter = self.create_schema_adapter();
let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;
Comment on lines -719 to -720
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was mapping from file schema -> table schema by filling in unknown stats for partition columns. It did not do any per-file adaptation, it assumes all files already have the same schema.


stats.column_statistics =
schema_mapper.map_column_statistics(&stats.column_statistics)?;
file_groups.iter_mut().try_for_each(|file_group| {
if let Some(stat) = file_group.statistics_mut() {
stat.column_statistics =
schema_mapper.map_column_statistics(&stat.column_statistics)?;
}
Ok::<_, DataFusionError>(())
})?;
// Note: Statistics already include both file columns and partition columns.
// PartitionedFile::with_statistics automatically appends exact partition column
// statistics (min=max=partition_value, null_count=0, distinct_count=1) computed
// from partition_values.
Ok(ListFilesResult {
file_groups,
statistics: stats,
Expand Down
230 changes: 2 additions & 228 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,13 @@ mod tests {
ListingOptions, ListingTable, ListingTableConfig, SchemaSource,
};
use datafusion_common::{
assert_contains, plan_err,
assert_contains,
stats::Precision,
test_util::{batches_to_string, datafusion_test_data},
ColumnStatistics, DataFusionError, Result, ScalarValue,
DataFusionError, Result, ScalarValue,
};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::schema_adapter::{
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
use datafusion_datasource::ListingTableUrl;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
Expand All @@ -147,15 +144,12 @@ mod tests {
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::{collect, ExecutionPlanProperties};
use rstest::rstest;
use std::collections::HashMap;
use std::io::Write;
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;

const DUMMY_NULL_COUNT: Precision<usize> = Precision::Exact(42);

/// Creates a test schema with standard field types used in tests
fn create_test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -1448,31 +1442,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_statistics_mapping_with_custom_factory() -> Result<()> {
let ctx = SessionContext::new();
let table = create_test_listing_table_with_json_and_adapter(
&ctx,
false,
// NullStatsAdapterFactory sets column_statistics null_count to DUMMY_NULL_COUNT
Arc::new(NullStatsAdapterFactory {}),
)?;

let result = table.list_files_for_scan(&ctx.state(), &[], None).await?;

assert_eq!(
result.statistics.column_statistics[0].null_count,
DUMMY_NULL_COUNT
);
for g in result.file_groups {
if let Some(s) = g.file_statistics(None) {
assert_eq!(s.column_statistics[0].null_count, DUMMY_NULL_COUNT);
}
}

Ok(())
}

#[tokio::test]
async fn test_statistics_mapping_with_default_factory() -> Result<()> {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -1513,199 +1482,4 @@ mod tests {

Ok(())
}

#[rstest]
#[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")]
#[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")]
#[case(
MapSchemaError::InvalidProjection,
"Invalid projection in schema mapping"
)]
#[tokio::test]
async fn test_schema_adapter_map_schema_errors(
#[case] error_type: MapSchemaError,
#[case] expected_error_msg: &str,
) -> Result<()> {
let ctx = SessionContext::new();
let table = create_test_listing_table_with_json_and_adapter(
&ctx,
false,
Arc::new(FailingMapSchemaAdapterFactory { error_type }),
)?;

// The error should bubble up from the scan operation when schema mapping fails
let scan_result = table.scan(&ctx.state(), None, &[], None).await;

assert!(scan_result.is_err());
let error_msg = scan_result.unwrap_err().to_string();
assert!(
error_msg.contains(expected_error_msg),
"Expected error containing '{expected_error_msg}', got: {error_msg}"
);

Ok(())
}

// Test that errors during file listing also bubble up correctly
#[tokio::test]
async fn test_schema_adapter_error_during_file_listing() -> Result<()> {
let ctx = SessionContext::new();
let table = create_test_listing_table_with_json_and_adapter(
&ctx,
true,
Arc::new(FailingMapSchemaAdapterFactory {
error_type: MapSchemaError::TypeIncompatible,
}),
)?;

// The error should bubble up from list_files_for_scan when collecting statistics
let list_result = table.list_files_for_scan(&ctx.state(), &[], None).await;

assert!(list_result.is_err());
let error_msg = list_result.unwrap_err().to_string();
assert!(
error_msg.contains("Cannot map incompatible types"),
"Expected type incompatibility error during file listing, got: {error_msg}"
);

Ok(())
}

#[derive(Debug, Copy, Clone)]
enum MapSchemaError {
TypeIncompatible,
GeneralFailure,
InvalidProjection,
}

#[derive(Debug)]
struct FailingMapSchemaAdapterFactory {
error_type: MapSchemaError,
}

impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory {
fn create(
&self,
projected_table_schema: SchemaRef,
_table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(FailingMapSchemaAdapter {
schema: projected_table_schema,
error_type: self.error_type,
})
}
}

#[derive(Debug)]
struct FailingMapSchemaAdapter {
schema: SchemaRef,
error_type: MapSchemaError,
}

impl SchemaAdapter for FailingMapSchemaAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.schema.field(index);
file_schema.fields.find(field.name()).map(|(i, _)| i)
}

fn map_schema(
&self,
_file_schema: &Schema,
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
// Always fail with different error types based on the configured error_type
match self.error_type {
MapSchemaError::TypeIncompatible => {
plan_err!(
"Cannot map incompatible types: Boolean cannot be cast to Utf8"
)
}
MapSchemaError::GeneralFailure => {
plan_err!("Schema adapter mapping failed due to internal error")
}
MapSchemaError::InvalidProjection => {
plan_err!("Invalid projection in schema mapping: column index out of bounds")
}
}
}
}

#[derive(Debug)]
struct NullStatsAdapterFactory;

impl SchemaAdapterFactory for NullStatsAdapterFactory {
fn create(
&self,
projected_table_schema: SchemaRef,
_table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(NullStatsAdapter {
schema: projected_table_schema,
})
}
}

#[derive(Debug)]
struct NullStatsAdapter {
schema: SchemaRef,
}

impl SchemaAdapter for NullStatsAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.schema.field(index);
file_schema.fields.find(field.name()).map(|(i, _)| i)
}

fn map_schema(
&self,
file_schema: &Schema,
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let projection = (0..file_schema.fields().len()).collect();
Ok((Arc::new(NullStatsMapper {}), projection))
}
}

#[derive(Debug)]
struct NullStatsMapper;

impl SchemaMapper for NullStatsMapper {
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
Ok(batch)
}

fn map_column_statistics(
&self,
stats: &[ColumnStatistics],
) -> Result<Vec<ColumnStatistics>> {
Ok(stats
.iter()
.map(|s| {
let mut s = s.clone();
s.null_count = DUMMY_NULL_COUNT;
s
})
.collect())
}
}

/// Helper function to create a test ListingTable with JSON format and custom schema adapter factory
fn create_test_listing_table_with_json_and_adapter(
ctx: &SessionContext,
collect_stat: bool,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Result<ListingTable> {
let path = "table/file.json";
register_test_store(ctx, &[(path, 10)]);

let format = JsonFormat::default();
let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
let table_path = ListingTableUrl::parse("test:///table/")?;

let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(Arc::new(schema))
.with_schema_adapter_factory(schema_adapter_factory);

ListingTable::try_new(config)
}
}
Loading