Enable serialized_reader read specific Page by passing row ranges.#1977
Enable serialized_reader read specific Page by passing row ranges.#1977Ted-Jiang wants to merge 1 commit intoapache:masterfrom
Conversation
There was a problem hiding this comment.
For now just support primitive_reader
parquet/src/arrow/async_reader.rs
Outdated
There was a problem hiding this comment.
will support in InMemoryReader
parquet/src/file/metadata.rs
Outdated
There was a problem hiding this comment.
Need 3 level vec: row_group -> column chunck -> page
|
I'm confused about the design of the new API described above I think column index reader should be a function for parquet reader or parquet-rs, any one who call the parquet reader should get the benefit from this optimization with a filter. From your implementation, I find user need to call the lower api and use the column index to calculate the |
|
Yes, I think the row ranges are internal to |
Yes, i agree it's better keep it private. I think for now, we make it pub, after full support for predicate pushdown, we will do like in java. |
parquet/src/column/reader.rs
Outdated
| } | ||
|
|
||
| pub(crate) fn set_row_ranges(&mut self, row_ranges: RowRanges) { | ||
| self.selected_row_ranges = Some(row_ranges); |
There was a problem hiding this comment.
need this row_ranges for row align in future.
| Ok(Box::new(page_reader)) | ||
| } | ||
|
|
||
| fn get_column_page_reader_with_offset_index( |
There was a problem hiding this comment.
Cause of lack test data in sub project in parquet-testing, will add end to end test after add test file in it.
There was a problem hiding this comment.
We can add UT using the writer API.
#1935 has been merged, the parquet file contains the column index by default.
| let iterator = FilePageIterator::new(column_index, Arc::clone(self))?; | ||
| fn column_chunks( | ||
| &self, | ||
| i: usize, |
There was a problem hiding this comment.
| i: usize, | |
| column_index: usize, |
| let mut columns_indexes = vec![]; | ||
| let mut offset_indexes = vec![]; | ||
| for rg in &filtered_row_groups { | ||
| let c = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; |
There was a problem hiding this comment.
If a schema has co1,col2,col3.....col8, and we just need the col1 and col3, do we need to load other useless index data?
There was a problem hiding this comment.
I've had a quick review, unfortunately I think this is missing a key detail. In particular the arrow writer must read the same records from each of its columns. As written this simply skips reading pruned pages from columns. There is no relationship between the page boundaries across columns within a parquet, and therefore this will return different rows for each of the columns.
As described in #1791 (review), you will need to extract the row selection in addition to the page selection, and push this into RecordReader and ColumnValueDecoder. This will also make the API clearer, as we aren't going behind their back and skipping pages at the block-level
parquet/src/arrow/arrow_reader.rs
Outdated
| self.skip_arrow_metadata = skip_arrow_metadata; | ||
| self |
There was a problem hiding this comment.
| self.skip_arrow_metadata = skip_arrow_metadata; | |
| self | |
| { | |
| skip_arrow_metadata, | |
| ..self | |
| } |
And same below
parquet/src/arrow/arrow_reader.rs
Outdated
| if self.options.selected_rows.is_some() { | ||
| let ranges = &self.options.selected_rows.as_ref().unwrap().clone(); |
There was a problem hiding this comment.
| if self.options.selected_rows.is_some() { | |
| let ranges = &self.options.selected_rows.as_ref().unwrap().clone(); | |
| if let Some(ranges) = self.options.selected_rows.as_ref() |
parquet/src/arrow/arrow_reader.rs
Outdated
| batch_size: usize, | ||
| ) -> Result<Self::RecordReader>; | ||
|
|
||
| fn get_record_reader_by_columns_and_row_ranges( |
There was a problem hiding this comment.
Do we need this, or is the ArrowReaderOptions sufficient?
| row_groups: Vec<RowGroupMetaData>, | ||
| page_indexes: Option<Vec<Index>>, | ||
| offset_indexes: Option<Vec<Vec<PageLocation>>>, | ||
| page_indexes: Option<Vec<Vec<Index>>>, |
There was a problem hiding this comment.
| page_indexes: Option<Vec<Vec<Index>>>, | |
| /// Page index for all pages in each column chunk | |
| page_indexes: Option<Vec<Vec<Index>>>, |
Or something like that, same for the below
parquet/src/file/reader.rs
Outdated
|
|
||
| /// get a serially readable slice of the current reader | ||
| /// This should fail if the slice exceeds the current bounds | ||
| fn get_multi_range_read( |
There was a problem hiding this comment.
As discussed on #1955 I'm not a fan of this, I would much rather the page reader reads pages, than skipping byte ranges behind its back.
It also changes the semantics of how a column chunk is read, as it now buffers in memory an extra time
| for (start, length) in start_list.into_iter().zip(length_list.into_iter()) { | ||
| combine_vec.extend(self.slice(start..start + length).to_vec()); | ||
| } | ||
| let reader = Bytes::copy_from_slice(combine_vec.as_slice()).reader(); |
There was a problem hiding this comment.
This adds an additional copy of all the page bytes, which is definitely not ideal...
| // read from parquet file which before the footer. | ||
| offset_index: Vec<PageLocation>, | ||
|
|
||
| // use to keep needed page index. |
There was a problem hiding this comment.
We pass all PageLocation and RowRanges in this struct then do the filter logic.
if we have 5 pages, in try_new, we filter 2 pages and keep these 3 pages index_numbers in this index_map for final calculate_offset_range.
Thanks @tustvold, your are right. Maybe I made the title confusing😭. as you mentioned in [#1791 (review)]. (#1791 (review)):
This pr is only about the
As above, need pass the |
|
I think small incremental PRs is a good approach. However, I have concerns with this specific PR:
I wonder if a plan of attack akin to the following might work:
Currently it feels like we're adding the high-level functionality before the necessary lower level functionality exists, and this means low-level details, like the page delimiting, leak out of the high-level APIs. Edit: I'll try and stub out some APIs for what I mean over the next couple of days. This will also help me validate my mental model checks out 😅 |
Got it, i will delete the tomorrow i will start with
After read the code, i think if we want to skip page in I got worried about where should i pass the column index info. |
Give me a day or so and I'll get a PR up with some stuff stubbed out, I think this exercise will help us both 😄 |
Sure ! this really need some time! 💪 |
|
Marking as a draft, as I think the approach in #1998 is what we will take forward |
|
@Ted-Jiang Can this be closed now? |
Which issue does this PR close?
Closes #1976.
Rationale for this change
Part support #1792
if we want to use page index get row ranges , first use
SerializedFileReaderget pageIndex info, then use this index getrow_ranges like below:
Finally we can pass the
row_rangesto new API to read parquet file(datafusion use this way but withoutrow_ranges)What changes are included in this PR?
One example: if we read col1, col2 and apply filter get the result we need read
row_ranges[20, 80],For col1:
we need all data from page1, page2, page3.
For col2:
after this PR, we will filter page2 and keep page0, page1
as for page1: need all data
as for page0: we need part of its row_range(need row align TODO)
Are there any user-facing changes?