Skip to content

Conversation

@ankitsultana
Copy link
Contributor

@ankitsultana ankitsultana commented Jun 25, 2025

Summary

  • Enables group-by trim by default for group-by without any aggregation functions, since it always makes sense to trim groups in that case.
  • Enables runInBroker and useBrokerPruning by default. Both of these are only applicable when usePhysicalOptimizer is set and runInBroker is only applicable when useLiteMode is set too.
  • Fixes Sort collation info propagation. When a Sort exists, we have the opportunity to let the PDD know that the output will be sorted. This wasn't being leveraged before which was causing unnecessary Sort and Exchange to be added for some queries.
  • Also fixed PinotDataDistribution.apply(PinotDistMapping). Currently we were always dropping collation, but now we take in a boolean to allow callers to choose whether they want to drop collation or not.
  • Added tests for group-by hint options.
  • Fixed bug in setting group-trim limit in lite mode. I was always choosing the server stage limit before and had left a TODO on it. Cleared the TODO and have resorted to the group-trim configured in the aggregate.

Test Plan

Added more unit-tests. We also have E2E query tests that run 100s of queries and matches results between general V2 engine and the physical optimized queries.

@ankitsultana ankitsultana added multi-stage Related to the multi-stage query engine mse-physical-optimizer mse-lite-mode labels Jun 25, 2025
@ankitsultana ankitsultana changed the title [multistage] Enable runInBroker / useBrokerPruning by Default and Many Other Improvements [multistage] Enable runInBroker / useBrokerPruning by Default + Misc. Improvements Jun 25, 2025
return Boolean.parseBoolean(option);
}
}
if (aggRel.getAggCallList().isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for Reviewers: For queries such as WITH tmp AS (SELECT DISTINCT col1, col2 FROM tbl LIMIT 1000) SELECT ..., at present we were not trimming groups by default which would have meant that the group-by could have grown really huge.

For such queries, we can always leverage group-trimming, and this change enables that.

},
{
"description": "Example of query that avoids exchanges for aggregates",
"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR with teamOne as (select col2, percentile(col3, 50) as sum_of_runs from a group by col2), teamTwo as (select col2, percentile(col3, 50) as sum_of_runs from a group by col2), all as (select col2, sum_of_runs from teamOne union all select col2, sum_of_runs from teamTwo) select col2, percentile(sum_of_runs, 50) from all group by col2",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query taken from AggregatePlans.json:

"description": "Select aggregates with literals on top of a union",
"sql": "EXPLAIN PLAN FOR with teamOne as (select /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ col2, percentile(col3, 50) as sum_of_runs from a group by col2), teamTwo as (select /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ col2, percentile(col3, 50) as sum_of_runs from a group by col2), all as (select col2, sum_of_runs from teamOne union all select col2, sum_of_runs from teamTwo) select /*+ aggOption(is_skip_leaf_stage_group_by='true') */ col2, percentile(sum_of_runs, 50) from all group by col2",
"output": [
"Execution Plan",
"\nPinotLogicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], aggType=[LEAF])",
"\n LogicalUnion(all=[true])",
"\n PinotLogicalExchange(distribution=[hash[0, 1, 2]])",
"\n LogicalProject(col2=[$0], sum_of_runs=[$1], $f2=[50])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], aggType=[DIRECT])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2], $f2=[50])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[hash[0, 1, 2]])",
"\n LogicalProject(col2=[$0], sum_of_runs=[$1], $f2=[50])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], aggType=[DIRECT])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2], $f2=[50])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]

The plan with v2 optimizer is better in the multiple ways:

  1. We automatically can detect that the group-by column, col2, is the partitioning column. So the leaf stages execute the aggregate directly instead of splitting them.
  2. We are automatically able to skip the exchange required for the final aggregation above the union.

In other words, what the existing optimizer is not able to achieve even with hints, we can achieve without any hints.

"\n PhysicalSort(fetch=[100])",
"\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL], limit=[100])",
"\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1, 2]])",
"\n PhysicalAggregate(group=[{0, 1, 7}], aggType=[LEAF], limit=[100])",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shows effectiveness of auto-enabling group-trim for group-by queries with no agg calls.

@ankitsultana ankitsultana marked this pull request as ready for review June 25, 2025 23:56
}
newFieldCollations.add(fieldCollation.withFieldIndex(newFieldIndices.get(0)));
}
return RelCollations.of(newFieldCollations);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a question. Is it intentionally to select the newFieldIndices.get(0)?

  public static RelCollation apply(RelCollation relCollation, PinotDistMapping mapping) {
    if (relCollation.getKeys().isEmpty()) {
      return relCollation;
    }
    List<RelFieldCollation> newFieldCollations = new ArrayList<>();
    for (RelFieldCollation fieldCollation : relCollation.getFieldCollations()) {
      List<Integer> newFieldIndices = mapping.getTargets(fieldCollation.getFieldIndex());
      if (CollectionUtils.isEmpty(newFieldIndices)) {
        break;
      }
      newFieldCollations.add(fieldCollation.withFieldIndex(newFieldIndices.get(0)));
    }
    return RelCollations.of(newFieldCollations);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that was by design because I wanted to keep it simple at the time. For context, this can occur in scenarios such as follows (which are rare or unlikely):

Project(col1=$0, col2=$0)
   Sort(collation=[order by $0 desc])
      TableScan(col1)

In this case, the project could be said to be ordered by both col1 and col2, but for now I am only preserving one of the indices to keep things simple. But I think it should be okay to add all field indexes too. I can take that up in a follow-up.

Copy link
Contributor

@wirybeaver wirybeaver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@codecov-commenter
Copy link

codecov-commenter commented Jun 26, 2025

Codecov Report

❌ Patch coverage is 88.88889% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.22%. Comparing base (1a476de) to head (b77a466).
⚠️ Report is 1255 commits behind head on master.

Files with missing lines Patch % Lines
...cal/v2/opt/rules/LiteModeWorkerAssignmentRule.java 87.50% 2 Missing and 1 partial ⚠️
...ache/pinot/calcite/rel/traits/TraitAssignment.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #16204      +/-   ##
============================================
+ Coverage     62.90%   63.22%   +0.32%     
+ Complexity     1386     1366      -20     
============================================
  Files          2867     2953      +86     
  Lines        163354   170388    +7034     
  Branches      24952    26068    +1116     
============================================
+ Hits         102755   107728    +4973     
- Misses        52847    54502    +1655     
- Partials       7752     8158     +406     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.20% <88.88%> (+0.33%) ⬆️
java-21 63.18% <88.88%> (+0.36%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.22% <88.88%> (+0.32%) ⬆️
unittests 63.22% <88.88%> (+0.32%) ⬆️
unittests1 64.74% <88.88%> (+8.92%) ⬆️
unittests2 33.39% <0.00%> (-0.18%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ankitsultana ankitsultana merged commit 0e5d80e into apache:master Jun 26, 2025
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

mse-lite-mode mse-physical-optimizer multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants