Skip to content

Comments

GH-38865 [C++][Parquet] support passing a RowRange to RecordBatchReader #38867

Closed
binmahone wants to merge 25 commits intoapache:maint-13.0.0from
binmahone:mhb_page_pruning
Closed

GH-38865 [C++][Parquet] support passing a RowRange to RecordBatchReader #38867
binmahone wants to merge 25 commits intoapache:maint-13.0.0from
binmahone:mhb_page_pruning

Conversation

@binmahone
Copy link

@binmahone binmahone commented Nov 23, 2023

!!! this PR is based on arrow-13 for early review

Rationale for this change

see #38865

What changes are included in this PR?

add a parameter in GetRecordBatchReader to accept row_ranges.
The row ranges can be used to:

  1. skip decompressing and decoding unnecessary pages. This part is done by leveraging an existing hook called DataPageFilter
  2. Now only necessary pages are remaining. In Parquet, row number is not aligned across different columns' pages, so the remaining rows for different columns are different. We introduced a new class called RecordSkipper to i. coordinate all Column Readers' progress so that they can still resemble row records correctly ii. skip unnecessary rows in necessary pages

Are these changes tested?

a new test file callled range_reader_test.cc is added

Are there any user-facing changes?

a new GetRecordBatchReader API overload is added. NO existing API is broken

@binmahone binmahone requested a review from wgtmac as a code owner November 23, 2023 13:22
@github-actions github-actions bot added the awaiting review Awaiting review label Nov 23, 2023
@github-actions
Copy link

⚠️ GitHub issue #38865 has been automatically assigned in GitHub to PR creator.

Copy link
Member

@mapleFU mapleFU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RowRanges API LGTM. I think we can separate a patch for RowRanges

The underlying implement logic is ok, but I need to review a pass for it. Also, Seems that this patch is merged into 13.0.0...


virtual ::arrow::Status GetRecordBatchReader(
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
const std::shared_ptr<std::map<int, RowRangesPtr>>& row_ranges_map,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, does RowRanges index means row-idx in this RowGroup or the row-idx in this file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row-idx in this RowGroup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document this in the method contract. The way this is being passed (as a shared pointer and values being pointer seems a little strange to me) is there a reason how this contract was arrived at (I need to take a closer look at how Range was arrived at).

I'm not sure if keeping consistency for a brand new method is worth it or if this should return arrow::Result<std::unique_ptr> at some point we should convert all the methods to return result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its also not clear to me that we should include both RowGroupIndices and ranges within those indicies. It seems like a cleaner API to only include one or the other (which also makes passing in ranges easier?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to File based

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Nov 23, 2023
RecordSkipper(RowRanges& pages, RowRanges& row_ranges_)
: row_ranges(row_ranges_) { // copy row_ranges
RowRanges will_process_pages, skip_pages;
for (auto& page : pages.getRanges()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires records not to exceed page boundary. So we may need to check when RecordSkipper cannot be used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this case happen?

}
auto offset_index = rg_pg_index_reader->GetOffsetIndex(input_->column_index());

if (!offset_index) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this too strict? Sometimes not all columns have populated offset index.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it requires more code to proper handle the case in which page index is missing. Exception on missing seems to be a simple&easy solution for now

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Nov 27, 2023
int32_t* dict_len) = 0;
};

struct Range {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document the struct.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

};

struct Range {
static Range unionRange(const Range& left, const Range& right) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, the Google style guide is used this means typically method names start with an upper-case character.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

};

struct Range {
static Range unionRange(const Range& left, const Range& right) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please provide documentation on what this method is intended to do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is deleted. and I have added some comments to critical classes & methods


struct Range {
static Range unionRange(const Range& left, const Range& right) {
if (left.from <= right.from) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general too much code is inlined in these classes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this before merging PR

/// if return values is positive, it means to read N records
/// if return values is negative, it means to skip N records
/// if return values is 0, it means end of RG
int64_t advise_next(const int64_t current_rg_procesed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not claear to me what the input parameter is here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total rows that current RG has processed

};

class RowRanges {
std::vector<Range> ranges;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please follow class layout conventions of putting private members at the end.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
};

class RowRanges {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a relatively heavy-weight represenation if most ranges aren't runs. did you consider other alternatives?

CC @fatemehp

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a bitmap interface in RowRanges. However it is not implemented yet.

return result;
}

RowRanges slice(const int64_t from, const int64_t to) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please document intended semantics here. Is this method really necessary? why wouldn't it consider partial ranges?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is deleted


class PARQUET_EXPORT RecordSkipper {
public:
RecordSkipper(RowRanges& pages, RowRanges& row_ranges_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second variable looks off

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate on this? I don't quite get it.

@binmahone
Copy link
Author

I think it might be worth having more discussion on the issue with representation of RowRanges and on the newly exposed API on the issue to make sure we come to a consensus

The latest version follows the design in https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/edit

@binmahone
Copy link
Author

hi @emkornfield , I've just pushed my refinements to this PR and will go through a review with @wgtmac . It may save you your time if you wait until @wgtmac has approved it

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did another pass, mainly on the API and style. Please directly open a PR on the main branch. Thanks!

std::shared_ptr<::arrow::RecordBatchReader>* out);
::arrow::Status GetRecordBatchReader(std::shared_ptr<::arrow::RecordBatchReader>* out);


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the blank line.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

struct End {};

// Represent a set of ranges to read. The ranges are sorted and non-overlapping.
class RowRanges {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to relocate the RowRanges related classes to a separate source file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// virtual std::unique_ptr<Iterator> NewIterator() = 0;

private:
std::vector<IntervalRange> ranges;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, RowRanges class should hide the implemetation detail and the private member std::vector<IntervalRange> ranges should not appear here. If we don't want to pay the cost for virtual class, we can use PIMPL idiom by following example in parquet/metadata.h

};

// Represent a range to read. The range is inclusive on both ends.
struct IntervalRange {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we simply define the IntervalRange class as below and move any functions to an individual utility class?

struct IntervalRange {
  // inclusive
  int64_t start;
  // inclusive
  int64_t end;
};

struct IntervalRangeUtils {
  static IntervalRange Intersection(const IntervalRange& rhs,
                                    const IntervalRange& rhs);
  static bool IsBefore(const IntervalRange& lhs, const IntervalRange& rhs);
  ...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the above separation would make the public API RowRanges looks much simpler to external user and those utility functions are what we need only in the implementation.

Comment on lines 578 to 579
size_t row_range_idx = 0;
size_t total_rows_to_process = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
size_t row_range_idx = 0;
size_t total_rows_to_process = 0;
size_t row_range_idx_ = 0;
size_t total_rows_to_process_ = 0;

This is the style convection for class member variables.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got. The convention seems not enforced everywhere, for example: ReaderContext::reader

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change all member variables in my PR

bool operator()(const DataPageStats& stats) {
++page_range_idx;

IntervalRange current_page_range = (*page_ranges)[page_range_idx];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we find some way to not directly assume all ranges are IntervalRange? Probably we can use the iterator of RowRanges and throw when a BitmapRange is obtained for now. Otherwise it would be difficult for future refactoring.

// Column reader implementations

struct RowRangesPageFilter {
explicit RowRangesPageFilter(const RowRanges& row_ranges_,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove underscore from input parameters.

// ----------------------------------------------------------------------
// Column reader implementations

struct RowRangesPageFilter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comment to explain what it does.

Comment on lines 603 to 604
void checkAndGetPageRanges(const RowRanges & row_ranges,
std::shared_ptr<RowRanges>& page_ranges) const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void checkAndGetPageRanges(const RowRanges & row_ranges,
std::shared_ptr<RowRanges>& page_ranges) const {
void CheckAndGetPageRanges(const RowRanges& row_ranges,
std::shared_ptr<RowRanges>* page_ranges) const {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why std::shared_ptr& -> std::shared_ptr* ?

if (page_locations.size() >= 1) {
page_ranges->Add(
{page_locations[page_locations.size() - 1].first_row_index,
ctx_->reader->metadata()->RowGroup(input_->current_row_group())->num_rows() -
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cost (RowGroup(input_->current_row_group())) may not be negligible if there are many columns to do this repeatedly.

}
}

size_t Count() const { return end - start + 1; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IntervalRange({-1, -1}).Count() == 1 ? Would this be weird?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a check of validness

Comment on lines 363 to 365
RowRanges(const RowRanges& other) { ranges = other.ranges; }

RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= default?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

std::string result = "[";
for (const IntervalRange& range : ranges) {
result +=
"(" + std::to_string(range.start) + ", " + std::to_string(range.end) + "), ";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uses range.ToString()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@github-actions github-actions bot added the Status: stale-warning Issues and PRs flagged as stale which are due to be closed if no indication otherwise label Nov 18, 2025
@thisisnic
Copy link
Member

Thank you for your contribution. Unfortunately, this
pull request has been marked as stale because it has had no activity in the past 365 days. Please remove the stale label
or comment below, or this PR will be closed in 14 days. Feel free to re-open this if it has been closed in error. If you
do not have repository permissions to reopen the PR, please tag a maintainer.

@thisisnic thisisnic closed this Dec 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

awaiting change review Awaiting change review Component: C++ Component: Parquet Status: stale-warning Issues and PRs flagged as stale which are due to be closed if no indication otherwise

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants