Skip to content

Python: Implement Hive create and load table#5447

Merged
rdblue merged 13 commits intoapache:masterfrom
Fokko:fd-read-metadata
Aug 15, 2022
Merged

Python: Implement Hive create and load table#5447
rdblue merged 13 commits intoapache:masterfrom
Fokko:fd-read-metadata

Conversation

@Fokko
Copy link
Contributor

@Fokko Fokko commented Aug 5, 2022

No description provided.

@github-actions github-actions bot added the python label Aug 5, 2022
@Fokko Fokko changed the title Python: Read Metadata Python: Implement Hive create and load Hive table Aug 5, 2022
@Fokko Fokko changed the title Python: Implement Hive create and load Hive table Python: Implement Hive create and load table Aug 5, 2022
Missing parts of creating and loading Hive tables,
including reading/writing the table metadata.

Tested against local metastore (with map/list/struct types)
return f"struct<{', '.join(field_results)}>"

def field(self, field: NestedField, field_result: str) -> str:
return f"{field.name}: {field_result}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that there can be a space here. The Java converter doesn't have a space and I seem to remember Hive expecting a certain format in some cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, great catch! It was writing fine, but when reading the table it would throw an error:

Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Error: type expected at the position 83 of 'string:int:boolean:array<string>:map<string,map<string,int>>:array<struct<latitude: float,longitude: float>>:struct<name: string,age: int>' but ' float' is found. (state=08S01,code=1)

Blame it on validate on read.

After removing all the spaces, it works fine:

+-------------------------------+----------------------------------------------------+----------------------------------------------------+
|           col_name            |                     data_type                      |                      comment                       |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name                    | data_type                                          | comment                                            |
| foo                           | string                                             |                                                    |
| bar                           | int                                                |                                                    |
| baz                           | boolean                                            |                                                    |
| qux                           | array<string>                                      |                                                    |
| quux                          | map<string,map<string,int>>                        |                                                    |
| location                      | array<struct<latitude:float,longitude:float>>      |                                                    |
| person                        | struct<name:string,age:int>                        |                                                    |
|                               | NULL                                               | NULL                                               |
| # Detailed Table Information  | NULL                                               | NULL                                               |
| Database:                     | nyc                                                | NULL                                               |
| OwnerType:                    | USER                                               | NULL                                               |
| Owner:                        | fokkodriesprong                                    | NULL                                               |
| CreateTime:                   | Mon Aug 08 11:58:50 UTC 2022                       | NULL                                               |
| LastAccessTime:               | Mon Aug 08 11:58:50 UTC 2022                       | NULL                                               |
| Retention:                    | 0                                                  | NULL                                               |
| Location:                     | file:/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/nyc.db/complex | NULL                                               |
| Table Type:                   | EXTERNAL_TABLE                                     | NULL                                               |
| Table Parameters:             | NULL                                               | NULL                                               |
|                               | EXTERNAL                                           | TRUE                                               |
|                               | metadata_location                                  | file:/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/nyc.db/complex/metadata/3400b5e4-d454-4788-b269-18ea6a4beac0.metadata.json |
|                               | numFiles                                           | 0                                                  |
|                               | table_type                                         | ICEBERG                                            |
|                               | totalSize                                          | 0                                                  |
|                               | transient_lastDdlTime                              | 1659959930                                         |
|                               | NULL                                               | NULL                                               |
| # Storage Information         | NULL                                               | NULL                                               |
| SerDe Library:                | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL                                               |
| InputFormat:                  | org.apache.hadoop.mapred.FileInputFormat           | NULL                                               |
| OutputFormat:                 | org.apache.hadoop.mapred.FileOutputFormat          | NULL                                               |
| Compressed:                   | No                                                 | NULL                                               |
| Num Buckets:                  | 0                                                  | NULL                                               |
| Bucket Columns:               | []                                                 | NULL                                               |
| Sort Columns:                 | []                                                 | NULL                                               |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+

if database_location := database_properties.get(LOCATION):
database_location = database_location.rstrip("/")
return f"{database_location}/{table_name}/"
raise ValueError("Cannot determine location from warehouse, please provide an explicit location")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should respect the warehouse property used to construct the catalog rather than failing with ValueError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, added it, including a test

metadata_location = f"{location}metadata/{uuid.uuid4()}.metadata.json"
metadata = TableMetadataV2(
location=location,
schemas=[schema],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema field IDs and partition field IDs need to be reassigned to ensure that they are consistent because we don't trust that users pass them in correctly. Passing them directly is okay for this PR, but we should not release until we have reassignment done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one, we do have validators on them when we initialize the Pydantic models, but there is no re-assignment being done. Do you want to set them to zero? We could also set the schema_id by default to zero?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what we do in Java: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/TableMetadata.java#L88-L133

When the table metadata builder adds a schema, spec, or order, the builder will check whether it is an existing order and find out the ID to assign. We should do something similar here.

This probably isn't something to do in this PR (since this is focused on Hive) but introducing the builder would probably be a good idea. That's how we also accumulate the change set that we pass to the REST catalog when committing table changes. I'd say let's move forward with what you have here (except passing the IDs rather than hard-coding to 0) and we can add reassignment later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out the Java code. I like it and agree that it is best to do it in separate PR.

@Fokko Fokko force-pushed the fd-read-metadata branch from 91cb7fe to 75c2af7 Compare August 8, 2022 18:31
@Fokko Fokko force-pushed the fd-read-metadata branch from 75c2af7 to dc7d33d Compare August 8, 2022 18:33
metadata = FromInputFile.table_metadata(file)
return Table(identifier=(table.dbName, table.tableName), metadata=metadata, metadata_location=metadata_location)

def _write_metadata(self, metadata: TableMetadataV2, io: FileIO, metadata_path: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this accept metadata v2 instead of either v1 or v2? I don't think that we want to assume that we will only write v2 metadata. We can't upgrade tables automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the signature. This is because we don't have any logic that allows updating a table. This is only used for creating new tables, and I think it makes sense to only allow V2 tables (or at least push the user into that direction).

if partition_spec.fields
else DEFAULT_LAST_PARTITION_ID,
)
io = load_file_io({**self.properties})
Copy link
Contributor

@rdblue rdblue Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're always using self.properties then I think it makes sense to have an io instance for the catalog. That can be used here rather than creating a new one per table create.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think that we should mix in the properties from the table in there as well. This way you can override this on a table level.

raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
return self._convert_hive_into_iceberg(hive_table)

return self._convert_hive_into_iceberg(hive_table, io, metadata_location)
Copy link
Contributor

@rdblue rdblue Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass metadata_location here? Since it is set on the table, I think it would make sense to always pull it from table parameters. That way we never create a situation where we've forgotten to set the table parameter, but successfully returned a table instance. Or one where we've forgotten to update it and returned an updated table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that a lot. I think that the metadata_location was still there for historical reasons. Thanks!

catalog._client.__enter__().get_table.assert_called_with(dbname="default", tbl_name="table")
table = catalog.load_table(("default", "new_tabl2e"))

catalog._client.__enter__().get_table.assert_called_with(dbname="default", tbl_name="new_tabl2e")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to have 2 in the table name?

@rdblue
Copy link
Contributor

rdblue commented Aug 12, 2022

@Fokko, looks like this is ready except that it conflicts with the order by PR that was just merged. I tried to fix it, but I didn't get it right.

@Fokko
Copy link
Contributor Author

Fokko commented Aug 15, 2022

Awesome @rdblue just resolved the pre-commit error 👍🏻

@rdblue rdblue merged commit 5b55d00 into apache:master Aug 15, 2022
@rdblue
Copy link
Contributor

rdblue commented Aug 15, 2022

Thanks, @Fokko! Great to have this working.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants