Skip to content

Commit 221db82

Browse files
authored
Let applyAnd to be applied using different window sizes (#10372)
1 parent 7c3c8e8 commit 221db82

File tree

7 files changed

+89
-30
lines changed

7 files changed

+89
-30
lines changed

pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int numDocs) {
5252
_docIdBuffer = new int[Math.min(numDocs, DocIdSetPlanNode.MAX_DOC_PER_CALL)];
5353
}
5454

55+
public BitmapDocIdSetOperator(IntIterator intIterator, int[] docIdBuffer) {
56+
_intIterator = intIterator;
57+
_docIdBuffer = docIdBuffer;
58+
}
59+
5560
public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int[] docIdBuffer) {
5661
_intIterator = bitmap.getIntIterator();
5762
_docIdBuffer = docIdBuffer;

pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collections;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.OptionalInt;
2425
import org.apache.pinot.core.common.Operator;
2526
import org.apache.pinot.core.operator.BaseOperator;
2627
import org.apache.pinot.core.operator.BitmapDocIdSetOperator;
@@ -33,7 +34,9 @@
3334
import org.apache.pinot.core.plan.DocIdSetPlanNode;
3435
import org.apache.pinot.segment.spi.Constants;
3536
import org.apache.pinot.segment.spi.datasource.DataSource;
37+
import org.roaringbitmap.BatchIterator;
3638
import org.roaringbitmap.BitmapDataProvider;
39+
import org.roaringbitmap.IntIterator;
3740
import org.roaringbitmap.PeekableIntIterator;
3841
import org.roaringbitmap.RoaringBitmap;
3942
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@@ -110,6 +113,19 @@ public int advance(int targetDocId) {
110113
return next();
111114
}
112115

116+
@Override
117+
public MutableRoaringBitmap applyAnd(BatchIterator batchIterator, OptionalInt firstDoc, OptionalInt lastDoc) {
118+
IntIterator intIterator = batchIterator.asIntIterator(new int[OPTIMAL_ITERATOR_BATCH_SIZE]);
119+
ProjectionOperator projectionOperator =
120+
new ProjectionOperator(_dataSourceMap, new BitmapDocIdSetOperator(intIterator, _docIdBuffer));
121+
MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
122+
ProjectionBlock projectionBlock;
123+
while ((projectionBlock = projectionOperator.nextBlock()) != null) {
124+
processProjectionBlock(projectionBlock, matchingDocIds);
125+
}
126+
return matchingDocIds;
127+
}
128+
113129
@Override
114130
public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
115131
ProjectionOperator projectionOperator =

pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.core.operator.dociditerators;
2020

21+
import java.util.OptionalInt;
2122
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
2223
import org.apache.pinot.segment.spi.Constants;
2324
import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -26,7 +27,6 @@
2627
import org.apache.pinot.spi.utils.CommonConstants.Query.OptimizationConstants;
2728
import org.roaringbitmap.BatchIterator;
2829
import org.roaringbitmap.RoaringBitmapWriter;
29-
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
3030
import org.roaringbitmap.buffer.MutableRoaringBitmap;
3131

3232

@@ -79,13 +79,21 @@ public int advance(int targetDocId) {
7979
}
8080

8181
@Override
82-
public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
83-
if (docIds.isEmpty()) {
82+
public MutableRoaringBitmap applyAnd(BatchIterator docIdIterator, OptionalInt firstDoc, OptionalInt lastDoc) {
83+
if (!docIdIterator.hasNext()) {
8484
return new MutableRoaringBitmap();
8585
}
86-
RoaringBitmapWriter<MutableRoaringBitmap> result = RoaringBitmapWriter.bufferWriter()
87-
.expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
88-
BatchIterator docIdIterator = docIds.getBatchIterator();
86+
RoaringBitmapWriter<MutableRoaringBitmap> result;
87+
if (firstDoc.isPresent() && lastDoc.isPresent()) {
88+
result = RoaringBitmapWriter.bufferWriter()
89+
.expectedRange(firstDoc.getAsInt(), lastDoc.getAsInt())
90+
.runCompress(false)
91+
.get();
92+
} else {
93+
result = RoaringBitmapWriter.bufferWriter()
94+
.runCompress(false)
95+
.get();
96+
}
8997
int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
9098
while (docIdIterator.hasNext()) {
9199
int limit = docIdIterator.nextBatch(buffer);

pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
package org.apache.pinot.core.operator.dociditerators;
2020

21+
import java.util.OptionalInt;
2122
import javax.annotation.Nullable;
23+
import org.apache.pinot.core.common.BlockDocIdIterator;
2224
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
2325
import org.apache.pinot.segment.spi.Constants;
2426
import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -44,7 +46,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator {
4446
private final ForwardIndexReaderContext _readerContext;
4547
private final int _numDocs;
4648
private final ValueMatcher _valueMatcher;
47-
private final int[] _batch = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
49+
private final int[] _batch;
4850
private int _firstMismatch;
4951
private int _cursor;
5052
private final int _cardinality;
@@ -53,7 +55,8 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator {
5355
private long _numEntriesScanned = 0L;
5456

5557
public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs,
56-
@Nullable NullValueVectorReader nullValueReader) {
58+
@Nullable NullValueVectorReader nullValueReader, int batchSize) {
59+
_batch = new int[batchSize];
5760
_predicateEvaluator = predicateEvaluator;
5861
_reader = dataSource.getForwardIndex();
5962
_readerContext = _reader.createContext();
@@ -69,6 +72,7 @@ public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, DataSource dat
6972
// for testing
7073
public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, ForwardIndexReader reader, int numDocs,
7174
@Nullable NullValueVectorReader nullValueReader) {
75+
_batch = new int[BlockDocIdIterator.OPTIMAL_ITERATOR_BATCH_SIZE];
7276
_predicateEvaluator = predicateEvaluator;
7377
_reader = reader;
7478
_readerContext = reader.createContext();
@@ -87,7 +91,7 @@ public int next() {
8791
int limit;
8892
int batchSize = 0;
8993
do {
90-
limit = Math.min(_numDocs - _nextDocId, OPTIMAL_ITERATOR_BATCH_SIZE);
94+
limit = Math.min(_numDocs - _nextDocId, _batch.length);
9195
if (limit > 0) {
9296
for (int i = 0; i < limit; i++) {
9397
_batch[i] = _nextDocId + i;
@@ -121,14 +125,22 @@ public int advance(int targetDocId) {
121125
}
122126

123127
@Override
124-
public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
125-
if (docIds.isEmpty()) {
128+
public MutableRoaringBitmap applyAnd(BatchIterator docIdIterator, OptionalInt firstDoc, OptionalInt lastDoc) {
129+
if (!docIdIterator.hasNext()) {
126130
return new MutableRoaringBitmap();
127131
}
128-
RoaringBitmapWriter<MutableRoaringBitmap> result = RoaringBitmapWriter.bufferWriter()
129-
.expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
130-
BatchIterator docIdIterator = docIds.getBatchIterator();
131-
int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
132+
RoaringBitmapWriter<MutableRoaringBitmap> result;
133+
if (firstDoc.isPresent() && lastDoc.isPresent()) {
134+
result = RoaringBitmapWriter.bufferWriter()
135+
.expectedRange(firstDoc.getAsInt(), lastDoc.getAsInt())
136+
.runCompress(false)
137+
.get();
138+
} else {
139+
result = RoaringBitmapWriter.bufferWriter()
140+
.runCompress(false)
141+
.get();
142+
}
143+
int[] buffer = new int[_batch.length];
132144
while (docIdIterator.hasNext()) {
133145
int limit = docIdIterator.nextBatch(buffer);
134146
if (limit > 0) {
@@ -264,7 +276,7 @@ public static int removeNullDocs(int[] docIds, double[] values, int limit, Immut
264276

265277
private class DictIdMatcher implements ValueMatcher {
266278

267-
private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
279+
private final int[] _buffer = new int[_batch.length];
268280

269281
@Override
270282
public boolean doesValueMatch(int docId) {
@@ -280,7 +292,7 @@ public int matchValues(int limit, int[] docIds) {
280292

281293
private class DictIdMatcherAndNullHandler implements ValueMatcher {
282294

283-
private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
295+
private final int[] _buffer = new int[_batch.length];
284296
private final ImmutableRoaringBitmap _nullBitmap;
285297

286298
public DictIdMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
@@ -308,7 +320,7 @@ public int matchValues(int limit, int[] docIds) {
308320

309321
private class IntMatcher implements ValueMatcher {
310322

311-
private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
323+
private final int[] _buffer = new int[_batch.length];
312324

313325
@Override
314326
public boolean doesValueMatch(int docId) {
@@ -325,7 +337,7 @@ public int matchValues(int limit, int[] docIds) {
325337
private class IntMatcherAndNullHandler implements ValueMatcher {
326338

327339
private final ImmutableRoaringBitmap _nullBitmap;
328-
private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
340+
private final int[] _buffer = new int[_batch.length];
329341

330342
public IntMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
331343
_nullBitmap = nullBitmap;
@@ -349,7 +361,7 @@ public int matchValues(int limit, int[] docIds) {
349361

350362
private class LongMatcher implements ValueMatcher {
351363

352-
private final long[] _buffer = new long[OPTIMAL_ITERATOR_BATCH_SIZE];
364+
private final long[] _buffer = new long[_batch.length];
353365

354366
@Override
355367
public boolean doesValueMatch(int docId) {
@@ -366,7 +378,7 @@ public int matchValues(int limit, int[] docIds) {
366378
private class LongMatcherAndNullHandler implements ValueMatcher {
367379

368380
private final ImmutableRoaringBitmap _nullBitmap;
369-
private final long[] _buffer = new long[OPTIMAL_ITERATOR_BATCH_SIZE];
381+
private final long[] _buffer = new long[_batch.length];
370382

371383
public LongMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
372384
_nullBitmap = nullBitmap;
@@ -390,7 +402,7 @@ public int matchValues(int limit, int[] docIds) {
390402

391403
private class FloatMatcher implements ValueMatcher {
392404

393-
private final float[] _buffer = new float[OPTIMAL_ITERATOR_BATCH_SIZE];
405+
private final float[] _buffer = new float[_batch.length];
394406

395407
@Override
396408
public boolean doesValueMatch(int docId) {
@@ -407,7 +419,7 @@ public int matchValues(int limit, int[] docIds) {
407419
private class FloatMatcherAndNullHandler implements ValueMatcher {
408420

409421
private final ImmutableRoaringBitmap _nullBitmap;
410-
private final float[] _buffer = new float[OPTIMAL_ITERATOR_BATCH_SIZE];
422+
private final float[] _buffer = new float[_batch.length];
411423

412424
public FloatMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
413425
_nullBitmap = nullBitmap;
@@ -431,7 +443,7 @@ public int matchValues(int limit, int[] docIds) {
431443

432444
private class DoubleMatcher implements ValueMatcher {
433445

434-
private final double[] _buffer = new double[OPTIMAL_ITERATOR_BATCH_SIZE];
446+
private final double[] _buffer = new double[_batch.length];
435447

436448
@Override
437449
public boolean doesValueMatch(int docId) {
@@ -448,7 +460,7 @@ public int matchValues(int limit, int[] docIds) {
448460
private class DoubleMatcherAndNullHandler implements ValueMatcher {
449461

450462
private final ImmutableRoaringBitmap _nullBitmap;
451-
private final double[] _buffer = new double[OPTIMAL_ITERATOR_BATCH_SIZE];
463+
private final double[] _buffer = new double[_batch.length];
452464

453465
public DoubleMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
454466
_nullBitmap = nullBitmap;

pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
package org.apache.pinot.core.operator.dociditerators;
2020

21+
import java.util.OptionalInt;
2122
import org.apache.pinot.core.common.BlockDocIdIterator;
23+
import org.roaringbitmap.BatchIterator;
2224
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
2325
import org.roaringbitmap.buffer.MutableRoaringBitmap;
2426

@@ -34,10 +36,17 @@
3436
*/
3537
public interface ScanBasedDocIdIterator extends BlockDocIdIterator {
3638

39+
MutableRoaringBitmap applyAnd(BatchIterator batchIterator, OptionalInt firstDoc, OptionalInt lastDoc);
40+
3741
/**
3842
* Applies AND operation to the given bitmap of document ids, returns a bitmap of the matching document ids.
3943
*/
40-
MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds);
44+
default MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
45+
if (docIds.isEmpty()) {
46+
return new MutableRoaringBitmap();
47+
}
48+
return applyAnd(docIds.getBatchIterator(), OptionalInt.of(docIds.first()), OptionalInt.of(docIds.last()));
49+
}
4150

4251
/**
4352
* Returns the number of entries (SV value contains one entry, MV value contains multiple entries) scanned during the

pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ public final class SVScanDocIdSet implements BlockDocIdSet {
2929
private final SVScanDocIdIterator _docIdIterator;
3030

3131
public SVScanDocIdSet(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs,
32-
boolean nullHandlingEnabled) {
32+
boolean nullHandlingEnabled, int batchSize) {
3333
NullValueVectorReader nullValueVector = nullHandlingEnabled ? dataSource.getNullValueVector() : null;
34-
_docIdIterator = new SVScanDocIdIterator(predicateEvaluator, dataSource, numDocs, nullValueVector);
34+
_docIdIterator = new SVScanDocIdIterator(predicateEvaluator, dataSource, numDocs, nullValueVector, batchSize);
3535
}
3636

3737
@Override

pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.common.base.Preconditions;
2222
import java.util.Collections;
2323
import java.util.List;
24+
import org.apache.pinot.core.common.BlockDocIdIterator;
2425
import org.apache.pinot.core.common.Operator;
2526
import org.apache.pinot.core.operator.blocks.FilterBlock;
2627
import org.apache.pinot.core.operator.docidsets.MVScanDocIdSet;
@@ -37,23 +38,31 @@ public class ScanBasedFilterOperator extends BaseFilterOperator {
3738
private final DataSource _dataSource;
3839
private final int _numDocs;
3940
private final boolean _nullHandlingEnabled;
41+
private final int _batchSize;
4042

41-
ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs,
43+
public ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs,
4244
boolean nullHandlingEnabled) {
45+
this(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled, BlockDocIdIterator.OPTIMAL_ITERATOR_BATCH_SIZE);
46+
}
47+
48+
public ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs,
49+
boolean nullHandlingEnabled, int batchSize) {
4350
_predicateEvaluator = predicateEvaluator;
4451
_dataSource = dataSource;
4552
_numDocs = numDocs;
4653
_nullHandlingEnabled = nullHandlingEnabled;
4754
Preconditions.checkState(_dataSource.getForwardIndex() != null,
4855
"Forward index disabled for column: %s, scan based filtering not supported!",
4956
_dataSource.getDataSourceMetadata().getFieldSpec().getName());
57+
_batchSize = batchSize;
5058
}
5159

5260
@Override
5361
protected FilterBlock getNextBlock() {
5462
DataSourceMetadata dataSourceMetadata = _dataSource.getDataSourceMetadata();
5563
if (dataSourceMetadata.isSingleValue()) {
56-
return new FilterBlock(new SVScanDocIdSet(_predicateEvaluator, _dataSource, _numDocs, _nullHandlingEnabled));
64+
return new FilterBlock(new SVScanDocIdSet(_predicateEvaluator, _dataSource, _numDocs, _nullHandlingEnabled,
65+
_batchSize));
5766
} else {
5867
return new FilterBlock(new MVScanDocIdSet(_predicateEvaluator, _dataSource, _numDocs));
5968
}

0 commit comments

Comments
 (0)