Skip to content

Conversation

@gortiz
Copy link
Contributor

@gortiz gortiz commented Aug 1, 2024

This PR introduces a new way to explain multi-stage queries in Pinot.
The main goal is to provide a more detailed explanation of the query execution plan, including information about
the physical operators that are being used.

Warning

Edit on Jan 2025: The next paragraph is incorrect. The new explain plan is disabled in 1.3.0 and we plan to enable it by default in future versions. Please refer to Pinot explain documentation to see the different versions and how they could be enabled.

By default, explain plan for will return the new plan. If you want to use the old plan you can use
explain plan without implementation for. This may be problematic, so we can discuss to introduce a new flag for this.
The main reason to break the default behavior is that the new plan is more verbose actually what a user should expect
when asking for implementation, at least following Calcite terminology.
Alternatively we can change the syntax in the same way we already did with explain physical plan for.

At architectural level, the new explain mode is closer to the one used in single stage.
The broker parses and optimizes the query generating a logical plan, generating RelNodes.
These nodes are transformed into PlanNodes as usual and sent to the servers.
But instead of asking to execute the plan, the broker asks to explain it using a new protobuf endpoint.
This new endpoint returns a list of PlanNodes.

When the server receives the explain request, it analyzes the plan looking for leaf operators and creates single-stage
operators as usual.
There are two key differences with respect of the execution mode:

  1. The server tracks which PlanNodes have been converted into single-stage operators.
  2. The server does not execute the operator. Instead it calls a new introduced method Operator.getOperatorInfo,
    which returns the same information returned by Operator.explainPlan but in POJOs.

The server then convert these POJOs into PlanNodes and substitute the tracked PlanNodes with the new ones.
Finally the new plan is sent back to the broker.

In order to be able to introduce physical (aka index used, etc) information in the PlanNode, a new ExplainedPlanNode is
created.
These nodes are not meant to be translated into actual operators, but to be used to explain the query execution plan.
When the broker receives the PlanNodes, it converts them back into a RelNode using a new class PinotExplainedRelNode.

Then the broker substitutes the original logical RelNodes with the new ones returned by the servers.
Finally, it explains the RelNode as expected in Calcite.

The result can be see in the following pictures:

Without implementation (similar to current explain)
image

With implementation:
image

The PR is still a work in progress, but it is already partially functional.

  • Combine different plans for segments (right now the first one is used)
    • Ideally we should group by plan and count how many segments use that plan
  • Change behavior when verbose explain mode is used
    • Right now verbose mode is ignored.
  • Add a flag so that we can use to decide if we want to use the new plan or the old one by default
  • PlanNodeToRelConverted
    • Support Window
    • Support SetOp
    • Decide what to do with send and receive mailboxes
  • Support pipeline breaker
    • Given current explain does not support pipeline breaker, we decided to do not support pipeline breaker in the first version of the new explain
  • PinotTable.getRowType may not use TypeFactory
  • Try to simplify LeafStageTransferableBlockOperator.explain
  • Try to simplify QueryServer.explain (similar to what was done in QueryDispatcher)
  • Try to simplify QueryRunner.explain (similar to what was done in QueryDispatcher)
  • Resolve corner cases in MultiStageBrokerRequestHandler
  • Review ServerQueryExecutorV1Impl
  • Decide if we need to keep MultiStageExplainAskingServersUtils
  • Verify tests
  • Split PinotExplainedRelNode and PinotExplainedRelNode.Info
    • Edit: PinotExplainedRelNode.Info was renamed as ExplainInfo and moved out from PinotExplainedRelNode

@gortiz gortiz force-pushed the multi-stage-explain branch from 8722ab7 to a3b3874 Compare August 5, 2024 10:01
@codecov-commenter
Copy link

codecov-commenter commented Aug 5, 2024

Codecov Report

Attention: Patch coverage is 17.52161% with 1431 lines in your changes missing coverage. Please review.

Project coverage is 64.20%. Comparing base (59551e4) to head (6641543).
Report is 1081 commits behind head on master.

Files with missing lines Patch % Lines
...he/pinot/query/planner/explain/PlanNodeMerger.java 0.00% 251 Missing ⚠️
.../query/planner/logical/PlanNodeToRelConverter.java 0.00% 239 Missing ⚠️
...he/pinot/query/planner/explain/PlanNodeSorter.java 0.00% 79 Missing ⚠️
...inot/query/planner/logical/RexExpressionUtils.java 1.36% 72 Missing ⚠️
...apache/pinot/query/service/server/QueryServer.java 43.01% 51 Missing and 2 partials ⚠️
...t/query/planner/explain/ExplainNodeSimplifier.java 0.00% 49 Missing ⚠️
...ry/planner/explain/AskingServerStageExplainer.java 0.00% 47 Missing ⚠️
.../apache/pinot/core/plan/PinotExplainedRelNode.java 0.00% 43 Missing ⚠️
...e/operator/LeafStageTransferableBlockOperator.java 0.00% 38 Missing ⚠️
...va/org/apache/pinot/query/runtime/QueryRunner.java 2.63% 37 Missing ⚠️
... and 74 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13733      +/-   ##
============================================
+ Coverage     61.75%   64.20%   +2.44%     
- Complexity      207     1534    +1327     
============================================
  Files          2436     2594     +158     
  Lines        133233   142748    +9515     
  Branches      20636    21864    +1228     
============================================
+ Hits          82274    91646    +9372     
+ Misses        44911    44349     -562     
- Partials       6048     6753     +705     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 64.18% <17.52%> (+2.47%) ⬆️
java-21 64.07% <17.52%> (+2.45%) ⬆️
skip-bytebuffers-false 64.19% <17.52%> (+2.45%) ⬆️
skip-bytebuffers-true 64.05% <17.52%> (+36.32%) ⬆️
temurin 64.20% <17.52%> (+2.44%) ⬆️
unittests 64.19% <17.52%> (+2.44%) ⬆️
unittests1 55.62% <17.59%> (+8.73%) ⬆️
unittests2 34.55% <1.84%> (+6.81%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@yashmayya yashmayya added multi-stage Related to the multi-stage query engine feature release-notes Referenced by PRs that need attention when compiling the next release notes labels Aug 5, 2024
@gortiz gortiz force-pushed the multi-stage-explain branch from 5725a84 to 67c8e5c Compare August 7, 2024 14:37
gortiz added 20 commits August 13, 2024 12:12
Add a flag we can use to decide if we want to use the new plan or the old one by default
@gortiz gortiz force-pushed the multi-stage-explain branch from d834ba9 to eca86d3 Compare August 13, 2024 10:12
Copy link
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gortiz something I just realized is that we can lose the table related information in the new explain plan. For instance, this query on the basic quickstart:

EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR WITH tmp AS (
  select playerID,
    teamID,
    SUM(homeRuns) as totalHomeRuns
  from baseballStats
  WHERE yearID > 2000
  GROUP BY playerID,
    teamID
  ORDER BY totalHomeRuns DESC
)
SELECT *
FROM tmp
  JOIN dimBaseballTeams ON tmp.teamID = dimBaseballTeams.teamID;

returns:

Execution Plan
LogicalJoin(condition=[=($1, $3)], joinType=[inner])
  PinotLogicalExchange(distribution=[hash[1]])
    PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])
      PinotLogicalExchange(distribution=[hash[0, 1]])
        LeafStageCombineOperator
          StreamingInstanceResponse
            CombineGroupBy
              GroupBy(groupKeys=[[playerID, teamID]], aggregations=[[sum(homeRuns)]])
                Project(columns=[[homeRuns, teamID, playerID]])
                  DocIdSet(maxDocs=[10000])
                    FilterFullScan(predicate=[yearID > '2000'], operator=[RANGE])
  PinotLogicalExchange(distribution=[hash[0]])
    LeafStageCombineOperator
      StreamingInstanceResponse
        StreamingCombineSelect
          SelectStreaming(segment=[dimBaseballTeams_OFFLINE_0], table=[dimBaseballTeams], totalDocs=[51])
            Project(columns=[[teamName, teamID]])
              DocIdSet(maxDocs=[10000])
                FilterMatchEntireSegment(numDocs=[51])

whereas earlier it would've returned:

Execution Plan
LogicalJoin(condition=[=($1, $3)], joinType=[inner])
  PinotLogicalExchange(distribution=[hash[1]])
    PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])
      PinotLogicalExchange(distribution=[hash[0, 1]])
        PinotLogicalAggregate(group=[{16, 25}], agg#0=[$SUM0($11)])
          LogicalFilter(condition=[>($27, 2000)])
            LogicalTableScan(table=[[default, baseballStats]])
  PinotLogicalExchange(distribution=[hash[0]])
    LogicalProject(teamID=[$3], teamName=[$4])
      LogicalTableScan(table=[[default, dimBaseballTeams]])

For simpler queries, this probably won't be a big issue but for complex queries with lots of joins, CTEs etc. I think it's pretty important to include this information.

Looks like you updated StreamingSelectionOnlyOperator to include the table name in its explain attributes; I guess we should do something similar for all other such operators that can be used in the leaf stage (for instance, StreamingInstanceResponseOperator)?

public static final int V1 = 1;
}

public static final String ASK_SERVERS_FOR_EXPLAIN_PLAN = "pinot.query.explain.ask.servers";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed on usage of physical, what about something like pinot.multistage.explain.query.include.segment.level.plan? It's fairly verbose but hopefully should be able to convey intent clearly to users.

Comment on lines +181 to +194
protected List<ExplainInfo> getChildrenExplainInfo() {
return getChildOperators().stream()
.filter(Objects::nonNull)
.map(Operator::getExplainInfo)
.collect(Collectors.toList());
}

protected String getExplainName() {
return toExplainString();
}

protected Map<String, Plan.ExplainNode.AttributeValue> getExplainAttributes() {
return Collections.emptyMap();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, agree on the current state of the Operator interface. Thanks for adding Javadocs to all the explain related methods - that should help out quite a bit. I think we can discuss potential refactoring of that interface separately, this looks good for now.

explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES : explain.getDetailLevel();
Set<String> tableNames = RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel);
return new QueryPlannerResult(null, PlannerUtils.explainPlan(relRoot.rel, format, level), tableNames);
if (!explain.withImplementation() || !askServers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, we can also plan to deprecate the EXPLAIN IMPLEMENTATION PLAN FOR syntax and change it to one of the WITH <extension> in the future.


public void explain(Worker.QueryRequest request, QueryServerInstance virtualServer, Deadline deadline,
Consumer<AsyncResponse<List<Worker.ExplainResponse>>> callback) {
_dispatchStub.withDeadline(deadline).explain(request, new AllValuesDispatchObserver<>(virtualServer, callback));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I don't get is why the proto file is defined as:

rpc Submit(ServerRequest) returns (stream ServerResponse);
instead of

rpc Submit(ServerRequest) returns (ServerResponse);

That's the proto definition for GrpcQueryServer right (which is for v1 streaming queries FWICT)? The proto definition for the multi-stage engine's QueryServer Submit RPC is -

rpc Submit(QueryRequest) returns (QueryResponse);

So I guess this makes sense now since the new Explain RPC returns a stream ExplainResponse and the implementation does call onNext multiple times.

return visitor.build();
}

private static class ConverterVisitor implements PlanNodeVisitor<Void, Void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for elaborating! 😄

Comment on lines 280 to 284
if (PipelineBreakerExecutor.hasPipelineBreakers(stagePlan)) {
// TODO: Support pipeline breakers before merging this feature.
LOGGER.error("Pipeline breaker is not supported in explain query");
return stagePlan;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I hadn't thought of that either, thanks for the explanation! I guess we can just update that TODO comment for now, it makes sense to defer this considering the current explain also doesn't properly support pipeline breaker.

# Conflicts:
#	pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
#	pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@gortiz
Copy link
Contributor Author

gortiz commented Sep 18, 2024

something I just realized is that we can lose the table related information in the new explain plan. For instance, this query on the basic quickstart:
Looks like you updated StreamingSelectionOnlyOperator to include the table name in its explain attributes; I guess we should do something similar for all other such operators that can be used in the leaf stage (for instance, StreamingInstanceResponseOperator)?

Nice catch. Yes, I've found the same problem and tried to solve it that way, but it doesn't seem to be a scalable solution. Very easily we can end up having an operator that is not registering the table. Instead what I've done is to add the table in the LeafStageTransferableBlockOperator, which by definition knows the table

@gortiz gortiz force-pushed the multi-stage-explain branch from 8bea4a1 to 4fc70cd Compare September 23, 2024 10:44
Copy link
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gortiz, LGTM! There are some new tests added in #13999 failing with an NPE here though (and looks related to the explain changes).

return cmp2;
}
int cmp3;
switch (value1.getValueCase()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to handle the STRINGLIST case here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably because either I missed it or because JSON (the previous type) is difficult to sort. Adding it now that it is just a list of strings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in 2722631

@gortiz
Copy link
Contributor Author

gortiz commented Sep 23, 2024

Thanks @gortiz, LGTM! There are some new tests added in #13999 failing with an NPE here though (and looks related to the explain changes).

Yes, I've fixed that, but now the same tests fail due to some assertion in the test I don't understand. That is why we need to merge this ASAP. The code in ServerQueryExecutorV1Impl is very sensible and it has been modified more often that it used due to the timeseries code

@gortiz
Copy link
Contributor Author

gortiz commented Sep 23, 2024

The issue should be fixed. Anyway, my error merging this code shown that #13999 is not testing the empty case and also to me it looks like the code that was added in ServerQueryExecutorV1Impl in that PR should have been added into ResultsBlockUtils.buildEmptyQueryResults. @ankitsultana can you take a look at that?

@gortiz gortiz merged commit c484fef into apache:master Sep 24, 2024
@gortiz gortiz deleted the multi-stage-explain branch September 24, 2024 17:37
*
* Use false in order to mimic behavior of Pinot 1.2.0 and previous.
*/
public static final String EXPLAIN_ASKING_SERVERS = "explainAskingServers";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should update the query option for consistency since we updated the broker config from pinot.query.explain.ask.servers to pinot.query.multistage.explain.include.segment.plan. Probably to explainIncludeSegmentPlan?


private final WorkerManager _workerManager;
private final QueryDispatcher _queryDispatcher;
private final boolean _explainAskingServerDefault;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's update this member variable name too.

@yashmayya
Copy link
Contributor

#14193

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature multi-stage Related to the multi-stage query engine release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants