Skip to content

Conversation

@walterddr
Copy link
Contributor

@walterddr walterddr commented Oct 5, 2023

Changes

MultiStageOperator

  • add setEarlyTerminate() API which sets the early termination flag to true. This API is meant to populate upstream
    • contract of this is when _isEarlyTerminate is set to true, the next call to nextBlock() should return EOS if it has not already done so.
    • all future status propagation should follow the same status flag-setting contract.

Mailboxes

  • SendingMailbox and ReceivingMailbox now bear a contract to indicate whether early termination has occurred from upstream (receiving --> sending)
  • ReceivingMailboxes are set earlyTerminaton flag by ReceiveOperator
  • ReceivingMailbox populate this info back to SendingMailbox
  • SendingMailbox returns the boolean flag to SendingOperator, and SendingOperator checks this boolean flag and subsequently call its opChain's setEarlyTermination() method.

TODO

  • add checks to ensure that EOS block are only called and return once: we are not even logging warning for now but as a contract: any operator should guarantee that no call to operator.nextBlock() should occur if the operator in question has already returned EOS.

@codecov-commenter
Copy link

codecov-commenter commented Oct 5, 2023

Codecov Report

❌ Patch coverage is 87.32394% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.13%. Comparing base (87996d2) to head (59a1228).
⚠️ Report is 3325 commits behind head on master.

Files with missing lines Patch % Lines
.../query/mailbox/channel/MailboxContentObserver.java 40.00% 3 Missing ⚠️
...g/apache/pinot/query/mailbox/ReceivingMailbox.java 71.42% 1 Missing and 1 partial ⚠️
...apache/pinot/query/mailbox/GrpcSendingMailbox.java 50.00% 0 Missing and 1 partial ⚠️
...he/pinot/query/mailbox/InMemorySendingMailbox.java 80.00% 0 Missing and 1 partial ⚠️
...e/operator/LeafStageTransferableBlockOperator.java 83.33% 0 Missing and 1 partial ⚠️
...t/query/runtime/operator/LiteralValueOperator.java 0.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #11746       +/-   ##
=============================================
+ Coverage     14.41%   63.13%   +48.72%     
- Complexity      201     1140      +939     
=============================================
  Files          2342     2343        +1     
  Lines        125896   126100      +204     
  Branches      19362    19403       +41     
=============================================
+ Hits          18146    79619    +61473     
+ Misses       106213    40822    -65391     
- Partials       1537     5659     +4122     
Flag Coverage Δ
integration <0.01% <0.00%> (+<0.01%) ⬆️
integration1 <0.01% <0.00%> (?)
integration2 0.00% <0.00%> (ø)
java-11 63.10% <87.32%> (?)
java-17 62.98% <87.32%> (+48.57%) ⬆️
java-20 62.98% <87.32%> (+48.57%) ⬆️
temurin 63.13% <87.32%> (+48.72%) ⬆️
unittests 63.13% <87.32%> (+48.72%) ⬆️
unittests1 67.30% <87.32%> (?)
unittests2 14.43% <0.00%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@walterddr walterddr force-pushed the pr_stats_with_feedback branch 2 times, most recently from 77cd516 to fc4185d Compare October 5, 2023 19:26
@walterddr walterddr force-pushed the pr_stats_with_feedback branch from 053450a to 76fa636 Compare October 6, 2023 17:28
@walterddr walterddr changed the title [multistage] initial draft with a metadata block API [multistage] framework to back-propagate metadata across opChains Oct 6, 2023
@walterddr walterddr marked this pull request as ready for review October 6, 2023 17:45
@Jackie-Jiang Jackie-Jiang added enhancement multi-stage Related to the multi-stage query engine labels Oct 9, 2023
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic looks good

private static final ExpressionContext PLACEHOLDER_IDENTIFIER = ExpressionContext.forIdentifier("__PLACEHOLDER__");

private final MultiStageOperator _inputOperator;
private final MultiStageOperator _upstreamOperator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. In the mailbox, upstream is the next operator (e.g. receiver is upstream for sender), but here upstream is input operator. Suggest reverting this change because input is more clear

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes some are named upstream some are named input. will revert this change and make another PR to change the naming

TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
sendTransferableBlock(eosBlockWithStats);
isEarlyTerminated = sendTransferableBlock(eosBlockWithStats);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to set early terminate when it is already EOS

Comment on lines 168 to 169
// acquire extra metadata block
block = _upstreamOperator.nextBlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not very robust. Simply remove the break and let it follow the regular execution flow

private static final Set<String> RANKING_FUNCTION_NAMES = ImmutableSet.of("RANK", "DENSE_RANK");

private final MultiStageOperator _inputOperator;
private final MultiStageOperator _upstreamOperator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, suggest not changing it

/**
* API to send a block to the destination mailboxes.
* @param block the block to be transferred
* @return true if any of the upstream mailboxes requested EOS (e.g. early termination)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is incorrect. It returns true only if all receiving mailboxes are early terminated.

Side comment: Do you realize this upstream is the opposite of other upstream changes in the PR (in this place it refers to the next operator in the data flow, but in other places it refers to the previous operator in the data flow). Thus suggest not using upstream because it can be confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it is confusing, will create a separate PR for naming fix

boolean isEarlyTerminated = true;
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
if (!sendingMailbox.isTerminated()) {
if (!sendingMailbox.isEarlyTerminated()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do the block.isEndOfStreamBlock() check first, then check if all mailboxes are early terminated

@walterddr walterddr force-pushed the pr_stats_with_feedback branch from bd61449 to 48ed081 Compare October 10, 2023 23:13
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@Override
public void cancel(Throwable t) {
if (_isTerminated) {
if (_isEarlyTerminated || _isTerminated) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not change this. Cancel should be applied even if early terminate is called

// TODO: Revisit if this is the correct way to apply back pressure
private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>();
private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) This can be defined as a volatile boolean

private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;

private final AtomicInteger _bufferSize = new AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY);
private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) This can be defined as a volatile boolean

isEarlyTerminated = false;
} else {
sendTransferableBlock(block);
isEarlyTerminated = sendTransferableBlock(block);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified (remove isEarlyTerminated)

Suggested change
isEarlyTerminated = sendTransferableBlock(block);
if (sendTransferableBlock(block)) {
earlyTerminate();
}

earlyTerminateMailboxes();
}

protected void earlyTerminateMailboxes() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason why making this a separate method?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants