Skip to content

Parquet Arrow Error: missing required field PageHeader.type_ #6581

@jroddev

Description

@jroddev

Which part is this question about
Parquet, Arrow, Async
Trying to adapt external_metadata example with an custom AsyncFileReader

Describe your question
Not sure if this is a bug or a problem with my implementation.
I use this parquet file but have also tested with others and get the same error.

The error that I am seeing is

ArrowError("Parquet argument error: External: protocol error")

and after some digging I found the underlying error is

ParquetError::External from thrift ProtocolError {
    kind: Unknown,
    message: "missing required field PageHeader.type_",
}

This is happening because the first ident hit in thrift read_from_in_protocol is of type TType::Stop so it returns and type_ is never set. When the metadata is initially parsed there is i32, list, i32, string, i32 before it hits the first TType::Stop so it seems like it isn't starting from the start? Do I need to reset it back to the beginning somehow?

Context

This is the CustomAsyncFileReader. The eventual plan is to have this reading from an API so that I can page through RowGroups without loading them all into memory at once.

struct CustomAsyncFileReader {
    pub file_path: String,
    pub parquet_metadata: Arc<ParquetMetaData>,
}

impl AsyncFileReader for CustomAsyncFileReader {
    fn get_bytes(&mut self, range: std::ops::Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
        async move {
            let length = range.end - range.start;
            let mut file = File::open(&self.file_path).await.unwrap();
            let _ = file.seek(io::SeekFrom::Start(range.start as u64));
            let mut buffer = vec![0; length];
            match file.read_exact(&mut buffer).await {
                Ok(bytes) => Ok(Bytes::from(buffer))
                Err(e) => Err(ParquetError::General(e.to_string()))
            }
        }
        .boxed()
    }

    fn get_metadata(
        &mut self,
    ) -> BoxFuture<'_, Result<Arc<parquet::file::metadata::ParquetMetaData>>> {
        async move { Ok(self.parquet_metadata.clone()) }.boxed()
    }
}

This is the processing of the rows. The println is where the error is shown

async fn process_parquet_stream(
    remote_reader: CustomAsyncFileReader,
    metadata: Arc<ParquetMetaData>,
) {
    let options = ArrowReaderOptions::new().with_page_index(true);
    let arrow_reader_metadata =
        ArrowReaderMetadata::try_new(metadata.clone().into(), options).unwrap();
    let reader =
        ParquetRecordBatchStreamBuilder::new_with_metadata(remote_reader, arrow_reader_metadata)
            .build()
            .unwrap();
    reader
        .for_each(|row_group| async move {
            println!("process row group: {:#?}", row_group);
            sleep(Duration::from_secs(1)).await;
        })
        .await;
}

Finally this is how I'm reading the metadata in the main function. I had tried a few other variants but they all result in the same error.

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
    let current_dir = env::current_dir().unwrap();
    let path = format!(
        "{}/data/green_tripdata_2024-01.parquet",
        current_dir.display()
    );
    let mut file = File::open(&path).await.unwrap();
    let file_size = file.metadata().await.unwrap().len();

    let metadata = Arc::new(
        ParquetMetaDataReader::new()
            .with_page_indexes(true)
            .load_and_finish(&mut file, file_size as usize)
            .await
            .unwrap(),
    );

    let remote_reader = CustomAsyncFileReader {
        file_path: path.to_string(),
        parquet_metadata: metadata.clone(),
    };
    process_parquet_stream(remote_reader, metadata).await;
    Ok(())
}

Any help is greatly appreciated.

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions