Skip to content

Commit 73d82ec

Browse files
walterddrRong Rong
andauthored
[multistage][bugfix] fix operator eos pull (#11970)
* clean up single input stop-the-world operator * clean up 2-input stop-the-world on right, stream on left operator * clean up single-input stream operator * refactor try-catch block into base class; and fix set op right-side error handling --------- Co-authored-by: Rong Rong <[email protected]>
1 parent a986dd1 commit 73d82ec

File tree

9 files changed

+127
-156
lines changed

9 files changed

+127
-156
lines changed

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class AggregateOperator extends MultiStageOperator {
7070
private final MultistageAggregationExecutor _aggregationExecutor;
7171
private final MultistageGroupByExecutor _groupByExecutor;
7272

73-
private boolean _hasReturnedAggregateBlock;
73+
private boolean _hasConstructedAggregateBlock;
7474

7575
public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator, DataSchema resultSchema,
7676
List<RexExpression> aggCalls, List<RexExpression> groupSet, AggType aggType, List<Integer> filterArgIndices,
@@ -131,26 +131,19 @@ public String toExplainString() {
131131

132132
@Override
133133
protected TransferableBlock getNextBlock() {
134-
try {
135-
TransferableBlock finalBlock = _aggregationExecutor != null ? consumeAggregation() : consumeGroupBy();
136-
137-
// setting upstream error block
138-
if (finalBlock.isErrorBlock()) {
139-
return finalBlock;
140-
}
141-
142-
if (!_hasReturnedAggregateBlock) {
143-
return produceAggregatedBlock();
144-
} else {
145-
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
146-
}
147-
} catch (Exception e) {
148-
return TransferableBlockUtils.getErrorTransferableBlock(e);
134+
if (_hasConstructedAggregateBlock) {
135+
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
136+
}
137+
TransferableBlock finalBlock = _aggregationExecutor != null ? consumeAggregation() : consumeGroupBy();
138+
// returning upstream error block if finalBlock contains error.
139+
if (finalBlock.isErrorBlock()) {
140+
return finalBlock;
149141
}
142+
return produceAggregatedBlock();
150143
}
151144

152145
private TransferableBlock produceAggregatedBlock() {
153-
_hasReturnedAggregateBlock = true;
146+
_hasConstructedAggregateBlock = true;
154147
if (_aggregationExecutor != null) {
155148
return new TransferableBlock(_aggregationExecutor.getResult(), _resultSchema, DataBlock.Type.ROW);
156149
} else {

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -188,24 +188,21 @@ public String toExplainString() {
188188
}
189189

190190
@Override
191-
protected TransferableBlock getNextBlock() {
192-
try {
193-
if (_isTerminated) {
194-
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
195-
}
196-
if (!_isHashTableBuilt) {
197-
// Build JOIN hash table
198-
buildBroadcastHashTable();
199-
}
200-
if (_upstreamErrorBlock != null) {
201-
return _upstreamErrorBlock;
202-
}
203-
TransferableBlock leftBlock = _leftTableOperator.nextBlock();
204-
// JOIN each left block with the right block.
205-
return buildJoinedDataBlock(leftBlock);
206-
} catch (Exception e) {
207-
return TransferableBlockUtils.getErrorTransferableBlock(e);
191+
protected TransferableBlock getNextBlock()
192+
throws ProcessingException {
193+
if (_isTerminated) {
194+
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
195+
}
196+
if (!_isHashTableBuilt) {
197+
// Build JOIN hash table
198+
buildBroadcastHashTable();
199+
}
200+
if (_upstreamErrorBlock != null) {
201+
return _upstreamErrorBlock;
208202
}
203+
TransferableBlock leftBlock = _leftTableOperator.nextBlock();
204+
// JOIN each left block with the constructed right hash table.
205+
return buildJoinedDataBlock(leftBlock);
209206
}
210207

211208
private void buildBroadcastHashTable()

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -112,29 +112,26 @@ public String toExplainString() {
112112
}
113113

114114
@Override
115-
protected TransferableBlock getNextBlock() {
115+
protected TransferableBlock getNextBlock()
116+
throws InterruptedException, TimeoutException {
116117
if (_executionFuture == null) {
117118
_executionFuture = startExecution();
118119
}
119-
try {
120-
BaseResultsBlock resultsBlock =
121-
_blockingQueue.poll(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
122-
if (resultsBlock == null) {
123-
throw new TimeoutException("Timed out waiting for results block");
124-
}
125-
// Terminate when receiving exception block
126-
Map<Integer, String> exceptions = _exceptions;
127-
if (exceptions != null) {
128-
return TransferableBlockUtils.getErrorTransferableBlock(exceptions);
129-
}
130-
if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
131-
return constructMetadataBlock();
132-
} else {
133-
// Regular data block
134-
return composeTransferableBlock(resultsBlock, _dataSchema);
135-
}
136-
} catch (Exception e) {
137-
return TransferableBlockUtils.getErrorTransferableBlock(e);
120+
BaseResultsBlock resultsBlock =
121+
_blockingQueue.poll(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
122+
if (resultsBlock == null) {
123+
throw new TimeoutException("Timed out waiting for results block");
124+
}
125+
// Terminate when receiving exception block
126+
Map<Integer, String> exceptions = _exceptions;
127+
if (exceptions != null) {
128+
return TransferableBlockUtils.getErrorTransferableBlock(exceptions);
129+
}
130+
if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
131+
return constructMetadataBlock();
132+
} else {
133+
// Regular data block
134+
return composeTransferableBlock(resultsBlock, _dataSchema);
138135
}
139136
}
140137

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import org.apache.pinot.core.common.Operator;
2424
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
25+
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
2526
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
2627
import org.apache.pinot.spi.exception.EarlyTerminationException;
2728
import org.apache.pinot.spi.trace.InvocationScope;
@@ -55,11 +56,19 @@ public TransferableBlock nextBlock() {
5556
if (shouldCollectStats()) {
5657
OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
5758
operatorStats.startTimer();
58-
nextBlock = getNextBlock();
59+
try {
60+
nextBlock = getNextBlock();
61+
} catch (Exception e) {
62+
nextBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
63+
}
5964
operatorStats.recordRow(1, nextBlock.getNumRows());
6065
operatorStats.endTimer(nextBlock);
6166
} else {
62-
nextBlock = getNextBlock();
67+
try {
68+
nextBlock = getNextBlock();
69+
} catch (Exception e) {
70+
nextBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
71+
}
6372
}
6473
return nextBlock;
6574
}
@@ -70,7 +79,8 @@ public String getOperatorId() {
7079
}
7180

7281
// Make it protected because we should always call nextBlock()
73-
protected abstract TransferableBlock getNextBlock();
82+
protected abstract TransferableBlock getNextBlock()
83+
throws Exception;
7484

7585
protected void earlyTerminate() {
7686
_isEarlyTerminated = true;

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public class OperatorStats {
4343
private long _startTimeMs = -1;
4444
private long _endTimeMs = -1;
4545
private final Map<String, String> _executionStats;
46-
private boolean _processingStarted = false;
4746

4847
public OperatorStats(OpChainExecutionContext context) {
4948
this(context.getRequestId(), context.getStageId(), context.getServer());
@@ -69,7 +68,6 @@ public void endTimer(TransferableBlock block) {
6968
_executeStopwatch.stop();
7069
_endTimeMs = System.currentTimeMillis();
7170
}
72-
_processingStarted = true;
7371
}
7472

7573
public void recordRow(int numBlock, int numRows) {

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public abstract class SetOperator extends MultiStageOperator {
5050
private final DataSchema _dataSchema;
5151

5252
private boolean _isRightSetBuilt;
53+
private boolean _isTerminated;
5354
private TransferableBlock _upstreamErrorBlock;
5455

5556
public SetOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
@@ -60,6 +61,8 @@ public SetOperator(OpChainExecutionContext opChainExecutionContext, List<MultiSt
6061
_leftChildOperator = getChildOperators().get(0);
6162
_rightChildOperator = getChildOperators().get(1);
6263
_rightRowSet = new HashSet<>();
64+
_isRightSetBuilt = false;
65+
_isTerminated = false;
6366
}
6467

6568
@Override
@@ -89,10 +92,17 @@ public ExecutionStatistics getExecutionStatistics() {
8992

9093
@Override
9194
protected TransferableBlock getNextBlock() {
92-
// A blocking call to construct a set with all the right side rows.
95+
if (_isTerminated) {
96+
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
97+
}
9398
if (!_isRightSetBuilt) {
99+
// construct a SET with all the right side rows.
94100
constructRightBlockSet();
95101
}
102+
if (_upstreamErrorBlock != null) {
103+
return _upstreamErrorBlock;
104+
}
105+
// UNION each left block with the constructed right block set.
96106
TransferableBlock leftBlock = _leftChildOperator.nextBlock();
97107
return constructResultBlockSet(leftBlock);
98108
}
@@ -107,7 +117,11 @@ protected void constructRightBlockSet() {
107117
}
108118
block = _rightChildOperator.nextBlock();
109119
}
110-
_isRightSetBuilt = true;
120+
if (block.isErrorBlock()) {
121+
_upstreamErrorBlock = block;
122+
} else {
123+
_isRightSetBuilt = true;
124+
}
111125
}
112126

113127
protected TransferableBlock constructResultBlockSet(TransferableBlock leftBlock) {

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java

Lines changed: 48 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ public class SortOperator extends MultiStageOperator {
5151
private final ArrayList<Object[]> _rows;
5252
private final int _numRowsToKeep;
5353

54-
private boolean _hasReturnedSortedBlock;
55-
private TransferableBlock _upstreamErrorBlock;
54+
private boolean _hasConstructedSortedBlock;
5655

5756
public SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator,
5857
List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
@@ -73,8 +72,7 @@ public SortOperator(OpChainExecutionContext context, MultiStageOperator upstream
7372
_fetch = fetch;
7473
_offset = Math.max(offset, 0);
7574
_dataSchema = dataSchema;
76-
_upstreamErrorBlock = null;
77-
_hasReturnedSortedBlock = false;
75+
_hasConstructedSortedBlock = false;
7876
// Setting numRowsToKeep as default maximum on Broker if limit not set.
7977
// TODO: make this default behavior configurable.
8078
_numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultResponseLimit;
@@ -110,74 +108,65 @@ public String toExplainString() {
110108

111109
@Override
112110
protected TransferableBlock getNextBlock() {
113-
try {
114-
consumeInputBlocks();
115-
return produceSortedBlock();
116-
} catch (Exception e) {
117-
return TransferableBlockUtils.getErrorTransferableBlock(e);
111+
if (_hasConstructedSortedBlock) {
112+
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
113+
}
114+
TransferableBlock finalBlock = consumeInputBlocks();
115+
// returning upstream error block if finalBlock contains error.
116+
if (finalBlock.isErrorBlock()) {
117+
return finalBlock;
118118
}
119+
return produceSortedBlock();
119120
}
120121

121122
private TransferableBlock produceSortedBlock() {
122-
if (_upstreamErrorBlock != null) {
123-
return _upstreamErrorBlock;
124-
}
125-
126-
if (!_hasReturnedSortedBlock) {
127-
_hasReturnedSortedBlock = true;
128-
if (_priorityQueue == null) {
129-
if (_rows.size() > _offset) {
130-
List<Object[]> row = _rows.subList(_offset, _rows.size());
131-
return new TransferableBlock(row, _dataSchema, DataBlock.Type.ROW);
132-
} else {
133-
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
134-
}
123+
_hasConstructedSortedBlock = true;
124+
if (_priorityQueue == null) {
125+
if (_rows.size() > _offset) {
126+
List<Object[]> row = _rows.subList(_offset, _rows.size());
127+
return new TransferableBlock(row, _dataSchema, DataBlock.Type.ROW);
135128
} else {
136-
LinkedList<Object[]> rows = new LinkedList<>();
137-
while (_priorityQueue.size() > _offset) {
138-
Object[] row = _priorityQueue.poll();
139-
rows.addFirst(row);
140-
}
141-
if (rows.size() == 0) {
142-
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
143-
} else {
144-
return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
145-
}
129+
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
146130
}
147131
} else {
148-
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
132+
LinkedList<Object[]> rows = new LinkedList<>();
133+
while (_priorityQueue.size() > _offset) {
134+
Object[] row = _priorityQueue.poll();
135+
rows.addFirst(row);
136+
}
137+
if (rows.size() == 0) {
138+
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
139+
} else {
140+
return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
141+
}
149142
}
150143
}
151144

152-
private void consumeInputBlocks() {
153-
if (!_hasReturnedSortedBlock) {
154-
TransferableBlock block = _upstreamOperator.nextBlock();
155-
while (block.isDataBlock()) {
156-
List<Object[]> container = block.getContainer();
157-
if (_priorityQueue == null) {
158-
// TODO: when push-down properly, we shouldn't get more than _numRowsToKeep
159-
int numRows = _rows.size();
160-
if (numRows < _numRowsToKeep) {
161-
if (numRows + container.size() < _numRowsToKeep) {
162-
_rows.addAll(container);
163-
} else {
164-
_rows.addAll(container.subList(0, _numRowsToKeep - numRows));
165-
LOGGER.debug("Early terminate at SortOperator - operatorId={}, opChainId={}", _operatorId,
166-
_context.getId());
167-
// setting operator to be early terminated and awaits EOS block next.
168-
earlyTerminate();
169-
}
170-
}
171-
} else {
172-
for (Object[] row : container) {
173-
SelectionOperatorUtils.addToPriorityQueue(row, _priorityQueue, _numRowsToKeep);
145+
private TransferableBlock consumeInputBlocks() {
146+
TransferableBlock block = _upstreamOperator.nextBlock();
147+
while (block.isDataBlock()) {
148+
List<Object[]> container = block.getContainer();
149+
if (_priorityQueue == null) {
150+
// TODO: when push-down properly, we shouldn't get more than _numRowsToKeep
151+
int numRows = _rows.size();
152+
if (numRows < _numRowsToKeep) {
153+
if (numRows + container.size() < _numRowsToKeep) {
154+
_rows.addAll(container);
155+
} else {
156+
_rows.addAll(container.subList(0, _numRowsToKeep - numRows));
157+
LOGGER.debug("Early terminate at SortOperator - operatorId={}, opChainId={}", _operatorId,
158+
_context.getId());
159+
// setting operator to be early terminated and awaits EOS block next.
160+
earlyTerminate();
174161
}
175162
}
176-
block = _upstreamOperator.nextBlock();
177-
}
178-
if (block.isErrorBlock()) {
179-
_upstreamErrorBlock = block;
163+
} else {
164+
for (Object[] row : container) {
165+
SelectionOperatorUtils.addToPriorityQueue(row, _priorityQueue, _numRowsToKeep);
166+
}
180167
}
168+
block = _upstreamOperator.nextBlock();
181169
}
170+
return block;
182171
}
183172
}

0 commit comments

Comments
 (0)