-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[multistage] Enable runInBroker / useBrokerPruning by Default + Misc. Improvements #16204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| return Boolean.parseBoolean(option); | ||
| } | ||
| } | ||
| if (aggRel.getAggCallList().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for Reviewers: For queries such as WITH tmp AS (SELECT DISTINCT col1, col2 FROM tbl LIMIT 1000) SELECT ..., at present we were not trimming groups by default which would have meant that the group-by could have grown really huge.
For such queries, we can always leverage group-trimming, and this change enables that.
.../java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
Show resolved
Hide resolved
.../java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
Outdated
Show resolved
Hide resolved
| }, | ||
| { | ||
| "description": "Example of query that avoids exchanges for aggregates", | ||
| "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR with teamOne as (select col2, percentile(col3, 50) as sum_of_runs from a group by col2), teamTwo as (select col2, percentile(col3, 50) as sum_of_runs from a group by col2), all as (select col2, sum_of_runs from teamOne union all select col2, sum_of_runs from teamTwo) select col2, percentile(sum_of_runs, 50) from all group by col2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query taken from AggregatePlans.json:
pinot/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
Lines 138 to 159 in 472c53d
| "description": "Select aggregates with literals on top of a union", | |
| "sql": "EXPLAIN PLAN FOR with teamOne as (select /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ col2, percentile(col3, 50) as sum_of_runs from a group by col2), teamTwo as (select /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ col2, percentile(col3, 50) as sum_of_runs from a group by col2), all as (select col2, sum_of_runs from teamOne union all select col2, sum_of_runs from teamTwo) select /*+ aggOption(is_skip_leaf_stage_group_by='true') */ col2, percentile(sum_of_runs, 50) from all group by col2", | |
| "output": [ | |
| "Execution Plan", | |
| "\nPinotLogicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], aggType=[FINAL])", | |
| "\n PinotLogicalExchange(distribution=[hash[0]])", | |
| "\n PinotLogicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], aggType=[LEAF])", | |
| "\n LogicalUnion(all=[true])", | |
| "\n PinotLogicalExchange(distribution=[hash[0, 1, 2]])", | |
| "\n LogicalProject(col2=[$0], sum_of_runs=[$1], $f2=[50])", | |
| "\n PinotLogicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], aggType=[DIRECT])", | |
| "\n PinotLogicalExchange(distribution=[hash[0]])", | |
| "\n LogicalProject(col2=[$1], col3=[$2], $f2=[50])", | |
| "\n PinotLogicalTableScan(table=[[default, a]])", | |
| "\n PinotLogicalExchange(distribution=[hash[0, 1, 2]])", | |
| "\n LogicalProject(col2=[$0], sum_of_runs=[$1], $f2=[50])", | |
| "\n PinotLogicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], aggType=[DIRECT])", | |
| "\n PinotLogicalExchange(distribution=[hash[0]])", | |
| "\n LogicalProject(col2=[$1], col3=[$2], $f2=[50])", | |
| "\n PinotLogicalTableScan(table=[[default, a]])", | |
| "\n" | |
| ] |
The plan with v2 optimizer is better in the multiple ways:
- We automatically can detect that the group-by column,
col2, is the partitioning column. So the leaf stages execute the aggregate directly instead of splitting them. - We are automatically able to skip the exchange required for the final aggregation above the union.
In other words, what the existing optimizer is not able to achieve even with hints, we can achieve without any hints.
| "\n PhysicalSort(fetch=[100])", | ||
| "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL], limit=[100])", | ||
| "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1, 2]])", | ||
| "\n PhysicalAggregate(group=[{0, 1, 7}], aggType=[LEAF], limit=[100])", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shows effectiveness of auto-enabling group-trim for group-by queries with no agg calls.
| } | ||
| newFieldCollations.add(fieldCollation.withFieldIndex(newFieldIndices.get(0))); | ||
| } | ||
| return RelCollations.of(newFieldCollations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have a question. Is it intentionally to select the newFieldIndices.get(0)?
public static RelCollation apply(RelCollation relCollation, PinotDistMapping mapping) {
if (relCollation.getKeys().isEmpty()) {
return relCollation;
}
List<RelFieldCollation> newFieldCollations = new ArrayList<>();
for (RelFieldCollation fieldCollation : relCollation.getFieldCollations()) {
List<Integer> newFieldIndices = mapping.getTargets(fieldCollation.getFieldIndex());
if (CollectionUtils.isEmpty(newFieldIndices)) {
break;
}
newFieldCollations.add(fieldCollation.withFieldIndex(newFieldIndices.get(0)));
}
return RelCollations.of(newFieldCollations);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that was by design because I wanted to keep it simple at the time. For context, this can occur in scenarios such as follows (which are rare or unlikely):
Project(col1=$0, col2=$0)
Sort(collation=[order by $0 desc])
TableScan(col1)
In this case, the project could be said to be ordered by both col1 and col2, but for now I am only preserving one of the indices to keep things simple. But I think it should be okay to add all field indexes too. I can take that up in a follow-up.
pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
Show resolved
Hide resolved
wirybeaver
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #16204 +/- ##
============================================
+ Coverage 62.90% 63.22% +0.32%
+ Complexity 1386 1366 -20
============================================
Files 2867 2953 +86
Lines 163354 170388 +7034
Branches 24952 26068 +1116
============================================
+ Hits 102755 107728 +4973
- Misses 52847 54502 +1655
- Partials 7752 8158 +406
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary
runInBrokeranduseBrokerPruningby default. Both of these are only applicable whenusePhysicalOptimizeris set andrunInBrokeris only applicable whenuseLiteModeis set too.PinotDataDistribution.apply(PinotDistMapping). Currently we were always dropping collation, but now we take in a boolean to allow callers to choose whether they want to drop collation or not.Test Plan
Added more unit-tests. We also have E2E query tests that run 100s of queries and matches results between general V2 engine and the physical optimized queries.