-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Support local replicated join and local exchange parallelism #14893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
845dfe8 to
4a26c4b
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14893 +/- ##
============================================
+ Coverage 61.75% 63.70% +1.95%
- Complexity 207 1470 +1263
============================================
Files 2436 2713 +277
Lines 133233 151804 +18571
Branches 20636 23440 +2804
============================================
+ Hits 82274 96712 +14438
- Misses 44911 47826 +2915
- Partials 6048 7266 +1218
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
4a26c4b to
ac02e60
Compare
|
|
||
| List<String> getSegments(BrokerRequest brokerRequest) { | ||
| Set<String> selectedSegments = _segmentSelector.select(brokerRequest); | ||
| if (!selectedSegments.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isn't this if a bit redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to short circuit it. This is the same as calculateRouting()
| RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId); | ||
|
|
||
| /** | ||
| * Returns the segments that are relevant for the given broker request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's specify here what null means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
gortiz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would need more time to review the code and, ideally, some explanation of the decisions you made here. The changes look to me more complex than I would have expected. We are deviating from the standard Calcite semantics here (i.e., with singleton + parallelism), and I'm not sure why we need to do that.
What I would expect in this situation is that the join node uses the broadcast distribution for its right side (meaning that each incarnation of the join will see all the data). The main difference with the regular broadcast is that instead of picking one server per segment and broadcasting from them, we pick all servers that will execute the left side and read from them, sending the information to its own node.
|
|
||
| private PinotLogicalExchange(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RelDistribution distribution, | ||
| PinotRelExchangeType exchangeType) { | ||
| PinotRelExchangeType exchangeType, List<Integer> keys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which cases the keys will be different than distribution.getKeys? What is in fact the meaning of having keys = {X, Y, Z} and a distribution like random that doesn't support keys? Wouldn't be better to change the distribution value depending on the keys? If we want to use a distribution + keys that is not permitted by Calcite we can create our own implementation of RelDistribution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me put more comments explaining this. We use SINGLETON to represent local exchange, but we also want to support parallelism for local exchange where keys are needed. We can revisit this as we add more custom distribution types
| // NOTE: We use SINGLETON to represent local distribution. Add keys to the exchange because we might want to | ||
| // switch it to HASH distribution to increase parallelism. See MailboxAssignmentVisitor for details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading RelDistribution.Type, shouldn't this be broadcast? The definition of broadcast is:
There are multiple instances of the stream, and all records appear in each instance
While the definition of singleton is:
There is only one instance of the stream. It sees all records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I don't get why we set DistributionType as SINGLETON in cases where we want to use HASH.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not broadcast because we don't want to send data to other servers. This is not strictly SINGLETON if we want to add parallelism to local exchange (split one block into multiple and spread them into multiple operators). If there is no extra parallelism (1-to-1 distribution), then it is SINGLETON.
| if (node instanceof MailboxSendNode) { | ||
| MailboxSendNode sendNode = (MailboxSendNode) node; | ||
| int senderStageId = sendNode.getStageId(); | ||
| Integer senderStageId = sendNode.getStageId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Integer? BaseNode.getStageId() is always a int, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, but using Integer can avoid a lot of boxing. I changed this to Integer to align with receiverStageId
ac02e60 to
96ec407
Compare
96ec407 to
f5095e3
Compare
Broadcast is supported in #14797, but there could still be data shuffling. |
f5095e3 to
57c1b80
Compare
|
I'll merge it for now since it contains some important performance optimizations. We can revisit since there is no backward compatible issue |
Related to #14518
Added a new table hint:
is_replicated(boolean)Support local replicated join by configuring both side as local distribution, and also hint right table as replicated:
Also support parallelism for local exchange to increase the parallelism for intermediate stage with table hint
partition_parallelism.