@@ -92,13 +92,13 @@ public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
9292 DistributedStagePlan distributedStagePlan , HelixManager helixManager , ServerMetrics serverMetrics ,
9393 QueryExecutor leafQueryExecutor , ExecutorService executorService ) {
9494 long queryArrivalTimeMs = System .currentTimeMillis ();
95- ServerPlanRequestContext serverContext = new ServerPlanRequestContext (executionContext , distributedStagePlan ,
96- leafQueryExecutor , executorService );
95+ ServerPlanRequestContext serverContext = new ServerPlanRequestContext (distributedStagePlan , leafQueryExecutor ,
96+ executorService , executionContext . getPipelineBreakerResult () );
9797 // 1. compile the PinotQuery
9898 constructPinotQueryPlan (serverContext );
9999 // 2. convert PinotQuery into InstanceRequest list (one for each physical table)
100100 List <InstanceRequest > instanceRequestList =
101- ServerPlanRequestUtils .constructServerQueryRequests (serverContext , distributedStagePlan ,
101+ ServerPlanRequestUtils .constructServerQueryRequests (executionContext , serverContext , distributedStagePlan ,
102102 helixManager .getHelixPropertyStore ());
103103 serverContext .setServerQueryRequests (instanceRequestList .stream ()
104104 .map (instanceRequest -> new ServerQueryRequest (instanceRequest , serverMetrics , queryArrivalTimeMs , true ))
@@ -118,13 +118,6 @@ public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
118118 private static void constructPinotQueryPlan (ServerPlanRequestContext serverContext ) {
119119 DistributedStagePlan stagePlan = serverContext .getStagePlan ();
120120 PinotQuery pinotQuery = serverContext .getPinotQuery ();
121- Integer leafNodeLimit =
122- QueryOptionsUtils .getMultiStageLeafLimit (serverContext .getExecutionContext ().getOpChainMetadata ());
123- if (leafNodeLimit != null ) {
124- pinotQuery .setLimit (leafNodeLimit );
125- } else {
126- pinotQuery .setLimit (DEFAULT_LEAF_NODE_LIMIT );
127- }
128121 pinotQuery .setExplain (false );
129122 // visit the plan and create PinotQuery and determine the leaf stage boundary PlanNode.
130123 ServerPlanRequestVisitor .walkStageNode (stagePlan .getStageRoot (), serverContext );
@@ -137,8 +130,9 @@ private static void constructPinotQueryPlan(ServerPlanRequestContext serverConte
137130 * @param helixPropertyStore helix property store used to fetch table config and schema for leaf-stage execution.
138131 * @return a list of server instance request to be run.
139132 */
140- public static List <InstanceRequest > constructServerQueryRequests (ServerPlanRequestContext serverContext ,
141- DistributedStagePlan distributedStagePlan , ZkHelixPropertyStore <ZNRecord > helixPropertyStore ) {
133+ public static List <InstanceRequest > constructServerQueryRequests (OpChainExecutionContext executionContext ,
134+ ServerPlanRequestContext serverContext , DistributedStagePlan distributedStagePlan ,
135+ ZkHelixPropertyStore <ZNRecord > helixPropertyStore ) {
142136 StageMetadata stageMetadata = distributedStagePlan .getStageMetadata ();
143137 WorkerMetadata workerMetadata = distributedStagePlan .getCurrentWorkerMetadata ();
144138 String rawTableName = StageMetadata .getTableName (stageMetadata );
@@ -147,6 +141,7 @@ public static List<InstanceRequest> constructServerQueryRequests(ServerPlanReque
147141 List <InstanceRequest > requests = new ArrayList <>();
148142 for (Map .Entry <String , List <String >> tableEntry : tableToSegmentListMap .entrySet ()) {
149143 String tableType = tableEntry .getKey ();
144+ List <String > segmentList = tableEntry .getValue ();
150145 // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
151146 // network traffic. but there's chance to improve this:
152147 // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
@@ -155,15 +150,15 @@ public static List<InstanceRequest> constructServerQueryRequests(ServerPlanReque
155150 TableNameBuilder .forType (TableType .OFFLINE ).tableNameWithType (rawTableName ));
156151 Schema schema = ZKMetadataProvider .getTableSchema (helixPropertyStore ,
157152 TableNameBuilder .forType (TableType .OFFLINE ).tableNameWithType (rawTableName ));
158- requests .add (ServerPlanRequestUtils .compileInstanceRequest (serverContext , stageId , tableConfig , schema ,
159- StageMetadata .getTimeBoundary (stageMetadata ), TableType .OFFLINE , tableEntry . getValue () ));
153+ requests .add (ServerPlanRequestUtils .compileInstanceRequest (executionContext , serverContext , stageId ,
154+ tableConfig , schema , StageMetadata .getTimeBoundary (stageMetadata ), TableType .OFFLINE , segmentList ));
160155 } else if (TableType .REALTIME .name ().equals (tableType )) {
161156 TableConfig tableConfig = ZKMetadataProvider .getTableConfig (helixPropertyStore ,
162157 TableNameBuilder .forType (TableType .REALTIME ).tableNameWithType (rawTableName ));
163158 Schema schema = ZKMetadataProvider .getTableSchema (helixPropertyStore ,
164159 TableNameBuilder .forType (TableType .REALTIME ).tableNameWithType (rawTableName ));
165- requests .add (ServerPlanRequestUtils .compileInstanceRequest (serverContext , stageId , tableConfig , schema ,
166- StageMetadata .getTimeBoundary (stageMetadata ), TableType .REALTIME , tableEntry . getValue () ));
160+ requests .add (ServerPlanRequestUtils .compileInstanceRequest (executionContext , serverContext , stageId ,
161+ tableConfig , schema , StageMetadata .getTimeBoundary (stageMetadata ), TableType .REALTIME , segmentList ));
167162 } else {
168163 throw new IllegalArgumentException ("Unsupported table type key: " + tableType );
169164 }
@@ -174,17 +169,21 @@ public static List<InstanceRequest> constructServerQueryRequests(ServerPlanReque
174169 /**
175170 * Convert {@link PinotQuery} into an {@link InstanceRequest}.
176171 */
177- private static InstanceRequest compileInstanceRequest (ServerPlanRequestContext serverContext , int stageId ,
178- TableConfig tableConfig , Schema schema , TimeBoundaryInfo timeBoundaryInfo , TableType tableType ,
179- List <String > segmentList ) {
172+ private static InstanceRequest compileInstanceRequest (OpChainExecutionContext executionContext ,
173+ ServerPlanRequestContext serverContext , int stageId , TableConfig tableConfig , Schema schema ,
174+ TimeBoundaryInfo timeBoundaryInfo , TableType tableType , List <String > segmentList ) {
180175 // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing.
181- OpChainExecutionContext executionContext = serverContext .getExecutionContext ();
182176 long requestId =
183177 (executionContext .getRequestId () << 16 ) + ((long ) stageId << 8 ) + (tableType == TableType .REALTIME ? 1 : 0 );
184- Integer leafNodeLimit = QueryOptionsUtils .getMultiStageLeafLimit (executionContext .getOpChainMetadata ());
185- LOGGER .debug ("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit );
186178 // 1. make a deep copy of the pinotQuery and modify the PinotQuery accordingly
187179 PinotQuery pinotQuery = new PinotQuery (serverContext .getPinotQuery ());
180+ // - attach leaf node limit
181+ Integer leafNodeLimit = QueryOptionsUtils .getMultiStageLeafLimit (executionContext .getOpChainMetadata ());
182+ if (leafNodeLimit != null ) {
183+ pinotQuery .setLimit (leafNodeLimit );
184+ } else {
185+ pinotQuery .setLimit (DEFAULT_LEAF_NODE_LIMIT );
186+ }
188187 // - attach table type
189188 DataSource dataSource = pinotQuery .getDataSource ();
190189 String rawTableName = dataSource .getTableName ();
@@ -204,7 +203,7 @@ private static InstanceRequest compileInstanceRequest(ServerPlanRequestContext s
204203 // 2. set pinot query options according to requestMetadataMap
205204 updateQueryOptions (pinotQuery , executionContext );
206205
207- // 3. wrapped around in broker request
206+ // 3. wrapped around in broker request and replace with actual table name with type.
208207 BrokerRequest brokerRequest = new BrokerRequest ();
209208 brokerRequest .setPinotQuery (pinotQuery );
210209 QuerySource querySource = new QuerySource ();
0 commit comments