2020
2121import com .google .common .annotations .VisibleForTesting ;
2222import java .util .ArrayList ;
23- import java .util .Collections ;
2423import java .util .List ;
2524import java .util .Map ;
2625import java .util .concurrent .ExecutorService ;
2726import java .util .concurrent .TimeoutException ;
2827import javax .annotation .Nullable ;
2928import org .apache .helix .HelixManager ;
30- import org .apache .helix .store .zk .ZkHelixPropertyStore ;
31- import org .apache .helix .zookeeper .datamodel .ZNRecord ;
3229import org .apache .pinot .common .exception .QueryException ;
3330import org .apache .pinot .common .metrics .ServerMetrics ;
3431import org .apache .pinot .common .utils .config .QueryOptionsUtils ;
3936import org .apache .pinot .query .mailbox .MailboxIdUtils ;
4037import org .apache .pinot .query .mailbox .MailboxService ;
4138import org .apache .pinot .query .planner .plannode .MailboxSendNode ;
42- import org .apache .pinot .query .planner .plannode .PlanNode ;
4339import org .apache .pinot .query .routing .MailboxMetadata ;
4440import org .apache .pinot .query .runtime .blocks .TransferableBlock ;
4541import org .apache .pinot .query .runtime .executor .ExecutorServiceUtils ;
5046import org .apache .pinot .query .runtime .operator .OpChain ;
5147import org .apache .pinot .query .runtime .plan .DistributedStagePlan ;
5248import org .apache .pinot .query .runtime .plan .OpChainExecutionContext ;
53- import org .apache .pinot .query .runtime .plan .PhysicalPlanContext ;
5449import org .apache .pinot .query .runtime .plan .PhysicalPlanVisitor ;
5550import org .apache .pinot .query .runtime .plan .pipeline .PipelineBreakerExecutor ;
5651import org .apache .pinot .query .runtime .plan .pipeline .PipelineBreakerResult ;
5752import org .apache .pinot .query .runtime .plan .server .ServerPlanRequestContext ;
5853import org .apache .pinot .query .runtime .plan .server .ServerPlanRequestUtils ;
5954import org .apache .pinot .spi .env .PinotConfiguration ;
60- import org .apache .pinot .spi .metrics .PinotMetricUtils ;
6155import org .apache .pinot .spi .utils .CommonConstants ;
6256import org .apache .pinot .spi .utils .CommonConstants .Broker .Request .QueryOptionKey ;
6357import org .apache .pinot .spi .utils .CommonConstants .MultiStageQueryRunner .JoinOverFlowMode ;
@@ -72,17 +66,13 @@ public class QueryRunner {
7266 private static final Logger LOGGER = LoggerFactory .getLogger (QueryRunner .class );
7367 private static final String PINOT_V1_SERVER_QUERY_CONFIG_PREFIX = "pinot.server.query.executor" ;
7468
75- // This is a temporary before merging the 2 type of executor.
76- private ServerQueryExecutorV1Impl _serverExecutor ;
7769 private HelixManager _helixManager ;
78- private ZkHelixPropertyStore <ZNRecord > _helixPropertyStore ;
79- private MailboxService _mailboxService ;
80- private String _hostname ;
81- private int _port ;
70+ private ServerMetrics _serverMetrics ;
8271
8372 private ExecutorService _opChainExecutor ;
84-
8573 private OpChainSchedulerService _scheduler ;
74+ private MailboxService _mailboxService ;
75+ private ServerQueryExecutorV1Impl _leafQueryExecutor ;
8676
8777 // Group-by settings
8878 @ Nullable
@@ -102,12 +92,14 @@ public class QueryRunner {
10292 */
10393 public void init (PinotConfiguration config , InstanceDataManager instanceDataManager , HelixManager helixManager ,
10494 ServerMetrics serverMetrics ) {
95+ _helixManager = helixManager ;
96+ _serverMetrics = serverMetrics ;
97+
10598 String instanceName = config .getProperty (CommonConstants .MultiStageQueryRunner .KEY_OF_QUERY_RUNNER_HOSTNAME );
106- _hostname = instanceName .startsWith (CommonConstants .Helix .PREFIX_OF_SERVER_INSTANCE ) ? instanceName .substring (
99+ String hostname = instanceName .startsWith (CommonConstants .Helix .PREFIX_OF_SERVER_INSTANCE ) ? instanceName .substring (
107100 CommonConstants .Helix .SERVER_INSTANCE_PREFIX_LENGTH ) : instanceName ;
108- _port = config .getProperty (CommonConstants .MultiStageQueryRunner .KEY_OF_QUERY_RUNNER_PORT ,
101+ int port = config .getProperty (CommonConstants .MultiStageQueryRunner .KEY_OF_QUERY_RUNNER_PORT ,
109102 CommonConstants .MultiStageQueryRunner .DEFAULT_QUERY_RUNNER_PORT );
110- _helixManager = helixManager ;
111103
112104 // TODO: Consider using separate config for intermediate stage and leaf stage
113105 String numGroupsLimitStr = config .getProperty (CommonConstants .Server .CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT );
@@ -121,29 +113,28 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
121113 String joinOverflowModeStr = config .getProperty (CommonConstants .MultiStageQueryRunner .KEY_OF_JOIN_OVERFLOW_MODE );
122114 _joinOverflowMode = joinOverflowModeStr != null ? JoinOverFlowMode .valueOf (joinOverflowModeStr ) : null ;
123115
116+ //TODO: make this configurable
117+ _opChainExecutor =
118+ ExecutorServiceUtils .create (config , "pinot.query.runner.opchain" , "op_chain_worker_on_" + port + "_port" );
119+ _scheduler = new OpChainSchedulerService (getOpChainExecutorService ());
120+ _mailboxService = new MailboxService (hostname , port , config );
124121 try {
125- //TODO: make this configurable
126- _opChainExecutor = ExecutorServiceUtils .create (config , "pinot.query.runner.opchain" ,
127- "op_chain_worker_on_" + _port + "_port" );
128- _scheduler = new OpChainSchedulerService (getOpChainExecutorService ());
129- _mailboxService = new MailboxService (_hostname , _port , config );
130- _serverExecutor = new ServerQueryExecutorV1Impl ();
131- _serverExecutor .init (config .subset (PINOT_V1_SERVER_QUERY_CONFIG_PREFIX ), instanceDataManager , serverMetrics );
122+ _leafQueryExecutor = new ServerQueryExecutorV1Impl ();
123+ _leafQueryExecutor .init (config .subset (PINOT_V1_SERVER_QUERY_CONFIG_PREFIX ), instanceDataManager , serverMetrics );
132124 } catch (Exception e ) {
133125 throw new RuntimeException (e );
134126 }
127+
128+ LOGGER .info ("Initialized QueryRunner with hostname: {}, port: {}" , hostname , port );
135129 }
136130
137- public void start ()
138- throws TimeoutException {
139- _helixPropertyStore = _helixManager .getHelixPropertyStore ();
131+ public void start () {
140132 _mailboxService .start ();
141- _serverExecutor .start ();
133+ _leafQueryExecutor .start ();
142134 }
143135
144- public void shutDown ()
145- throws TimeoutException {
146- _serverExecutor .shutDown ();
136+ public void shutDown () {
137+ _leafQueryExecutor .shutDown ();
147138 _mailboxService .shutdown ();
148139 ExecutorServiceUtils .close (_opChainExecutor );
149140 }
@@ -156,17 +147,15 @@ public void shutDown()
156147 */
157148 public void processQuery (DistributedStagePlan distributedStagePlan , Map <String , String > requestMetadata ) {
158149 long requestId = Long .parseLong (requestMetadata .get (CommonConstants .Query .Request .MetadataKeys .REQUEST_ID ));
159- long timeoutMs = Long .parseLong (requestMetadata .get (QueryOptionKey .TIMEOUT_MS ));
160- boolean isTraceEnabled =
161- Boolean .parseBoolean (requestMetadata .getOrDefault (CommonConstants .Broker .Request .TRACE , "false" ));
150+ long timeoutMs = Long .parseLong (requestMetadata .get (CommonConstants .Broker .Request .QueryOptionKey .TIMEOUT_MS ));
162151 long deadlineMs = System .currentTimeMillis () + timeoutMs ;
163152
164153 setStageCustomProperties (distributedStagePlan .getStageMetadata ().getCustomProperties (), requestMetadata );
165154
166155 // run pre-stage execution for all pipeline breakers
167156 PipelineBreakerResult pipelineBreakerResult =
168- PipelineBreakerExecutor .executePipelineBreakers (_scheduler , _mailboxService , distributedStagePlan , deadlineMs ,
169- requestId , isTraceEnabled );
157+ PipelineBreakerExecutor .executePipelineBreakers (_scheduler , _mailboxService , distributedStagePlan ,
158+ requestMetadata , requestId , deadlineMs );
170159
171160 // Send error block to all the receivers if pipeline breaker fails
172161 if (pipelineBreakerResult != null && pipelineBreakerResult .getErrorBlock () != null ) {
@@ -193,13 +182,15 @@ public void processQuery(DistributedStagePlan distributedStagePlan, Map<String,
193182 }
194183
195184 // run OpChain
185+ OpChainExecutionContext executionContext =
186+ new OpChainExecutionContext (_mailboxService , requestId , distributedStagePlan .getStageId (),
187+ distributedStagePlan .getServer (), deadlineMs , requestMetadata , distributedStagePlan .getStageMetadata (),
188+ pipelineBreakerResult );
196189 OpChain opChain ;
197190 if (DistributedStagePlan .isLeafStage (distributedStagePlan )) {
198- opChain = compileLeafStage (requestId , distributedStagePlan , requestMetadata , pipelineBreakerResult , deadlineMs ,
199- isTraceEnabled );
191+ opChain = compileLeafStage (executionContext , distributedStagePlan );
200192 } else {
201- opChain = compileIntermediateStage (requestId , distributedStagePlan , requestMetadata , pipelineBreakerResult ,
202- deadlineMs , isTraceEnabled );
193+ opChain = PhysicalPlanVisitor .walkPlanNode (distributedStagePlan .getStageRoot (), executionContext );
203194 }
204195 _scheduler .register (opChain );
205196 }
@@ -248,51 +239,36 @@ public ExecutorService getOpChainExecutorService() {
248239 return _opChainExecutor ;
249240 }
250241
251- private OpChain compileIntermediateStage (long requestId , DistributedStagePlan distributedStagePlan ,
252- Map <String , String > requestMetadataMap , PipelineBreakerResult pipelineBreakerResult , long deadlineMs ,
253- boolean isTraceEnabled ) {
254- PlanNode stageRoot = distributedStagePlan .getStageRoot ();
255- OpChainExecutionContext opChainContext = new OpChainExecutionContext (_mailboxService , requestId ,
256- stageRoot .getPlanFragmentId (), distributedStagePlan .getServer (), deadlineMs ,
257- distributedStagePlan .getStageMetadata (), pipelineBreakerResult , isTraceEnabled );
258- return PhysicalPlanVisitor .walkPlanNode (stageRoot ,
259- new PhysicalPlanContext (opChainContext , pipelineBreakerResult ));
260- }
261-
262- private OpChain compileLeafStage (long requestId , DistributedStagePlan distributedStagePlan ,
263- Map <String , String > requestMetadataMap , PipelineBreakerResult pipelineBreakerResult , long deadlineMs ,
264- boolean isTraceEnabled ) {
265- OpChainExecutionContext opChainContext = new OpChainExecutionContext (_mailboxService , requestId ,
266- distributedStagePlan .getStageId (), distributedStagePlan .getServer (), deadlineMs ,
267- distributedStagePlan .getStageMetadata (), pipelineBreakerResult , isTraceEnabled );
268- PhysicalPlanContext planContext = new PhysicalPlanContext (opChainContext , pipelineBreakerResult );
269- List <ServerPlanRequestContext > serverPlanRequestContexts = ServerPlanRequestUtils .constructServerQueryRequests (
270- planContext , distributedStagePlan , requestMetadataMap , _helixPropertyStore );
242+ private OpChain compileLeafStage (OpChainExecutionContext executionContext ,
243+ DistributedStagePlan distributedStagePlan ) {
244+ List <ServerPlanRequestContext > serverPlanRequestContexts =
245+ ServerPlanRequestUtils .constructServerQueryRequests (executionContext , distributedStagePlan ,
246+ _helixManager .getHelixPropertyStore ());
271247 List <ServerQueryRequest > serverQueryRequests = new ArrayList <>(serverPlanRequestContexts .size ());
248+ long queryArrivalTimeMs = System .currentTimeMillis ();
272249 for (ServerPlanRequestContext requestContext : serverPlanRequestContexts ) {
273- serverQueryRequests .add (new ServerQueryRequest ( requestContext . getInstanceRequest (),
274- new ServerMetrics ( PinotMetricUtils . getPinotMetricsRegistry ()), System . currentTimeMillis () ));
250+ serverQueryRequests .add (
251+ new ServerQueryRequest ( requestContext . getInstanceRequest (), _serverMetrics , queryArrivalTimeMs ));
275252 }
276253 MailboxSendNode sendNode = (MailboxSendNode ) distributedStagePlan .getStageRoot ();
277- OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext (planContext );
278254 MultiStageOperator leafStageOperator =
279- new LeafStageTransferableBlockOperator (opChainExecutionContext , this ::processServerQueryRequest ,
280- serverQueryRequests , sendNode .getDataSchema ());
255+ new LeafStageTransferableBlockOperator (executionContext , this ::processServerQueryRequest , serverQueryRequests ,
256+ sendNode .getDataSchema ());
281257 MailboxSendOperator mailboxSendOperator =
282- new MailboxSendOperator (opChainExecutionContext , leafStageOperator , sendNode .getDistributionType (),
258+ new MailboxSendOperator (executionContext , leafStageOperator , sendNode .getDistributionType (),
283259 sendNode .getPartitionKeySelector (), sendNode .getCollationKeys (), sendNode .getCollationDirections (),
284260 sendNode .isSortOnSender (), sendNode .getReceiverStageId ());
285- return new OpChain (opChainExecutionContext , mailboxSendOperator , Collections . emptyList () );
261+ return new OpChain (executionContext , mailboxSendOperator );
286262 }
287263
288264 private InstanceResponseBlock processServerQueryRequest (ServerQueryRequest request ) {
289265 InstanceResponseBlock result ;
290266 try {
291- result = _serverExecutor .execute (request , getOpChainExecutorService ());
267+ result = _leafQueryExecutor .execute (request , getOpChainExecutorService ());
292268 } catch (Exception e ) {
293269 InstanceResponseBlock errorResponse = new InstanceResponseBlock ();
294- errorResponse .getExceptions (). put ( QueryException . QUERY_EXECUTION_ERROR_CODE ,
295- e .getMessage () + QueryException .getTruncatedStackTrace (e ));
270+ errorResponse .getExceptions ()
271+ . put ( QueryException . QUERY_EXECUTION_ERROR_CODE , e .getMessage () + QueryException .getTruncatedStackTrace (e ));
296272 result = errorResponse ;
297273 }
298274 return result ;
0 commit comments