Skip to content

Conversation

@ankitsultana
Copy link
Contributor

Summary

Goal: Prototype the MSE Lite Mode to enable testing in our clusters so we can make informed decisions about its semantics and overall guarantees. (e.g. what is the most intuitive way to add limit to the leaf stage that is least confusing and maximally transparent to users)

Prototypes the Multistage Engine Lite Mode as described in #14640. The feature is hidden behind a flag and can only be enabled if one has also set the usePhysicalOptimizer=true query option.

This builds on top of the existing Optimizer changes and leverages many of the optimizations that are provided by that optimizer.

Specifics

Worker Assignment

Worker and Exchange are assigned using a new rule. This is because the generic Worker/Exchange assignment rule handles the general case, and for the Lite Mode we have some custom logic like picking a random server instance out of the ones assigned to the leaf stage.

The assignment is quite simple: the leaf stage assignment is done by the existing LeafStageWorkerAssignmentRule, and the new Lite Mode assignment rule simply samples a server instance from the leaf stage workers, and uses it for the all plan nodes except the leaf stage.

Integration with Sort/Aggregate Pushdown

Both of these rules are still run since there are scenarios where we would like to push down the Sort or Aggregate to the leaf. e.g. if I have a query like SELECT col1, COUNT(*) FROM tbl GROUP BY col1, then the aggregate should be pushed down to the leaf stage in most cases for obvious reasons.

Sort Insert Rule

If there doesn't exist a limit already in the leaf stage, we add it. Currently I am using a hardcoded value but in the future we'll make it configurable per se.

Semantics

None of the semantics in this PR are final and are subject to broader community review. Based on testing from this PR, I'll file a PEP to describe the semantics in detail.

Test Plan

Added Unit Tests. We are also testing this out in our clusters.

]
},
{
"description": "Auto elimination of partial aggregate when group-by on partitioning column. There's no sort because the limit is added to Agg.",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll raise a small change to update the explainTerms of PhysicalAggregate to match that of PinotLogicalAggregate. Didn't do it in this PR because it will change quite a lot of tests in this json file.

},
{
"description": "Query with single semi join and aggregation",
"sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN PLAN FOR SELECT COUNT(*), col2 FROM a WHERE col1 = 'foo' AND col2 IN (SELECT col1 FROM b) GROUP BY col2",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query demonstrates one challenge. Most of the users would expect that the Semi Join filter will be evaluated first and then the aggregation will take place. But the aggregation can only be done after the filter.

This can be fixed with semi-join dynamic filtering, but the same issue also holds true for anti semi-joins too.

One way to address this is to use the automated Sub-Plan fragmenter or dynamically switching to pipeline breaker like how the MSE does it.

I think before we think about optimizations, we'll iron out the semantics so they remain as intuitive as possible for the users.

@codecov-commenter
Copy link

codecov-commenter commented May 8, 2025

Codecov Report

Attention: Patch coverage is 81.01266% with 15 lines in your changes missing coverage. Please review.

Project coverage is 63.30%. Comparing base (1a476de) to head (159419f).
Report is 52 commits behind head on master.

Files with missing lines Patch % Lines
.../physical/v2/opt/rules/LiteModeSortInsertRule.java 65.21% 6 Missing and 2 partials ⚠️
...he/pinot/query/context/PhysicalPlannerContext.java 57.14% 2 Missing and 1 partial ⚠️
.../query/planner/physical/v2/nodes/PhysicalSort.java 0.00% 2 Missing ⚠️
...cal/v2/opt/rules/LiteModeWorkerAssignmentRule.java 94.59% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15743      +/-   ##
============================================
+ Coverage     62.90%   63.30%   +0.40%     
- Complexity     1386     1388       +2     
============================================
  Files          2867     2886      +19     
  Lines        163354   165064    +1710     
  Branches      24952    25217     +265     
============================================
+ Hits         102755   104491    +1736     
+ Misses        52847    52686     -161     
- Partials       7752     7887     +135     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.29% <81.01%> (+0.41%) ⬆️
java-21 63.26% <81.01%> (+0.44%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.30% <81.01%> (+0.40%) ⬆️
unittests 63.29% <81.01%> (+0.40%) ⬆️
unittests1 56.39% <81.01%> (+0.57%) ⬆️
unittests2 33.47% <0.00%> (-0.10%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ankitsultana ankitsultana marked this pull request as ready for review May 8, 2025 20:22
* plan nodes.
*/
public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer {
private static final Random RANDOM = new Random();
Copy link
Contributor

@wirybeaver wirybeaver May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might consider to abstract it as a functional interface. One selection strategy that can reduce the network cost is to select the server has most of segments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abstraction can definitely be considered as we evolve this. I think right now it's a bit too early. I am planning to add support for parallelism, server pruning, etc. soon so that will also increase the case handling of this Rule a bit.

Copy link
Contributor

@wirybeaver wirybeaver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm with nit comments

@ankitsultana ankitsultana requested a review from itschrispeck May 14, 2025 04:39
@itschrispeck itschrispeck merged commit fbfe10c into apache:master May 14, 2025
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

mse-lite-mode mse-physical-optimizer multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants