-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Multi stage explain #13733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multi stage explain #13733
Conversation
8722ab7 to
a3b3874
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13733 +/- ##
============================================
+ Coverage 61.75% 64.20% +2.44%
- Complexity 207 1534 +1327
============================================
Files 2436 2594 +158
Lines 133233 142748 +9515
Branches 20636 21864 +1228
============================================
+ Hits 82274 91646 +9372
+ Misses 44911 44349 -562
- Partials 6048 6753 +705
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
5725a84 to
67c8e5c
Compare
Add a flag we can use to decide if we want to use the new plan or the old one by default
…xplainAskingServersUtils
…stHandler cannot obtain the physical plan
…apache.pinot.query.planner.explain package
…x send It was an error introduced in 2ec071c
d834ba9 to
eca86d3
Compare
yashmayya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gortiz something I just realized is that we can lose the table related information in the new explain plan. For instance, this query on the basic quickstart:
EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR WITH tmp AS (
select playerID,
teamID,
SUM(homeRuns) as totalHomeRuns
from baseballStats
WHERE yearID > 2000
GROUP BY playerID,
teamID
ORDER BY totalHomeRuns DESC
)
SELECT *
FROM tmp
JOIN dimBaseballTeams ON tmp.teamID = dimBaseballTeams.teamID;
returns:
Execution Plan
LogicalJoin(condition=[=($1, $3)], joinType=[inner])
PinotLogicalExchange(distribution=[hash[1]])
PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])
PinotLogicalExchange(distribution=[hash[0, 1]])
LeafStageCombineOperator
StreamingInstanceResponse
CombineGroupBy
GroupBy(groupKeys=[[playerID, teamID]], aggregations=[[sum(homeRuns)]])
Project(columns=[[homeRuns, teamID, playerID]])
DocIdSet(maxDocs=[10000])
FilterFullScan(predicate=[yearID > '2000'], operator=[RANGE])
PinotLogicalExchange(distribution=[hash[0]])
LeafStageCombineOperator
StreamingInstanceResponse
StreamingCombineSelect
SelectStreaming(segment=[dimBaseballTeams_OFFLINE_0], table=[dimBaseballTeams], totalDocs=[51])
Project(columns=[[teamName, teamID]])
DocIdSet(maxDocs=[10000])
FilterMatchEntireSegment(numDocs=[51])
whereas earlier it would've returned:
Execution Plan
LogicalJoin(condition=[=($1, $3)], joinType=[inner])
PinotLogicalExchange(distribution=[hash[1]])
PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])
PinotLogicalExchange(distribution=[hash[0, 1]])
PinotLogicalAggregate(group=[{16, 25}], agg#0=[$SUM0($11)])
LogicalFilter(condition=[>($27, 2000)])
LogicalTableScan(table=[[default, baseballStats]])
PinotLogicalExchange(distribution=[hash[0]])
LogicalProject(teamID=[$3], teamName=[$4])
LogicalTableScan(table=[[default, dimBaseballTeams]])
For simpler queries, this probably won't be a big issue but for complex queries with lots of joins, CTEs etc. I think it's pretty important to include this information.
Looks like you updated StreamingSelectionOnlyOperator to include the table name in its explain attributes; I guess we should do something similar for all other such operators that can be used in the leaf stage (for instance, StreamingInstanceResponseOperator)?
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
Show resolved
Hide resolved
| public static final int V1 = 1; | ||
| } | ||
|
|
||
| public static final String ASK_SERVERS_FOR_EXPLAIN_PLAN = "pinot.query.explain.ask.servers"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed on usage of physical, what about something like pinot.multistage.explain.query.include.segment.level.plan? It's fairly verbose but hopefully should be able to convey intent clearly to users.
| protected List<ExplainInfo> getChildrenExplainInfo() { | ||
| return getChildOperators().stream() | ||
| .filter(Objects::nonNull) | ||
| .map(Operator::getExplainInfo) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| protected String getExplainName() { | ||
| return toExplainString(); | ||
| } | ||
|
|
||
| protected Map<String, Plan.ExplainNode.AttributeValue> getExplainAttributes() { | ||
| return Collections.emptyMap(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, agree on the current state of the Operator interface. Thanks for adding Javadocs to all the explain related methods - that should help out quite a bit. I think we can discuss potential refactoring of that interface separately, this looks good for now.
| explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES : explain.getDetailLevel(); | ||
| Set<String> tableNames = RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel); | ||
| return new QueryPlannerResult(null, PlannerUtils.explainPlan(relRoot.rel, format, level), tableNames); | ||
| if (!explain.withImplementation() || !askServers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, we can also plan to deprecate the EXPLAIN IMPLEMENTATION PLAN FOR syntax and change it to one of the WITH <extension> in the future.
|
|
||
| public void explain(Worker.QueryRequest request, QueryServerInstance virtualServer, Deadline deadline, | ||
| Consumer<AsyncResponse<List<Worker.ExplainResponse>>> callback) { | ||
| _dispatchStub.withDeadline(deadline).explain(request, new AllValuesDispatchObserver<>(virtualServer, callback)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I don't get is why the proto file is defined as:
rpc Submit(ServerRequest) returns (stream ServerResponse);
instead of
rpc Submit(ServerRequest) returns (ServerResponse);
That's the proto definition for GrpcQueryServer right (which is for v1 streaming queries FWICT)? The proto definition for the multi-stage engine's QueryServer Submit RPC is -
| rpc Submit(QueryRequest) returns (QueryResponse); |
So I guess this makes sense now since the new Explain RPC returns a stream ExplainResponse and the implementation does call onNext multiple times.
| return visitor.build(); | ||
| } | ||
|
|
||
| private static class ConverterVisitor implements PlanNodeVisitor<Void, Void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks for elaborating! 😄
| if (PipelineBreakerExecutor.hasPipelineBreakers(stagePlan)) { | ||
| // TODO: Support pipeline breakers before merging this feature. | ||
| LOGGER.error("Pipeline breaker is not supported in explain query"); | ||
| return stagePlan; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I hadn't thought of that either, thanks for the explanation! I guess we can just update that TODO comment for now, it makes sense to defer this considering the current explain also doesn't properly support pipeline breaker.
# Conflicts: # pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java # pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
Nice catch. Yes, I've found the same problem and tried to solve it that way, but it doesn't seem to be a scalable solution. Very easily we can end up having an operator that is not registering the table. Instead what I've done is to add the table in the |
…as pinot.query.multistage.explain.include.segment.plan
…egment.plan by default
…ableName to table
8bea4a1 to
4fc70cd
Compare
yashmayya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return cmp2; | ||
| } | ||
| int cmp3; | ||
| switch (value1.getValueCase()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not need to handle the STRINGLIST case here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably because either I missed it or because JSON (the previous type) is difficult to sort. Adding it now that it is just a list of strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in 2722631
...rc/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
Outdated
Show resolved
Hide resolved
Yes, I've fixed that, but now the same tests fail due to some assertion in the test I don't understand. That is why we need to merge this ASAP. The code in |
|
The issue should be fixed. Anyway, my error merging this code shown that #13999 is not testing the empty case and also to me it looks like the code that was added in |
| * | ||
| * Use false in order to mimic behavior of Pinot 1.2.0 and previous. | ||
| */ | ||
| public static final String EXPLAIN_ASKING_SERVERS = "explainAskingServers"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should update the query option for consistency since we updated the broker config from pinot.query.explain.ask.servers to pinot.query.multistage.explain.include.segment.plan. Probably to explainIncludeSegmentPlan?
|
|
||
| private final WorkerManager _workerManager; | ||
| private final QueryDispatcher _queryDispatcher; | ||
| private final boolean _explainAskingServerDefault; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's update this member variable name too.
This PR introduces a new way to explain multi-stage queries in Pinot.
The main goal is to provide a more detailed explanation of the query execution plan, including information about
the physical operators that are being used.
Warning
Edit on Jan 2025: The next paragraph is incorrect. The new explain plan is disabled in 1.3.0 and we plan to enable it by default in future versions. Please refer to Pinot explain documentation to see the different versions and how they could be enabled.
By default,
explain plan forwill return the new plan. If you want to use the old plan you can useexplain plan without implementation for. This may be problematic, so we can discuss to introduce a new flag for this.The main reason to break the default behavior is that the new plan is more verbose actually what a user should expect
when asking for implementation, at least following Calcite terminology.
Alternatively we can change the syntax in the same way we already did with
explain physical plan for.At architectural level, the new explain mode is closer to the one used in single stage.
The broker parses and optimizes the query generating a logical plan, generating RelNodes.
These nodes are transformed into PlanNodes as usual and sent to the servers.
But instead of asking to execute the plan, the broker asks to explain it using a new protobuf endpoint.
This new endpoint returns a list of PlanNodes.
When the server receives the explain request, it analyzes the plan looking for leaf operators and creates single-stage
operators as usual.
There are two key differences with respect of the execution mode:
Operator.getOperatorInfo,which returns the same information returned by
Operator.explainPlanbut in POJOs.The server then convert these POJOs into PlanNodes and substitute the tracked PlanNodes with the new ones.
Finally the new plan is sent back to the broker.
In order to be able to introduce physical (aka index used, etc) information in the PlanNode, a new ExplainedPlanNode is
created.
These nodes are not meant to be translated into actual operators, but to be used to explain the query execution plan.
When the broker receives the PlanNodes, it converts them back into a RelNode using a new class
PinotExplainedRelNode.Then the broker substitutes the original logical RelNodes with the new ones returned by the servers.
Finally, it explains the RelNode as expected in Calcite.
The result can be see in the following pictures:
Without implementation (similar to current explain)

With implementation:

The PR is still a work in progress, but it is already partially functional.