[Parquet] Support skipping pages with mask based evaluation#9118
[Parquet] Support skipping pages with mask based evaluation#9118sdf-jkl wants to merge 46 commits intoapache:mainfrom
Conversation
alamb
left a comment
There was a problem hiding this comment.
Thank you @sdf-jkl -- this actually makes a lot of sense to me 👏
I have a few concerns:
- I am worried about the performance overhead of this approach (copying the page index and the loop for each batch) -- I will run some benchmarks to asses this
- I do wonder if we have test coverage for this entire situation -- in particular, do we have tests that repeatedly call
next_mask_chunkafter the first page and make sure we get the right rows?
If the performance looks good, I think we should add some more tests -- maybe @hhhizzz has some ideas on how to do this (or I think I can try and find some time to help out / work with codex to do so)
| /// Using the row selection to skip(4), page2 won't be read at all, so in this | ||
| /// case we can't decode all the rows and apply a mask. To correctly apply the | ||
| /// bit mask, we need all 6 values be read, but page2 is not in memory. | ||
| fn override_selector_strategy_if_needed( |
There was a problem hiding this comment.
nice -- the idea is to avoid this function 👍
| array_reader, | ||
| schema: Arc::new(schema), | ||
| read_plan, | ||
| page_offsets: page_offsets.map(|slice| Arc::new(slice.to_vec())), |
There was a problem hiding this comment.
So I think this will effectively will copy the entire OffsetIndexMetadataData structure (which I worry could be quite large)
I wonder if we need to find a way to avoid this (e.g. making the entire thing Arc'd in https://github.com/apache/arrow-rs/blob/67e04e758f1e62ec3d78d2f678daf433a4c54e30/parquet/src/file/metadata/mod.rs#L197-L196 somehow 🤔 )
There was a problem hiding this comment.
We could store only the &Vec<PageLocation> instead of the entire OffsetIndexMetadataData df9a493
| while cursor < mask.len() && selected_rows < batch_size { | ||
| let mut page_end = mask.len(); | ||
| if let Some(pages) = page_locations { | ||
| for loc in pages { |
There was a problem hiding this comment.
I am also a little worried that this loop will take too long (it is O(N^2) in the number of pages as each time it looks through all pages
Maybe we could potentially add a PageLocationIterator to the cursor itself (so we know where to pick up)
There was a problem hiding this comment.
Maybe a binary search through a vec of page offsets? Would have to construct the vec once beforehand to keep us from rebuilding it.
|
run benchmark arrow_reader_clickbench arrow_reader_row_filter |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark arrow_reader_clickbench arrow_reader_row_filter |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
@alamb @Dandandan clickbench q12, 24, 30 show some degradation, but everything else looks like an overall improvement. |
|
|
||
| let reader = ParquetRecordBatchReader::new(array_reader, plan); | ||
| let reader = | ||
| ParquetRecordBatchReader::new(array_reader, plan, page_offsets.cloned()); |
There was a problem hiding this comment.
cloned may cause extra expense here, can we use Arc<[PageLocation]> to avoid that?
There was a problem hiding this comment.
It's a big api change to make PageLocation or OffsetIndexMetadataData an Arc inside ParquetMetaData.
If we'd want to make that change, I can open an issue and work up a PR.
There was a problem hiding this comment.
I agree with @hhhizzz that copying the offsets here is not good
I thought about it some more, and I think the reason the copy is currently needed is that the decision of should the page be skipped is postponed until the next MaskChunk is needed
One potential idea I had to avoid this, is to use the page index in the ReadPlanBuilder when building, rather than pass in the page index to every call for next_batch.
So maybe that would look something like extending MaskCursor from
/// Cursor for iterating a mask-backed [`RowSelection`]
///
/// This is best for dense selections where there are many small skips
/// or selections. For example, selecting every other row.
#[derive(Debug)]
pub struct MaskCursor {
mask: BooleanBuffer,
/// Current absolute offset into the selection
position: usize,
}To also track what ranges should be skipped entirely. Maybe something like
#[derive(Debug)]
pub struct MaskCursor {
mask: BooleanBuffer,
/// Current absolute offset into the selection
position: usize,
/// Which row ranges should be skipped entirely?
skip_ranges: Vec<Range<usize>>,
}That I think would simplify the logic for next_mask_chunk significantly and it would avoid the need to copy the entire page inde
|
Thank you! @sdf-jkl , the code look great, just wondering if we could add more Unit tests. |
|
Here's the exists test: arrow-rs/parquet/src/arrow/async_reader/mod.rs Line 1218 in 13d497a I think we can just add one more UT to test the skipping page with RowSelectionPolicy set to Mask instead of Auto
|
|
I want to write down my thoughts about this issue, hopefully it will be helpful. The original issue:
The simplest case where this happens is:
The case above is pretty simple and does not require different page layouts between columns. It only requires a skipped page in a column and a mask selection. This can happen:
This was fixed by:
The new issue (introduced in #9239):
Prerequisites for this to happen:
How we fixed? it:
Possible positions where mask cursor can end up:
Cases where this issue could happen:
Invariant enforced by the fix: |
|
So, the test cases we need for the new issue are:
|
# Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. --> Push after #9354 where `TestReader` is moved to a common place for other async reader tests. - Part of #9348. - Will help with #9118 # Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> # What changes are included in this PR? - Move sync tests from `parquet/tests/arrow_reader/row_filter.rs` to `parquet/tests/arrow_reader/row_filter/sync.rs` - Move async tests from `parquet/src/async_reader/mod.rs` to `parquet/tests/arrow_reader/row_filter/async.rs` <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> # Are these changes tested? Code movement <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> # Are there any user-facing changes? Code movement <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. If there are any breaking changes to public APIs, please call them out. -->
|
@alamb Above I argue that the only page‑offset issues can appear during filter reads or final reads. During filtering, it can happen when an During the final read, it can happen for when multiple columns are projected. In both cases the columns need have different page offsets (due to types, encodings, or other reasons) to cause errors. |
|
I have one more idea I want to try this weekend💭 |
There was a problem hiding this comment.
@alamb
The change I was thinking about is this:
Instead of using all page offsets as MaskChunk boundaries, only use the skipped pages.
Previously, we treated every page offsets as a boundary. When reading loaded pages, this meant we had to stop at every page and start a new MaskChunk, which introduced a significant function calls overhead. Skipped pages would coalesce because, once we start a new MaskChunk, consecutive values from the start of the mask are zeros, so they merge into the initial skip.
By using only skipped pages as boundaries for MaskChunks, we stop a chunk only when the next page is not loaded. This way, skipped pages are still coalesce through the initial skip, but loaded pages are no longer bounded by every page offset and can coalesce until they reach a skipped page.
This should prevent the back and forth MaskChunk calls at every original page boundary in the projection.
|
|
||
| /// Returns true if selectors should be forced, preventing mask materialisation | ||
| pub(crate) fn should_force_selectors( | ||
| /// Returns row offsets for the starts of skipped pages across projected columns |
There was a problem hiding this comment.
Function to retrieve row offsets for skipped pages across projected columns.
| mask: BooleanBuffer, | ||
| /// Current absolute offset into the selection | ||
| position: usize, | ||
| /// Index of the next page boundary candidate. This advances monotonically |
There was a problem hiding this comment.
Make MaskCursor know next boundary index like the idea in sdf-jkl#2
| // or until the mask is exhausted. This mirrors the behaviour of the legacy | ||
| // `RowSelector` queue-based iteration. | ||
| while cursor < mask.len() && selected_rows < batch_size { | ||
| let max_chunk_rows = page_boundaries |
There was a problem hiding this comment.
This way we can avoid binary search in every MaskChunk
|
@Dandandan could you run the benchmark again since this could be related to apache/datafusion#20324 |
(I think the runner is stuck again) |
|
run benchmark arrow_reader_clickbench |
|
I'll check out the benchmark locally |
|
For |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Which issue does this PR close?
Rationale for this change
Check issue.
What changes are included in this PR?
Made
next_mask_chunkpage aware.By adding
page_offsetstoParquetRecordBatchReaderAre these changes tested?
Yes, unit tests
Are there any user-facing changes?