Push ChunkReader into SerializedPageReader (#2463)#2464
Push ChunkReader into SerializedPageReader (#2463)#2464tustvold merged 1 commit intoapache:masterfrom
ChunkReader into SerializedPageReader (#2463)#2464Conversation
| let row_count = calculate_row_count( | ||
| offset_index, | ||
| *seen_num_data_pages, | ||
| self.total_num_values, |
There was a problem hiding this comment.
This was actually previously incorrect, it was using total_num_values, instead of total_row_count. We don't currently support skipping with lists, #2122 but this would cause issues if we did
| /// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read | ||
| fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> { | ||
| /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read | ||
| struct TrackedRead<R> { |
There was a problem hiding this comment.
This is kind of ugly, but the length of the header isn't stored anywhere
| Ok(()) | ||
| } | ||
| match &mut self.state { | ||
| SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) }, |
There was a problem hiding this comment.
It should now be possible to read the header, and then skip by just incrementing the offset @Ted-Jiang
| .build() | ||
| .unwrap(); | ||
|
|
||
| let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![])); |
There was a problem hiding this comment.
This is somewhat cumbersome, but I opted to take ColumnChunkMetadata in the constructor as it is more future-proof, not to mention slightly more obvious what the values are. In particular it avoids ambiguity over whether values includes nulls (it does).
|
|
||
| let page_len = front.compressed_page_size as usize; | ||
|
|
||
| // TODO: Add ChunkReader get_bytes to potentially avoid copy |
There was a problem hiding this comment.
This is what will allow removing InMemoryRowGroupReader and just using SerializedPageReader. I intend to do this as a follow up
There was a problem hiding this comment.
You mean InMemoryColumnChunkReader?
If the idea that we make InMemoryColumnChunk impl ChunkReader?
There was a problem hiding this comment.
I think we could just replace InMemoryColumnChunk with SerializedPageReader<Bytes>
There was a problem hiding this comment.
Cool, I was working on #2426. If you want I can rebase onto this branch and do that as part of that ticket
There was a problem hiding this comment.
That would be fantastic if you could 👍
alamb
left a comment
There was a problem hiding this comment.
Looks reasonable to me but I have not been down in this code for a while. I would defer to @thinkharderdev @Ted-Jiang @nevi-me or @sunchao
Thanks for pushing this along @tustvold 👍
| 7, | ||
| Compression::UNCOMPRESSED, | ||
| Type::INT32, | ||
| Arc::new(Bytes::from(buf)), |
There was a problem hiding this comment.
I thought Bytes were already ref counted -- is there any need to wrap this in an additional Arc?
There was a problem hiding this comment.
Because the signature needs Arc<T: ChunkReader> because ChunkReader doesn't impl Clone
| // The file source buffer which references exactly the bytes for the column trunk | ||
| // to be read by this page reader. | ||
| buf: SerializedPages<T>, | ||
| pub struct SerializedPageReader<R: ChunkReader> { |
There was a problem hiding this comment.
This is a non trivial change, right? To make this different than std::io::Read? But on the other hand the SerializedFileReader isn't changed -- https://docs.rs/parquet/20.0.0/parquet/file/index.html#example-of-reading-an-existing-file
Maybe to ease the transition we can add some docstring example showing how to create a ChunkReader from a std::io::Read? Or maybe it doesn't matter.
I think we should be sensitive and over communicate a change like this in the SerializedPageReader.
@zeevm do you have any comments on this change (especially in regards to your comments on #2394 (comment))?
There was a problem hiding this comment.
This is a non trivial change, right?
It brings SerializedPageReader into line with all the other readers, so I'm not really sure I agree that this is a major change. I suspect almost all users are using RowGroupReader::get_column_page_reader and not calling this constructor. Tbh I'm not entirely sure why this method is even public... Perhaps I should take the opportunity to make it crate private as part of slowly reducing the amount of implementation detail that leaks out of the crate?
There was a problem hiding this comment.
Perhaps I should take the opportunity to make it crate private as part of slowly reducing the amount of implementation detail that leaks out of the crate?
github codesearch https://cs.github.com/?q=SerializedPageReader%20language%3ARust&scopeName=All%20repos&scope= seems to suggest that most of the uses of this structure are forks of the arrow-rs codebase in various states of divergence.
Not that github codesearch would find all the possible issues, but it is a good sanity check that this isn't widel used
| Ok(()) | ||
| } | ||
| match &mut self.state { | ||
| SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) }, |
| Ok(result) | ||
| } | ||
| let decompressor = create_codec(meta.compression())?; | ||
| let (start, len) = meta.byte_range(); |
There was a problem hiding this comment.
i find
/// Returns the offset and length in bytes of the column chunk within the file
pub fn byte_range(&self) -> (u64, u64) {
let col_start = match self.dictionary_page_offset() {
Some(dictionary_page_offset) => dictionary_page_offset,
None => self.data_page_offset(),
};
let col_len = self.compressed_size();
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
}It may return the dictionary_page_start, which is not right in checking wether is dictPage
There was a problem hiding this comment.
That is intentional, we want it to return the dictionary_page_start if there is one as we use this to construct the dictionary page location.
There was a problem hiding this comment.
Oh! forgot there is no dict_offset in locations 😂
| Ok(result) | ||
| } | ||
| let decompressor = create_codec(meta.compression())?; | ||
| let (start, len) = meta.byte_range(); |
There was a problem hiding this comment.
This is not correct, as it will potentially miss the dictionary page. I originally tried to use the presence of the dictionary_page_offset to infer the existence of a dictionary page, but this isn't consistently set. In particular "alltypes_tiny_pages.parquet" does not set the dictionary page offset, and yet the first page is a dictionary page
|
Benchmark runs are scheduled for baseline = ecc6210 and contender = 63ab69e. 63ab69e is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
ChunkReader into SerializedPageReader (#2463)
Which issue does this PR close?
Closes #2463
Rationale for this change
This pushes
ChunkReaderintoSerializedPageReader. This not only simplifies the code, but will allow removing theInMemoryColumnChunkReaderinasync_reader.rsand implementing page skipping the absence of a PageIndex #2460What changes are included in this PR?
Are there any user-facing changes?
This changes the interface of
SerializedPageReader, which is public, albeit a very low-level API