GH-39392: [C++][Parquet] Support page pruning#39393
GH-39392: [C++][Parquet] Support page pruning#39393huberylee wants to merge 3 commits intoapache:mainfrom
Conversation
|
|
|
@mapleFU Would you mind take a look? This merge request fully implements what I commented in document https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM |
|
This patch is extremly huge...At first glance it satisfy the interface, but I need time to reviewing it |
| /// returned by the Read interface is processed completely before calling the | ||
| /// Read interface again. Otherwise, fatal errors may occur due to data in the | ||
| /// Buffer being overwritten. | ||
| class ARROW_EXPORT ChunkBufferedInputStream : public InputStream { |
There was a problem hiding this comment.
What's the exactly difference between this and CachedFileInputStream? Can we just implement this on it?
There was a problem hiding this comment.
Do you mean ReadRangeCache or BufferedInputStream ?
There was a problem hiding this comment.
Yeah I mean ReadRangeCache. IMO RangeCache would be benifit from existing code and Wait interface for async read
There was a problem hiding this comment.
If we use ReadRangeCache, we need to differentiate between scenarios where data is read with RowRange and scenarios where data is read without RowRange.
There was a problem hiding this comment.
IMO, this seems to replace the ::arrow::io::BufferedInputStream. there're two possible cases:
pre_buffer = True. So, this re-encapsulate a io-coalapse above::arrow::io::BufferedInputStreampre_buffer = False, it just do single col io
There was a problem hiding this comment.
Emm in current code, is buffered_stream_enabled_ and pre_buffer related? Maybe I forgot this 🤔
There was a problem hiding this comment.
Emm in current code, is
buffered_stream_enabled_andpre_bufferrelated? Maybe I forgot this 🤔
Sort of... iirc the properties_.GetStream() won't be invoked for prebuffered column chunks.
There was a problem hiding this comment.
Oh I forgot this:
if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr &&
::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), i)) {
// PARQUET-1698: if read coalescing is enabled, read from pre-buffered
// segments.
PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
stream = std::make_shared<::arrow::io::BufferReader>(buffer);
} else {
stream = properties_.GetStream(source_, col_range.offset, col_range.length);
}Thanks for point out!
There was a problem hiding this comment.
When
pre_buffer = true,ReadRangeCachewill be used to perform IO coalescence between columns, only whenpre_buffer = falseandbuffered_stream_enabled_ = true,BufferedInputStreamwill be used to do single column IO.ChunkBufferedInputStreamdoes the same thing withBufferedInputStream, while also providing simple IO merging capabilities, and shielding the upper layer from the discontinuity of IO.
Do you have a plan to implement prebuffer feature when ReadRanges are provided? It seems that we are still uncapable of doing this, which I think it is worth doing.
There was a problem hiding this comment.
When
pre_buffer = true,ReadRangeCachewill be used to perform IO coalescence between columns, only whenpre_buffer = falseandbuffered_stream_enabled_ = true,BufferedInputStreamwill be used to do single column IO.ChunkBufferedInputStreamdoes the same thing withBufferedInputStream, while also providing simple IO merging capabilities, and shielding the upper layer from the discontinuity of IO.Do you have a plan to implement
prebufferfeature when ReadRanges are provided? It seems that we are still uncapable of doing this, which I think it is worth doing.
ReadRangeCache seems to meet the capabilities you mentioned, right?
| return Status::OK(); | ||
| } | ||
|
|
||
| read_gaps_.clear(); |
There was a problem hiding this comment.
It will be triggered every time before actual IO occurs.
There was a problem hiding this comment.
Oh I got this, it maintaining a current buffer and do range read when NextPage is called...and Peak / Read will be triggered in this case
| // set byte_width to the length of "fixed": 10 | ||
| // todo: find a way to generate test data with more diversity. | ||
| BuilderType builder(::arrow::fixed_size_binary(5)); | ||
| const int byte_width = 10; |
There was a problem hiding this comment.
Hmm whats the purpose here?
Just to be consistent with NullableArray for fixed_size_binary.
| // The num of rows to skip before reading each row range | ||
| std::vector<int64_t> skip_row_nums_; | ||
|
|
||
| // The last row index for echo row range, start counting from within the page |
There was a problem hiding this comment.
What does "echo row range" mean, is it "each row range"?
There was a problem hiding this comment.
What does "echo row range" mean, is it "each row range"?
Yes.
| // Skip info for current page | ||
| PageSkipInfo* skip_info_{nullptr}; |
There was a problem hiding this comment.
If the page is v1 without page-index, what would this being?
There was a problem hiding this comment.
If the page is v1 without page-index, what would this being?
In our internal system, this cannot happen. It may indeed happen in this place, but the way to deal with it is very simple, we only need to fallback to the original logical when invoking NextChunk.
There was a problem hiding this comment.
IMO fallback or checking is important. Some cases might likely to read external table or disable column statistics in some wide table
There was a problem hiding this comment.
In current implementation, we only check it in GetColumnPageReader under debug mode. Fallback and more checks will be added later.
| if (read_states != nullptr) { | ||
| PageSkipInfo& skip_info = read_states->NextPageSkipInfo(); | ||
| const int64_t records_to_skip = skip_info.SkipRowNum(); | ||
| const int64_t skipped_records = SkipRecords(records_to_skip); |
There was a problem hiding this comment.
IMO, doing SkipRecords in HasNextInternal would be a bit tricky...
There was a problem hiding this comment.
IMO, doing
SkipRecordsinHasNextInternalwould be a bit tricky...
Maybe we can implement the row skipping logic in ReadRecords.
There was a problem hiding this comment.
do we need to document clearly how (or if?) the data page filter and the page pruning can work with each other??
There was a problem hiding this comment.
Is data_page_filter used in this patch? And how would a page being skipped here?
There was a problem hiding this comment.
the ReadNextPage may use data page filter. Assuming users set both filter and row ranges. Would SkipRecords(records_to_skip) skip wrong records?
There was a problem hiding this comment.
Seems multiple machanism is applied here...
There was a problem hiding this comment.
@jp0317 Should we combine ShouldSkipPage with PageIndex? Our current codebase relies something weird here. 🤔
There was a problem hiding this comment.
do we need to document clearly how (or if?) the data page filter and the page pruning can work with each other??
data_page_filter is used to filter out non-hit pages. In the current implementation, SkipRecords is used to skip unnecessary rows within the hit pages, they do not conflict with each other. Using with data_page_filter is not currently supported, but such scenarios can be supported with very minor modifications to NextPage.
There was a problem hiding this comment.
@jp0317 Should we combine
ShouldSkipPagewith PageIndex? Our current codebase relies something weird here. 🤔
It seemsShouldSkipPage targets page header statistics..iiuc we might want two version of skipping page (rather than combining into one): one based on PageIndex before actually IO on that page, one based on page header after IO is done.
I think we'd need more clearer docs on those skip-related apis as currently it seems they are supposed to be used in a certain way but not stated in the doc..
| return bytes_for_values; | ||
| } | ||
|
|
||
| // Two parts different from original HasNextInternal: |
There was a problem hiding this comment.
What would a SkipRecords(k) do? During SkipRecords, HasNext might also being called, would a hidden HasNextInternal() skip unexpected rows?
There was a problem hiding this comment.
- Within a page, the hit rows may not be contiguous. For a page that has already been hit, any non-hit rows within it need to be skipped using
SkipRecords. It may be clearer to look at this logic in conjunction with code commentscolumn_reader.h:164~173:
//
// | <--------------------------- column chunk -------------------------------> |
// | <-------------------- page N -----------------------> |
// first_row_idx last_row_idx
// |-- ... --|-------------------------------------------------------|--- ... ---|
// |---- range0 ----| |---- range1 ----|
// |--skip0--| |--skip1--|
// |------last_row_index0-----|
// |-------------------last_row_index1-------------------|
//- During the execution of the
SkipRecordsmethod, it does indeed call theHasNextInternalmethod. The current implementation and testing have covered such scenarios, ensuring that no errors occur.
There was a problem hiding this comment.
Yeah, I mean it limits the usage of TypedColumnReader, and only allow internal skip. External skip would introduce inconsistent skipping. In current-case, Skip(skip_last) would skip more than skip_last.
There was a problem hiding this comment.
Oh I get to understand this. SkipRecords() now means "trying to skip the existing records and skip any remaing data"...
There was a problem hiding this comment.
Yeah, I mean it limits the usage of
TypedColumnReader, and only allow internal skip. External skip would introduce inconsistent skipping. In current-case,Skip(skip_last)would skip more thanskip_last.
First of all, it is strange to execute SkipRecords on the basis of hit lines. Secondly, whether it is consistent depends on how to understand the semantics of SkipRecords. If some lines are skipped on the basis of hit lines, the current implementation can theoretically guarantee consistency, but more tests need to be added for verification; If SkipRecords is for all rows in page, then the existing implementation will indeed have problems.
There was a problem hiding this comment.
First of all, it is strange to execute
SkipRecordson the basis of hit lines.
I think if Parquet user is using a dense read with selection vector, or doing something like incremental filtering, it's possible to do this. Since SkipRecords is exported, and it's used when skipping the records within page, I think the interface is a bit weird🤔
There was a problem hiding this comment.
First of all, it is strange to execute
SkipRecordson the basis of hit lines.I think if Parquet user is using a dense read with selection vector, or doing something like incremental filtering, it's possible to do this. Since
SkipRecordsis exported, and it's used when skipping the records within page, I think the interface is a bit weird🤔
I get your point, maybe this is just a code implementation problem, we can perform row skipping operations in ReadRecords. What do you think?
|
I agree with @mapleFU that this PR is very large. I'd recommend at least breaking up the components added to Arrow from the parquet changes (and possibly splitting out the metrics class as these seem like core orthogonal concepts that should likely get a closer revierw). |
There was a problem hiding this comment.
this probably belongs in util?
There was a problem hiding this comment.
it also doesn't seem like it has tests associated with it?
There was a problem hiding this comment.
it also doesn't seem like it has tests associated with it?
These stats are not relevant to this MR and I will remove it.
| return ss.str(); | ||
| } | ||
|
|
||
| static constexpr Range EMPTY_RANGE{std::numeric_limits<int64_t>::max(), |
There was a problem hiding this comment.
shouldn't this be something like {0, 0}, as it stands now it is an invalid range? Also kEmptyRange I think is preferred in new code
There was a problem hiding this comment.
shouldn't this be something like {0, 0}, as it stands now it is an invalid range
EMPTY_RANGE represents InvalidRange or EmptyRange in different scenes. {0, 0} means hit the first row, so we can't use it to mean empty range. kEmptyRange is more appropriate, I will modify it later.
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #pragma once |
There was a problem hiding this comment.
overall this doesn't appear to be updated to reflect the interfaces discussed in https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/edit ?
There was a problem hiding this comment.
@huberylee Do you mind taking a look at this patch: #38867? It does similar feature but use a different approach discussed in the doc without optimizing I/O and would be good to consolidate the effort with yours.
There was a problem hiding this comment.
overall this doesn't appear to be updated to reflect the interfaces discussed in https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/edit ?
I have already started working on it during the documentation discussion, so the implementation in this MR will be a little different from the documentation.
There was a problem hiding this comment.
@huberylee Do you mind taking a look at this patch: #38867? It does similar feature but use a different approach discussed in the doc without optimizing I/O and would be good to consolidate the effort with yours.
I'm pleasure to do it. I will look at the patch: #38867 later.
|
I commented on specific sections but looking over the PR it seems quite different then what was discussed in https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/edit I'm not sure I understand what is the cause of the discrepencies? |
|
Sorry for chiming in not sooner. I agree that it would be good to break down this PR into smaller ones. Otherwise, it would be challenging to get properly reviewed. To provide some context, this doc was originally inspired by offline discussion from @mapleFU @baibaichen @binmahone and me and further reviewed by @emkornfield @fatemehp. We have somewhat reached a consensus on the API and need more discussion on concrete PRs to refine the implementation. @huberylee I have seen your detailed comment and replied to some of them in that doc. It would be good to go through the doc before heading to the implementation. Otherwise it might waste time of you and people involved in the original discussion. I will take a look at this PR over the weekend and see if we can be on the same page quickly. |
wgtmac
left a comment
There was a problem hiding this comment.
I haven't looked at the change on the reader side yet, will take another pass this weekend.
| /// distance between them is less than io_merge_threshold, and the actual size | ||
| /// in one io request will be limited by the buffer_size. | ||
| /// | ||
| /// \attention It is important to note that since the data returned by the Read |
There was a problem hiding this comment.
This breaks the contract of the InputStream interface.
|
|
||
| // At the beginning of a new read range and required bytes is more than | ||
| // read range length, we think it's ok because there are some tentative | ||
| // read requests when getting next page data. |
There was a problem hiding this comment.
Here is for checking the record boundary of a repeated column?
There was a problem hiding this comment.
Here is for checking the record boundary of a repeated column?
No. In SerializedPageReader::NextPage, it try to read 16KB, 32KB, 64KB...., util the read data container whole page header. In an actual scenario, the 16KB read in the first attempt may have exceeded the size of the entire Page.
| return Status::OK(); | ||
| } | ||
|
|
||
| if (read_ranges_idx_ != read_ranges_.size()) { |
There was a problem hiding this comment.
Why not checking this before line 230?
There was a problem hiding this comment.
Why not checking this before line 230?
We can not check this before line 230 for there may be more ranges to read.
| /// returned by the Read interface is processed completely before calling the | ||
| /// Read interface again. Otherwise, fatal errors may occur due to data in the | ||
| /// Buffer being overwritten. | ||
| class ARROW_EXPORT ChunkBufferedInputStream : public InputStream { |
There was a problem hiding this comment.
When
pre_buffer = true,ReadRangeCachewill be used to perform IO coalescence between columns, only whenpre_buffer = falseandbuffered_stream_enabled_ = true,BufferedInputStreamwill be used to do single column IO.ChunkBufferedInputStreamdoes the same thing withBufferedInputStream, while also providing simple IO merging capabilities, and shielding the upper layer from the discontinuity of IO.
Do you have a plan to implement prebuffer feature when ReadRanges are provided? It seems that we are still uncapable of doing this, which I think it is worth doing.
| namespace arrow::io { | ||
|
|
||
| struct IoMetrics { | ||
| Metric io_time; |
There was a problem hiding this comment.
We may need extra metrics like number of actual I/Os and others to effectiveness evaluation.
| CheckReadPosition(start, num_bytes, source); | ||
|
|
||
| // Prefetch data | ||
| PARQUET_THROW_NOT_OK(source->WillNeed(*read_ranges)); |
There was a problem hiding this comment.
What will happen if the read behavior of source->WillNeed(*read_ranges) is different from the IO merging process in the ChunkBufferedInputStream?
| std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> source, | ||
| int64_t start, int64_t num_bytes); | ||
| int64_t start, int64_t num_bytes, | ||
| const ReadRanges* read_ranges = NULLPTR); |
There was a problem hiding this comment.
This function now returns two different streams depending on the availability of the read_ranges input. It would be good to add some comment to make it clear.
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #pragma once |
There was a problem hiding this comment.
@huberylee Do you mind taking a look at this patch: #38867? It does similar feature but use a different approach discussed in the doc without optimizing I/O and would be good to consolidate the effort with yours.
| virtual ::arrow::Status GetRecordBatchReader( | ||
| const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, | ||
| const std::optional<std::unordered_map<int, RowRanges>>& row_ranges, | ||
| std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; |
There was a problem hiding this comment.
It looks duplicate to provide row_group_indices and row_ranges at the same time.
| ::arrow::Status GetRecordBatchReader(const std::vector<int>& row_group_indices, | ||
| std::shared_ptr<::arrow::RecordBatchReader>* out); | ||
| ::arrow::Status GetRecordBatchReader(std::shared_ptr<::arrow::RecordBatchReader>* out); | ||
| ::arrow::Status GetRecordBatchReader( |
| RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); | ||
|
|
||
| if (reader_properties_.pre_buffer()) { | ||
| // When row_ranges has value, only the data of hit pages should be load, |
There was a problem hiding this comment.
If the IO coalescing process is predictable, we can still enable prebuffer here with some code change. If we need to implement this, some logic of ChunkBufferedInputStream should be refactored out to be shared by prebuffer.
There was a problem hiding this comment.
If the IO coalescing process is predictable, we can still enable prebuffer here with some code change. If we need to implement this, some logic of ChunkBufferedInputStream should be refactored out to be shared by prebuffer.
Yes. In the current implementation, the use of prebuffer is avoided regardless of whether pre_buffer is enabled or not.
| }; | ||
| } | ||
|
|
||
| FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups, |
There was a problem hiding this comment.
I'd suggest to use interface below:
FileColumnIteratorFactory SomeRowGroupsFactory(std::map<int, RowRanges> ranges);
In this way, GetFieldReader and GetFieldReaders can be changed without adding extra optional parameter.
// RowGroups can be either std::vector<int> or std::map<int, RowRanges>
template <typename RowGroups>
Status GetFieldReader(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const RowGroups& row_groups,
std::unique_ptr<ColumnReaderImpl>* out)
template <typename RowGroups>
Status GetFieldReaders(const std::vector<int>& column_indices,
const RowGroups& row_groups,
std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
std::shared_ptr<::arrow::Schema>* out_schema)
There was a problem hiding this comment.
I'd suggest to use interface below:
FileColumnIteratorFactory SomeRowGroupsFactory(std::map<int, RowRanges> ranges);In this way,
GetFieldReaderandGetFieldReaderscan be changed without adding extra optional parameter.// RowGroups can be either std::vector<int> or std::map<int, RowRanges> template <typename RowGroups> Status GetFieldReader(int i, const std::shared_ptr<std::unordered_set<int>>& included_leaves, const RowGroups& row_groups, std::unique_ptr<ColumnReaderImpl>* out) template <typename RowGroups> Status GetFieldReaders(const std::vector<int>& column_indices, const RowGroups& row_groups, std::vector<std::shared_ptr<ColumnReaderImpl>>* out, std::shared_ptr<::arrow::Schema>* out_schema)
If the interface requires all row groups to provide ranges information, we can do this change.
| row_groups_(row_groups.begin(), row_groups.end()) {} | ||
|
|
||
| explicit FileColumnIterator(int column_index, ParquetFileReader* reader, | ||
| std::vector<int> row_groups, RowRangesOpt row_ranges) |
There was a problem hiding this comment.
Consider to combine row_groups and row_ranges.
There was a problem hiding this comment.
Consider to combine row_groups and row_ranges.
Split row_groups and row_ranges into two different args is meaningful, for some row groups only hit some rows, and some row groups hit all rows in row group.
| auto row_ranges_map = row_ranges_.value(); | ||
| auto it = row_ranges_map.find(row_group_ordinal); | ||
| if (it != row_ranges_map.end()) { | ||
| auto index_reader = reader_->GetPageIndexReader()->RowGroup(row_group_ordinal); |
There was a problem hiding this comment.
Error handling is required here when index_reader is not available.
There was a problem hiding this comment.
Error handling is required here when index_reader is not available.
Yes, it should go back to original logical when index_reader is not available.
The current implementation in MR is completely different from that in https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM . It provides a new implementation idea. If the community finds it meaningful, let us work together to integrate these two different implementations. |
| } | ||
|
|
||
| if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr && | ||
| ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), i)) { |
There was a problem hiding this comment.
Why do we still have to read all the data here?
|
Thank you for your contribution. Unfortunately, this |
Rationale for this change
FileReader supports reading data based on specified RowRanges to provide the most fundamental Filter pushdown capability to various upper-level computing engines. More details please refer to #39392.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?