Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -167,6 +169,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private CloseableGroup closeables = null;
private Set<Endpoint> endpoints;
private Supplier<Map<String, String>> mutationHeaders = Map::of;
private KeyManagementClient keyManagementClient = null;
private String namespaceSeparator = null;

private RESTTableCache tableCache;
Expand Down Expand Up @@ -272,6 +275,12 @@ public void initialize(String name, Map<String, String> unresolved) {
mergedProps,
RESTCatalogProperties.METRICS_REPORTING_ENABLED,
RESTCatalogProperties.METRICS_REPORTING_ENABLED_DEFAULT);

if (mergedProps.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) {
this.keyManagementClient = EncryptionUtil.createKmsClient(mergedProps);
this.closeables.addCloseable(this.keyManagementClient);
}

this.namespaceSeparator =
PropertyUtil.propertyAsString(
mergedProps,
Expand Down Expand Up @@ -575,6 +584,7 @@ private Supplier<BaseTable> createTableSupplier(
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, credentials),
keyManagementClient,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you all thought about the IRC remote scan planning and it integration with encryption ?

do we have test for the same.

tableMetadata,
endpoints);

Expand Down Expand Up @@ -674,6 +684,7 @@ public Table registerTable(
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
keyManagementClient,
response.tableMetadata(),
endpoints);

Expand Down Expand Up @@ -943,6 +954,7 @@ public Table create() {
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
keyManagementClient,
response.tableMetadata(),
endpoints);

Expand Down Expand Up @@ -976,6 +988,7 @@ public Transaction createTransaction() {
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
keyManagementClient,
RESTTableOperations.UpdateType.CREATE,
createChanges(meta),
meta,
Expand Down Expand Up @@ -1041,6 +1054,7 @@ public Transaction replaceTransaction() {
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
keyManagementClient,
RESTTableOperations.UpdateType.REPLACE,
changes.build(),
base,
Expand Down Expand Up @@ -1181,6 +1195,7 @@ private FileIO tableFileIO(
* @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation
* requests (POST/DELETE)
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
* @param kmsClient the {@link KeyManagementClient} for encrypted tables
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointing out that I'm changing the signature of this method. #14740 did the same recently, and I think #14465 that hasn't released yet so there shouldn't be problems with this

* @param current the current table metadata
* @param supportedEndpoints the set of supported REST endpoints
* @return a new RESTTableOperations instance
Expand All @@ -1191,10 +1206,18 @@ protected RESTTableOperations newTableOps(
Supplier<Map<String, String>> readHeaders,
Supplier<Map<String, String>> mutationHeaderSupplier,
FileIO fileIO,
KeyManagementClient kmsClient,
TableMetadata current,
Set<Endpoint> supportedEndpoints) {
return new RESTTableOperations(
restClient, path, readHeaders, mutationHeaderSupplier, fileIO, current, supportedEndpoints);
restClient,
path,
readHeaders,
mutationHeaderSupplier,
fileIO,
kmsClient,
current,
supportedEndpoints);
}

/**
Expand All @@ -1211,6 +1234,7 @@ protected RESTTableOperations newTableOps(
* @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation
* requests (POST/DELETE)
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
* @param kmsClient the {@link KeyManagementClient} for encrypted tables
* @param updateType the {@link RESTTableOperations.UpdateType} being performed
* @param createChanges the list of metadata updates to apply during table creation or replacement
* @param current the current table metadata (may be null for CREATE operations)
Expand All @@ -1223,6 +1247,7 @@ protected RESTTableOperations newTableOps(
Supplier<Map<String, String>> readHeaders,
Supplier<Map<String, String>> mutationHeaderSupplier,
FileIO fileIO,
KeyManagementClient kmsClient,
RESTTableOperations.UpdateType updateType,
List<MetadataUpdate> createChanges,
TableMetadata current,
Expand All @@ -1233,6 +1258,7 @@ protected RESTTableOperations newTableOps(
readHeaders,
mutationHeaderSupplier,
fileIO,
kmsClient,
updateType,
createChanges,
current,
Expand Down
Loading