-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Which part is this question about
Parquet, Arrow, Async
Trying to adapt external_metadata example with an custom AsyncFileReader
Describe your question
Not sure if this is a bug or a problem with my implementation.
I use this parquet file but have also tested with others and get the same error.
The error that I am seeing is
ArrowError("Parquet argument error: External: protocol error")and after some digging I found the underlying error is
ParquetError::External from thrift ProtocolError {
kind: Unknown,
message: "missing required field PageHeader.type_",
}This is happening because the first ident hit in thrift read_from_in_protocol is of type TType::Stop so it returns and type_ is never set. When the metadata is initially parsed there is i32, list, i32, string, i32 before it hits the first TType::Stop so it seems like it isn't starting from the start? Do I need to reset it back to the beginning somehow?
Context
This is the CustomAsyncFileReader. The eventual plan is to have this reading from an API so that I can page through RowGroups without loading them all into memory at once.
struct CustomAsyncFileReader {
pub file_path: String,
pub parquet_metadata: Arc<ParquetMetaData>,
}
impl AsyncFileReader for CustomAsyncFileReader {
fn get_bytes(&mut self, range: std::ops::Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
async move {
let length = range.end - range.start;
let mut file = File::open(&self.file_path).await.unwrap();
let _ = file.seek(io::SeekFrom::Start(range.start as u64));
let mut buffer = vec![0; length];
match file.read_exact(&mut buffer).await {
Ok(bytes) => Ok(Bytes::from(buffer))
Err(e) => Err(ParquetError::General(e.to_string()))
}
}
.boxed()
}
fn get_metadata(
&mut self,
) -> BoxFuture<'_, Result<Arc<parquet::file::metadata::ParquetMetaData>>> {
async move { Ok(self.parquet_metadata.clone()) }.boxed()
}
}This is the processing of the rows. The println is where the error is shown
async fn process_parquet_stream(
remote_reader: CustomAsyncFileReader,
metadata: Arc<ParquetMetaData>,
) {
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_reader_metadata =
ArrowReaderMetadata::try_new(metadata.clone().into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(remote_reader, arrow_reader_metadata)
.build()
.unwrap();
reader
.for_each(|row_group| async move {
println!("process row group: {:#?}", row_group);
sleep(Duration::from_secs(1)).await;
})
.await;
}Finally this is how I'm reading the metadata in the main function. I had tried a few other variants but they all result in the same error.
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let current_dir = env::current_dir().unwrap();
let path = format!(
"{}/data/green_tripdata_2024-01.parquet",
current_dir.display()
);
let mut file = File::open(&path).await.unwrap();
let file_size = file.metadata().await.unwrap().len();
let metadata = Arc::new(
ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut file, file_size as usize)
.await
.unwrap(),
);
let remote_reader = CustomAsyncFileReader {
file_path: path.to_string(),
parquet_metadata: metadata.clone(),
};
process_parquet_stream(remote_reader, metadata).await;
Ok(())
}Any help is greatly appreciated.