Skip to content

Commit d50d6d6

Browse files
committed
[Multi-stage] Remove PhysicalPlanContext and clean up executor logic
1 parent 2b40362 commit d50d6d6

23 files changed

+247
-391
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
166166
return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage));
167167
}
168168

169-
boolean traceEnabled = Boolean.parseBoolean(
170-
sqlNodeAndOptions.getOptions().getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
169+
Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
170+
boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
171171

172172
ResultTable queryResults;
173173
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
@@ -177,8 +177,8 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
177177

178178
long executionStartTimeNs = System.nanoTime();
179179
try {
180-
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs,
181-
sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
180+
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions,
181+
stageIdStatsMap);
182182
} catch (Throwable t) {
183183
String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t);
184184
LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage);

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java

Lines changed: 45 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,12 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import java.util.ArrayList;
23-
import java.util.Collections;
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.concurrent.ExecutorService;
2726
import java.util.concurrent.TimeoutException;
2827
import javax.annotation.Nullable;
2928
import org.apache.helix.HelixManager;
30-
import org.apache.helix.store.zk.ZkHelixPropertyStore;
31-
import org.apache.helix.zookeeper.datamodel.ZNRecord;
3229
import org.apache.pinot.common.exception.QueryException;
3330
import org.apache.pinot.common.metrics.ServerMetrics;
3431
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -39,7 +36,6 @@
3936
import org.apache.pinot.query.mailbox.MailboxIdUtils;
4037
import org.apache.pinot.query.mailbox.MailboxService;
4138
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
42-
import org.apache.pinot.query.planner.plannode.PlanNode;
4339
import org.apache.pinot.query.routing.MailboxMetadata;
4440
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
4541
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
@@ -50,14 +46,12 @@
5046
import org.apache.pinot.query.runtime.operator.OpChain;
5147
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
5248
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
53-
import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
5449
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
5550
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
5651
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
5752
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
5853
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
5954
import org.apache.pinot.spi.env.PinotConfiguration;
60-
import org.apache.pinot.spi.metrics.PinotMetricUtils;
6155
import org.apache.pinot.spi.utils.CommonConstants;
6256
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
6357
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
@@ -72,17 +66,13 @@ public class QueryRunner {
7266
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class);
7367
private static final String PINOT_V1_SERVER_QUERY_CONFIG_PREFIX = "pinot.server.query.executor";
7468

75-
// This is a temporary before merging the 2 type of executor.
76-
private ServerQueryExecutorV1Impl _serverExecutor;
7769
private HelixManager _helixManager;
78-
private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
79-
private MailboxService _mailboxService;
80-
private String _hostname;
81-
private int _port;
70+
private ServerMetrics _serverMetrics;
8271

8372
private ExecutorService _opChainExecutor;
84-
8573
private OpChainSchedulerService _scheduler;
74+
private MailboxService _mailboxService;
75+
private ServerQueryExecutorV1Impl _leafQueryExecutor;
8676

8777
// Group-by settings
8878
@Nullable
@@ -102,12 +92,14 @@ public class QueryRunner {
10292
*/
10393
public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, HelixManager helixManager,
10494
ServerMetrics serverMetrics) {
95+
_helixManager = helixManager;
96+
_serverMetrics = serverMetrics;
97+
10598
String instanceName = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
106-
_hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring(
99+
String hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring(
107100
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
108-
_port = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
101+
int port = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
109102
CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
110-
_helixManager = helixManager;
111103

112104
// TODO: Consider using separate config for intermediate stage and leaf stage
113105
String numGroupsLimitStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
@@ -121,29 +113,28 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
121113
String joinOverflowModeStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
122114
_joinOverflowMode = joinOverflowModeStr != null ? JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
123115

116+
//TODO: make this configurable
117+
_opChainExecutor =
118+
ExecutorServiceUtils.create(config, "pinot.query.runner.opchain", "op_chain_worker_on_" + port + "_port");
119+
_scheduler = new OpChainSchedulerService(getOpChainExecutorService());
120+
_mailboxService = new MailboxService(hostname, port, config);
124121
try {
125-
//TODO: make this configurable
126-
_opChainExecutor = ExecutorServiceUtils.create(config, "pinot.query.runner.opchain",
127-
"op_chain_worker_on_" + _port + "_port");
128-
_scheduler = new OpChainSchedulerService(getOpChainExecutorService());
129-
_mailboxService = new MailboxService(_hostname, _port, config);
130-
_serverExecutor = new ServerQueryExecutorV1Impl();
131-
_serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
122+
_leafQueryExecutor = new ServerQueryExecutorV1Impl();
123+
_leafQueryExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
132124
} catch (Exception e) {
133125
throw new RuntimeException(e);
134126
}
127+
128+
LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", hostname, port);
135129
}
136130

137-
public void start()
138-
throws TimeoutException {
139-
_helixPropertyStore = _helixManager.getHelixPropertyStore();
131+
public void start() {
140132
_mailboxService.start();
141-
_serverExecutor.start();
133+
_leafQueryExecutor.start();
142134
}
143135

144-
public void shutDown()
145-
throws TimeoutException {
146-
_serverExecutor.shutDown();
136+
public void shutDown() {
137+
_leafQueryExecutor.shutDown();
147138
_mailboxService.shutdown();
148139
ExecutorServiceUtils.close(_opChainExecutor);
149140
}
@@ -156,17 +147,15 @@ public void shutDown()
156147
*/
157148
public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadata) {
158149
long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
159-
long timeoutMs = Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS));
160-
boolean isTraceEnabled =
161-
Boolean.parseBoolean(requestMetadata.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
150+
long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
162151
long deadlineMs = System.currentTimeMillis() + timeoutMs;
163152

164153
setStageCustomProperties(distributedStagePlan.getStageMetadata().getCustomProperties(), requestMetadata);
165154

166155
// run pre-stage execution for all pipeline breakers
167156
PipelineBreakerResult pipelineBreakerResult =
168-
PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, deadlineMs,
169-
requestId, isTraceEnabled);
157+
PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
158+
requestMetadata, requestId, deadlineMs);
170159

171160
// Send error block to all the receivers if pipeline breaker fails
172161
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) {
@@ -193,13 +182,15 @@ public void processQuery(DistributedStagePlan distributedStagePlan, Map<String,
193182
}
194183

195184
// run OpChain
185+
OpChainExecutionContext executionContext =
186+
new OpChainExecutionContext(_mailboxService, requestId, distributedStagePlan.getStageId(),
187+
distributedStagePlan.getServer(), deadlineMs, requestMetadata, distributedStagePlan.getStageMetadata(),
188+
pipelineBreakerResult);
196189
OpChain opChain;
197190
if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
198-
opChain = compileLeafStage(requestId, distributedStagePlan, requestMetadata, pipelineBreakerResult, deadlineMs,
199-
isTraceEnabled);
191+
opChain = compileLeafStage(executionContext, distributedStagePlan);
200192
} else {
201-
opChain = compileIntermediateStage(requestId, distributedStagePlan, requestMetadata, pipelineBreakerResult,
202-
deadlineMs, isTraceEnabled);
193+
opChain = PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext);
203194
}
204195
_scheduler.register(opChain);
205196
}
@@ -248,51 +239,36 @@ public ExecutorService getOpChainExecutorService() {
248239
return _opChainExecutor;
249240
}
250241

251-
private OpChain compileIntermediateStage(long requestId, DistributedStagePlan distributedStagePlan,
252-
Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs,
253-
boolean isTraceEnabled) {
254-
PlanNode stageRoot = distributedStagePlan.getStageRoot();
255-
OpChainExecutionContext opChainContext = new OpChainExecutionContext(_mailboxService, requestId,
256-
stageRoot.getPlanFragmentId(), distributedStagePlan.getServer(), deadlineMs,
257-
distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled);
258-
return PhysicalPlanVisitor.walkPlanNode(stageRoot,
259-
new PhysicalPlanContext(opChainContext, pipelineBreakerResult));
260-
}
261-
262-
private OpChain compileLeafStage(long requestId, DistributedStagePlan distributedStagePlan,
263-
Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs,
264-
boolean isTraceEnabled) {
265-
OpChainExecutionContext opChainContext = new OpChainExecutionContext(_mailboxService, requestId,
266-
distributedStagePlan.getStageId(), distributedStagePlan.getServer(), deadlineMs,
267-
distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled);
268-
PhysicalPlanContext planContext = new PhysicalPlanContext(opChainContext, pipelineBreakerResult);
269-
List<ServerPlanRequestContext> serverPlanRequestContexts = ServerPlanRequestUtils.constructServerQueryRequests(
270-
planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore);
242+
private OpChain compileLeafStage(OpChainExecutionContext executionContext,
243+
DistributedStagePlan distributedStagePlan) {
244+
List<ServerPlanRequestContext> serverPlanRequestContexts =
245+
ServerPlanRequestUtils.constructServerQueryRequests(executionContext, distributedStagePlan,
246+
_helixManager.getHelixPropertyStore());
271247
List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
248+
long queryArrivalTimeMs = System.currentTimeMillis();
272249
for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
273-
serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
274-
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()));
250+
serverQueryRequests.add(
251+
new ServerQueryRequest(requestContext.getInstanceRequest(), _serverMetrics, queryArrivalTimeMs));
275252
}
276253
MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
277-
OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(planContext);
278254
MultiStageOperator leafStageOperator =
279-
new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest,
280-
serverQueryRequests, sendNode.getDataSchema());
255+
new LeafStageTransferableBlockOperator(executionContext, this::processServerQueryRequest, serverQueryRequests,
256+
sendNode.getDataSchema());
281257
MailboxSendOperator mailboxSendOperator =
282-
new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getDistributionType(),
258+
new MailboxSendOperator(executionContext, leafStageOperator, sendNode.getDistributionType(),
283259
sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
284260
sendNode.isSortOnSender(), sendNode.getReceiverStageId());
285-
return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
261+
return new OpChain(executionContext, mailboxSendOperator);
286262
}
287263

288264
private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest request) {
289265
InstanceResponseBlock result;
290266
try {
291-
result = _serverExecutor.execute(request, getOpChainExecutorService());
267+
result = _leafQueryExecutor.execute(request, getOpChainExecutorService());
292268
} catch (Exception e) {
293269
InstanceResponseBlock errorResponse = new InstanceResponseBlock();
294-
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
295-
e.getMessage() + QueryException.getTruncatedStackTrace(e));
270+
errorResponse.getExceptions()
271+
.put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage() + QueryException.getTruncatedStackTrace(e));
296272
result = errorResponse;
297273
}
298274
return result;

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pinot.query.runtime.operator;
2020

21-
import java.util.List;
2221
import java.util.function.Consumer;
2322
import org.apache.pinot.core.common.Operator;
2423
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -34,31 +33,21 @@
3433
public class OpChain implements AutoCloseable {
3534
private static final Logger LOGGER = LoggerFactory.getLogger(OpChain.class);
3635

37-
private final MultiStageOperator _root;
38-
private final List<String> _receivingMailboxIds;
3936
private final OpChainId _id;
4037
private final OpChainStats _stats;
41-
private final Consumer<OpChainId> _opChainFinishCallback;
38+
private final MultiStageOperator _root;
39+
private final Consumer<OpChainId> _finishCallback;
4240

43-
public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<String> receivingMailboxIds) {
44-
this(context, root, receivingMailboxIds, (id) -> { });
41+
public OpChain(OpChainExecutionContext context, MultiStageOperator root) {
42+
this(context, root, (id) -> {
43+
});
4544
}
4645

47-
public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<String> receivingMailboxIds,
48-
Consumer<OpChainId> opChainFinishCallback) {
49-
_root = root;
50-
_receivingMailboxIds = receivingMailboxIds;
46+
public OpChain(OpChainExecutionContext context, MultiStageOperator root, Consumer<OpChainId> finishCallback) {
5147
_id = context.getId();
5248
_stats = context.getStats();
53-
_opChainFinishCallback = opChainFinishCallback;
54-
}
55-
56-
public Operator<TransferableBlock> getRoot() {
57-
return _root;
58-
}
59-
60-
public List<String> getReceivingMailboxIds() {
61-
return _receivingMailboxIds;
49+
_root = root;
50+
_finishCallback = finishCallback;
6251
}
6352

6453
public OpChainId getId() {
@@ -70,6 +59,10 @@ public OpChainStats getStats() {
7059
return _stats;
7160
}
7261

62+
public Operator<TransferableBlock> getRoot() {
63+
return _root;
64+
}
65+
7366
@Override
7467
public String toString() {
7568
return "OpChain{" + _id + "}";
@@ -86,7 +79,7 @@ public void close() {
8679
try {
8780
_root.close();
8881
} finally {
89-
_opChainFinishCallback.accept(getId());
82+
_finishCallback.accept(getId());
9083
LOGGER.trace("OpChain callback called");
9184
}
9285
}
@@ -102,7 +95,7 @@ public void cancel(Throwable e) {
10295
try {
10396
_root.cancel(e);
10497
} finally {
105-
_opChainFinishCallback.accept(getId());
98+
_finishCallback.accept(getId());
10699
LOGGER.trace("OpChain callback called");
107100
}
108101
}

0 commit comments

Comments
 (0)