Enable parallel writing across row groups when writing encrypted parquet#8162
Enable parallel writing across row groups when writing encrypted parquet#8162alamb merged 16 commits intoapache:mainfrom
Conversation
bcab6f9 to
b701695
Compare
|
@alamb @adamreeve I think this will make for a better multithreading API. |
adamreeve
left a comment
There was a problem hiding this comment.
Looks good to me thanks Rok, just some minor comments
|
Thanks for the review @adamreeve ! I suppose it's time to ask @alamb to do a pass :) |
alamb
left a comment
There was a problem hiding this comment.
Thanks @rok and @adamreeve -- I went over the PR more carefully and it makes sense to me. The examples are quite cool.
Thanks again
|
I merged up from main and plan to merge this PR in once the tests pass |
|
Hi @rok and @adamreeve -- there appears to be a clippy error now (due to a new API that was added on main). Can you possibly take a look? Again, I apologize for the delay in review |
|
Thanks for the review @alamb! |
|
@alamb I've pushed a couple of ignores just to confirm the codepath. Please don't merge yet. |
|
@alamb the deprecation warning comes from |
15ed2de to
c4e38db
Compare
|
I don't have the mixed sync/async-write test quite right yet, but I wonder if we really need it? |
|
I am getting ready to create the 56.2.0 release candidate. Shall we try and get this one in, or can it wait for 57.0.0 in October? |
|
@alamb it'd be great to get this one in. Let me take a look if I can fix up the |
I am not sure -- what I would love to see is an example like this https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowColumnWriter.html#example-encoding-two-arrow-arrays-in-parallel That shows how to use whatever APIs we have to write data in parallel I am sorry I haven't found time to study this PR in detail. @adamreeve it sounds like you have some good ideas and have devoted time here. What would you recommend? |
alamb
left a comment
There was a problem hiding this comment.
Ok, I went over this PR quite carefully this morning, and reviewed all the PRs / APIs and I think it makes sense. Thank you @rok and @adamreeve
Divergent Writing APIs
One thought I had (which we can do as a follow on PR) would be to unify the APIs for doing concurrent writing so they always use ArrowRowGroupWriterFactory, which I think would mean:
- Deprecate
get_column_writers - Make
ArrowRowGroupWriterFactory::newpublic - Update the examples to use ArrowRowGroupWriterFactory
If this sounds reasonable, I can file a ticket.
I will also see if I can debug the test failures
| } | ||
|
|
||
| /// Create a new row group writer and return its column writers. | ||
| pub async fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> { |
There was a problem hiding this comment.
I double checked and this code is not yet released, so this is not a public API change. It was added in
There was a problem hiding this comment.
It is very fresh indeed. I hope we're not spoiling some plan here. We did find the ArrowRowGroupWriterFactory route better so I think it should be ok.
| } | ||
|
|
||
| /// Create a new row group writer and return its column writers. | ||
| #[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")] |
There was a problem hiding this comment.
Note to myself, these APIs were added in
- Support multi-threaded writing of Parquet files with modular encryption #8029
So they are relatively new
|
|
||
| let mut writers = writer.get_column_writers().await.unwrap(); | ||
| // Use low-level API to write an Arrow group | ||
| let arrow_writer = writer.sync_writer; |
There was a problem hiding this comment.
I think it would be good to use public APIs in the tests, rather than accessing the inner fields directly (which is not possible from other crates)
|
|
||
| /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`]. | ||
| /// This can be useful to provide more control over how files are written. | ||
| pub fn into_serialized_writer( |
There was a problem hiding this comment.
I spent quite some time trying to figure out why this API is needed -- specifically "why do we need an ArrowWriter at all, why not use SerializedFileWriter and get_column_writers directly, as shown in this example
After study I concluded the reason we need to expose ArrowRowGroupWriterFactory is that ArrowRowGroupWriterFactory::create_column_writers also has the appropriate encryption properties.
It is unfortunate that we'll now have two different sets of APIs for creating column writers -- via get_column_writers AND ArrowRowGroupWriterFactory
There was a problem hiding this comment.
I filed a ticket with a suggestion for a unified API:
| Ok(()) | ||
| } | ||
|
|
||
| /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`]. |
There was a problem hiding this comment.
I spent quite a while trying to figure out why we can't just use get_column_writers as in the example to use ArrowRowGroupWriterFactory and I (finally) realized the reason is the encryption configuration isn't passed to get_column_writers. ArrowRowGroupWriterFactory does have the encryption details and thus can make the correct ArrowColumnWriters.
I think it is somewhat of a strange API to create an ArrowWriter only to immediately destructure it into a SerializedWriter / the underlying writer. It is also unfortunate we now have two different APIs for writing row groups in parallel, depending on encryption.
I have an idea to make the APIs better as a follow on.
There was a problem hiding this comment.
Agreed it's odd. Motivation was to introduce as few new pubs as possible. Would be very curious about alternative API shapes.
There was a problem hiding this comment.
#8162 (review) is my suggestion , basically TLDR is to make ArrowRowGroupWriterFactory constructor public and deprecate get_column_writers
There was a problem hiding this comment.
I spent quite a while trying to figure out why we can't just use get_column_writers.
Sorry, I realise now that I didn't make this very clear in my original issue (#7359).
One other factor is that we couldn't just use the WriterProperties passed to get_column_writers to internally create a new FileEncryptor. When a FileEncryptor is created for the SerializedFileWriter, it generates random AAD (additional authentication data), and this AAD has to be the same for all encrypted modules in the file.
There was a problem hiding this comment.
I filed a ticket with a suggestion for a unified API:
| } | ||
|
|
||
| /// Create column writers for a new row group. | ||
| pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> { |
There was a problem hiding this comment.
I see now that this is the key API -- create column writers with the relevant encryption properties, if relevant
There was a problem hiding this comment.
What do you thinks to set create_row_group_writer pub and remove this function and use function into_writers add in issue #8260
There was a problem hiding this comment.
So I am not sure making ArrowRowGroupWriter public gets us much of anything, and it would not allow per-column parallel encoding
One benefit of getting the column writers individually, is that then the columns can be encoded in parallel. The ArrowRowGroupWriter can only write RowGroups in parallel.
I looked at ArrowRowGroupWriter a bit more, and the only substantial thing it does is call a loop with compute_leaves which is already public.
arrow-rs/parquet/src/arrow/arrow_writer/mod.rs
Lines 831 to 835 in bac3690
| assert_eq!(to_write, read); | ||
| } | ||
|
|
||
| #[tokio::test] |
There was a problem hiding this comment.
- I removed this test because it was added in [Parquet] Write row group with async writer #8262 along with
AsyncArrowWriter::get_column_writersandAsyncArrowWriter::append_row_group
Since we removed those APIs in this PR, we should also remove the test
|
Sorry for the late response @alamb and thanks for the added changes. I think this is good as is and can be included in the release. |
This was my fault for not reviewing this PR before in more detail. I leave this open until tomorrow to give people a chance to respond, and if not I'll merge it in and make the RC |
I've read through the latest comments and it sounds like you have a good understanding of the motivation for this change now, thanks for taking a close look Andrew. I agree it would be good to consolidate on using If you'd rather not have the new |
Yeah I agree -- one method is not a big deal if we are going to go with the ArrowRowGroupWriterFactory I'll merge this PR and file some follow on tickets shortly |
|
Thank you @rok @lilianm and @adamreeve |
# Which issue does this PR close? - related to #8162 # Rationale for this change - While reviewing #8162 I read a bunch more of the parquet code and I wanted to capture some of my understanding in comments. # What changes are included in this PR? Add more documentation to various parquet writing APIs # Are these changes tested? By CI # Are there any user-facing changes? Documentation only, no function changes --------- Co-authored-by: Ed Seidl <[email protected]>
Rationale for this change
#8029 introduced
pub ArrowWriter.get_column_writersandpub ArrowWriter.append_row_groupto enable multi-threaded parquet encrypted writing. However testing downstream showed the API is not feasible, see #8115.What changes are included in this PR?
This introduces
pub ArrowWriter.into_serialized_writerand deprecatespub ArrowWriter.get_column_writersandpub ArrowWriter.append_row_group. It also makesArrowRowGroupWriterFactorypublic and adds apub ArrowRowGroupWriterFactory.create_column_writers.Are these changes tested?
This includes a DataFusion inspired test for concurrent writing across columns and row groups to make sure parallel writing is and remains possible with
ArrowWriters API. Further we created a draft PR in DataFusion apache/datafusion#16738 to test for multithreaded writing support.Are there any user-facing changes?
See description of changes.