feat: Add stream upload (multi-part upload)#20
feat: Add stream upload (multi-part upload)#20kodiakhq[bot] merged 19 commits intoinfluxdata:mainfrom
Conversation
tustvold
left a comment
There was a problem hiding this comment.
Awesome that you're working on this 🎉 This will open up all sorts of cool things, like doing streaming parquet writes to object storage.
That being said I'm not sure that BoxStream will give us the optimal interface. The main challenge with streams is that they are pull-based, which can complicate things like offloading CPU bound work to a separate threadpool.
One option might be to use futures::Sink i.e. something like
type ChunkWriter<'a> = Pin<Box<dyn Sink<Bytes, Error=Error>>>
async fn put_chunked(&self, location: &Path) -> Result<ChunkWriter<'_>>
But this has two issues:
- It isn't clear how to handle cancellation
- It still forces data to be produced serially
I wonder if instead we might do something like
#[async_trait]
trait ChunkWriter {
/// Write a chunk at index `idx`
///
/// Takes a shared reference so can be performed in parallel
async fn put(&self, idx: usize, data: Bytes) -> Result<()>;
/// Abort the multipart upload
///
/// Takes a mutable reference to ensure no inflight requests
async fn abort(&mut self) -> Result<()>
/// Finish the multipart upload
///
/// Takes a mutable reference to ensure no inflight requests
async fn finish(&mut self) -> Result<()>
}
#[async_trait]
trait ObjectStore {
async fn put_chunked(&self) -> Result<Box<dyn ChunkWriter + '_>>
...
}
We could then perhaps impl Sink for ChunkWriter as a usability enhancement for simple use-cases. This would let people do
store.put_chunked(location)?.await.forward(stream).await?
What do you think?
src/azure.rs
Outdated
| Ok(()) | ||
| } | ||
|
|
||
| async fn upload( |
There was a problem hiding this comment.
I think put_chunked or put_multi_part might be a better name
I was starting to think that about streams. It seems like I initially liked the idea of the
Apologies if this is a naive question, but are there file format serializers that don't do this? I was expecting that something like the parquet writer would produce buffers serially, and the upload would receive them and initiate concurrent upload calls as they came in. (Possibly using something like So far, I'm more inclined to go with just #[async_trait]
trait AbortableUpload {
async abort() -> Result<()>;
}
type ChunkWriter<'a> = Pin<Box<dyn Sink<Bytes, Error=Error> + AbortableUpload>>
async fn put_chunked(&self, location: &Path) -> Result<ChunkWriter<'_>> |
Aah yes, yeah that would be annoying for users to have to worry about
You could theoretically encode parquet row groups separately, potentially in parallel, or chunks of a CSV or ndjson file, etc... Although tbh uploading them out of order is probably of only limited benefit, especially if internally the
I suspect FuturesUnordered or similar might be easier to use in a push-based context.
Sounds good to me 👍
Thinking a bit more on this:
As such I wonder if we need multipart uploads to be a higher level concept on the trait, maybe something like What do you think? |
c5134a2 to
5a27004
Compare
|
I've been away from this for a bit, but got back into it this weekend. I decided to go ahead with the implementation to get a better sense of what the low-level details looks like. I realized that I learned Azure does not provide a way to clean up parts ("blocks" as they call them). There is no unique ID associated with a particular upload, other than the object location. They just expire in 7 days. So abort does nothing for now.
|
Pretty much, I just remember being stung in the past by S3 bills for aborted multipart uploads which were just sitting around eating up storage space.
Yeah, tbh this seems like a pretty sensible policy. https://aws.amazon.com/blogs/aws-cloud-financial-management/discovering-and-deleting-incomplete-multipart-uploads-to-lower-amazon-s3-costs/ describes how you can now achieve the same with S3, so perhaps we don't need to add explicit support for this after all...
I see, yeah let's do the simple thing that is supported by all use-cases and we can extend as needed later |
src/local.rs
Outdated
| buf: &[u8], | ||
| ) -> std::task::Poll<Result<usize, io::Error>> { | ||
| loop { | ||
| match &mut self.state { |
There was a problem hiding this comment.
I think this could be simplified with maybe_spawn_blocking and BoxFuture instead of JoinHandle
There was a problem hiding this comment.
Main reason I didn't use maybe_spawn_blocking was it forces the wrong error type for this function, and don't see any easy way to make that generic. But I agree BoxFuture would be better.
5be7502 to
ad4af52
Compare
"multi-part" seems to be an S3 specific concept, though it also is a decent description. Right now the difference between |
2e67be7 to
eb4da24
Compare
|
@wjones127 and @tustvold -- Given we plan to donate the |
|
How soon are we planning to move the crate? I will add the GCS implementation to the PR this weekend, but I can also hold off on that if we want to get this is sooner. I think this should be ready to merge in the next week, but feel free to let me know if you believe otherwise @tustvold 😄 |
I was imagining sometime next week -- I think we can have a race to see what gets merged first, and the worst case is I'll port this PR's code if needed. |
|
We won't be able to test GCS XML multipart uploads in CI until it's added upstream: fsouza/fake-gcs-server#852 But I can test manually for now. |
|
|
| /// Configuration for connecting to [Google Cloud Storage](https://cloud.google.com/storage/).s | ||
| #[derive(Debug)] | ||
| pub struct GoogleCloudStorage { | ||
| client: Arc<GoogleCloudStorageClient>, |
There was a problem hiding this comment.
I needed to have a reference to the underlying token function and client from the mutlipart writer struct, so I moved the request data into another struct and referenced with an Arc.
tustvold
left a comment
There was a problem hiding this comment.
This is looking extremely cool, just some relatively minor suggestions
Cargo.toml
Outdated
| futures = "0.3" | ||
| serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } | ||
| serde_json = { version = "1.0", default-features = false, optional = true } | ||
| serde-xml-rs = { version = "0.5.1", default-features = false, optional = true } |
There was a problem hiding this comment.
I know this is currently a dependency of azure_storage, but I am a bit apprehensive about its maintenance, it doesn't seem to be particularly actively receiving contributions, and has some pretty long-standing issues - RReverser/serde-xml-rs#135
The same actually holds of xml-rs netvl/xml-rs#219 which is used by rusoto...
https://github.com/tafia/quick-xml is possibly a more future-proof option? Perhaps something to bear in mind as we work on #18
There was a problem hiding this comment.
Good point. quick-xml does look like it might be a better choice. I will try switching to that.
There was a problem hiding this comment.
It's a little better, but there is one issue they will not fix that we have to work around: tafia/quick-xml#350
| buf: &[u8], | ||
| ) -> std::task::Poll<Result<usize, io::Error>> { | ||
| // Poll current tasks | ||
| self.as_mut().poll_tasks(cx)?; |
There was a problem hiding this comment.
I had no idea how this was working, turns out Rust 1.36 added some magic to make this work
src/throttle.rs
Outdated
| Waiting, | ||
| } | ||
|
|
||
| struct ThrottledUpload { |
There was a problem hiding this comment.
As far as I can tell this simply delays the first write or flush, I'm not really sure this is particularly meaningful. As this cannot delay the underlying API calls (as they're hidden behind the AsyncWrite abstraction), I would personally be inclined to just not support these APIs in the ThrottledStore
There was a problem hiding this comment.
Is ThrottledStore mostly used just for testing?
There was a problem hiding this comment.
I've removed the ThrottleStore implementation.
Co-authored-by: Raphael Taylor-Davies <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
cac4ad4 to
070edfb
Compare
Implements stream / multi-part upload for all object stores except GCP. Exposed as two new methods in
ObjectStoretrait:Follow-up work: