Use object_store:BufWriter to replace put_multipart#9648
Use object_store:BufWriter to replace put_multipart#9648tustvold merged 5 commits intoapache:mainfrom
Conversation
|
It seems that |
|
Close and reopen to rerun CI |
We could drop the I believe this is in line with what @tustvold is thinking with ongoing improvements of |
Another option is to push an empty commit. |
|
Thanks for your review! @devinjdangelo |
FWIW the multipart ID is exposed but it might take some more type plumbing. IMO removing AbortableWrite is probably a simpler path forward |
yyy1000
left a comment
There was a problem hiding this comment.
Thanks for your review! @devinjdangelo @tustvold
I updated the PR to address your comment, let me know whether it needs further improvement :)
| )) | ||
| ) -> Result<Box<dyn AsyncWrite + Send + Unpin>> { | ||
| let buf_writer = BufWriter::new(object_store, location.clone()); | ||
| Ok(file_compression_type.convert_async_writer(buf_writer)?) |
There was a problem hiding this comment.
I think create_writer could still be saved cause it create the writer with compression?
| writer.shutdown() | ||
| writer.shutdown() | ||
| .await | ||
| .map_err(|_| internal_datafusion_err!("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!"))?; |
There was a problem hiding this comment.
Don't know whether it's proper to let all just shutdown here. But I think according to #9648 (comment), it's OK? 👀
| Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
I also removed struct MultiPart since they're also not used anymore.
| pub fn convert_async_writer( | ||
| &self, | ||
| w: Box<dyn AsyncWrite + Send + Unpin>, | ||
| w: BufWriter, |
There was a problem hiding this comment.
I'm thinking whether it's OK to change the param type here cause it's public, but keeping Box<dyn AsyncWrite + Send + Unpin> makes the type incompatible. 🤔
There was a problem hiding this comment.
I think it depends on if this code is always used with object_store (aka if DataFusion code always writes output using the object_store API). If this is the case, then switching to BufWriter here makes sense to me
BTW I think we need to update the comments on this function to match the new implementation
alamb
left a comment
There was a problem hiding this comment.
Thank you very much @yyy1000 -- this looks like a good improvement to me
I would like a few more opinions about the change in abort behavior, but as I understand the implications it seems a reasonable change to me.
With a few more updates to doc comments I think this PR will be ready to go in my view
| pub fn convert_async_writer( | ||
| &self, | ||
| w: Box<dyn AsyncWrite + Send + Unpin>, | ||
| w: BufWriter, |
There was a problem hiding this comment.
I think it depends on if this code is always used with object_store (aka if DataFusion code always writes output using the object_store API). If this is the case, then switching to BufWriter here makes sense to me
BTW I think we need to update the comments on this function to match the new implementation
| } | ||
| } | ||
|
|
||
| /// Stores data needed during abortion of MultiPart writers |
There was a problem hiding this comment.
Is it correct to say that the implications of removing AbortableWrite is that if certain (larger) writes to object store fail / abort for some reason, "garbage" (unreferenced partial uploads) may be left around indefinitely on the provider?
While I understand that some object stores (maybe all) can be configured to automatically clean up such parts, I think reverting the "try to cleanup on failure" behavior is worth reconsidering.
I think I could be convinced with an argument like "the software can't ensure clean up anyways (for example, if it is SIGKILLed) for some reason, and thus we don't even try to clean up in paths we could", but if we go that route I think we should explicitly document the behavior and rationale in comments somewhere
I think @metesynnada or @mustafasrepo originally added this code (though I may be wrong) so perhaps they have some perspective to share
There was a problem hiding this comment.
Yeah, I think according to #9648 (comment), 'garbage' cleanup will be only on cloud provider if removing AbortableWrite 🤔, also @devinjdangelo , is it right?
There was a problem hiding this comment.
Is it correct to say that the implications of removing AbortableWrite is that if certain (larger) writes to object store fail / abort for some reason, "garbage" (unreferenced partial uploads) may be left around indefinitely on the provider?
Yes. I have mixed feelings about removing any attempt to clean up on failure.
I think I could be convinced with an argument like "the software can't ensure clean up anyways (for example, if it is SIGKILLed) for some reason, and thus we don't even try to clean up in paths we could", but if we go that route I think we should explicitly document the behavior and rationale in comments somewhere
This argument is valid. A hardware/network fault will prevent any cleanup code we write from working, so to truly protect against partial writes would require logic outside of DataFusion's process (e.g. on the cloud service provider side).
On the other hand, this change may be annoying for simple failures when writing to a local file system. Encountering any execution error will leave dangling files when before they often could be cleaned up.
I think this is a case where one will draw different conclusions depending on if they are imagining an individual user of something like datafusion-cli vs. a production database system implemented on top DataFusion. The latter user will have little use for our attempts at clean up (they will need much better anyway), but the former may appreciate it.
There was a problem hiding this comment.
Local file system automatically cleans up on drop, or at least makes a best effort to do so. FWIW this same mechanism is used for ALL uploads, even the non-multipart ones so as to provide atomicity. Given nobody has complained about this, I suspect it is doing a fairly good job
I am not aware of a cloud provider that provides multipart uploads without some automated way to reap aborted uploads after a given time
There was a problem hiding this comment.
Ok, so I vote we leave the code as is (no attempt to explicitly abort in write) and add a note in the documentation. If it turns out this is an important behavior, we can add it back in
@yyy1000 can you handle adding the note in the documentation?
There was a problem hiding this comment.
Sure, I added it around create_writer function, would it be enough?
There was a problem hiding this comment.
Looks reasonable to me -- thank you
| location: &Path, | ||
| object_store: Arc<dyn ObjectStore>, | ||
| ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> { | ||
| let (multipart_id, writer) = object_store |
There was a problem hiding this comment.
For anyone else following along, the BufWriter internally does a multi-part put when appropriate
|
I can check this PR as well tomorrow. |
* feat: use BufWriter to replace put_multipart * feat: remove AbortableWrite * fix clippy * fix: add doc comment
Which issue does this PR close?
Closes #9614 .
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?