Add integration test for scan rows with selection#2158
Add integration test for scan rows with selection#2158tustvold merged 9 commits intoapache:masterfrom
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2158 +/- ##
==========================================
+ Coverage 82.85% 83.06% +0.21%
==========================================
Files 237 237
Lines 61381 61620 +239
==========================================
+ Hits 50856 51186 +330
+ Misses 10525 10434 -91
Help us with your feedback. Take ten seconds to tell us how you rate us. |
| Some(remaining) => { | ||
| selection.push_front(RowSelection::skip(remaining)); | ||
| // if page row count less than batch_size we must set batch size to page row count. | ||
| // add check avoid dead loop |
There was a problem hiding this comment.
Fix wrong logic, remaining record need read
| None => { | ||
| // If we skip records before all read operation | ||
| // we need set `column_reader` by `set_page_reader` | ||
| if let Some(page_reader) = pages.next() { |
There was a problem hiding this comment.
Fix skip before all read operator, need set column_reader
|
@tustvold @thinkharderdev PTAL😊 |
tustvold
left a comment
There was a problem hiding this comment.
Had a brief look, will review in more detail later (flying today)
| // we need set `column_reader` by `set_page_reader` | ||
| if let Some(page_reader) = pages.next() { | ||
| self.set_page_reader(page_reader?)?; | ||
| false |
There was a problem hiding this comment.
This is wrong, as it will now only mark end_of_column when it reaches the end of the file, instead of the end of a column chunk within a row group. This will break record delimiting for repeated fields.
There was a problem hiding this comment.
@tustvold i move it out to
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
if self.record_reader.column_reader().is_none() {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = self.pages.next() {
self.record_reader.set_page_reader(page_reader?)?;
} else {
return Ok(0);
}
}
self.record_reader.skip_records(num_records)
}
I think in this situation , only skip the first page without read any record the column_reader is none. related #2171 if
we create it in colchunk, then we will remove this check.
parquet/src/column/reader.rs
Outdated
| // If page has less rows than the remaining records to | ||
| // be skipped, skip entire page | ||
| if metadata.num_rows < remaining { | ||
| while metadata.num_rows < remaining { |
There was a problem hiding this comment.
Why is this necessary, there is already an outer while loop?
There was a problem hiding this comment.
because first add below
// because self.num_buffered_values == self.num_decoded_values means
// we need reads a new page and set up the decoders for levels
self.read_new_page()?;
if we still use if, we may read needless page header
There was a problem hiding this comment.
This while loop should result in the same behaviour as the previous continue??
There was a problem hiding this comment.
Oh... it's an useless loop
|
Thank you @Ted-Jiang -- the project to add page index and skipping is really coming along very nicely. It is a very nice piece of work. |
| } | ||
|
|
||
| fn skip_records(&mut self, num_records: usize) -> Result<usize> { | ||
| if self.record_reader.column_reader().is_none() { |
There was a problem hiding this comment.
This now behaves differently from next_batch which will potentially read from multiple column chunks for the same "batch". Can we extract this logic into a free function, similar to read_records, that performs the same loop?
This would also avoid duplicating this code in every ArrayReader
parquet/src/arrow/arrow_reader.rs
Outdated
| selection.push_front(RowSelection::select(remaining)); | ||
| self.batch_size | ||
| } | ||
| Some(_) => self.batch_size, |
There was a problem hiding this comment.
| Some(_) => self.batch_size, | |
| _ => self.batch_size, |
And remove the None case below. If remaining == 0 then front.row_count == self.batch_size
parquet/src/column/reader.rs
Outdated
| } | ||
| // because self.num_buffered_values == self.num_decoded_values means | ||
| // we need reads a new page and set up the decoders for levels | ||
| self.read_new_page()?; |
There was a problem hiding this comment.
Perhaps we could check the return type of this, and short-circuit if it returns false?
|
Benchmark runs are scheduled for baseline = e96ae8a and contender = d10d962. d10d962 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
@tustvold thanks for your patient review 👍 |
|
FYI I'm working on a follow up PR to address some stuff, e.g. get this integrated into the fuzz tests |
Which issue does this PR close?
Closes #2106 .
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?