Skip to content

Conversation

@somandal
Copy link
Contributor

@somandal somandal commented Apr 6, 2023

This PR addresses the issue found in: #10555

Issue summary:
The logic added as part of PR #10408 added support to order as part of the MailboxReceiveOperator which essentially stored all the rows in a PriorityQueue instead of returning them right away. Due to this we got into a situation where the blocks were added to the PriorityQueue but we never re-polled immediately to check for EOS and instead returned a no-op block. No-op puts the opChain back into the scheduler wait queue. Since the SendMailbox had already sent out the EOS earlier there was no new blocks sent out. Due to this, since no new blocks were sent the _seenMail was never set and the operator had to be reawakened and rescheduled after the default timeout.

Fix:
This PR fixes the above by splitting the MailboxReceiveOperator into a MailboxReceiveOperator and a SortedMailboxReceiveOperator. The sorted version keeps polling for data from all the mailboxes until it gets only 'null' blocks or gets EOS from all mailboxes. The MailboxReceiveOperator does not perform any sorting and will behave the same way it did prior to the PR #10408.

Testing done:

  • Unit tests in multi-stage planning and runtime modules
  • MultiStageEngineIntegrationTest - to ensure that this fixes the increase in runtime from 30s to 500s (and fixes the regression in ORDER BY queries)
  • MultiStageEngineCustomTenantIntegrationTest

cc @walterddr @ankitsultana @siddharthteotia @vvivekiyer

@codecov-commenter
Copy link

codecov-commenter commented Apr 6, 2023

Codecov Report

Merging #10570 (97265bf) into master (e3d7433) will increase coverage by 0.10%.
The diff coverage is 92.64%.

@@             Coverage Diff              @@
##             master   #10570      +/-   ##
============================================
+ Coverage     70.29%   70.39%   +0.10%     
- Complexity     6462     6481      +19     
============================================
  Files          2103     2105       +2     
  Lines        112767   112820      +53     
  Branches      16979    16989      +10     
============================================
+ Hits          79265    79424     +159     
+ Misses        27947    27834     -113     
- Partials       5555     5562       +7     
Flag Coverage Δ
integration1 24.55% <0.00%> (+0.05%) ⬆️
integration2 24.04% <0.00%> (-0.07%) ⬇️
unittests1 67.86% <92.64%> (+0.02%) ⬆️
unittests2 13.89% <0.00%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...che/pinot/query/runtime/operator/SortOperator.java 93.84% <ø> (ø)
.../pinot/query/service/dispatch/QueryDispatcher.java 94.00% <ø> (+0.04%) ⬆️
...runtime/operator/SortedMailboxReceiveOperator.java 87.67% <87.67%> (ø)
...y/runtime/operator/BaseMailboxReceiveOperator.java 97.72% <97.72%> (ø)
...query/runtime/operator/MailboxReceiveOperator.java 92.50% <100.00%> (-1.14%) ⬇️
.../pinot/query/runtime/operator/utils/SortUtils.java 100.00% <100.00%> (ø)
.../pinot/query/runtime/plan/PhysicalPlanVisitor.java 97.77% <100.00%> (+0.27%) ⬆️

... and 30 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@somandal somandal changed the title Split MailboxReceiveOperator into sorted and non-sorted versions [multistage] Split MailboxReceiveOperator into sorted and non-sorted versions Apr 6, 2023
Copy link
Contributor

@ankitsultana ankitsultana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level looks good. Thanks for the quick turn-around!

if (((openMailboxCount == 0) || (openMailboxCount <= eosMailboxCount))
&& (!CollectionUtils.isEmpty(_priorityQueue)) && !_isSortedBlockConstructed) {
// Some data is present in the PriorityQueue, these need to be sent upstream
LinkedList<Object[]> rows = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use an ArrayList? I'd assume that would be faster than a LinkedList

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, had to make a change to modify the SortUtil::SortComparator to handle directions differently.

node.getDataSchema(), node.getSenderStageId(), node.getStageId());
context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox());
return mailboxReceiveOperator;
if (!CollectionUtils.isEmpty(node.getCollationKeys()) && node.isSortOnReceiver()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where node.isSortOnReceiver() is true but collation keys are empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collation keys should not be empty to use node.isSortOnReceiver(), but the opposite is possible. We can have collation keys as not empty, but the isSortOnReceiver is false. In this case I'll be creating a normal MailboxSendOperator instead of the sorted version. This is useful for scenarios where the data is partitioned and sorted on the sender in the future, in which case the receiver sort is a no-op.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense.

Should we change this condition to simply: node.isSortOnReceiver() and add a precondition under the if block? Might be helpful to catch bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait, actually today for ORDER BY queries I always set 'isSortOnReceiver' to true. I've modified that to explicitly set based on whether collation keys exist or not. let me know if this seems okay or not. cc @walterddr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not following the discussion on this one --> if node.isSortOnReceiver() == true, do we still generate the SortNode on top of the receive?

Copy link
Contributor Author

@somandal somandal Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now with my changes the following will hold:

  • node.isSortOnReceiver() == true: only set if actual sorting needs to be performed on the receiver
    • Window functions that need ORDER BY
    • ORDER BY queries that have at least one collation key
  • node.isSortOnReceiver() == false: set when sorting not required on receiver side.
    • Window functions without ORDER BY (collation key is empty)
    • Queries with no collation key but with fetch and/or limit (which results in using a SortNode with empty collation key)
    • Can be false if collation key is non-empty (we don't use this code path today) such as when in the future we have partitioned data and decide to sort on sender side only.

The SortNode will still be created where needed, just if it's child operator is sorted then we set that the input is already sorted, otherwise we set input is not sorted (just as before). This discussion is more around when to create the sorted mailbox receive vs. non-sorted one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now with my changes the following will hold:

  • node.isSortOnReceiver() == true: only set if actual sorting needs to be performed on the receiver

    • Window functions that need ORDER BY
    • ORDER BY queries that have at least one collation key
  • node.isSortOnReceiver() == false: set when sorting not required on receiver side.

    • Can be false if collation key is non-empty (we don't use this code path today) such as when in the future we have partitioned data and decide to sort on sender side only.

[RR] hmm. could you explain more as: under what circumstances will this be true? did you mean there's an "ORDER BY " clause in the SQL, or there is actually a collation key specified on the SortExchange RelNode after Calcite has finish optimizer phase?

The SortNode will still be created where needed, just if it's child operator is sorted then we set that the input is already sorted, otherwise we set input is not sorted (just as before). This discussion is more around when to create the sorted mailbox receive vs. non-sorted one

[RR] the above-described behavior is identical to prior to #10408 yes? just instead of doing the sorting in SortExchange, previously we will create a SortNode on top of the Exchange Node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[RR] hmm. could you explain more as: under what circumstances will this be true? did you mean there's an "ORDER BY " clause in the SQL, or there is actually a collation key specified on the SortExchange RelNode after Calcite has finish optimizer phase?

I meant if there is a collation key specified in the SortExchange RelNode after the Calcite optimizer phase. I was just giving an example of ORDER BY queries

[RR] the above-described behavior is identical to prior to #10408 yes? just instead of doing the sorting in SortExchange, previously we will create a SortNode on top of the Exchange Node?

yes the end behavior is the exact same In terms of creation of the SortNode. Earlier the plan indicated sortOnReceiver as true for all LogicalSortExchange created for SortNodes without checking if collation key is set, but we handled skipping sorting in the receive operator. Now we explicitly set that sortOnReceiver is false (see planner test changes). Does that help clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node creation is the exact same as before, it’s just about setting the isSortOnReceiver flag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline with @walterddr and we are in agreement about this change and discussion

@somandal somandal requested a review from ankitsultana April 6, 2023 23:02
"\nLogicalProject(EXPR$0=[dateTrunc('DAY', $4)])",
"\n LogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems correct in my book. we shouldn't run sort unless there's collation key

Copy link
Contributor Author

@somandal somandal Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's essentially why i decided to modify the sort exchange used for LogicalSort nodes to check collation keys to decide whether to sort or not. The mailbox receive operator was already handling this earlier (i.e. checked for both isSortedOnReceiver flag and collation key to perform sorting), whereas this changes makes this explicit on the planner side itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@walterddr walterddr merged commit 6283ee7 into apache:master Apr 7, 2023
@walterddr
Copy link
Contributor

LGTM.
However IIUC this only hides the issue of slow mailbox receiver (e.g. there still can be possibility that the seenMail signal got lost). did we capture this in any of the action items in #10424 ?

CC @ankitsultana please kindly share your feedback and we can follow up next

@somandal
Copy link
Contributor Author

somandal commented Apr 7, 2023

LGTM. However IIUC this only hides the issue of slow mailbox receiver (e.g. there still can be possibility that the seenMail signal got lost). did we capture this in any of the action items in #10424 ?

CC @ankitsultana please kindly share your feedback and we can follow up next

I see @ankitsultana opened this PR: #10564 to address one of the issues with the callback mechanisms. Will let @ankitsultana talk about any others that he alluded to in the issue: #10555

@somandal somandal deleted the fix-ordering-regression-mailboxreceiver branch April 7, 2023 18:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants