Skip to content

Commit b2eb097

Browse files
author
Rong Rong
committed
[cleanup] clean up some code path change unnecessarily made
1 parent 3718326 commit b2eb097

File tree

3 files changed

+32
-32
lines changed

3 files changed

+32
-32
lines changed

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.pinot.core.query.request.ServerQueryRequest;
2626
import org.apache.pinot.query.planner.plannode.PlanNode;
2727
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
28-
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
28+
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
2929

3030

3131
/**
@@ -36,27 +36,24 @@
3636
* {@link org.apache.pinot.query.runtime.operator.OpChain} part.
3737
*/
3838
public class ServerPlanRequestContext {
39-
private final OpChainExecutionContext _executionContext;
4039
private final DistributedStagePlan _stagePlan;
4140
private final QueryExecutor _leafQueryExecutor;
4241
private final ExecutorService _executorService;
42+
private final PipelineBreakerResult _pipelineBreakerResult;
4343

4444
private final PinotQuery _pinotQuery;
4545
private PlanNode _leafStageBoundaryNode;
4646
private List<ServerQueryRequest> _serverQueryRequests;
4747

48-
public ServerPlanRequestContext(OpChainExecutionContext executionContext, DistributedStagePlan stagePlan,
49-
QueryExecutor leafQueryExecutor, ExecutorService executorService) {
50-
_executionContext = executionContext;
48+
public ServerPlanRequestContext(DistributedStagePlan stagePlan, QueryExecutor leafQueryExecutor,
49+
ExecutorService executorService, PipelineBreakerResult pipelineBreakerResult) {
5150
_stagePlan = stagePlan;
5251
_leafQueryExecutor = leafQueryExecutor;
5352
_executorService = executorService;
53+
_pipelineBreakerResult = pipelineBreakerResult;
5454
_pinotQuery = new PinotQuery();
5555
}
5656

57-
public OpChainExecutionContext getExecutionContext() {
58-
return _executionContext;
59-
}
6057
public DistributedStagePlan getStagePlan() {
6158
return _stagePlan;
6259
}
@@ -69,6 +66,10 @@ public ExecutorService getExecutorService() {
6966
return _executorService;
7067
}
7168

69+
public PipelineBreakerResult getPipelineBreakerResult() {
70+
return _pipelineBreakerResult;
71+
}
72+
7273
public PinotQuery getPinotQuery() {
7374
return _pinotQuery;
7475
}

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,13 @@ public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
9292
DistributedStagePlan distributedStagePlan, HelixManager helixManager, ServerMetrics serverMetrics,
9393
QueryExecutor leafQueryExecutor, ExecutorService executorService) {
9494
long queryArrivalTimeMs = System.currentTimeMillis();
95-
ServerPlanRequestContext serverContext = new ServerPlanRequestContext(executionContext, distributedStagePlan,
96-
leafQueryExecutor, executorService);
95+
ServerPlanRequestContext serverContext = new ServerPlanRequestContext(distributedStagePlan, leafQueryExecutor,
96+
executorService, executionContext.getPipelineBreakerResult());
9797
// 1. compile the PinotQuery
9898
constructPinotQueryPlan(serverContext);
9999
// 2. convert PinotQuery into InstanceRequest list (one for each physical table)
100100
List<InstanceRequest> instanceRequestList =
101-
ServerPlanRequestUtils.constructServerQueryRequests(serverContext, distributedStagePlan,
101+
ServerPlanRequestUtils.constructServerQueryRequests(executionContext, serverContext, distributedStagePlan,
102102
helixManager.getHelixPropertyStore());
103103
serverContext.setServerQueryRequests(instanceRequestList.stream()
104104
.map(instanceRequest -> new ServerQueryRequest(instanceRequest, serverMetrics, queryArrivalTimeMs, true))
@@ -118,13 +118,6 @@ public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
118118
private static void constructPinotQueryPlan(ServerPlanRequestContext serverContext) {
119119
DistributedStagePlan stagePlan = serverContext.getStagePlan();
120120
PinotQuery pinotQuery = serverContext.getPinotQuery();
121-
Integer leafNodeLimit =
122-
QueryOptionsUtils.getMultiStageLeafLimit(serverContext.getExecutionContext().getOpChainMetadata());
123-
if (leafNodeLimit != null) {
124-
pinotQuery.setLimit(leafNodeLimit);
125-
} else {
126-
pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
127-
}
128121
pinotQuery.setExplain(false);
129122
// visit the plan and create PinotQuery and determine the leaf stage boundary PlanNode.
130123
ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext);
@@ -137,8 +130,9 @@ private static void constructPinotQueryPlan(ServerPlanRequestContext serverConte
137130
* @param helixPropertyStore helix property store used to fetch table config and schema for leaf-stage execution.
138131
* @return a list of server instance request to be run.
139132
*/
140-
public static List<InstanceRequest> constructServerQueryRequests(ServerPlanRequestContext serverContext,
141-
DistributedStagePlan distributedStagePlan, ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
133+
public static List<InstanceRequest> constructServerQueryRequests(OpChainExecutionContext executionContext,
134+
ServerPlanRequestContext serverContext, DistributedStagePlan distributedStagePlan,
135+
ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
142136
StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
143137
WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
144138
String rawTableName = StageMetadata.getTableName(stageMetadata);
@@ -147,6 +141,7 @@ public static List<InstanceRequest> constructServerQueryRequests(ServerPlanReque
147141
List<InstanceRequest> requests = new ArrayList<>();
148142
for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
149143
String tableType = tableEntry.getKey();
144+
List<String> segmentList = tableEntry.getValue();
150145
// ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
151146
// network traffic. but there's chance to improve this:
152147
// TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
@@ -155,15 +150,15 @@ public static List<InstanceRequest> constructServerQueryRequests(ServerPlanReque
155150
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
156151
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
157152
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
158-
requests.add(ServerPlanRequestUtils.compileInstanceRequest(serverContext, stageId, tableConfig, schema,
159-
StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue()));
153+
requests.add(ServerPlanRequestUtils.compileInstanceRequest(executionContext, serverContext, stageId,
154+
tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, segmentList));
160155
} else if (TableType.REALTIME.name().equals(tableType)) {
161156
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
162157
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
163158
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
164159
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
165-
requests.add(ServerPlanRequestUtils.compileInstanceRequest(serverContext, stageId, tableConfig, schema,
166-
StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue()));
160+
requests.add(ServerPlanRequestUtils.compileInstanceRequest(executionContext, serverContext, stageId,
161+
tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, segmentList));
167162
} else {
168163
throw new IllegalArgumentException("Unsupported table type key: " + tableType);
169164
}
@@ -174,17 +169,21 @@ public static List<InstanceRequest> constructServerQueryRequests(ServerPlanReque
174169
/**
175170
* Convert {@link PinotQuery} into an {@link InstanceRequest}.
176171
*/
177-
private static InstanceRequest compileInstanceRequest(ServerPlanRequestContext serverContext, int stageId,
178-
TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, TableType tableType,
179-
List<String> segmentList) {
172+
private static InstanceRequest compileInstanceRequest(OpChainExecutionContext executionContext,
173+
ServerPlanRequestContext serverContext, int stageId, TableConfig tableConfig, Schema schema,
174+
TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList) {
180175
// Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing.
181-
OpChainExecutionContext executionContext = serverContext.getExecutionContext();
182176
long requestId =
183177
(executionContext.getRequestId() << 16) + ((long) stageId << 8) + (tableType == TableType.REALTIME ? 1 : 0);
184-
Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getOpChainMetadata());
185-
LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
186178
// 1. make a deep copy of the pinotQuery and modify the PinotQuery accordingly
187179
PinotQuery pinotQuery = new PinotQuery(serverContext.getPinotQuery());
180+
// - attach leaf node limit
181+
Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getOpChainMetadata());
182+
if (leafNodeLimit != null) {
183+
pinotQuery.setLimit(leafNodeLimit);
184+
} else {
185+
pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
186+
}
188187
// - attach table type
189188
DataSource dataSource = pinotQuery.getDataSource();
190189
String rawTableName = dataSource.getTableName();
@@ -204,7 +203,7 @@ private static InstanceRequest compileInstanceRequest(ServerPlanRequestContext s
204203
// 2. set pinot query options according to requestMetadataMap
205204
updateQueryOptions(pinotQuery, executionContext);
206205

207-
// 3. wrapped around in broker request
206+
// 3. wrapped around in broker request and replace with actual table name with type.
208207
BrokerRequest brokerRequest = new BrokerRequest();
209208
brokerRequest.setPinotQuery(pinotQuery);
210209
QuerySource querySource = new QuerySource();

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public Void visitJoin(JoinNode node, ServerPlanRequestContext context) {
133133
staticSide = node.getInputs().get(1);
134134
}
135135
if (visit(staticSide, context)) {
136-
PipelineBreakerResult pipelineBreakerResult = context.getExecutionContext().getPipelineBreakerResult();
136+
PipelineBreakerResult pipelineBreakerResult = context.getPipelineBreakerResult();
137137
int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide);
138138
List<TransferableBlock> transferableBlocks =
139139
pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, Collections.emptyList());

0 commit comments

Comments
 (0)