-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[multistage] Implement ordering for SortExchange #10408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #10408 +/- ##
=============================================
- Coverage 35.19% 24.33% -10.86%
+ Complexity 282 49 -233
=============================================
Files 2053 2029 -24
Lines 111397 110704 -693
Branches 16939 16863 -76
=============================================
- Hits 39203 26939 -12264
- Misses 68781 80946 +12165
+ Partials 3413 2819 -594
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 507 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a related PR by @KKcorps where we are switching to a operator context instead of individual args which may help simplify this. https://github.com/apache/pinot/pull/10413/files#diff-7ccb4a6b83288d4c1e1547c71a602c36dfbc90a8a283e9835c6d867cc2eeb2f3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 but i dont think this PR would be related since the refactoring only deal with visitor invariant, shared info across the stage such as partition, stageID, etc.
but i would anticipate a rebase issue. thank you for calling it out @ankitsultana
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 this will only help with future shared info. The fields I've added as part of this PR aren't shared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a TODO or Preconditions check to indicate that the sorting is not supported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
walterddr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the design approach look good to me overall. will take a look at the implementation details next. thank you for the contribution @somandal
93e6be4 to
c81e937
Compare
walterddr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me mostly.
i have some concerns regarding how the mailbox (1) handles received block; (2) what should be returned when normal block or error block is being received. please kindly take a look
pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
Outdated
Show resolved
Hide resolved
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
Outdated
Show resolved
Hide resolved
pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
Outdated
Show resolved
Hide resolved
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
Outdated
Show resolved
Hide resolved
...-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
Outdated
Show resolved
Hide resolved
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's an issue here with this approach. I am not sure if we should return a no-op block here or continue to grab more. is it possible to box this into a util. say.
| if (!block.isEndOfStreamBlock()) { | |
| return block; | |
| if (_priorityQueue != null) { | |
| List<Object[]> container = block.getContainer(); | |
| _priorityQueue.addAll(container); | |
| } else { | |
| return block; | |
| } | |
| if (!block.isEndOfStreamBlock()) { | |
| return processReceivedBlock(block); // box the logic in processReceivedBlock(block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the code here does return a no-op block (after exiting out of the loop, i.e processing all the mailboxes in the _sendingMailbox list). Are you recommending instead to return a no-op block for each mailbox? (at least it seems so from the recommended change)
I've created the util as recommended and it explicitly returns a no-op block after adding all the rows to the PriorityQueue if the data needs to be ordered for that given mailbox.
Any reason why we should not keep processing all mailboxes and instead return a no-op for each mailbox in the _sendingMailbox list as recommended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on this logic. as long as there's a _priorityQueue and size is not zero you should return the priority queue results.
when any mailbox receives an error. it should also dropped any _priorityQueue content and stopped the consumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I've modified the code to drop the contents of the _priorityQueue on receiving an error block. Keeping this block as is. Let me know if you intended for a different change here.
...query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
Outdated
Show resolved
Hide resolved
c81e937 to
598d199
Compare
Hey @walterddr can you check if I addressed your comments in this area correctly? We can discuss this some more if required. Basically made changes to:
|
Yeah when I thought about it more last night I think we shouldn't return a no-op block to every mailbox received.
for now the implementation looks good. i will create an issue to systematically address both points above |
walterddr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
…e as it will be sender and receiver aware Add support for ordering based on collation keys in the MailboxReceiveOperator when PinotLogicalSortExchange is used. MailboxSendOperator will be modified later to add sort support. Modify SortOperator to avoid sorting if the input is already sorted. It should still apply offset + limit
15272ec to
bfa4811
Compare
…versions (#10570) * Split MailboxReceiveOperator into sorted and non-sorted versions - sorted mailbox receiver will keep polling for data from all the mailboxes until it gets only 'null' blocks or gets EOS from all mailboxes. - non-sorted mailbox receiver will return immediately when data arrives (behavior identical to prior to #10408 * Fix tests - ORDER BY with only limit should set isSortOnReceiver to false - when there's fetch/offset in SortNode but no Collation, skip sorting thus speed up performance
This PR adds support for ordering on the collation keys when a
SortExchangeis present in the relational tree. To do this the following changes have been made:PinotLogicalSortExchangeto replace usage ofLogicalSortExchange. This is needed because laterLogicalSortExchangeis broken into aMailboxSendNodeandMailboxReceiveNode. Information on where to perform ordering (sender, receiver or both) needs to be passed down to these nodes and this is not possible via theLogicalSortExchangeclass alone.Exchangehintable, but based on the hint semantics it should only be applied at the table orSELECTlevel which doesn't apply toExchangenodes. The alternative was to extend theSortExchangewith our own implementation.MailboxReceiveOperatorwhenPinotLogicalSortExchangeis used.MailboxSendOperatorwill be modified later to add sort support. Once the sender can sort, the receiver needs to be modified to do a k-way merge instead ofPriorityQueuebased sort if the input is sorted.SortOperatorto avoid sorting if the input is already sorted. It should still apply offset + limitWe will also integrate with
SqlHintsupport to decide where to sort (sender, receiver, or both) once the path forward has been figured out on how to get hint support for some of the Apache Calcite nodes such as Window, Sort, Filter, etc.SqlHintsupport has not been added to this PR.This PR is a pre-requisite to adding support for Window Functions with
ORDER BY.LogicalSortExchange)OVER()andOVER(PARTITION BY): [multistage] Initial (phase 1) Query runtime for window functions - empty OVER() and OVER(PARTITION BY) #10286cc @walterddr @siddharthteotia @vvivekiyer @ankitsultana