Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ public static <T> Builder<T> builder() {
.defaultValue(false)
.build();

public static final PolarisConfiguration<Boolean> ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING =
Copy link
Contributor

@eric-maynard eric-maynard Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a catalog-level property, is there a need to make it specific to external catalogs?

I would prefer just ALLOW_CREDENTIAL_VENDING. Admins could just choose to set this on only external catalogs.

This would also extend nicely if we ever introduce overrides on the namespace or table level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think admins should be allowed to control it for INTERNAL and/or EXTERNAL catalogs independently. For now, this change focuses on EXTERNAL catalogs, but I think a subsequent PR that allows for disabling for INTERNAL catalogs as well would be a good approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admins could do that by just setting the flag on either INTERNAL or EXTERNAL catalogs, couldn't they? I don't see a reason to make the parameter specific to one type or the other.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the current code will support different keys for yaml-level system-wide settings, while using the same catalogConfig for per-catalog overrides, which does seem to achieve both goals.

I would agree that the yaml-level config is the main thing that needs to be configurable separately by whoever is running the Polaris service.

If we can verify that it won't cause any problems to have the same catalogConfig("enable.credential.vending") for both ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING and ALLOW_INTERNAL_CATALOG_CREDENTIAL_VENDING then this seems like the best approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree with @eric-maynard and @sfc-gh-dhuo . Ideally I would probably like to be able to set this on a per catalog basis but having blanket coverage is nice too. In the future I would like to have a more flexible way defining policies like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also prefer a catalog property for this at this moment. We could consider policy-based one if there are solid use cases in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current impl allows setting the default value for the service and allowing for catalog-level overrides. This is confirmed in the tests here and here. This allows admins to turn off credential vending by default, but to enable it on a case-by-case basis if they trust the external catalog or if the potential for credential overreach is not a security concern.

PolarisConfiguration.<Boolean>builder()
.key("ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING")
.catalogConfig("enable.credential.vending")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this name is not specific to external catalogs: enable.credential.vending

.description("If set to true, allow credential vending for external catalogs.")
.defaultValue(true)
.build();

public static final PolarisConfiguration<List<String>> SUPPORTED_CATALOG_STORAGE_TYPES =
PolarisConfiguration.<List<String>>builder()
.key("SUPPORTED_CATALOG_STORAGE_TYPES")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.iceberg.rest.responses.LoadViewResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.polaris.core.PolarisConfiguration;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
import org.apache.polaris.core.auth.PolarisAuthorizer;
Expand Down Expand Up @@ -818,6 +819,24 @@ public LoadTableResponse loadTableWithAccessDelegation(
authorizeBasicTableLikeOperationOrThrow(read, PolarisEntitySubType.TABLE, tableIdentifier);
}

PolarisResolvedPathWrapper catalogPath = resolutionManifest.getResolvedReferenceCatalogEntity();
callContext
.getPolarisCallContext()
.getDiagServices()
.checkNotNull(catalogPath, "No catalog available for loadTable request");
CatalogEntity catalogEntity = CatalogEntity.of(catalogPath.getRawLeafEntity());
PolarisConfigurationStore configurationStore =
callContext.getPolarisCallContext().getConfigurationStore();
if (catalogEntity
.getCatalogType()
.equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL)
&& !configurationStore.getConfiguration(
callContext.getPolarisCallContext(),
catalogEntity,
PolarisConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING)) {
throw new ForbiddenException("Access Delegation is not supported for this catalog");
}

// TODO: Find a way for the configuration or caller to better express whether to fail or omit
// when data-access is specified but access delegation grants are not found.
return doCatalogOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public String getRealmIdentifier() {
adminService.createCatalog(
new CatalogEntity.Builder()
.setName(CATALOG_NAME)
.setCatalogType("INTERNAL")
.setDefaultBaseLocation(storageLocation)
.setStorageConfigurationInfo(storageConfigModel, storageLocation)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,22 @@
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BaseTransaction;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateSchema;
Expand All @@ -52,6 +57,7 @@
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.auth.OAuth2Properties;
Expand Down Expand Up @@ -133,6 +139,26 @@ public class PolarisRestCatalogIntegrationTest extends CatalogTests<RESTCatalog>
private final String catalogBaseLocation =
S3_BUCKET_BASE + "/" + System.getenv("USER") + "/path/to/data";

private static final String[] DEFAULT_CATALOG_PROPERTIES = {
"allow.unstructured.table.location", "true",
"allow.external.table.location", "true"
};

@Retention(RetentionPolicy.RUNTIME)
private @interface CatalogConfig {
Catalog.TypeEnum value() default Catalog.TypeEnum.INTERNAL;

String[] properties() default {
"allow.unstructured.table.location", "true",
"allow.external.table.location", "true"
};
}

@Retention(RetentionPolicy.RUNTIME)
private @interface RestCatalogConfig {
String[] value() default {};
}

@BeforeAll
public static void setup(@PolarisRealm String realm) throws IOException {
// Set up test location
Expand Down Expand Up @@ -166,21 +192,27 @@ public void before(
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setAllowedLocations(List.of("s3://my-old-bucket/path/to/data"))
.build();
Optional<CatalogConfig> catalogConfig =
testInfo
.getTestMethod()
.flatMap(m -> Optional.ofNullable(m.getAnnotation(CatalogConfig.class)));

org.apache.polaris.core.admin.model.CatalogProperties.Builder catalogPropsBuilder =
org.apache.polaris.core.admin.model.CatalogProperties.builder(catalogBaseLocation)
.addProperty(
PolarisConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(),
"true")
.addProperty(
PolarisConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION.catalogConfig(),
"true");
org.apache.polaris.core.admin.model.CatalogProperties.builder(
catalogBaseLocation);
String[] properties =
catalogConfig.map(CatalogConfig::properties).orElse(DEFAULT_CATALOG_PROPERTIES);
for (int i = 0; i < properties.length; i += 2) {
catalogPropsBuilder.addProperty(properties[i], properties[i + 1]);
}
if (!S3_BUCKET_BASE.startsWith("file:/")) {
catalogPropsBuilder.addProperty(
CatalogEntity.REPLACE_NEW_LOCATION_PREFIX_WITH_CATALOG_DEFAULT_KEY, "file:");
}
Catalog catalog =
PolarisCatalog.builder()
.setType(Catalog.TypeEnum.INTERNAL)
.setType(
catalogConfig.map(CatalogConfig::value).orElse(Catalog.TypeEnum.INTERNAL))
.setName(currentCatalogName)
.setProperties(catalogPropsBuilder.build())
.setStorageConfigInfo(
Expand Down Expand Up @@ -287,21 +319,31 @@ public void before(
HTTPClient.builder(config)
.uri(config.get(CatalogProperties.URI))
.build());
this.restCatalog.initialize(
"polaris",
ImmutableMap.of(
CatalogProperties.URI,
"http://localhost:" + EXT.getLocalPort() + "/api/catalog",
OAuth2Properties.CREDENTIAL,
snowmanCredentials.clientId() + ":" + snowmanCredentials.clientSecret(),
OAuth2Properties.SCOPE,
BasePolarisAuthenticator.PRINCIPAL_ROLE_ALL,
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO",
"warehouse",
currentCatalogName,
"header." + REALM_PROPERTY_KEY,
realm));
Optional<RestCatalogConfig> restCatalogConfig =
testInfo
.getTestMethod()
.flatMap(m -> Optional.ofNullable(m.getAnnotation(RestCatalogConfig.class)));
ImmutableMap.Builder<String, String> propertiesBuilder =
ImmutableMap.<String, String>builder()
.put(
CatalogProperties.URI,
"http://localhost:" + EXT.getLocalPort() + "/api/catalog")
.put(
OAuth2Properties.CREDENTIAL,
snowmanCredentials.clientId() + ":" + snowmanCredentials.clientSecret())
.put(OAuth2Properties.SCOPE, BasePolarisAuthenticator.PRINCIPAL_ROLE_ALL)
.put(
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO")
.put("warehouse", currentCatalogName)
.put("header." + REALM_PROPERTY_KEY, realm);
restCatalogConfig.ifPresent(
config -> {
for (int i = 0; i < config.value().length; i += 2) {
propertiesBuilder.put(config.value()[i], config.value()[i + 1]);
}
});
this.restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast());
});
}

Expand Down Expand Up @@ -747,6 +789,104 @@ public void testCreateTableWithOverriddenBaseLocationMustResideInNsDirectory(
.isInstanceOf(ForbiddenException.class);
}

/**
* Create an EXTERNAL catalog. The test configuration, by default, disables access delegation for
* EXTERNAL catalogs, so register a table and try to load it with the REST client configured to
* try to fetch vended credentials. Expect a ForbiddenException.
*/
@CatalogConfig(Catalog.TypeEnum.EXTERNAL)
@RestCatalogConfig({"header.X-Iceberg-Access-Delegation", "vended-credentials"})
@Test
public void testLoadTableWithAccessDelegationForExternalCatalogWithConfigDisabled() {
Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));
restCatalog.registerTable(TableIdentifier.of(ns1, "my_table"), fileLocation);
try {
Assertions.assertThatThrownBy(
() -> restCatalog.loadTable(TableIdentifier.of(ns1, "my_table")))
.isInstanceOf(ForbiddenException.class)
.hasMessageContaining("Access Delegation is not supported for this catalog");
} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

/**
* Create an EXTERNAL catalog. The test configuration, by default, disables access delegation for
* EXTERNAL catalogs. Register a table and attempt to load it WITHOUT access delegation. This
* should succeed.
*/
@CatalogConfig(Catalog.TypeEnum.EXTERNAL)
@Test
public void testLoadTableWithoutAccessDelegationForExternalCatalogWithConfigDisabled() {
Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));
restCatalog.registerTable(TableIdentifier.of(ns1, "my_table"), fileLocation);
try {
restCatalog.loadTable(TableIdentifier.of(ns1, "my_table"));
} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

/**
* Create an EXTERNAL catalog. The test configuration, by default, disables access delegation for
* EXTERNAL catalogs. However, we set <code>enable.credential.vending</code> to <code>true</code>
* for this catalog, enabling it. Register a table and attempt to load it WITH access delegation.
* This should succeed.
*/
@CatalogConfig(
value = Catalog.TypeEnum.EXTERNAL,
properties = {"enable.credential.vending", "true"})
@RestCatalogConfig({"header.X-Iceberg-Access-Delegation", "vended-credentials"})
@Test
public void testLoadTableWithAccessDelegationForExternalCatalogWithConfigEnabledForCatalog() {
Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));
restCatalog.registerTable(TableIdentifier.of(ns1, "my_table"), fileLocation);
try {
restCatalog.loadTable(TableIdentifier.of(ns1, "my_table"));
} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

@Test
public void testSendNotificationInternalCatalog() {
NotificationRequest notification = new NotificationRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ featureConfiguration:
ALLOW_SPECIFYING_FILE_IO_IMPL: true
SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION: true
ALLOW_OVERLAPPING_CATALOG_URLS: true
ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING: false
SUPPORTED_CATALOG_STORAGE_TYPES:
- FILE
- S3
Expand Down