-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
This proposal intends to make AsyncArrowWriter accepts a new trait called AsyncFileWriter instead like what we do for ParquetRecordBatchStream.
AsyncArrowWriter accepts AsyncWrite currently:
arrow-rs/parquet/src/arrow/async_writer/mod.rs
Lines 100 to 109 in 19a3bb0
| impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> { | |
| /// Try to create a new Async Arrow Writer | |
| pub fn try_new( | |
| writer: W, | |
| arrow_schema: SchemaRef, | |
| props: Option<WriterProperties>, | |
| ) -> Result<Self> { | |
| let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default()); | |
| Self::try_new_with_options(writer, arrow_schema, options) | |
| } |
AsyncWrite is a low-level, poll-based API. Users with writers that provide async fn write() will need to encapsulate it within a manually written future state machine.
For example:
arrow-rs/object_store/src/buffered.rs
Lines 306 to 313 in 08af471
| impl AsyncWrite for BufWriter { | |
| fn poll_write( | |
| mut self: Pin<&mut Self>, | |
| cx: &mut Context<'_>, | |
| buf: &[u8], | |
| ) -> Poll<Result<usize, Error>> { | |
| let cap = self.capacity; | |
| let max_concurrency = self.max_concurrency; |
Describe the solution you'd like
I propose to make AsyncArrowWriter accepts a new trait called AsyncFileWriter:
pub struct AsyncArrowWriter<W> {
/// Underlying sync writer
sync_writer: ArrowWriter<Vec<u8>>,
/// Async writer provided by caller
async_writer: W,
}
impl<W: AsyncFileWriter + Unpin + Send> AsyncArrowWriter<W> {
...
}
pub trait AsyncFileWriter: Send {
async fn write(&mut self, bs: Bytes) -> Result<()>;
async fn complete(&mut self) -> Result<()>;
}
impl<T: AsyncWrite> AsyncFileWriter for T {
...
}
Describe alternatives you've considered
Not yet.
Additional context
ParquetRecordBatchStream accetps AsyncFileReader:
arrow-rs/parquet/src/arrow/async_reader/mod.rs
Lines 123 to 147 in f38283b
| /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files | |
| pub trait AsyncFileReader: Send { | |
| /// Retrieve the bytes in `range` | |
| fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>; | |
| /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially | |
| fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> { | |
| async move { | |
| let mut result = Vec::with_capacity(ranges.len()); | |
| for range in ranges.into_iter() { | |
| let data = self.get_bytes(range).await?; | |
| result.push(data); | |
| } | |
| Ok(result) | |
| } | |
| .boxed() | |
| } | |
| /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file, | |
| /// allowing fine-grained control over how metadata is sourced, in particular allowing | |
| /// for caching, pre-fetching, catalog metadata, etc... | |
| fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>; | |
| } |
I'm willing help implement this proposal.