feat: Translate Hadoop S3A configurations to object_store configurations#1817
Conversation
e09266b to
69b33b9
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1817 +/- ##
============================================
+ Coverage 56.12% 59.42% +3.29%
- Complexity 976 1151 +175
============================================
Files 119 130 +11
Lines 11743 12656 +913
Branches 2251 2370 +119
============================================
+ Hits 6591 7521 +930
+ Misses 4012 3928 -84
- Partials 1140 1207 +67 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
69b33b9 to
c53dee7
Compare
c53dee7 to
f0efcb7
Compare
parthchandra
left a comment
There was a problem hiding this comment.
Great work @Kontinuation !
| df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath) | ||
| } | ||
|
|
||
| // native_iceberg_compat mode does not have comprehensive S3 support, so we don't run tests |
There was a problem hiding this comment.
native_iceberg_compat has the same underlying code as native_datafusion so why would it be any different?
(We would need a different mechanism to pass in the config values though since it doesn't use QueryPlanSerde but passes in parameters via JNI in the call to init)
There was a problem hiding this comment.
We can make native_iceberg_compat honor Hadoop configuration just like native_datafusion, so that we can use this scanner impl to read plain parquet files. However, to my understanding native_iceberg_compat is primarily for integrating with iceberg, and iceberg configures S3 in a different way. This is an example of Spark catalog configuration that stores the warehouse in S3:
spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.access-key-id=xxxxx \
--conf spark.sql.catalog.my_catalog.s3.secret-access-key=xxxxxx
Also, Apache Iceberg project has its own implementation of S3 file IO that directly depends on AWS SDK V2 and does not use Hadoop S3A, so Hadoop S3A configurations are not effective when reading S3 using iceberg. This requires that we take configurations from multiple sources to properly configure native_iceberg_compat scanner.
There was a problem hiding this comment.
Ah, some misunderstanding here. Think of native_iceberg_compat as a variant of CometScan which uses the Datafusion reader instead of the original Comet parquet reader. Comet-Iceberg integration involves the iceberg scan replacing its own parquet file reader with the comet parquet reader. Iceberg w/Comet calls into the Comet scan's Batch reader and Column readers directly. native_iceberg_compat is written to provide a replacement for these classes.
Neither native_datafusion nor native_iceberg_compat will (or can) integrate with S3FileIO. S3FileIO configuration is ignored if Comet is enabled in Iceberg.
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" | ||
|
|
||
| [[package]] |
There was a problem hiding this comment.
Would including these increase the size of the final jar by a big amount?
There was a problem hiding this comment.
The size of release jar built from this branch on macOS is 32MB, while the main branch is 30MB. The final jar size increases by 2MB.
|
I have another thought on this. Any number of users have developed custom |
I think calling the Java credential provider from the native via JNI is a better option. The credential may need to be refreshed when we scan the parquet files. If we pass the session credential when creating the scanner, the session credential may expire mid-scan and cause authentication failures. The credential provider APIs in both AWS SDK V1 and V2 are relatively straightforward:
So it won't be hard to call it from native side using JNI. One nuance is that AWS SDK V1 and V2 defines credential provider and credential classes differently, our JNI-based native credential provider have to support both versions. |
I agree, I thought of the same thing some time after I wrote the above comment. @mbutrovich, @andygrove, fyi. |
|
One more thought. Would you be able to write some documentation on configuring/using this? |
|
We can address some of the open items in a follow up. Logged: #1829 |
| if (sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT) { | ||
| test("read parquet file from MinIO") { |
There was a problem hiding this comment.
This could be replaced with assume, like this:
| if (sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT) { | |
| test("read parquet file from MinIO") { | |
| test("read parquet file from MinIO") { | |
| assume(sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT) |
andygrove
left a comment
There was a problem hiding this comment.
Thanks @Kontinuation and @parthchandra. Let's merge this and keep iterating on the follow-on items?
Which issue does this PR close?
Closes #1766.
Rationale for this change
The
native_datafusionparquet scanner does not configure the object_store client using Hadoop S3A configurations. The AWS credentials for accessing S3 such asspark.hadoop.fs.s3a.access.keywill be ignored bynative_datafusion, which leads to authentication failures when reading data on S3.What changes are included in this PR?
This patch translates commonly used Hadoop S3A configurations (mostly for setting up credentials) to object_store counterpart. This is similar to what Apache Gluten does for supporting S3 (See https://gluten.apache.org/docs/getting-started/s3/#working-with-s3).
AWS allows accessing S3 using a variety of authentication methods, while object_store only supports some of them. We depend on
aws-configandaws-credential-typesto provide better support for our complex use cases of AWS S3 credentials. Including WebIdentityToken and AssumedRole credential.The Hadoop S3A configuration translation support is only added to
native_datafusion.native_iceberg_compatmay need to integrate with iceberg catalog configuration and the config translation could be handled differently, so we leave it as a future work.How are these changes tested?
CredentialProviderMetadatafor easier testing the correctness of AWS credential providers built by the native code. Please refer to the tests ins3.rsfor details.ParquetReadFromS3Suite). This test runs innative_cometandnative_datafusionmode.