-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[multistage] Split MailboxReceiveOperator into sorted and non-sorted versions #10570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[multistage] Split MailboxReceiveOperator into sorted and non-sorted versions #10570
Conversation
Codecov Report
@@ Coverage Diff @@
## master #10570 +/- ##
============================================
+ Coverage 70.29% 70.39% +0.10%
- Complexity 6462 6481 +19
============================================
Files 2103 2105 +2
Lines 112767 112820 +53
Branches 16979 16989 +10
============================================
+ Hits 79265 79424 +159
+ Misses 27947 27834 -113
- Partials 5555 5562 +7
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 30 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
ankitsultana
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level looks good. Thanks for the quick turn-around!
...time/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
Outdated
Show resolved
Hide resolved
| if (((openMailboxCount == 0) || (openMailboxCount <= eosMailboxCount)) | ||
| && (!CollectionUtils.isEmpty(_priorityQueue)) && !_isSortedBlockConstructed) { | ||
| // Some data is present in the PriorityQueue, these need to be sent upstream | ||
| LinkedList<Object[]> rows = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use an ArrayList? I'd assume that would be faster than a LinkedList
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, had to make a change to modify the SortUtil::SortComparator to handle directions differently.
| node.getDataSchema(), node.getSenderStageId(), node.getStageId()); | ||
| context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox()); | ||
| return mailboxReceiveOperator; | ||
| if (!CollectionUtils.isEmpty(node.getCollationKeys()) && node.isSortOnReceiver()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a case where node.isSortOnReceiver() is true but collation keys are empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collation keys should not be empty to use node.isSortOnReceiver(), but the opposite is possible. We can have collation keys as not empty, but the isSortOnReceiver is false. In this case I'll be creating a normal MailboxSendOperator instead of the sorted version. This is useful for scenarios where the data is partitioned and sorted on the sender in the future, in which case the receiver sort is a no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah makes sense.
Should we change this condition to simply: node.isSortOnReceiver() and add a precondition under the if block? Might be helpful to catch bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wait, actually today for ORDER BY queries I always set 'isSortOnReceiver' to true. I've modified that to explicitly set based on whether collation keys exist or not. let me know if this seems okay or not. cc @walterddr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not following the discussion on this one --> if node.isSortOnReceiver() == true, do we still generate the SortNode on top of the receive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now with my changes the following will hold:
node.isSortOnReceiver() == true: only set if actual sorting needs to be performed on the receiver- Window functions that need ORDER BY
- ORDER BY queries that have at least one collation key
node.isSortOnReceiver() == false: set when sorting not required on receiver side.- Window functions without ORDER BY (collation key is empty)
- Queries with no collation key but with fetch and/or limit (which results in using a
SortNodewith empty collation key) - Can be
falseif collation key is non-empty (we don't use this code path today) such as when in the future we have partitioned data and decide to sort on sender side only.
The SortNode will still be created where needed, just if it's child operator is sorted then we set that the input is already sorted, otherwise we set input is not sorted (just as before). This discussion is more around when to create the sorted mailbox receive vs. non-sorted one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now with my changes the following will hold:
node.isSortOnReceiver() == true: only set if actual sorting needs to be performed on the receiver
- Window functions that need ORDER BY
- ORDER BY queries that have at least one collation key
node.isSortOnReceiver() == false: set when sorting not required on receiver side.
- Can be
falseif collation key is non-empty (we don't use this code path today) such as when in the future we have partitioned data and decide to sort on sender side only.
[RR] hmm. could you explain more as: under what circumstances will this be true? did you mean there's an "ORDER BY " clause in the SQL, or there is actually a collation key specified on the SortExchange RelNode after Calcite has finish optimizer phase?
The
SortNodewill still be created where needed, just if it's child operator is sorted then we set that the input is already sorted, otherwise we set input is not sorted (just as before). This discussion is more around when to create the sorted mailbox receive vs. non-sorted one
[RR] the above-described behavior is identical to prior to #10408 yes? just instead of doing the sorting in SortExchange, previously we will create a SortNode on top of the Exchange Node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[RR] hmm. could you explain more as: under what circumstances will this be true? did you mean there's an "ORDER BY " clause in the SQL, or there is actually a collation key specified on the SortExchange RelNode after Calcite has finish optimizer phase?
I meant if there is a collation key specified in the SortExchange RelNode after the Calcite optimizer phase. I was just giving an example of ORDER BY queries
[RR] the above-described behavior is identical to prior to #10408 yes? just instead of doing the sorting in SortExchange, previously we will create a SortNode on top of the Exchange Node?
yes the end behavior is the exact same In terms of creation of the SortNode. Earlier the plan indicated sortOnReceiver as true for all LogicalSortExchange created for SortNodes without checking if collation key is set, but we handled skipping sorting in the receive operator. Now we explicitly set that sortOnReceiver is false (see planner test changes). Does that help clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The node creation is the exact same as before, it’s just about setting the isSortOnReceiver flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed this offline with @walterddr and we are in agreement about this change and discussion
...time/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
Outdated
Show resolved
Hide resolved
| "\nLogicalProject(EXPR$0=[dateTrunc('DAY', $4)])", | ||
| "\n LogicalSort(offset=[0], fetch=[10])", | ||
| "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])", | ||
| "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems correct in my book. we shouldn't run sort unless there's collation key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's essentially why i decided to modify the sort exchange used for LogicalSort nodes to check collation keys to decide whether to sort or not. The mailbox receive operator was already handling this earlier (i.e. checked for both isSortedOnReceiver flag and collation key to perform sorting), whereas this changes makes this explicit on the planner side itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
LGTM. CC @ankitsultana please kindly share your feedback and we can follow up next |
I see @ankitsultana opened this PR: #10564 to address one of the issues with the callback mechanisms. Will let @ankitsultana talk about any others that he alluded to in the issue: #10555 |
This PR addresses the issue found in: #10555
Issue summary:
The logic added as part of PR #10408 added support to order as part of the
MailboxReceiveOperatorwhich essentially stored all the rows in aPriorityQueueinstead of returning them right away. Due to this we got into a situation where the blocks were added to thePriorityQueuebut we never re-polled immediately to check forEOSand instead returned a no-op block. No-op puts the opChain back into the scheduler wait queue. Since the SendMailbox had already sent out theEOSearlier there was no new blocks sent out. Due to this, since no new blocks were sent the_seenMailwas never set and the operator had to be reawakened and rescheduled after the default timeout.Fix:
This PR fixes the above by splitting the MailboxReceiveOperator into a
MailboxReceiveOperatorand aSortedMailboxReceiveOperator. The sorted version keeps polling for data from all the mailboxes until it gets only 'null' blocks or gets EOS from all mailboxes. TheMailboxReceiveOperatordoes not perform any sorting and will behave the same way it did prior to the PR #10408.Testing done:
cc @walterddr @ankitsultana @siddharthteotia @vvivekiyer