Skip to content

feat: Translate Hadoop S3A configurations to object_store configurations#1817

Merged
andygrove merged 3 commits intoapache:mainfrom
Kontinuation:native-datafusion-parquet-s3
Jun 3, 2025
Merged

feat: Translate Hadoop S3A configurations to object_store configurations#1817
andygrove merged 3 commits intoapache:mainfrom
Kontinuation:native-datafusion-parquet-s3

Conversation

@Kontinuation
Copy link
Member

@Kontinuation Kontinuation commented May 30, 2025

Which issue does this PR close?

Closes #1766.

Rationale for this change

The native_datafusion parquet scanner does not configure the object_store client using Hadoop S3A configurations. The AWS credentials for accessing S3 such as spark.hadoop.fs.s3a.access.key will be ignored by native_datafusion, which leads to authentication failures when reading data on S3.

What changes are included in this PR?

This patch translates commonly used Hadoop S3A configurations (mostly for setting up credentials) to object_store counterpart. This is similar to what Apache Gluten does for supporting S3 (See https://gluten.apache.org/docs/getting-started/s3/#working-with-s3).

AWS allows accessing S3 using a variety of authentication methods, while object_store only supports some of them. We depend on aws-config and aws-credential-types to provide better support for our complex use cases of AWS S3 credentials. Including WebIdentityToken and AssumedRole credential.

The Hadoop S3A configuration translation support is only added to native_datafusion. native_iceberg_compat may need to integrate with iceberg catalog configuration and the config translation could be handled differently, so we leave it as a future work.

How are these changes tested?

  1. We define a structural stub CredentialProviderMetadata for easier testing the correctness of AWS credential providers built by the native code. Please refer to the tests in s3.rs for details.
  2. We added an end-to-end test using minio testcontainer (ParquetReadFromS3Suite). This test runs in native_comet and native_datafusion mode.
  3. We manually tested this locally and in our cloud environment. It works for all the AWS credentials we use, including anonymous credential, simple/temporary credential, EC2 instance profile credential, web identity token credential and assumed role credential using web identity token as base credential provider.

@Kontinuation Kontinuation changed the title feat: Translate Hadoop S3A credential configurations to object_store configurations feat: Translate Hadoop S3A configurations to object_store configurations May 30, 2025
@Kontinuation Kontinuation force-pushed the native-datafusion-parquet-s3 branch from e09266b to 69b33b9 Compare May 30, 2025 06:13
@codecov-commenter
Copy link

codecov-commenter commented May 30, 2025

Codecov Report

Attention: Patch coverage is 68.75000% with 10 lines in your changes missing coverage. Please review.

Project coverage is 59.42%. Comparing base (f09f8af) to head (f0efcb7).
Report is 228 commits behind head on main.

Files with missing lines Patch % Lines
...la/org/apache/comet/objectstore/NativeConfig.scala 61.90% 7 Missing and 1 partial ⚠️
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 81.81% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1817      +/-   ##
============================================
+ Coverage     56.12%   59.42%   +3.29%     
- Complexity      976     1151     +175     
============================================
  Files           119      130      +11     
  Lines         11743    12656     +913     
  Branches       2251     2370     +119     
============================================
+ Hits           6591     7521     +930     
+ Misses         4012     3928      -84     
- Partials       1140     1207      +67     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Kontinuation Kontinuation force-pushed the native-datafusion-parquet-s3 branch from 69b33b9 to c53dee7 Compare May 30, 2025 09:17
@Kontinuation Kontinuation force-pushed the native-datafusion-parquet-s3 branch from c53dee7 to f0efcb7 Compare May 30, 2025 09:20
@Kontinuation Kontinuation marked this pull request as ready for review May 30, 2025 09:22
Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @Kontinuation !

df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath)
}

// native_iceberg_compat mode does not have comprehensive S3 support, so we don't run tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

native_iceberg_compat has the same underlying code as native_datafusion so why would it be any different?
(We would need a different mechanism to pass in the config values though since it doesn't use QueryPlanSerde but passes in parameters via JNI in the call to init)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make native_iceberg_compat honor Hadoop configuration just like native_datafusion, so that we can use this scanner impl to read plain parquet files. However, to my understanding native_iceberg_compat is primarily for integrating with iceberg, and iceberg configures S3 in a different way. This is an example of Spark catalog configuration that stores the warehouse in S3:

spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
    --conf spark.sql.catalog.my_catalog.s3.access-key-id=xxxxx \
    --conf spark.sql.catalog.my_catalog.s3.secret-access-key=xxxxxx

Also, Apache Iceberg project has its own implementation of S3 file IO that directly depends on AWS SDK V2 and does not use Hadoop S3A, so Hadoop S3A configurations are not effective when reading S3 using iceberg. This requires that we take configurations from multiple sources to properly configure native_iceberg_compat scanner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, some misunderstanding here. Think of native_iceberg_compat as a variant of CometScan which uses the Datafusion reader instead of the original Comet parquet reader. Comet-Iceberg integration involves the iceberg scan replacing its own parquet file reader with the comet parquet reader. Iceberg w/Comet calls into the Comet scan's Batch reader and Column readers directly. native_iceberg_compat is written to provide a replacement for these classes.
Neither native_datafusion nor native_iceberg_compat will (or can) integrate with S3FileIO. S3FileIO configuration is ignored if Comet is enabled in Iceberg.

source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"

[[package]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would including these increase the size of the final jar by a big amount?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size of release jar built from this branch on macOS is 32MB, while the main branch is 30MB. The final jar size increases by 2MB.

@parthchandra
Copy link
Contributor

I have another thought on this. Any number of users have developed custom AWSCredentialsProviders in Java but we would not have corresponding implementations in Rust (the AWS Rust api is rather new). One way is to have a mechanism by which we can call the configured credentials provider in the Java side and simply pass in the session credentials to the native side instead of the conf. Another option is to invoke a configured credentials provider from the native side via JNI.
Do you think this implementation can be extended to support something like this (later, as an enhancement to this PR)?

@Kontinuation
Copy link
Member Author

I have another thought on this. Any number of users have developed custom AWSCredentialsProviders in Java but we would not have corresponding implementations in Rust (the AWS Rust api is rather new). One way is to have a mechanism by which we can call the configured credentials provider in the Java side and simply pass in the session credentials to the native side instead of the conf. Another option is to invoke a configured credentials provider from the native side via JNI. Do you think this implementation can be extended to support something like this (later, as an enhancement to this PR)?

I think calling the Java credential provider from the native via JNI is a better option. The credential may need to be refreshed when we scan the parquet files. If we pass the session credential when creating the scanner, the session credential may expire mid-scan and cause authentication failures.

The credential provider APIs in both AWS SDK V1 and V2 are relatively straightforward:

  • V1: AWSCredentialsProvider.getCredentials()AWSCredentials
  • V2: AwsCredentialsProvider.resolveCredentials()AwsCredentials

So it won't be hard to call it from native side using JNI. One nuance is that AWS SDK V1 and V2 defines credential provider and credential classes differently, our JNI-based native credential provider have to support both versions.

@parthchandra
Copy link
Contributor

parthchandra commented Jun 2, 2025

I have another thought on this. Any number of users have developed custom AWSCredentialsProviders in Java but we would not have corresponding implementations in Rust (the AWS Rust api is rather new). One way is to have a mechanism by which we can call the configured credentials provider in the Java side and simply pass in the session credentials to the native side instead of the conf. Another option is to invoke a configured credentials provider from the native side via JNI. Do you think this implementation can be extended to support something like this (later, as an enhancement to this PR)?

I think calling the Java credential provider from the native via JNI is a better option. The credential may need to be refreshed when we scan the parquet files. If we pass the session credential when creating the scanner, the session credential may expire mid-scan and cause authentication failures.

The credential provider APIs in both AWS SDK V1 and V2 are relatively straightforward:

  • V1: AWSCredentialsProvider.getCredentials()AWSCredentials
  • V2: AwsCredentialsProvider.resolveCredentials()AwsCredentials

So it won't be hard to call it from native side using JNI. One nuance is that AWS SDK V1 and V2 defines credential provider and credential classes differently, our JNI-based native credential provider have to support both versions.

I agree, I thought of the same thing some time after I wrote the above comment.
I agree, the credentials provider api is pretty straightforward and the JNI based provider can wrap both the V1 and V2 providers.

@mbutrovich, @andygrove, fyi.

@parthchandra
Copy link
Contributor

One more thought. Would you be able to write some documentation on configuring/using this?

@parthchandra
Copy link
Contributor

We can address some of the open items in a follow up. Logged: #1829

Comment on lines +109 to +110
if (sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT) {
test("read parquet file from MinIO") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be replaced with assume, like this:

Suggested change
if (sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT) {
test("read parquet file from MinIO") {
test("read parquet file from MinIO") {
assume(sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT)

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Kontinuation and @parthchandra. Let's merge this and keep iterating on the follow-on items?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support reading data from S3 using native_datafusion Parquet scanner

4 participants