Skip to content

Conversation

@Jackie-Jiang
Copy link
Contributor

Related to #14518

Currently colocated join requires tables on both side have same number of partitions. This PR enhances the worker assignment to allow colocated join with one side having more partitions than the other side, if the following condition is met:

  • Partition count for the larger side is multiple of smaller side, e.g. 16 vs 4
  • Partitions are actually assigned in a way that partitions are colocated, e.g. partition 0, 4, 8, 12 of the larger side are assigned to the same server hosting partition 0 of the smaller side

To enable this, explicitly set partition_size on the larger side to match the smaller side, e.g. /*+ tableOptions(partition_size='4') */

Added queries in ColocatedJoinQuickStart and verifies the results.

TODO: Currently the queries test setup doesn't align with the colocation requirement. Will add test separately

@Jackie-Jiang Jackie-Jiang requested review from gortiz and xiangfu0 May 10, 2025 01:45
@Jackie-Jiang Jackie-Jiang added enhancement documentation release-notes Referenced by PRs that need attention when compiling the next release notes multi-stage Related to the multi-stage query engine labels May 10, 2025
@Jackie-Jiang Jackie-Jiang force-pushed the mse_merge_partitions branch from 3de20c7 to f9b903e Compare May 12, 2025 18:13
@codecov-commenter
Copy link

codecov-commenter commented May 12, 2025

Codecov Report

Attention: Patch coverage is 70.00000% with 21 lines in your changes missing coverage. Please review.

Project coverage is 63.26%. Comparing base (1a476de) to head (8d046fc).
Report is 73 commits behind head on master.

Files with missing lines Patch % Lines
.../org/apache/pinot/query/routing/WorkerManager.java 70.00% 10 Missing and 11 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15764      +/-   ##
============================================
+ Coverage     62.90%   63.26%   +0.36%     
- Complexity     1386     1394       +8     
============================================
  Files          2867     2889      +22     
  Lines        163354   165310    +1956     
  Branches      24952    25274     +322     
============================================
+ Hits         102755   104587    +1832     
+ Misses        52847    52819      -28     
- Partials       7752     7904     +152     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.24% <70.00%> (+0.37%) ⬆️
java-21 63.23% <70.00%> (+0.41%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.26% <70.00%> (+0.36%) ⬆️
unittests 63.26% <70.00%> (+0.36%) ⬆️
unittests1 56.28% <70.00%> (+0.46%) ⬆️
unittests2 33.51% <0.00%> (-0.07%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.


private static final String PARTITION_PARALLELISM = "SET inferPartitionHint = true; "
+ "SELECT COUNT(*) "
+ "FROM userAttributes /*+ tableOptions(partition_parallelism='2') */ ua "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding support for the same feature this week and the PR should be out in 2-3 days.

Can you folks directly use the new optimizer instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change conflict with yours? If not, I'd suggest keeping them separate for now so that we can evaluate the new optimizer without blocking on this new feature.

It is good that both approaches come to the same feature request.

private static final String MULTIPLE_PARTITIONS_PER_WORKER_AND_PARALLELISM = "SET inferPartitionHint = true; "
+ "SELECT COUNT(*) "
+ "FROM userAttributes /*+ tableOptions(partition_size='2', partition_parallelism='2') */ ua "
+ "JOIN userGroups /*+ tableOptions(partition_size='2', partition_parallelism='2') */ ug "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related: how are partition_parallelism and stageParallelism different? Could we consolidate to a single config?

It was a big challenge to wrap my head around what these two are semantically supposed to do. These are quite hard to explain to users too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition_parallelism is the one specific to leaf stage partition based worker assignment, where each partition is split into multiple workers downstream; stageParallelism defines how many workers to schedule per server for the intermediate stage. Another difference is that stageParallelism is a query option, instead of a hint.

Comment on lines 615 to 619
if (offlineSegments == null) {
offlineSegments = new ArrayList<>(partitionInfo._offlineSegments);
} else {
offlineSegments.addAll(partitionInfo._offlineSegments);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This seems like over-engineering. We could allocate the list at the beginning. Remember that a new ArrayList() doesn't actually allocate any Object[] until the first element is added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here null and empty list are slightly different. Given I'm sharing a helper method getSegmentsMap() I need the list to be null when there is no segment

Comment on lines 594 to 600
// Round-robin partitions to workers, where each worker gets numPartitionsPerWorker partitions. This setup works
// only if all segments for these partitions are assigned to the same group of servers. This is useful when user
// wants to colocate tables with different partition count, but same partition function.
// E.g. when there are 16 partitions for table A and 4 partitions for table B, we may assign 16 partitions for
// table A to 4 workers, where partition 0, 4, 8, 12 goes to worker 0, partition 1, 5, 9, 13 goes to worker 1,
// etc.
for (int i = 0; i < numWorkers; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code of this method is getting very complex. Could we split it into some methods and/or reuse code between branches of numPartitionsPerWorker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract helper methods, but it is hard to reuse code across them

Comment on lines -31 to -35
public class ColocatedJoinEngineQuickStart extends MultistageEngineQuickStart {
private static final String QUICKSTART_IDENTIFIER = "COLOCATED_JOIN";
private static final String[] COLOCATED_JOIN_DIRECTORIES = new String[]{
"examples/batch/colocated/userAttributes", "examples/batch/colocated/userGroups"
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to remove this quickstart? It's actually the one I use the most. Not because of the configs, which are equal to the defaults, but because:

  • Tables are configured exactly as needed for colocation
  • There are few tables than in other quickstart and therefore it starts faster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just renamed to ColocatedJoinQuickStart with several new added queries

@Jackie-Jiang Jackie-Jiang force-pushed the mse_merge_partitions branch from f9b903e to 8d046fc Compare May 14, 2025 00:27
@ankitsultana ankitsultana merged commit 7063671 into apache:master May 14, 2025
17 checks passed
@Jackie-Jiang Jackie-Jiang deleted the mse_merge_partitions branch May 14, 2025 21:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation enhancement multi-stage Related to the multi-stage query engine release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants