Python: Implement Hive create and load table#5447
Conversation
Missing parts of creating and loading Hive tables, including reading/writing the table metadata. Tested against local metastore (with map/list/struct types)
python/pyiceberg/catalog/hive.py
Outdated
| return f"struct<{', '.join(field_results)}>" | ||
|
|
||
| def field(self, field: NestedField, field_result: str) -> str: | ||
| return f"{field.name}: {field_result}" |
There was a problem hiding this comment.
I'm not sure that there can be a space here. The Java converter doesn't have a space and I seem to remember Hive expecting a certain format in some cases.
There was a problem hiding this comment.
Ahh, great catch! It was writing fine, but when reading the table it would throw an error:
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Error: type expected at the position 83 of 'string:int:boolean:array<string>:map<string,map<string,int>>:array<struct<latitude: float,longitude: float>>:struct<name: string,age: int>' but ' float' is found. (state=08S01,code=1)
Blame it on validate on read.
After removing all the spaces, it works fine:
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| col_name | data_type | comment |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name | data_type | comment |
| foo | string | |
| bar | int | |
| baz | boolean | |
| qux | array<string> | |
| quux | map<string,map<string,int>> | |
| location | array<struct<latitude:float,longitude:float>> | |
| person | struct<name:string,age:int> | |
| | NULL | NULL |
| # Detailed Table Information | NULL | NULL |
| Database: | nyc | NULL |
| OwnerType: | USER | NULL |
| Owner: | fokkodriesprong | NULL |
| CreateTime: | Mon Aug 08 11:58:50 UTC 2022 | NULL |
| LastAccessTime: | Mon Aug 08 11:58:50 UTC 2022 | NULL |
| Retention: | 0 | NULL |
| Location: | file:/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/nyc.db/complex | NULL |
| Table Type: | EXTERNAL_TABLE | NULL |
| Table Parameters: | NULL | NULL |
| | EXTERNAL | TRUE |
| | metadata_location | file:/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/nyc.db/complex/metadata/3400b5e4-d454-4788-b269-18ea6a4beac0.metadata.json |
| | numFiles | 0 |
| | table_type | ICEBERG |
| | totalSize | 0 |
| | transient_lastDdlTime | 1659959930 |
| | NULL | NULL |
| # Storage Information | NULL | NULL |
| SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL |
| InputFormat: | org.apache.hadoop.mapred.FileInputFormat | NULL |
| OutputFormat: | org.apache.hadoop.mapred.FileOutputFormat | NULL |
| Compressed: | No | NULL |
| Num Buckets: | 0 | NULL |
| Bucket Columns: | [] | NULL |
| Sort Columns: | [] | NULL |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
python/pyiceberg/catalog/hive.py
Outdated
| if database_location := database_properties.get(LOCATION): | ||
| database_location = database_location.rstrip("/") | ||
| return f"{database_location}/{table_name}/" | ||
| raise ValueError("Cannot determine location from warehouse, please provide an explicit location") |
There was a problem hiding this comment.
This should respect the warehouse property used to construct the catalog rather than failing with ValueError
There was a problem hiding this comment.
Nice, added it, including a test
| metadata_location = f"{location}metadata/{uuid.uuid4()}.metadata.json" | ||
| metadata = TableMetadataV2( | ||
| location=location, | ||
| schemas=[schema], |
There was a problem hiding this comment.
The schema field IDs and partition field IDs need to be reassigned to ensure that they are consistent because we don't trust that users pass them in correctly. Passing them directly is okay for this PR, but we should not release until we have reassignment done.
There was a problem hiding this comment.
Good one, we do have validators on them when we initialize the Pydantic models, but there is no re-assignment being done. Do you want to set them to zero? We could also set the schema_id by default to zero?
There was a problem hiding this comment.
Here's what we do in Java: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/TableMetadata.java#L88-L133
When the table metadata builder adds a schema, spec, or order, the builder will check whether it is an existing order and find out the ID to assign. We should do something similar here.
This probably isn't something to do in this PR (since this is focused on Hive) but introducing the builder would probably be a good idea. That's how we also accumulate the change set that we pass to the REST catalog when committing table changes. I'd say let's move forward with what you have here (except passing the IDs rather than hard-coding to 0) and we can add reassignment later.
There was a problem hiding this comment.
Thanks for pointing out the Java code. I like it and agree that it is best to do it in separate PR.
python/pyiceberg/catalog/hive.py
Outdated
| metadata = FromInputFile.table_metadata(file) | ||
| return Table(identifier=(table.dbName, table.tableName), metadata=metadata, metadata_location=metadata_location) | ||
|
|
||
| def _write_metadata(self, metadata: TableMetadataV2, io: FileIO, metadata_path: str): |
There was a problem hiding this comment.
Why does this accept metadata v2 instead of either v1 or v2? I don't think that we want to assume that we will only write v2 metadata. We can't upgrade tables automatically.
There was a problem hiding this comment.
I've updated the signature. This is because we don't have any logic that allows updating a table. This is only used for creating new tables, and I think it makes sense to only allow V2 tables (or at least push the user into that direction).
python/pyiceberg/catalog/hive.py
Outdated
| if partition_spec.fields | ||
| else DEFAULT_LAST_PARTITION_ID, | ||
| ) | ||
| io = load_file_io({**self.properties}) |
There was a problem hiding this comment.
If we're always using self.properties then I think it makes sense to have an io instance for the catalog. That can be used here rather than creating a new one per table create.
There was a problem hiding this comment.
I actually think that we should mix in the properties from the table in there as well. This way you can override this on a table level.
python/pyiceberg/catalog/hive.py
Outdated
| raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e | ||
| return self._convert_hive_into_iceberg(hive_table) | ||
|
|
||
| return self._convert_hive_into_iceberg(hive_table, io, metadata_location) |
There was a problem hiding this comment.
Why pass metadata_location here? Since it is set on the table, I think it would make sense to always pull it from table parameters. That way we never create a situation where we've forgotten to set the table parameter, but successfully returned a table instance. Or one where we've forgotten to update it and returned an updated table.
There was a problem hiding this comment.
I like that a lot. I think that the metadata_location was still there for historical reasons. Thanks!
| catalog._client.__enter__().get_table.assert_called_with(dbname="default", tbl_name="table") | ||
| table = catalog.load_table(("default", "new_tabl2e")) | ||
|
|
||
| catalog._client.__enter__().get_table.assert_called_with(dbname="default", tbl_name="new_tabl2e") |
There was a problem hiding this comment.
Did you intend to have 2 in the table name?
|
@Fokko, looks like this is ready except that it conflicts with the order by PR that was just merged. I tried to fix it, but I didn't get it right. |
|
Awesome @rdblue just resolved the pre-commit error 👍🏻 |
|
Thanks, @Fokko! Great to have this working. |
No description provided.