Skip to content

proposal: Make AsyncArrowWriter accepts AsyncFileWriter trait instead #5738

@Xuanwo

Description

@Xuanwo

This proposal intends to make AsyncArrowWriter accepts a new trait called AsyncFileWriter instead like what we do for ParquetRecordBatchStream.

AsyncArrowWriter accepts AsyncWrite currently:

impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// Try to create a new Async Arrow Writer
pub fn try_new(
writer: W,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
) -> Result<Self> {
let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
Self::try_new_with_options(writer, arrow_schema, options)
}

AsyncWrite is a low-level, poll-based API. Users with writers that provide async fn write() will need to encapsulate it within a manually written future state machine.

For example:

impl AsyncWrite for BufWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
let cap = self.capacity;
let max_concurrency = self.max_concurrency;

Describe the solution you'd like

I propose to make AsyncArrowWriter accepts a new trait called AsyncFileWriter:

pub struct AsyncArrowWriter<W> {
    /// Underlying sync writer
    sync_writer: ArrowWriter<Vec<u8>>,

    /// Async writer provided by caller
    async_writer: W,
}

impl<W: AsyncFileWriter + Unpin + Send> AsyncArrowWriter<W> {
  ...
}

pub trait AsyncFileWriter: Send {
    async fn write(&mut self, bs: Bytes) -> Result<()>;
    async fn complete(&mut self) -> Result<()>;
}

impl<T: AsyncWrite> AsyncFileWriter for T {
   ...
}

Describe alternatives you've considered

Not yet.

Additional context

ParquetRecordBatchStream accetps AsyncFileReader:

/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
pub trait AsyncFileReader: Send {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
async move {
let mut result = Vec::with_capacity(ranges.len());
for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}
Ok(result)
}
.boxed()
}
/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
}


I'm willing help implement this proposal.

Metadata

Metadata

Assignees

Labels

enhancementAny new improvement worthy of a entry in the changelogparquetChanges to the parquet crate

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions