-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Allow colocated join with tables of different partitions #15764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow colocated join with tables of different partitions #15764
Conversation
3de20c7 to
f9b903e
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #15764 +/- ##
============================================
+ Coverage 62.90% 63.26% +0.36%
- Complexity 1386 1394 +8
============================================
Files 2867 2889 +22
Lines 163354 165310 +1956
Branches 24952 25274 +322
============================================
+ Hits 102755 104587 +1832
+ Misses 52847 52819 -28
- Partials 7752 7904 +152
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
|
||
| private static final String PARTITION_PARALLELISM = "SET inferPartitionHint = true; " | ||
| + "SELECT COUNT(*) " | ||
| + "FROM userAttributes /*+ tableOptions(partition_parallelism='2') */ ua " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am adding support for the same feature this week and the PR should be out in 2-3 days.
Can you folks directly use the new optimizer instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change conflict with yours? If not, I'd suggest keeping them separate for now so that we can evaluate the new optimizer without blocking on this new feature.
It is good that both approaches come to the same feature request.
| private static final String MULTIPLE_PARTITIONS_PER_WORKER_AND_PARALLELISM = "SET inferPartitionHint = true; " | ||
| + "SELECT COUNT(*) " | ||
| + "FROM userAttributes /*+ tableOptions(partition_size='2', partition_parallelism='2') */ ua " | ||
| + "JOIN userGroups /*+ tableOptions(partition_size='2', partition_parallelism='2') */ ug " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
related: how are partition_parallelism and stageParallelism different? Could we consolidate to a single config?
It was a big challenge to wrap my head around what these two are semantically supposed to do. These are quite hard to explain to users too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partition_parallelism is the one specific to leaf stage partition based worker assignment, where each partition is split into multiple workers downstream; stageParallelism defines how many workers to schedule per server for the intermediate stage. Another difference is that stageParallelism is a query option, instead of a hint.
| if (offlineSegments == null) { | ||
| offlineSegments = new ArrayList<>(partitionInfo._offlineSegments); | ||
| } else { | ||
| offlineSegments.addAll(partitionInfo._offlineSegments); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This seems like over-engineering. We could allocate the list at the beginning. Remember that a new ArrayList() doesn't actually allocate any Object[] until the first element is added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here null and empty list are slightly different. Given I'm sharing a helper method getSegmentsMap() I need the list to be null when there is no segment
| // Round-robin partitions to workers, where each worker gets numPartitionsPerWorker partitions. This setup works | ||
| // only if all segments for these partitions are assigned to the same group of servers. This is useful when user | ||
| // wants to colocate tables with different partition count, but same partition function. | ||
| // E.g. when there are 16 partitions for table A and 4 partitions for table B, we may assign 16 partitions for | ||
| // table A to 4 workers, where partition 0, 4, 8, 12 goes to worker 0, partition 1, 5, 9, 13 goes to worker 1, | ||
| // etc. | ||
| for (int i = 0; i < numWorkers; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code of this method is getting very complex. Could we split it into some methods and/or reuse code between branches of numPartitionsPerWorker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract helper methods, but it is hard to reuse code across them
| public class ColocatedJoinEngineQuickStart extends MultistageEngineQuickStart { | ||
| private static final String QUICKSTART_IDENTIFIER = "COLOCATED_JOIN"; | ||
| private static final String[] COLOCATED_JOIN_DIRECTORIES = new String[]{ | ||
| "examples/batch/colocated/userAttributes", "examples/batch/colocated/userGroups" | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to remove this quickstart? It's actually the one I use the most. Not because of the configs, which are equal to the defaults, but because:
- Tables are configured exactly as needed for colocation
- There are few tables than in other quickstart and therefore it starts faster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just renamed to ColocatedJoinQuickStart with several new added queries
f9b903e to
8d046fc
Compare
Related to #14518
Currently colocated join requires tables on both side have same number of partitions. This PR enhances the worker assignment to allow colocated join with one side having more partitions than the other side, if the following condition is met:
To enable this, explicitly set
partition_sizeon the larger side to match the smaller side, e.g./*+ tableOptions(partition_size='4') */Added queries in
ColocatedJoinQuickStartand verifies the results.TODO: Currently the queries test setup doesn't align with the colocation requirement. Will add test separately