Skip to content

Conversation

@ankitsultana
Copy link
Contributor

@ankitsultana ankitsultana commented May 13, 2025

Summary

The Leaf stage worker assignment rule was iterating on PartitionInfo._segments list. But that list is not complete, and SegmentPartitionMetadataManager was skipping the new segments from TablePartitionInfo.

This PR fixes that by:

  1. Renaming the existing TablePartitionInfo to TablePartitionReplicatedServersInfo (TPRSI), so it's clear that the TPRSI POJO also tracks segment replication (i.e. segment/instance assignment)
  2. Removing the usage of TablePartitionReplicatedServersInfo from LeafStageWorkerAssignmentRule and only relying on the new TablePartitionInfo (TPI).

Big Picture

The bigger picture here is that the v2 query optimizer relies on TablePartitionInfo solely for quickly accessing segments belonging to a given partition. Prior to this PR, TablePartitionInfo used by the leaf stage worker assignment was also automatically skipping new segments from being tracked in the TPI.

This was bad design, because the v2 query optimizer delegates the responsibility of segment selection and assignment to the Routing Manager.

Test Plan

Added Unit Tests. We also have existing unit-tests and E2E plan tests that verify plan generation.

@ankitsultana ankitsultana added multi-stage Related to the multi-stage query engine mse-physical-optimizer labels May 13, 2025
List<String> selectedSegments = new ArrayList<>();
if (info != null) {
List<String> segmentsForPartition = tablePartitionInfo.getSegmentsInPartition(partitionNum);
if (!segmentsForPartition.isEmpty()) {
Copy link
Contributor Author

@ankitsultana ankitsultana May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Though this check is not required, I think it makes the code slightly easier to read

public TablePartitionInfo(String tableNameWithType, String partitionColumn, String partitionFunctionName,
int numPartitions, PartitionInfo[] partitionInfoMap, List<String> segmentsWithInvalidPartition) {
int numPartitions, PartitionInfo[] partitionInfoMap, List<String> segmentsWithInvalidPartition,
Map<Integer, List<String>> excludedNewSegments) {
Copy link
Contributor

@wirybeaver wirybeaver May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mark the existing constructor with @VisibleForTesting and create a new constructor to pass in the excludedNewSegments. The modification of existing unit test of TablePartitionInfo don't need to be modified in this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo having multiple ctors is more error prone in the long run. It's easy to call ctors with lower number of args and developers then don't think about everything that's required to create an accurate version of an object.

@wirybeaver
Copy link
Contributor

wirybeaver commented May 13, 2025

LGTM. My hunch is that the broker routing manager can insert the partition number into segment when the helix thread invoke the update method. It will simplify the code of workerAssignment a lot and reduce the overhead in the query path.

Specifically, the core part of workerAssignment is to ensure the partition is not assigned to multiple servers. As we have Map<ServerInstance, List<SegmentWrapper>> where the SegmentWrapper containing the partition number, we only need to walk through the routing table once to validate there's not intersection of partitions between servers.

Copy link
Contributor

@wirybeaver wirybeaver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a global unit test failure on the pinot-core module

Copy link
Collaborator

@shauryachats shauryachats left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you need to compute the partition info without maintaining fully replicated servers? If so, I'd suggest adding a new type of TablePartitionInfo and directly calculate the info needed for the new physical planner, e.g. all segments within the partition, segments with invalid partition etc.
This way, we don't need to pay overhead to maintain info not needed for both approaches.

@ankitsultana
Copy link
Contributor Author

Seems like you need to compute the partition info without maintaining fully replicated servers? If so, I'd suggest adding a new type of TablePartitionInfo and directly calculate the info needed for the new physical planner, e.g. all segments within the partition, segments with invalid partition etc. This way, we don't need to pay overhead to maintain info not needed for both approaches.

I don't follow, which overhead is concerning here? Note that this change is simply tracking the excludedNewSegments in TPI. These were created temporarily anyways, but all I am doing is passing them to TablePartitionInfo, since otherwise SegmentPartitionMetadataManager was silently skipping some segments from TablePartitionInfo.

From a semantics point of view I think TablePartitionInfo should track all segments that were processed by the SegmentPartitionMetadataManager.

@Jackie-Jiang
Copy link
Contributor

What I meant is that the new LeafStageWorkerAssignmentRule and WorkerManager requires different info from partitioning:

  • LeafStageWorkerAssignmentRule needs to find all segments for each partition, and track segments without any partition
  • WorkerManager needs to find segments for each partition with fully replicated servers

Because they need different info, I'd suggest adding a new class similar to TablePartitionInfo (you may also rename the existing one to something like TablePartitionInfoWithReplicatedServers), then make a new method to retrieve that info. They are for different purpose, so better separate them.

@ankitsultana
Copy link
Contributor Author

Isn't that an overkill?

Requirements of LeafStageWorkerAssignmentRule and WorkerManager are different, but from an abstraction point of view we can say that SegmentPartitionMetadataManager manages TablePartitionInfo which:

  • Provides an easy to use API to work with segment partitions.
  • Is a complete view, in that all processed segments will be part of the TablePartitionInfo POJO.

Maybe I am missing something, is there a deeper reason to create another POJO here?

Maintaining two separate POJOs with mostly the same information will be an anti-pattern in my view.

Copy link
Contributor

@itschrispeck itschrispeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good from my side

@Jackie-Jiang
Copy link
Contributor

Isn't that an overkill?

Requirements of LeafStageWorkerAssignmentRule and WorkerManager are different, but from an abstraction point of view we can say that SegmentPartitionMetadataManager manages TablePartitionInfo which:

  • Provides an easy to use API to work with segment partitions.
  • Is a complete view, in that all processed segments will be part of the TablePartitionInfo POJO.

Maybe I am missing something, is there a deeper reason to create another POJO here?

Maintaining two separate POJOs with mostly the same information will be an anti-pattern in my view.

The main difference between them is as following:

  • WorkerManager is trying to find segments of the same partition served by the same server
  • LeafStageWorkerAssignmentRule only needs to find segments of the same partition

The reason why WorkerManager is treating new added segments differently is because they might not be available on the same server and we don't want to fail the query, thus it excludes them. What you are doing within this PR is to add the excluded ones back.

Because the info needed for them are actually different, even though they share common properties (e.g. which partition a segment belongs to), it will be easier to maintain if keep the logic separate. With current approach, there are overhead for both use cases:

  • When called from WorkerManager, storing the excluded segments is overhead
  • When called from SegmentPartitionMetadataManager, maintaining the fully replicated servers is overhead

@ankitsultana
Copy link
Contributor Author

On second thought, the current changes in this PR are not semantically correct. Like Jackie mentioned, I am only interested in getting the segments corresponding to a given partition from Segment Partition Metadata Manager. The segment selection, and exclusion of new segments, is delegated to Routing Manager, as it ideally should be.

Given that, me adding excludedNewSegments to the existing POJO isn't ideal since that will allow the exclusion logic to be owned by both SegmentPartitionMetadataManager and Routing Manager.

I'll update this soon.



/**
* Tracks segments by partition for a table. Also tracks the invalid partition segments.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: moved existing TablePartitionInfo to TablePartitionReplicatedServersInfo. This one only tracks segments by partition and the invalid partition segments.


private void computeTablePartitionInfo() {
@Override
public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: GH is not great at showing this but I have simply moved the compute table partition info methods towards the end. however reviewers can double check this claim.

@codecov-commenter
Copy link

codecov-commenter commented May 29, 2025

Codecov Report

Attention: Patch coverage is 72.11538% with 29 lines in your changes missing coverage. Please review.

Project coverage is 63.33%. Comparing base (1a476de) to head (baf8f62).
Report is 161 commits behind head on master.

Files with missing lines Patch % Lines
...mentpartition/SegmentPartitionMetadataManager.java 74.13% 11 Missing and 4 partials ⚠️
.../org/apache/pinot/query/routing/WorkerManager.java 52.94% 7 Missing and 1 partial ⚠️
...che/pinot/broker/routing/BrokerRoutingManager.java 0.00% 5 Missing ⚠️
...e/routing/TablePartitionReplicatedServersInfo.java 94.44% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15780      +/-   ##
============================================
+ Coverage     62.90%   63.33%   +0.43%     
+ Complexity     1386     1354      -32     
============================================
  Files          2867     2898      +31     
  Lines        163354   166410    +3056     
  Branches      24952    25453     +501     
============================================
+ Hits         102755   105398    +2643     
- Misses        52847    53042     +195     
- Partials       7752     7970     +218     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.30% <72.11%> (+0.43%) ⬆️
java-21 63.29% <72.11%> (+0.47%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.33% <72.11%> (+0.43%) ⬆️
unittests 63.33% <72.11%> (+0.43%) ⬆️
unittests1 56.45% <78.04%> (+0.63%) ⬆️
unittests2 33.34% <56.73%> (-0.23%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

_segmentInfoMap.put(segment, segmentInfo);
}
computeTablePartitionInfo();
computeAllTablePartitionInfo();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding config to turn each of them on/off to reduce overhead. Can be done as a follow up

@ankitsultana ankitsultana merged commit de04b5d into apache:master May 29, 2025
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

mse-physical-optimizer multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants