Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "orc/Common.hh"
#include "orc/orc-config.hh"
#include "orc/Statistics.hh"
#include "orc/sargs/SearchArgument.hh"
#include "orc/Type.hh"
#include "orc/Vector.hh"

Expand Down Expand Up @@ -191,6 +192,11 @@ namespace orc {
*/
RowReaderOptions& setEnableLazyDecoding(bool enable);

/**
* Set search argument for predicate push down
*/
RowReaderOptions& searchArgument(std::unique_ptr<SearchArgument> sargs);

/**
* Should enable encoding block mode
*/
Expand Down Expand Up @@ -245,6 +251,11 @@ namespace orc {
* What scale should all Hive 0.11 decimals be normalized to?
*/
int32_t getForcedScaleOnHive11Decimal() const;

/**
* Get search argument for predicate push down
*/
std::shared_ptr<SearchArgument> getSearchArgument() const;
};


Expand Down
10 changes: 10 additions & 0 deletions c++/src/Options.hh
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ namespace orc {
bool throwOnHive11DecimalOverflow;
int32_t forcedScaleOnHive11Decimal;
bool enableLazyDecoding;
std::shared_ptr<SearchArgument> sargs;

RowReaderOptionsPrivate() {
selection = ColumnSelection_NONE;
Expand Down Expand Up @@ -249,6 +250,15 @@ namespace orc {
privateBits->enableLazyDecoding = enable;
return *this;
}

RowReaderOptions& RowReaderOptions::searchArgument(std::unique_ptr<SearchArgument> sargs) {
privateBits->sargs = std::move(sargs);
return *this;
}

std::shared_ptr<SearchArgument> RowReaderOptions::getSearchArgument() const {
return privateBits->sargs;
}
}

#endif
232 changes: 190 additions & 42 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ namespace orc {
return columnPath.substr(0, columnPath.length() - 1);
}

WriterVersion getWriterVersionImpl(const FileContents * contents) {
if (!contents->postscript->has_writerversion()) {
return WriterVersion_ORIGINAL;
}
return static_cast<WriterVersion>(contents->postscript->writerversion());
}

void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type) {
size_t id = static_cast<size_t>(type.getColumnId());
Expand Down Expand Up @@ -227,6 +233,15 @@ namespace orc {

ColumnSelector column_selector(contents.get());
column_selector.updateSelected(selectedColumns, opts);

// prepare SargsApplier if SearchArgument is available
if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
sargs = opts.getSearchArgument();
sargsApplier.reset(new SargsApplier(*contents->schema,
sargs.get(),
footer->rowindexstride(),
getWriterVersionImpl(_contents.get())));
}
}

CompressionKind RowReaderImpl::getCompression() const {
Expand Down Expand Up @@ -293,33 +308,43 @@ namespace orc {
previousRow = rowNumber;
startNextStripe();

uint64_t rowsToSkip = currentRowInStripe;
// when predicate push down is enabled, above call to startNextStripe()
// will move current row to 1st matching row group; here we only need
// to deal with the case when PPD is not enabled.
if (!sargsApplier) {
uint64_t rowsToSkip = currentRowInStripe;

if (footer->rowindexstride() > 0 &&
currentStripeInfo.indexlength() > 0) {
uint32_t rowGroupId =
static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride());
rowsToSkip -= static_cast<uint64_t>(rowGroupId) * footer->rowindexstride();
if (footer->rowindexstride() > 0 &&
currentStripeInfo.indexlength() > 0) {
if (rowIndexes.empty()) {
loadStripeIndex();
}
uint32_t rowGroupId =
static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride());
rowsToSkip -= static_cast<uint64_t>(rowGroupId) * footer->rowindexstride();

if (rowGroupId != 0) {
seekToRowGroup(rowGroupId);
if (rowGroupId != 0) {
seekToRowGroup(rowGroupId);
}
}
}

reader->skip(rowsToSkip);
reader->skip(rowsToSkip);
}
}

void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
void RowReaderImpl::loadStripeIndex() {
// reset all previous row indexes
rowIndexes.clear();
bloomFilterIndex.clear();

// obtain row indexes for selected columns
uint64_t offset = currentStripeInfo.offset();
for (int i = 0; i < currentStripeFooter.streams_size(); ++i) {
const proto::Stream& pbStream = currentStripeFooter.streams(i);
uint64_t colId = pbStream.column();
if (selectedColumns[colId] && pbStream.has_kind()
&& pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
&& (pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
std::unique_ptr<SeekableInputStream> inStream =
createDecompressor(getCompression(),
std::unique_ptr<SeekableInputStream>
Expand All @@ -331,16 +356,33 @@ namespace orc {
getCompressionSize(),
*contents->pool);

proto::RowIndex rowIndex;
if (!rowIndex.ParseFromZeroCopyStream(inStream.get())) {
throw ParseError("Failed to parse the row index");
if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
proto::RowIndex rowIndex;
if (!rowIndex.ParseFromZeroCopyStream(inStream.get())) {
throw ParseError("Failed to parse the row index");
}
rowIndexes[colId] = rowIndex;
} else { // Stream_Kind_BLOOM_FILTER_UTF8
proto::BloomFilterIndex pbBFIndex;
if (!pbBFIndex.ParseFromZeroCopyStream(inStream.get())) {
throw ParseError("Failed to parse bloom filter index");
}
BloomFilterIndex bfIndex;
for (int j = 0; j < pbBFIndex.bloomfilter_size(); j++) {
bfIndex.entries.push_back(BloomFilterUTF8Utils::deserialize(
pbStream.kind(),
currentStripeFooter.columns(static_cast<int>(pbStream.column())),
pbBFIndex.bloomfilter(j)));
}
// add bloom filters to result for one column
bloomFilterIndex[pbStream.column()] = bfIndex;
}

rowIndexes[colId] = rowIndex;
}
offset += pbStream.length();
}
}

void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
// store positions for selected columns
std::vector<std::list<uint64_t>> positions;
// store position providers for selected colimns
Expand Down Expand Up @@ -516,10 +558,7 @@ namespace orc {
}

WriterVersion ReaderImpl::getWriterVersion() const {
if (!contents->postscript->has_writerversion()) {
return WriterVersion_ORIGINAL;
}
return static_cast<WriterVersion>(contents->postscript->writerversion());
return getWriterVersionImpl(contents.get());
}

uint64_t ReaderImpl::getContentLength() const {
Expand Down Expand Up @@ -892,29 +931,68 @@ namespace orc {

void RowReaderImpl::startNextStripe() {
reader.reset(); // ColumnReaders use lots of memory; free old memory first
currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
uint64_t fileLength = contents->stream->getLength();
if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
rowIndexes.clear();
bloomFilterIndex.clear();

do {
currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
uint64_t fileLength = contents->stream->getLength();
if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
currentStripeInfo.datalength() + currentStripeInfo.footerlength() >= fileLength) {
std::stringstream msg;
msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength="
<< fileLength << ", StripeInfo=(offset=" << currentStripeInfo.offset() << ", indexLength="
<< currentStripeInfo.indexlength() << ", dataLength=" << currentStripeInfo.datalength()
<< ", footerLength=" << currentStripeInfo.footerlength() << ")";
throw ParseError(msg.str());
std::stringstream msg;
msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength="
<< fileLength << ", StripeInfo=(offset=" << currentStripeInfo.offset() << ", indexLength="
<< currentStripeInfo.indexlength() << ", dataLength=" << currentStripeInfo.datalength()
<< ", footerLength=" << currentStripeInfo.footerlength() << ")";
throw ParseError(msg.str());
}
currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
rowsInCurrentStripe = currentStripeInfo.numberofrows();

if (sargsApplier) {
// read row group statistics and bloom filters of current stripe
loadStripeIndex();

// select row groups to read in the current stripe
sargsApplier->pickRowGroups(rowsInCurrentStripe,
rowIndexes,
bloomFilterIndex);
if (sargsApplier->hasSelectedFrom(currentRowInStripe)) {
// current stripe has at least one row group matching the predicate
break;
} else {
// advance to next stripe when current stripe has no matching rows
currentStripe += 1;
currentRowInStripe = 0;
}
}
} while (sargsApplier && currentStripe < lastStripe);

if (currentStripe < lastStripe) {
// get writer timezone info from stripe footer to help understand timestamp values.
const Timezone& writerTimezone =
currentStripeFooter.has_writertimezone() ?
getTimezoneByName(currentStripeFooter.writertimezone()) :
localTimezone;
StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo,
currentStripeFooter,
currentStripeInfo.offset(),
*contents->stream,
writerTimezone);
reader = buildReader(*contents->schema, stripeStreams);

if (sargsApplier) {
// move to the 1st selected row group when PPD is enabled.
currentRowInStripe = advanceToNextRowGroup(currentRowInStripe,
rowsInCurrentStripe,
footer->rowindexstride(),
sargsApplier->getRowGroups());
previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe - 1;
if (currentRowInStripe > 0) {
seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()));
}
}
}
currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
rowsInCurrentStripe = currentStripeInfo.numberofrows();
const Timezone& writerTimezone =
currentStripeFooter.has_writertimezone() ?
getTimezoneByName(currentStripeFooter.writertimezone()) :
localTimezone;
StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo,
currentStripeFooter,
currentStripeInfo.offset(),
*(contents->stream.get()),
writerTimezone);
reader = buildReader(*contents->schema.get(), stripeStreams);
}

bool RowReaderImpl::next(ColumnVectorBatch& data) {
Expand All @@ -934,7 +1012,20 @@ namespace orc {
uint64_t rowsToRead =
std::min(static_cast<uint64_t>(data.capacity),
rowsInCurrentStripe - currentRowInStripe);
if (sargsApplier) {
rowsToRead = computeBatchSize(rowsToRead,
currentRowInStripe,
rowsInCurrentStripe,
footer->rowindexstride(),
sargsApplier->getRowGroups());
}
data.numElements = rowsToRead;
if (rowsToRead == 0) {
previousRow = lastStripe <= 0 ? footer->numberofrows() :
firstRowOfStripe[lastStripe - 1] +
footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
return false;
}
if (enableEncodedBlock) {
reader->nextEncoded(data, rowsToRead, nullptr);
}
Expand All @@ -944,13 +1035,70 @@ namespace orc {
// update row number
previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
currentRowInStripe += rowsToRead;

// check if we need to advance to next selected row group
if (sargsApplier) {
uint64_t nextRowToRead = advanceToNextRowGroup(currentRowInStripe,
rowsInCurrentStripe,
footer->rowindexstride(),
sargsApplier->getRowGroups());
if (currentRowInStripe != nextRowToRead) {
// it is guaranteed to be at start of a row group
currentRowInStripe = nextRowToRead;
if (currentRowInStripe < rowsInCurrentStripe) {
seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()));
}
}
}

if (currentRowInStripe >= rowsInCurrentStripe) {
currentStripe += 1;
currentRowInStripe = 0;
}
return rowsToRead != 0;
}

uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize,
uint64_t currentRowInStripe,
uint64_t rowsInCurrentStripe,
uint64_t rowIndexStride,
const std::vector<bool>& includedRowGroups) {
// In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
// groups are selected then marker position is set to the end of range (subset of row groups
// within stripe).
uint64_t endRowInStripe = rowsInCurrentStripe;
if (!includedRowGroups.empty()) {
endRowInStripe = currentRowInStripe;
uint32_t rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
for (; rg < includedRowGroups.size(); ++rg) {
if (!includedRowGroups[rg]) {
break;
} else {
endRowInStripe = std::min(rowsInCurrentStripe, (rg + 1) * rowIndexStride);
}
}
}
return std::min(requestedSize, endRowInStripe - currentRowInStripe);
}

uint64_t RowReaderImpl::advanceToNextRowGroup(uint64_t currentRowInStripe,
uint64_t rowsInCurrentStripe,
uint64_t rowIndexStride,
const std::vector<bool>& includedRowGroups) {
if (!includedRowGroups.empty()) {
uint32_t rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
for (; rg < includedRowGroups.size(); ++rg) {
if (includedRowGroups[rg]) {
return currentRowInStripe;
} else {
// advance to start of next row group
currentRowInStripe = (rg + 1) * rowIndexStride;
}
}
}
return std::min(currentRowInStripe, rowsInCurrentStripe);
}

std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch
(uint64_t capacity) const {
return getSelectedType().createRowBatch(capacity, *contents->pool, enableEncodedBlock);
Expand Down
Loading