GH-38865 [C++][Parquet] support passing a RowRange to RecordBatchReader #38867
GH-38865 [C++][Parquet] support passing a RowRange to RecordBatchReader #38867binmahone wants to merge 25 commits intoapache:maint-13.0.0from
Conversation
|
|
mapleFU
left a comment
There was a problem hiding this comment.
The RowRanges API LGTM. I think we can separate a patch for RowRanges
The underlying implement logic is ok, but I need to review a pass for it. Also, Seems that this patch is merged into 13.0.0...
cpp/src/parquet/arrow/reader.h
Outdated
|
|
||
| virtual ::arrow::Status GetRecordBatchReader( | ||
| const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, | ||
| const std::shared_ptr<std::map<int, RowRangesPtr>>& row_ranges_map, |
There was a problem hiding this comment.
So, does RowRanges index means row-idx in this RowGroup or the row-idx in this file?
There was a problem hiding this comment.
Please document this in the method contract. The way this is being passed (as a shared pointer and values being pointer seems a little strange to me) is there a reason how this contract was arrived at (I need to take a closer look at how Range was arrived at).
I'm not sure if keeping consistency for a brand new method is worth it or if this should return arrow::Result<std::unique_ptr> at some point we should convert all the methods to return result.
There was a problem hiding this comment.
Its also not clear to me that we should include both RowGroupIndices and ranges within those indicies. It seems like a cleaner API to only include one or the other (which also makes passing in ranges easier?)
cpp/src/parquet/column_reader.h
Outdated
| RecordSkipper(RowRanges& pages, RowRanges& row_ranges_) | ||
| : row_ranges(row_ranges_) { // copy row_ranges | ||
| RowRanges will_process_pages, skip_pages; | ||
| for (auto& page : pages.getRanges()) { |
There was a problem hiding this comment.
This requires records not to exceed page boundary. So we may need to check when RecordSkipper cannot be used.
| } | ||
| auto offset_index = rg_pg_index_reader->GetOffsetIndex(input_->column_index()); | ||
|
|
||
| if (!offset_index) { |
There was a problem hiding this comment.
Is this too strict? Sometimes not all columns have populated offset index.
There was a problem hiding this comment.
it requires more code to proper handle the case in which page index is missing. Exception on missing seems to be a simple&easy solution for now
cpp/src/parquet/column_reader.h
Outdated
| int32_t* dict_len) = 0; | ||
| }; | ||
|
|
||
| struct Range { |
There was a problem hiding this comment.
Please document the struct.
cpp/src/parquet/column_reader.h
Outdated
| }; | ||
|
|
||
| struct Range { | ||
| static Range unionRange(const Range& left, const Range& right) { |
There was a problem hiding this comment.
Generally, the Google style guide is used this means typically method names start with an upper-case character.
cpp/src/parquet/column_reader.h
Outdated
| }; | ||
|
|
||
| struct Range { | ||
| static Range unionRange(const Range& left, const Range& right) { |
There was a problem hiding this comment.
please provide documentation on what this method is intended to do.
There was a problem hiding this comment.
this method is deleted. and I have added some comments to critical classes & methods
cpp/src/parquet/column_reader.h
Outdated
|
|
||
| struct Range { | ||
| static Range unionRange(const Range& left, const Range& right) { | ||
| if (left.from <= right.from) { |
There was a problem hiding this comment.
I think in general too much code is inlined in these classes.
There was a problem hiding this comment.
Will do this before merging PR
cpp/src/parquet/column_reader.h
Outdated
| /// if return values is positive, it means to read N records | ||
| /// if return values is negative, it means to skip N records | ||
| /// if return values is 0, it means end of RG | ||
| int64_t advise_next(const int64_t current_rg_procesed) { |
There was a problem hiding this comment.
its not claear to me what the input parameter is here?
There was a problem hiding this comment.
total rows that current RG has processed
cpp/src/parquet/column_reader.h
Outdated
| }; | ||
|
|
||
| class RowRanges { | ||
| std::vector<Range> ranges; |
There was a problem hiding this comment.
please follow class layout conventions of putting private members at the end.
cpp/src/parquet/column_reader.h
Outdated
| } | ||
| }; | ||
|
|
||
| class RowRanges { |
There was a problem hiding this comment.
This seems like a relatively heavy-weight represenation if most ranges aren't runs. did you consider other alternatives?
CC @fatemehp
There was a problem hiding this comment.
added a bitmap interface in RowRanges. However it is not implemented yet.
cpp/src/parquet/column_reader.h
Outdated
| return result; | ||
| } | ||
|
|
||
| RowRanges slice(const int64_t from, const int64_t to) const { |
There was a problem hiding this comment.
please document intended semantics here. Is this method really necessary? why wouldn't it consider partial ranges?
cpp/src/parquet/column_reader.h
Outdated
|
|
||
| class PARQUET_EXPORT RecordSkipper { | ||
| public: | ||
| RecordSkipper(RowRanges& pages, RowRanges& row_ranges_) |
There was a problem hiding this comment.
second variable looks off
There was a problem hiding this comment.
can you elaborate on this? I don't quite get it.
The latest version follows the design in https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/edit |
|
hi @emkornfield , I've just pushed my refinements to this PR and will go through a review with @wgtmac . It may save you your time if you wait until @wgtmac has approved it |
wgtmac
left a comment
There was a problem hiding this comment.
I just did another pass, mainly on the API and style. Please directly open a PR on the main branch. Thanks!
cpp/src/parquet/arrow/reader.h
Outdated
| std::shared_ptr<::arrow::RecordBatchReader>* out); | ||
| ::arrow::Status GetRecordBatchReader(std::shared_ptr<::arrow::RecordBatchReader>* out); | ||
|
|
||
|
|
cpp/src/parquet/column_reader.h
Outdated
| struct End {}; | ||
|
|
||
| // Represent a set of ranges to read. The ranges are sorted and non-overlapping. | ||
| class RowRanges { |
There was a problem hiding this comment.
It would be good to relocate the RowRanges related classes to a separate source file.
cpp/src/parquet/column_reader.h
Outdated
| // virtual std::unique_ptr<Iterator> NewIterator() = 0; | ||
|
|
||
| private: | ||
| std::vector<IntervalRange> ranges; |
There was a problem hiding this comment.
IMHO, RowRanges class should hide the implemetation detail and the private member std::vector<IntervalRange> ranges should not appear here. If we don't want to pay the cost for virtual class, we can use PIMPL idiom by following example in parquet/metadata.h
cpp/src/parquet/column_reader.h
Outdated
| }; | ||
|
|
||
| // Represent a range to read. The range is inclusive on both ends. | ||
| struct IntervalRange { |
There was a problem hiding this comment.
Should we simply define the IntervalRange class as below and move any functions to an individual utility class?
struct IntervalRange {
// inclusive
int64_t start;
// inclusive
int64_t end;
};
struct IntervalRangeUtils {
static IntervalRange Intersection(const IntervalRange& rhs,
const IntervalRange& rhs);
static bool IsBefore(const IntervalRange& lhs, const IntervalRange& rhs);
...
}
There was a problem hiding this comment.
IMO, the above separation would make the public API RowRanges looks much simpler to external user and those utility functions are what we need only in the implementation.
cpp/src/parquet/column_reader.h
Outdated
| size_t row_range_idx = 0; | ||
| size_t total_rows_to_process = 0; |
There was a problem hiding this comment.
| size_t row_range_idx = 0; | |
| size_t total_rows_to_process = 0; | |
| size_t row_range_idx_ = 0; | |
| size_t total_rows_to_process_ = 0; |
This is the style convection for class member variables.
There was a problem hiding this comment.
Got. The convention seems not enforced everywhere, for example: ReaderContext::reader
There was a problem hiding this comment.
I'll change all member variables in my PR
cpp/src/parquet/arrow/reader.cc
Outdated
| bool operator()(const DataPageStats& stats) { | ||
| ++page_range_idx; | ||
|
|
||
| IntervalRange current_page_range = (*page_ranges)[page_range_idx]; |
There was a problem hiding this comment.
Could we find some way to not directly assume all ranges are IntervalRange? Probably we can use the iterator of RowRanges and throw when a BitmapRange is obtained for now. Otherwise it would be difficult for future refactoring.
cpp/src/parquet/arrow/reader.cc
Outdated
| // Column reader implementations | ||
|
|
||
| struct RowRangesPageFilter { | ||
| explicit RowRangesPageFilter(const RowRanges& row_ranges_, |
There was a problem hiding this comment.
Please remove underscore from input parameters.
cpp/src/parquet/arrow/reader.cc
Outdated
| // ---------------------------------------------------------------------- | ||
| // Column reader implementations | ||
|
|
||
| struct RowRangesPageFilter { |
There was a problem hiding this comment.
Please add some comment to explain what it does.
cpp/src/parquet/arrow/reader.cc
Outdated
| void checkAndGetPageRanges(const RowRanges & row_ranges, | ||
| std::shared_ptr<RowRanges>& page_ranges) const { |
There was a problem hiding this comment.
| void checkAndGetPageRanges(const RowRanges & row_ranges, | |
| std::shared_ptr<RowRanges>& page_ranges) const { | |
| void CheckAndGetPageRanges(const RowRanges& row_ranges, | |
| std::shared_ptr<RowRanges>* page_ranges) const { |
There was a problem hiding this comment.
why std::shared_ptr& -> std::shared_ptr* ?
| if (page_locations.size() >= 1) { | ||
| page_ranges->Add( | ||
| {page_locations[page_locations.size() - 1].first_row_index, | ||
| ctx_->reader->metadata()->RowGroup(input_->current_row_group())->num_rows() - |
There was a problem hiding this comment.
This cost (RowGroup(input_->current_row_group())) may not be negligible if there are many columns to do this repeatedly.
cpp/src/parquet/column_reader.h
Outdated
| } | ||
| } | ||
|
|
||
| size_t Count() const { return end - start + 1; } |
There was a problem hiding this comment.
IntervalRange({-1, -1}).Count() == 1 ? Would this be weird?
cpp/src/parquet/column_reader.h
Outdated
| RowRanges(const RowRanges& other) { ranges = other.ranges; } | ||
|
|
||
| RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); } |
cpp/src/parquet/column_reader.h
Outdated
| std::string result = "["; | ||
| for (const IntervalRange& range : ranges) { | ||
| result += | ||
| "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + "), "; |
1fd4bba to
b75abdf
Compare
|
Thank you for your contribution. Unfortunately, this |
!!! this PR is based on arrow-13 for early review
Rationale for this change
see #38865
What changes are included in this PR?
add a parameter in GetRecordBatchReader to accept row_ranges.
The row ranges can be used to:
Are these changes tested?
a new test file callled range_reader_test.cc is added
Are there any user-facing changes?
a new GetRecordBatchReader API overload is added. NO existing API is broken