Conversation
crates/catalog/sql/src/catalog.rs
Outdated
| let rows = connection.transaction(|txn|{ | ||
| let name = self.name.clone(); | ||
| Box::pin(async move { | ||
| sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await |
There was a problem hiding this comment.
Should you care about SQL injections ? Or the catalog / namespace / table names are assumed to be safe ?
There was a problem hiding this comment.
Good point, it's better to use prepare statement here.
There was a problem hiding this comment.
There are several points with this implementation:
- If
parentisNone, we should list all namespaces. - We should also count namespaces in
iceberg_namespace_properties - We should list only sub namespaces.
See java implementation here.
| } | ||
|
|
||
| async fn load_table(&self, identifier: &TableIdent) -> Result<Table> { | ||
| let metadata_location = { |
There was a problem hiding this comment.
Do we need to check the cache first? Given that it's inserted later.
There was a problem hiding this comment.
Also the insertion should not blind, we need to check its version first. My suggestion is to remove the cache for now so that things don't get too complicated.
There was a problem hiding this comment.
The cache is only intended for update_table. Optimistically assuming that the metadata_location hasn't been changed since loading the table, the metadata and metadata_location from the cache can directly be used to perform the update. This way the database has to be queried only once for the optimistic case.
If the metadata_location changed, the update method has to be more involved.
I would not use the cache for loading tables.
There was a problem hiding this comment.
I'm thinking maybe we should have a standalone data structure for caching, just like CachingCatalog in java
crates/catalog/sql/src/catalog.rs
Outdated
| Box::pin(async move { | ||
| sqlx::query( | ||
| "create table if not exists iceberg_namespace_properties ( | ||
| catalog_name text not null, |
There was a problem hiding this comment.
VARCHAR(255) here. Maybe we can copy&paste SQL from java iceberg?
+ CATALOG_NAME
+ " VARCHAR(255) NOT NULL,"
+ NAMESPACE_NAME
+ " VARCHAR(255) NOT NULL,"
+ NAMESPACE_PROPERTY_KEY
+ " VARCHAR(255),"
+ NAMESPACE_PROPERTY_VALUE
+ " VARCHAR(1000),"
crates/catalog/sql/src/catalog.rs
Outdated
| let rows = connection.transaction(|txn|{ | ||
| let name = self.name.clone(); | ||
| Box::pin(async move { | ||
| sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await |
There was a problem hiding this comment.
Good point, it's better to use prepare statement here.
crates/catalog/sql/src/catalog.rs
Outdated
| let rows = connection.transaction(|txn|{ | ||
| let name = self.name.clone(); | ||
| Box::pin(async move { | ||
| sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await |
There was a problem hiding this comment.
There are several points with this implementation:
- If
parentisNone, we should list all namespaces. - We should also count namespaces in
iceberg_namespace_properties - We should list only sub namespaces.
See java implementation here.
crates/catalog/sql/src/catalog.rs
Outdated
| y.table_namespace | ||
| .split('.') | ||
| .map(ToString::to_string) | ||
| .collect::<Vec<_>>(), |
There was a problem hiding this comment.
How about extract this to a common method in NamespaceIdent?
| } | ||
|
|
||
| async fn load_table(&self, identifier: &TableIdent) -> Result<Table> { | ||
| let metadata_location = { |
There was a problem hiding this comment.
Also the insertion should not blind, we need to check its version first. My suggestion is to remove the cache for now so that things don't get too complicated.
| pub type TableMetadataRef = Arc<TableMetadata>; | ||
|
|
||
| #[derive(Debug, PartialEq, Deserialize, Eq, Clone)] | ||
| #[derive(Debug, PartialEq, Deserialize, Eq, Clone, TypedBuilder)] |
There was a problem hiding this comment.
We had a discussion in this pr about the table metadata builder. I have concern on this derived builder since it's error prone and not easy to review. TableMetadataBuilder will be heavily used by transaction api and we will need to do a lot of check for in it. I would suggest to maintain this struct manually, what do you think?
|
cc @JanKaul Is this pr ready for review or you need to do more updates? |
|
I have to add a couple of more changes. I'll notify you when I'm finished. |
|
@JanKaul WDYT? I think this PR is ready for review, I can add the update and delete in a separate PR. |
Cool, I'll take a look first. |
|
Thank you all for your helpful comments. I think the PR is ready for review again. |
liurenjie1024
left a comment
There was a problem hiding this comment.
Thanks @JanKaul for this pr, we moved a huge step forward! I think there are some places we can improve a little to make it more robust.
crates/catalog/sql/src/catalog.rs
Outdated
| name: String, | ||
| connection: AnyPool, | ||
| storage: FileIO, | ||
| cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>, |
There was a problem hiding this comment.
I'm hesitating to add cache here, maybe we can add sth like CachedCatalog in java so that all catalog implementations could benefit from it?
| Ok(table) | ||
| } | ||
|
|
||
| async fn create_table( |
There was a problem hiding this comment.
There are some things missing here:
- We should first check namespace exists
- The location is optional, it should use warehouse's subdir as location
I would suggest to refer to python's implementation,
fix connection pool issue for sql catalog
create sqlconfig, fix rest of the tests and remove todo
Co-authored-by: Renjie Liu <[email protected]>
Co-authored-by: Renjie Liu <[email protected]>
|
the bind placeholder |
|
Thanks for bringing this up, I haven't thought about it. It's weird though, the test is using sqlite which it's supposed to not work with ? according to the docs. |
This PR implements the basic operations for a Sql catalog. The implementation uses the
sqlxcrate which enables Postgres, MySQL and Sqlite.The
update_tablemethod is to be implemented later.