Use offset index in ParquetRecordBatchStream#2526
Conversation
| let (offset, lengths) = get_index_offset_and_lengths(chunks)?; | ||
| let length = lengths.iter().sum::<usize>(); | ||
|
|
||
| if length == 0 { |
There was a problem hiding this comment.
Not sure if this is right or we should return an empty vec?
parquet/src/arrow/async_reader.rs
Outdated
|
|
||
| /// Provides asynchronous access to the the page index for each column chunk in a | ||
| /// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups` | ||
| fn get_column_indexes( |
There was a problem hiding this comment.
Not sure if we should make this a separate constructor or just have it always use the index. If the index doesn't exist, the cost of determining that is minimal.
There was a problem hiding this comment.
I'm not sure of the value of exposing these on AsyncFileReader, and not just handling the logic internally. Ultimately if the implementer wants to override the way the index is fetched, they can just return ParquetMetadata from get_metadata with the index information already loaded.
tustvold
left a comment
There was a problem hiding this comment.
I think I would opt to keep more of this logic hidden, i.e. not exposed on AsyncFileReader, and make use get_byte_ranges to avoid making lots of separate small fetch requests
object_store/src/local.rs
Outdated
| integration.head(&path).await.unwrap(); | ||
| } | ||
|
|
||
| #[ignore] |
parquet/src/arrow/async_reader.rs
Outdated
| Self::new_builder(AsyncReader(input), metadata, Default::default()) | ||
| } | ||
|
|
||
| pub async fn new_with_index(mut input: T) -> Result<Self> { |
There was a problem hiding this comment.
I think it would be more consistent to have new_with_options accepting ArrowReaderOptions. This already has a field on it for the page index
parquet/src/arrow/async_reader.rs
Outdated
| } | ||
| } | ||
|
|
||
| let metadata = Arc::new(ParquetMetaData::new_with_page_index( |
There was a problem hiding this comment.
Not part of this PR, but I still feel something is off with the way the index information is located on ParquetMetadata...
Edit: filed #2530
There was a problem hiding this comment.
Yeah, I agree.
| ) -> Result<Vec<Vec<PageLocation>>, ParquetError> { | ||
| let (offset, total_length) = get_location_offset_and_total_length(chunks)?; | ||
|
|
||
| if total_length == 0 { |
| } | ||
| } else { | ||
| break; | ||
| if !(selector.skip || current_page_included) { |
There was a problem hiding this comment.
What are the implications of this change?
There was a problem hiding this comment.
Before we would break if we were on the last page. So if you skipped from inside the second to last page into the last page, then this would short circuit and the last page range wouldn't be selected.
parquet/src/arrow/async_reader.rs
Outdated
|
|
||
| //read all need data into buffer | ||
| let data = self | ||
| .get_bytes(offset as usize..offset as usize + length) |
There was a problem hiding this comment.
This will perform separate get_bytes requests to fetch the page index and column index information for each column chunk. This is likely not a good idea, especially since this will be performed serially.
Ideally we would identify the ranges of all the index information, and then call get_byte_ranges, this will allow coalescing proximate requests to ObjectStore, paralell fetch, etc...
There was a problem hiding this comment.
Yeah, that makes sense.
parquet/src/arrow/async_reader.rs
Outdated
| let mut offset_indexes = vec![]; | ||
|
|
||
| for (idx, rg) in row_groups.iter_mut().enumerate() { | ||
| let column_index = input.get_column_indexes(metadata.clone(), idx).await?; |
There was a problem hiding this comment.
We should check if the column index has already been fetched to the metadata, and not fetch it again if it is already present
parquet/src/arrow/async_reader.rs
Outdated
|
|
||
| /// Provides asynchronous access to the the page index for each column chunk in a | ||
| /// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups` | ||
| fn get_column_indexes( |
There was a problem hiding this comment.
I'm not sure of the value of exposing these on AsyncFileReader, and not just handling the logic internally. Ultimately if the implementer wants to override the way the index is fetched, they can just return ParquetMetadata from get_metadata with the index information already loaded.
| // then we need to also fetch a dictionary page. | ||
| let mut ranges = vec![]; | ||
| let (start, _len) = chunk_meta.byte_range(); | ||
| match page_locations[idx].first() { |
There was a problem hiding this comment.
Discovered this bug as well. We weren't fetching a dictionary page if it existed
| // If page has less rows than the remaining records to | ||
| // be skipped, skip entire page | ||
| if metadata.num_rows < remaining { | ||
| if metadata.num_rows <= remaining { |
There was a problem hiding this comment.
Another bug I uncovered while testing. This causes the reader to try and fetch the pages unnecessarily which are not pre-fetched
| return Self::new_builder(AsyncReader(input), metadata, options); | ||
| } | ||
|
|
||
| fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length); |
There was a problem hiding this comment.
I think this will read one col_index and page_location alternately. but they are written separately.
https://github.com/apache/parquet-format/blob/master/doc/images/PageIndexLayout.png
I think if we not cache all bytes in memory, we should read whole col_index then page_location.
why not we use 🤔
/// Read on row group's all columns indexes and change into [`Index`]
/// If not the format not available return an empty vector.
pub fn read_columns_indexes<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
) -> Result<Vec<Index>, ParquetError> {There was a problem hiding this comment.
This can't use read_columns_indexes as this needs to asynchronously fetch the bytes
There was a problem hiding this comment.
Oh! Thanks! but we can still try combine all col_index or page_location together separately
There was a problem hiding this comment.
The onus is on AsyncFileReader::get_ranges to handle this, e.g. ObjectStore::get_ranges does this already
| index_reader::get_location_offset_and_total_length(rg.columns())?; | ||
|
|
||
| let (idx_offset, idx_lengths) = | ||
| index_reader::get_index_offset_and_lengths(rg.columns())?; |
There was a problem hiding this comment.
It occurs to me that this method is making a pretty strong assumption that the column index data is contiguous, I'm not sure this is actually guaranteed... Definitely a separate issue from this PR though
| rg.set_page_offset(offset_index.clone()); | ||
| offset_indexes.push(offset_index); |
There was a problem hiding this comment.
Again this interface seems really confused - something for #2530
tustvold
left a comment
There was a problem hiding this comment.
Looks good to me, thank you. There is definitely some cleanup to do with the page index plumbing, but that is beyond the scope of this PR.
|
Benchmark runs are scheduled for baseline = 0f45932 and contender = 1eb6c45. 1eb6c45 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #2430
Closes #2434
Rationale for this change
Leverage OffsetIndex (if available) to prune IO in ParquetRecordBatchStream
What changes are included in this PR?
Allow user to specify read options in ParquetRecordBatchStreamBuilder which will fetch index metadata when building.
Assorted bug fixes:
RowSelection::scan_rangeswhen skipping past the final page boundarySerializedPageReaderfromInMemoryColumnChunkif we had a page index.Are there any user-facing changes?