Add SchemaAdapterFactory Support for ListingTable with Schema Evolution and Mapping#16583
Add SchemaAdapterFactory Support for ListingTable with Schema Evolution and Mapping#16583kosiew merged 21 commits intoapache:mainfrom
Conversation
…sertions in `ListingTableConfig` methods
…ion support details
…ma_adapter method
…chema state representation
…s_empty() for clarity
adriangb
left a comment
There was a problem hiding this comment.
Leaving some questions / suggestions
| /// # // Custom schema adapter for handling schema evolution | ||
| /// # #[derive(Debug)] | ||
| /// # struct EvolutionSchemaAdapterFactory; | ||
| /// # impl SchemaAdapterFactory for EvolutionSchemaAdapterFactory { | ||
| /// # fn create(&self, projected_table_schema: SchemaRef, file_schema: SchemaRef) -> Box<dyn SchemaAdapter> { | ||
| /// # unimplemented!("Custom schema adapter implementation") | ||
| /// # } | ||
| /// # } | ||
| /// # |
There was a problem hiding this comment.
I don't think this should be here. It belongs alongside the SchemaAdapter trait / might already be there.
There was a problem hiding this comment.
Before this PR, schema adapters was not used anywhere in the repo, except in tests.
Using schema adapter with a ListingTable is the first illustration of how to use schema adapters.
Therefore, added the example here.
There was a problem hiding this comment.
Deleted this example, because there is an almost similar example appropriately located near fn with_schema_adapter_factory
There was a problem hiding this comment.
I still think that (1) most users won't be using a SchemaAdapter and (2) linking to the existing docs on SchemaAdapter and maybe enhancing those should be enough.
| // Note: We preserve existing options state, but downstream code may expect | ||
| // options to be set. Consider calling with_listing_options() or infer_options() | ||
| // before operations that require options to be present. | ||
| debug_assert!( | ||
| self.options.is_some() || cfg!(test), | ||
| "ListingTableConfig::with_schema called without options set. \ | ||
| Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code." | ||
| ); | ||
|
|
||
| Self { | ||
| table_paths: self.table_paths, | ||
| file_schema: Some(schema), | ||
| options: self.options, | ||
| schema_source: SchemaSource::Specified, | ||
| ..self |
There was a problem hiding this comment.
Is this related to this change or drive by?
There was a problem hiding this comment.
This is a minor refactor because I added derive Default for ListingTableConfig
| /// | ||
| /// # Example: Configuring Parquet Files with Custom Options | ||
| /// ```rust | ||
| /// # use std::sync::Arc; | ||
| /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; | ||
| /// # use datafusion::datasource::file_format::parquet::ParquetFormat; | ||
| /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); | ||
| /// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) | ||
| /// .with_file_extension(".parquet") | ||
| /// .with_collect_stat(true); | ||
| /// | ||
| /// let config = ListingTableConfig::new(table_paths) | ||
| /// .with_listing_options(options); // Configure file format and options | ||
| /// ``` | ||
| pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { | ||
| // Note: This method properly sets options, but be aware that downstream | ||
| // methods like infer_schema() and try_new() require both schema and options | ||
| // to be set to function correctly. | ||
| debug_assert!( | ||
| !self.table_paths.is_empty() || cfg!(test), | ||
| "ListingTableConfig::with_listing_options called without table_paths set. \ | ||
| Consider calling new() or new_with_multi_paths() first to establish table paths." | ||
| ); | ||
|
|
||
| Self { | ||
| table_paths: self.table_paths, | ||
| file_schema: self.file_schema, | ||
| options: Some(listing_options), | ||
| schema_source: self.schema_source, | ||
| ..self | ||
| } | ||
| } |
There was a problem hiding this comment.
This is a minor refactor because I derive Default for ListingTableConfig
| /// # #[derive(Debug)] | ||
| /// # struct MySchemaAdapterFactory; | ||
| /// # impl SchemaAdapterFactory for MySchemaAdapterFactory { | ||
| /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> { | ||
| /// # unimplemented!() | ||
| /// # } | ||
| /// # } |
There was a problem hiding this comment.
I think the example can just show setting the DefaultSchemaAdapterFactory
There was a problem hiding this comment.
I retained this as an example of custom schema adapter factory....
There was a problem hiding this comment.
I feel that showing an example using DefaultSchemaAdapterFactory here and linking to proper extensive docs on how to build a custom SchemaAdapter and the multiple layers of factories is more helpful.
| /// # #[derive(Debug)] | ||
| /// # struct EvolutionAdapterFactory; | ||
| /// # impl SchemaAdapterFactory for EvolutionAdapterFactory { | ||
| /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> { | ||
| /// # unimplemented!() | ||
| /// # } | ||
| /// # } |
There was a problem hiding this comment.
Same as above, we can use DefaultSchemaAdapterFactory in the examples
There was a problem hiding this comment.
... and changed this to DefaultSchemaAdapterFactory
with a note that they can refer to ListingTableConfig::with_schema_adapter_factory for custom example
| let table_schema = self.schema(); | ||
| match &self.schema_adapter_factory { | ||
| Some(factory) => { | ||
| factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema)) |
There was a problem hiding this comment.
Not this PR's fault but this is such a sad unfortunate API... why in the world do we pass in the same thing twice!?
There was a problem hiding this comment.
Added create_with_projected_schema to the api
| let schema_adapter = self.create_schema_adapter(); | ||
| let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?; |
There was a problem hiding this comment.
I guess this is just for the column statistics?
Co-authored-by: Adrian Garcia Badaracco <[email protected]>
Co-authored-by: Adrian Garcia Badaracco <[email protected]>
This reverts commit a300a30.
… ListingTableConfig
…method for convenience
…nto listing-config-14757
adriangb
left a comment
There was a problem hiding this comment.
Seems like a good improvement to me!
Btw, do you use ListingTable in production? There's plans to move it to an external repo or modularize it, it might be interesting for you to share your use case of it.
Co-authored-by: Adrian Garcia Badaracco <[email protected]>
…on and Mapping (apache#16583) - Introduced a new `schema_adapter_factory` field in `ListingTableConfig` and `ListingTable` - Added `with_schema_adapter_factory` and `schema_adapter_factory()` methods to both structs - Modified execution planning logic to apply schema adapters during scanning - Updated statistics collection to use mapped schemas - Implemented detailed documentation and example usage in doc comments - Added new unit and integration tests validating schema adapter behavior and error cases
Which issue does this PR close?
This is the last of a series of PRs re-implementing #15295 to close #14757 by adding schema‐evolution support for:
in DataFusion.
Rationale for this change
This change introduces schema adapter support to improve DataFusion's ability to handle schema evolution scenarios in listing-based tables. It provides a more flexible and robust mechanism for adapting schemas between files and logical table definitions, including support for custom adapters that handle field renaming, type coercion, and column reordering.
What changes are included in this PR?
schema_adapter_factoryfield inListingTableConfigandListingTablewith_schema_adapter_factoryandschema_adapter_factory()methods to both structsAre these changes tested?
Yes, the PR includes a suite of tests to verify:
Are there any user-facing changes?
Yes:
SchemaAdapterFactoryinListingTableConfigto customize how file schemas are interpreted relative to table schemas