-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[multistage] Multistage Engine Lite Mode (prototype) #15743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| ] | ||
| }, | ||
| { | ||
| "description": "Auto elimination of partial aggregate when group-by on partitioning column. There's no sort because the limit is added to Agg.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll raise a small change to update the explainTerms of PhysicalAggregate to match that of PinotLogicalAggregate. Didn't do it in this PR because it will change quite a lot of tests in this json file.
| }, | ||
| { | ||
| "description": "Query with single semi join and aggregation", | ||
| "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN PLAN FOR SELECT COUNT(*), col2 FROM a WHERE col1 = 'foo' AND col2 IN (SELECT col1 FROM b) GROUP BY col2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query demonstrates one challenge. Most of the users would expect that the Semi Join filter will be evaluated first and then the aggregation will take place. But the aggregation can only be done after the filter.
This can be fixed with semi-join dynamic filtering, but the same issue also holds true for anti semi-joins too.
One way to address this is to use the automated Sub-Plan fragmenter or dynamically switching to pipeline breaker like how the MSE does it.
I think before we think about optimizations, we'll iron out the semantics so they remain as intuitive as possible for the users.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #15743 +/- ##
============================================
+ Coverage 62.90% 63.30% +0.40%
- Complexity 1386 1388 +2
============================================
Files 2867 2886 +19
Lines 163354 165064 +1710
Branches 24952 25217 +265
============================================
+ Hits 102755 104491 +1736
+ Misses 52847 52686 -161
- Partials 7752 7887 +135
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| * plan nodes. | ||
| */ | ||
| public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer { | ||
| private static final Random RANDOM = new Random(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might consider to abstract it as a functional interface. One selection strategy that can reduce the network cost is to select the server has most of segments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Abstraction can definitely be considered as we evolve this. I think right now it's a bit too early. I am planning to add support for parallelism, server pruning, etc. soon so that will also increase the case handling of this Rule a bit.
.../java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
Outdated
Show resolved
Hide resolved
wirybeaver
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm with nit comments
Summary
Goal: Prototype the MSE Lite Mode to enable testing in our clusters so we can make informed decisions about its semantics and overall guarantees. (e.g. what is the most intuitive way to add limit to the leaf stage that is least confusing and maximally transparent to users)
Prototypes the Multistage Engine Lite Mode as described in #14640. The feature is hidden behind a flag and can only be enabled if one has also set the
usePhysicalOptimizer=truequery option.This builds on top of the existing Optimizer changes and leverages many of the optimizations that are provided by that optimizer.
Specifics
Worker Assignment
Worker and Exchange are assigned using a new rule. This is because the generic Worker/Exchange assignment rule handles the general case, and for the Lite Mode we have some custom logic like picking a random server instance out of the ones assigned to the leaf stage.
The assignment is quite simple: the leaf stage assignment is done by the existing
LeafStageWorkerAssignmentRule, and the new Lite Mode assignment rule simply samples a server instance from the leaf stage workers, and uses it for the all plan nodes except the leaf stage.Integration with Sort/Aggregate Pushdown
Both of these rules are still run since there are scenarios where we would like to push down the Sort or Aggregate to the leaf. e.g. if I have a query like
SELECT col1, COUNT(*) FROM tbl GROUP BY col1, then the aggregate should be pushed down to the leaf stage in most cases for obvious reasons.Sort Insert Rule
If there doesn't exist a limit already in the leaf stage, we add it. Currently I am using a hardcoded value but in the future we'll make it configurable per se.
Semantics
None of the semantics in this PR are final and are subject to broader community review. Based on testing from this PR, I'll file a PEP to describe the semantics in detail.
Test Plan
Added Unit Tests. We are also testing this out in our clusters.