Skip to content

Commit 4ebed46

Browse files
authored
[Multi-stage] Refactor query dispatcher to remove pipeline breaker (#11422)
1 parent 8ca6435 commit 4ebed46

File tree

8 files changed

+231
-324
lines changed

8 files changed

+231
-324
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Set;
27-
import java.util.concurrent.ExecutorService;
2827
import java.util.concurrent.TimeUnit;
2928
import javax.annotation.Nullable;
3029
import javax.ws.rs.WebApplicationException;
@@ -60,8 +59,6 @@
6059
import org.apache.pinot.query.mailbox.MailboxService;
6160
import org.apache.pinot.query.planner.DispatchableSubPlan;
6261
import org.apache.pinot.query.routing.WorkerManager;
63-
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
64-
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
6562
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
6663
import org.apache.pinot.query.type.TypeFactory;
6764
import org.apache.pinot.query.type.TypeSystem;
@@ -76,15 +73,10 @@
7673

7774
public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
7875
private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageBrokerRequestHandler.class);
79-
private final String _reducerHostname;
80-
private final int _reducerPort;
81-
82-
private final MailboxService _mailboxService;
83-
private final OpChainSchedulerService _reducerScheduler;
8476

8577
private final QueryEnvironment _queryEnvironment;
78+
private final MailboxService _mailboxService;
8679
private final QueryDispatcher _queryDispatcher;
87-
private final ExecutorService _opChainExecutor;
8880

8981
public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig,
9082
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
@@ -101,19 +93,14 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
10193
brokerId = StringUtils.split(brokerId, "_").length > 1 ? StringUtils.split(brokerId, "_")[0] : brokerId;
10294
reducerHostname = brokerId;
10395
}
104-
_reducerHostname = reducerHostname;
10596
// This config has to be set to a valid port number.
106-
_reducerPort = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
97+
int reducerPort =
98+
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
10799
_queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
108100
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
109-
new WorkerManager(_reducerHostname, _reducerPort, routingManager), _tableCache);
110-
_queryDispatcher = new QueryDispatcher();
111-
112-
String opChainExecConfigPrefix = "pinot.query.runner.opchain";
113-
String opChainExecNamePrefix = "query_broker_reducer_" + _reducerPort + "port";
114-
_opChainExecutor = ExecutorServiceUtils.create(config, opChainExecConfigPrefix, opChainExecNamePrefix);
115-
_reducerScheduler = new OpChainSchedulerService(_opChainExecutor);
116-
_mailboxService = new MailboxService(_reducerHostname, _reducerPort, config);
101+
new WorkerManager(reducerHostname, reducerPort, routingManager), _tableCache);
102+
_mailboxService = new MailboxService(reducerHostname, reducerPort, config);
103+
_queryDispatcher = new QueryDispatcher(_mailboxService);
117104

118105
// TODO: move this to a startUp() function.
119106
_mailboxService.start();
@@ -151,12 +138,12 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
151138
break;
152139
}
153140
} catch (Exception e) {
154-
LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query,
155-
ExceptionUtils.consolidateExceptionMessages(e));
141+
String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(e);
142+
LOGGER.info("Caught exception compiling request {}: {}, {}", requestId, query, consolidatedMessage);
156143
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
157144
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
158-
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
159-
ExceptionUtils.consolidateExceptionMessages(e)));
145+
return new BrokerResponseNative(
146+
QueryException.getException(QueryException.SQL_PARSING_ERROR, consolidatedMessage));
160147
}
161148

162149
DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
@@ -190,12 +177,14 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
190177

191178
long executionStartTimeNs = System.nanoTime();
192179
try {
193-
queryResults =
194-
_queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, _mailboxService, _reducerScheduler,
195-
queryTimeoutMs, sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
180+
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs,
181+
sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
196182
} catch (Throwable t) {
197-
LOGGER.error("query execution failed", t);
198-
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, t));
183+
String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t);
184+
LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage);
185+
requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
186+
return new BrokerResponseNative(
187+
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidatedMessage));
199188
}
200189
long executionEndTimeNs = System.nanoTime();
201190
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, executionEndTimeNs - executionStartTimeNs);
@@ -301,8 +290,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
301290
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
302291
@Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
303292
@Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
304-
RequestContext requestContext)
305-
throws Exception {
293+
RequestContext requestContext) {
306294
throw new UnsupportedOperationException();
307295
}
308296

@@ -315,6 +303,5 @@ public void start() {
315303
public void shutDown() {
316304
_queryDispatcher.shutdown();
317305
_mailboxService.shutdown();
318-
ExecutorServiceUtils.close(_opChainExecutor);
319306
}
320307
}

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
*/
1919
package org.apache.pinot.query.planner.logical;
2020

21-
import com.google.common.collect.ImmutableList;
2221
import java.util.ArrayList;
22+
import java.util.Collections;
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
@@ -88,26 +88,22 @@ public SubPlan makePlan(QueryPlan queryPlan) {
8888
new PlanFragment(1, subPlanRoot, new PlanFragmentMetadata(), new ArrayList<>()));
8989
subPlanRoot = subPlanRoot.visit(PlanFragmenter.INSTANCE, planFragmentContext);
9090

91-
// Sub plan root needs to send results back to the Broker ROOT, a.k.a. the client response node. the last stage
92-
// only has one
93-
// receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
91+
// Sub plan root needs to send final results back to the Broker
92+
// TODO: Should be SINGLETON (currently SINGLETON has to be local, so use BROADCAST_DISTRIBUTED instead)
9493
PlanNode subPlanRootSenderNode =
95-
new MailboxSendNode(subPlanRoot.getPlanFragmentId(), subPlanRoot.getDataSchema(),
96-
0, RelDistribution.Type.RANDOM_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
94+
new MailboxSendNode(subPlanRoot.getPlanFragmentId(), subPlanRoot.getDataSchema(), 0,
95+
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
9796
false);
9897
subPlanRootSenderNode.addInput(subPlanRoot);
99-
100-
PlanNode subPlanRootReceiverNode =
101-
new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
102-
RelDistribution.Type.RANDOM_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
103-
false, false, subPlanRootSenderNode);
104-
subPlanRoot = subPlanRootReceiverNode;
98+
subPlanRoot = new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
99+
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null, false,
100+
false, subPlanRootSenderNode);
105101
PlanFragment planFragment1 = planFragmentContext._planFragmentIdToRootNodeMap.get(1);
106102
planFragmentContext._planFragmentIdToRootNodeMap.put(1,
107103
new PlanFragment(1, subPlanRootSenderNode, planFragment1.getFragmentMetadata(), planFragment1.getChildren()));
108-
PlanFragment rootPlanFragment
109-
= new PlanFragment(subPlanRoot.getPlanFragmentId(), subPlanRoot, new PlanFragmentMetadata(),
110-
ImmutableList.of(planFragmentContext._planFragmentIdToRootNodeMap.get(1)));
104+
PlanFragment rootPlanFragment =
105+
new PlanFragment(subPlanRoot.getPlanFragmentId(), subPlanRoot, new PlanFragmentMetadata(),
106+
Collections.singletonList(planFragmentContext._planFragmentIdToRootNodeMap.get(1)));
111107
planFragmentContext._planFragmentIdToRootNodeMap.put(0, rootPlanFragment);
112108
for (Map.Entry<Integer, List<Integer>> planFragmentToChildrenEntry
113109
: planFragmentContext._planFragmentIdToChildrenMap.entrySet()) {

pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -477,18 +477,18 @@ private Object[][] provideQueriesWithExplainedPhysicalPlan() {
477477
//@formatter:off
478478
return new Object[][] {
479479
new Object[]{"EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
480-
"[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
481-
+ "├── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
480+
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
481+
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
482482
+ "│ └── [1]@localhost:1 PROJECT\n"
483483
+ "│ └── [1]@localhost:1 TABLE SCAN (a) null\n"
484-
+ "└── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
484+
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
485485
+ " └── [1]@localhost:2 PROJECT\n"
486486
+ " └── [1]@localhost:2 TABLE SCAN (a) null\n"},
487487
new Object[]{"EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR "
488488
+ "SELECT col1, COUNT(*) FROM a GROUP BY col1",
489-
"[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
490-
+ "├── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
491-
+ "└── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
489+
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
490+
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
491+
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
492492
+ " └── [1]@localhost:2 AGGREGATE_FINAL\n"
493493
+ " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
494494
+ " ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
@@ -498,9 +498,9 @@ private Object[][] provideQueriesWithExplainedPhysicalPlan() {
498498
+ " └── [2]@localhost:2 AGGREGATE_LEAF\n"
499499
+ " └── [2]@localhost:2 TABLE SCAN (a) null\n"},
500500
new Object[]{"EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
501-
"[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
502-
+ "├── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
503-
+ "└── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
501+
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
502+
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
503+
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
504504
+ " └── [1]@localhost:2 PROJECT\n"
505505
+ " └── [1]@localhost:2 JOIN\n"
506506
+ " ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pinot.query.runtime.plan;
2020

21-
import com.google.common.annotations.VisibleForTesting;
2221
import org.apache.pinot.query.mailbox.MailboxService;
2322
import org.apache.pinot.query.routing.VirtualServerAddress;
2423
import org.apache.pinot.query.runtime.operator.OpChainId;
@@ -42,7 +41,6 @@ public class OpChainExecutionContext {
4241
private final OpChainStats _stats;
4342
private final boolean _traceEnabled;
4443

45-
@VisibleForTesting
4644
public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
4745
VirtualServerAddress server, long deadlineMs, StageMetadata stageMetadata,
4846
PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) {
@@ -55,8 +53,7 @@ public OpChainExecutionContext(MailboxService mailboxService, long requestId, in
5553
_id = new OpChainId(requestId, server.workerId(), stageId);
5654
_stats = new OpChainStats(_id.toString());
5755
if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) {
58-
_stats.getOperatorStatsMap().putAll(
59-
pipelineBreakerResult.getOpChainStats().getOperatorStatsMap());
56+
_stats.getOperatorStatsMap().putAll(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap());
6057
}
6158
_traceEnabled = traceEnabled;
6259
}

0 commit comments

Comments
 (0)