Stub out Skip Records API (#1792)#1998
Conversation
c413686 to
0071931
Compare
Codecov Report
@@ Coverage Diff @@
## master #1998 +/- ##
==========================================
- Coverage 83.58% 83.42% -0.16%
==========================================
Files 222 222
Lines 57522 57906 +384
==========================================
+ Hits 48078 48309 +231
- Misses 9444 9597 +153
Continue to review full report at Codecov.
|
|
cool! 👍 @tustvold Are you the Flash 😄! i will try to go through this and give your my opinion today. |
parquet/src/arrow/arrow_reader.rs
Outdated
There was a problem hiding this comment.
Could we add total_row_count to check this selection is valid(maybe like continuous)
There was a problem hiding this comment.
Is it actually an issue if it isn't, e.g. if I only want the first 100 rows?
There was a problem hiding this comment.
yes, got it, it should check in user side.
parquet/src/column/page.rs
Outdated
There was a problem hiding this comment.
👍 really need this abstraction!
parquet/src/arrow/arrow_reader.rs
Outdated
There was a problem hiding this comment.
👍 pass mask here not each col is more reasonable 😂
parquet/src/column/page.rs
Outdated
There was a problem hiding this comment.
Is there we only need offset index, without the min max index?🤔
There was a problem hiding this comment.
Is this for the situation a page which has been read_records but left some unreaded buffer?
There was a problem hiding this comment.
Sorry, i don't get this point, why not directly call column_reader.skip_records(num_records)
could you give me some hint?
There was a problem hiding this comment.
RecordReader is a bit of an odd cookie, let me try to explain what it is doing.
In the absence of repetition levels, it can simply read batch size levels, and the corresponding number of values.
However, if repetition levels are present, it will likely need to read more than batch_size levels in order to read batch_size actual records (rows).
To achieve this it reads to its internal buffer and then splits off the data corresponding to batch_size rows, leaving the excess behind.
It is this excess of data that has been read to its buffers but not yielded to the caller yet, which we must consume here
There was a problem hiding this comment.
👍 nice write up ! Save me some time 😄!
So, i got it. More specific details to ask:
This is a part of skip, we need to read the rp ,dp to skip some records in the page(maybe have been readed or never readed ).
let (buffered_records, buffered_values) = self.count_records(num_records);
self.num_records += buffered_records;
self.num_values += buffered_values;
self.consume_def_levels();
self.consume_rep_levels();
self.consume_record_data();
self.consume_bitmap();
self.reset();
let remaining = buffered_records - num_records;
This also part of skip, remaining > 0, I think this we skip start at a new page
if remaining == 0 {
return Ok(buffered_records);
}
let skipped = match self.column_reader.as_mut() {
Some(column_reader) => column_reader.skip_records(remaining)?,
None => 0,
};
There was a problem hiding this comment.
This is a part of skip, we need to read the rp ,dp to skip some records in the page(maybe have been readed or never readed ).
Yes, this is just to consume the data that has been read to the internal buffers of RecordReader if any
This also part of skip, remaining > 0, I think this we skip start at a new page
Not necessarily, the only thing RecordReader needs to handle is skipping any data that has already been read from ColumnReader into its own buffers. It can then delegate to ColumnReader to skip the remaining rows, with no requirement that this is done at a page boundary - ColumnReader must be able to handle any case.
0071931 to
7527750
Compare
7527750 to
7324873
Compare
Co-authored-by: Yang Jiang <[email protected]>
alamb
left a comment
There was a problem hiding this comment.
The API looks good to me -- I had some questions and I think it would be nicer to return NotImplemented errors rather than panic in certain cases but I think this PR could also be merged as is to unblock further dev work
| } | ||
|
|
||
| fn skip_values(&mut self, _num_values: usize) -> Result<usize> { | ||
| todo!() |
There was a problem hiding this comment.
I think adding a ticket reference here like
unimplemented!("See https://github.com/apache/arrow-rs/.....") would help future readers
Bonus points for returning ArrowError::Unimplemented
This comment applies to everything below as well
parquet/src/arrow/arrow_reader.rs
Outdated
|
|
||
| /// Scan rows from the parquet file according to the provided `selection` | ||
| /// | ||
| /// TODO: Make public once row selection fully implemented |
| /// [`RowSelection`] allows selecting or skipping a provided number of rows | ||
| /// when scanning the parquet file | ||
| #[derive(Debug, Clone, Copy)] | ||
| pub(crate) struct RowSelection { |
There was a problem hiding this comment.
You probably already have thought about this, but I would expect that in certain scenarios, non contiguous rows / skips would be desired
Like "fetch the first 100 rows, skip the next 200, and then fetch the remaining"
Would this interface handle that case?
There was a problem hiding this comment.
See with_row_selection which takes a Vec to allow for this use-case
| } | ||
|
|
||
| fn peek_next_page(&self) -> Result<Option<PageMetadata>> { | ||
| todo!() |
There was a problem hiding this comment.
ditto returning "not yet implemented" would probably be nicer
| } | ||
|
|
||
| pub struct DefinitionLevelDecoder { | ||
| pub struct DefinitionLevelBufferDecoder { |
There was a problem hiding this comment.
I this rename a public API change as well? It does not appear in the docs
There was a problem hiding this comment.
No it is crate local
Which issue does this PR close?
Part of #1792
Rationale for this change
Stubs out an API for providing skip records functionality within parquet. I think this will work to support #1792, #1191 and potentially other functionality down the line.
Let me know what you think @Ted-Jiang @sunchao
What changes are included in this PR?
Stubs out APIs for adding row skipping logic to the parquet implementation
Are there any user-facing changes?
No 🎉