Skip to content

Commit 0f2d51c

Browse files
walterddrRong Rong
andauthored
[multistage] optimize limit without order by (#10241)
Co-authored-by: Rong Rong <[email protected]>
1 parent b284154 commit 0f2d51c

File tree

1 file changed

+42
-13
lines changed
  • pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator

1 file changed

+42
-13
lines changed

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import com.google.common.collect.ImmutableList;
23+
import java.util.ArrayList;
2324
import java.util.Comparator;
2425
import java.util.LinkedList;
2526
import java.util.List;
@@ -44,7 +45,8 @@ public class SortOperator extends MultiStageOperator {
4445
private final int _fetch;
4546
private final int _offset;
4647
private final DataSchema _dataSchema;
47-
private final PriorityQueue<Object[]> _rows;
48+
private final PriorityQueue<Object[]> _priorityQueue;
49+
private final ArrayList<Object[]> _rows;
4850
private final int _numRowsToKeep;
4951

5052
private boolean _readyToConstruct;
@@ -70,8 +72,15 @@ public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> col
7072
_upstreamErrorBlock = null;
7173
_isSortedBlockConstructed = false;
7274
_numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultHolderCapacity;
73-
_rows = new PriorityQueue<>(_numRowsToKeep,
74-
new SortComparator(collationKeys, collationDirections, dataSchema, false));
75+
// When there's no collationKeys, the SortOperator is a simple selection with row trim on limit & offset
76+
if (collationKeys.isEmpty()) {
77+
_priorityQueue = null;
78+
_rows = new ArrayList<>();
79+
} else {
80+
_priorityQueue = new PriorityQueue<>(_numRowsToKeep,
81+
new SortComparator(collationKeys, collationDirections, dataSchema, false));
82+
_rows = null;
83+
}
7584
}
7685

7786
@Override
@@ -107,16 +116,25 @@ private TransferableBlock produceSortedBlock() {
107116
}
108117

109118
if (!_isSortedBlockConstructed) {
110-
LinkedList<Object[]> rows = new LinkedList<>();
111-
while (_rows.size() > _offset) {
112-
Object[] row = _rows.poll();
113-
rows.addFirst(row);
114-
}
115119
_isSortedBlockConstructed = true;
116-
if (rows.size() == 0) {
117-
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
120+
if (_priorityQueue == null) {
121+
if (_rows.size() > _offset) {
122+
List<Object[]> row = _rows.subList(_offset, _rows.size());
123+
return new TransferableBlock(row, _dataSchema, DataBlock.Type.ROW);
124+
} else {
125+
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
126+
}
118127
} else {
119-
return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
128+
LinkedList<Object[]> rows = new LinkedList<>();
129+
while (_priorityQueue.size() > _offset) {
130+
Object[] row = _priorityQueue.poll();
131+
rows.addFirst(row);
132+
}
133+
if (rows.size() == 0) {
134+
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
135+
} else {
136+
return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
137+
}
120138
}
121139
} else {
122140
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -137,8 +155,19 @@ private void consumeInputBlocks() {
137155
}
138156

139157
List<Object[]> container = block.getContainer();
140-
for (Object[] row : container) {
141-
SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
158+
if (_priorityQueue == null) {
159+
// TODO: when push-down properly, we shouldn't get more than _numRowsToKeep
160+
if (_rows.size() <= _numRowsToKeep) {
161+
if (_rows.size() + container.size() <= _numRowsToKeep) {
162+
_rows.addAll(container);
163+
} else {
164+
_rows.addAll(container.subList(0, _numRowsToKeep - _rows.size()));
165+
}
166+
}
167+
} else {
168+
for (Object[] row : container) {
169+
SelectionOperatorUtils.addToPriorityQueue(row, _priorityQueue, _numRowsToKeep);
170+
}
142171
}
143172
block = _upstreamOperator.nextBlock();
144173
}

0 commit comments

Comments
 (0)