-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[multistage] framework to back-propagate metadata across opChains #11746
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #11746 +/- ##
=============================================
+ Coverage 14.41% 63.13% +48.72%
- Complexity 201 1140 +939
=============================================
Files 2342 2343 +1
Lines 125896 126100 +204
Branches 19362 19403 +41
=============================================
+ Hits 18146 79619 +61473
+ Misses 106213 40822 -65391
- Partials 1537 5659 +4122
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
77cd516 to
fc4185d
Compare
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
Outdated
Show resolved
Hide resolved
053450a to
76fa636
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic looks good
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
Show resolved
Hide resolved
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
Outdated
Show resolved
Hide resolved
...ery-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
Outdated
Show resolved
Hide resolved
...uery-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
Outdated
Show resolved
Hide resolved
| private static final ExpressionContext PLACEHOLDER_IDENTIFIER = ExpressionContext.forIdentifier("__PLACEHOLDER__"); | ||
|
|
||
| private final MultiStageOperator _inputOperator; | ||
| private final MultiStageOperator _upstreamOperator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing. In the mailbox, upstream is the next operator (e.g. receiver is upstream for sender), but here upstream is input operator. Suggest reverting this change because input is more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes some are named upstream some are named input. will revert this change and make another PR to change the naming
| TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( | ||
| OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); | ||
| sendTransferableBlock(eosBlockWithStats); | ||
| isEarlyTerminated = sendTransferableBlock(eosBlockWithStats); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to set early terminate when it is already EOS
| // acquire extra metadata block | ||
| block = _upstreamOperator.nextBlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not very robust. Simply remove the break and let it follow the regular execution flow
| private static final Set<String> RANKING_FUNCTION_NAMES = ImmutableSet.of("RANK", "DENSE_RANK"); | ||
|
|
||
| private final MultiStageOperator _inputOperator; | ||
| private final MultiStageOperator _upstreamOperator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, suggest not changing it
| /** | ||
| * API to send a block to the destination mailboxes. | ||
| * @param block the block to be transferred | ||
| * @return true if any of the upstream mailboxes requested EOS (e.g. early termination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is incorrect. It returns true only if all receiving mailboxes are early terminated.
Side comment: Do you realize this upstream is the opposite of other upstream changes in the PR (in this place it refers to the next operator in the data flow, but in other places it refers to the previous operator in the data flow). Thus suggest not using upstream because it can be confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it is confusing, will create a separate PR for naming fix
| boolean isEarlyTerminated = true; | ||
| for (SendingMailbox sendingMailbox : _sendingMailboxes) { | ||
| if (!sendingMailbox.isTerminated()) { | ||
| if (!sendingMailbox.isEarlyTerminated()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do the block.isEndOfStreamBlock() check first, then check if all mailboxes are early terminated
bd61449 to
48ed081
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
| @Override | ||
| public void cancel(Throwable t) { | ||
| if (_isTerminated) { | ||
| if (_isEarlyTerminated || _isTerminated) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not change this. Cancel should be applied even if early terminate is called
| // TODO: Revisit if this is the correct way to apply back pressure | ||
| private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS); | ||
| private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>(); | ||
| private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) This can be defined as a volatile boolean
| private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5; | ||
|
|
||
| private final AtomicInteger _bufferSize = new AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY); | ||
| private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) This can be defined as a volatile boolean
| isEarlyTerminated = false; | ||
| } else { | ||
| sendTransferableBlock(block); | ||
| isEarlyTerminated = sendTransferableBlock(block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified (remove isEarlyTerminated)
| isEarlyTerminated = sendTransferableBlock(block); | |
| if (sendTransferableBlock(block)) { | |
| earlyTerminate(); | |
| } |
| earlyTerminateMailboxes(); | ||
| } | ||
|
|
||
| protected void earlyTerminateMailboxes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason why making this a separate method?
Changes
MultiStageOperatorsetEarlyTerminate()API which sets the early termination flag to true. This API is meant to populate upstream_isEarlyTerminateis set to true, the next call tonextBlock()should return EOS if it has not already done so.MailboxesSendingMailboxandReceivingMailboxnow bear a contract to indicate whether early termination has occurred from upstream (receiving --> sending)ReceivingMailboxes are set earlyTerminaton flag byReceiveOperatorReceivingMailboxpopulate this info back toSendingMailboxSendingMailboxreturns the boolean flag toSendingOperator, andSendingOperatorchecks this boolean flag and subsequently call its opChain'ssetEarlyTermination()method.TODO
operator.nextBlock()should occur if theoperatorin question has already returned EOS.