2424import java .util .List ;
2525import java .util .Map ;
2626import java .util .Set ;
27- import java .util .concurrent .ExecutorService ;
2827import java .util .concurrent .TimeUnit ;
2928import javax .annotation .Nullable ;
3029import javax .ws .rs .WebApplicationException ;
6059import org .apache .pinot .query .mailbox .MailboxService ;
6160import org .apache .pinot .query .planner .DispatchableSubPlan ;
6261import org .apache .pinot .query .routing .WorkerManager ;
63- import org .apache .pinot .query .runtime .executor .ExecutorServiceUtils ;
64- import org .apache .pinot .query .runtime .executor .OpChainSchedulerService ;
6562import org .apache .pinot .query .service .dispatch .QueryDispatcher ;
6663import org .apache .pinot .query .type .TypeFactory ;
6764import org .apache .pinot .query .type .TypeSystem ;
7673
7774public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
7875 private static final Logger LOGGER = LoggerFactory .getLogger (MultiStageBrokerRequestHandler .class );
79- private final String _reducerHostname ;
80- private final int _reducerPort ;
81-
82- private final MailboxService _mailboxService ;
83- private final OpChainSchedulerService _reducerScheduler ;
8476
8577 private final QueryEnvironment _queryEnvironment ;
78+ private final MailboxService _mailboxService ;
8679 private final QueryDispatcher _queryDispatcher ;
87- private final ExecutorService _opChainExecutor ;
8880
8981 public MultiStageBrokerRequestHandler (PinotConfiguration config , String brokerIdFromConfig ,
9082 BrokerRoutingManager routingManager , AccessControlFactory accessControlFactory ,
@@ -101,19 +93,14 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
10193 brokerId = StringUtils .split (brokerId , "_" ).length > 1 ? StringUtils .split (brokerId , "_" )[0 ] : brokerId ;
10294 reducerHostname = brokerId ;
10395 }
104- _reducerHostname = reducerHostname ;
10596 // This config has to be set to a valid port number.
106- _reducerPort = Integer .parseInt (config .getProperty (CommonConstants .MultiStageQueryRunner .KEY_OF_QUERY_RUNNER_PORT ));
97+ int reducerPort =
98+ Integer .parseInt (config .getProperty (CommonConstants .MultiStageQueryRunner .KEY_OF_QUERY_RUNNER_PORT ));
10799 _queryEnvironment = new QueryEnvironment (new TypeFactory (new TypeSystem ()),
108100 CalciteSchemaBuilder .asRootSchema (new PinotCatalog (tableCache )),
109- new WorkerManager (_reducerHostname , _reducerPort , routingManager ), _tableCache );
110- _queryDispatcher = new QueryDispatcher ();
111-
112- String opChainExecConfigPrefix = "pinot.query.runner.opchain" ;
113- String opChainExecNamePrefix = "query_broker_reducer_" + _reducerPort + "port" ;
114- _opChainExecutor = ExecutorServiceUtils .create (config , opChainExecConfigPrefix , opChainExecNamePrefix );
115- _reducerScheduler = new OpChainSchedulerService (_opChainExecutor );
116- _mailboxService = new MailboxService (_reducerHostname , _reducerPort , config );
101+ new WorkerManager (reducerHostname , reducerPort , routingManager ), _tableCache );
102+ _mailboxService = new MailboxService (reducerHostname , reducerPort , config );
103+ _queryDispatcher = new QueryDispatcher (_mailboxService );
117104
118105 // TODO: move this to a startUp() function.
119106 _mailboxService .start ();
@@ -151,12 +138,12 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
151138 break ;
152139 }
153140 } catch (Exception e ) {
154- LOGGER . info ( "Caught exception while compiling SQL request {}: {}, {}" , requestId , query ,
155- ExceptionUtils . consolidateExceptionMessages ( e ) );
141+ String consolidatedMessage = ExceptionUtils . consolidateExceptionMessages ( e );
142+ LOGGER . info ( "Caught exception compiling request {}: {}, {}" , requestId , query , consolidatedMessage );
156143 _brokerMetrics .addMeteredGlobalValue (BrokerMeter .REQUEST_COMPILATION_EXCEPTIONS , 1 );
157144 requestContext .setErrorCode (QueryException .SQL_PARSING_ERROR_CODE );
158- return new BrokerResponseNative (QueryException . getException ( QueryException . SQL_PARSING_ERROR ,
159- ExceptionUtils . consolidateExceptionMessages ( e ) ));
145+ return new BrokerResponseNative (
146+ QueryException . getException ( QueryException . SQL_PARSING_ERROR , consolidatedMessage ));
160147 }
161148
162149 DispatchableSubPlan dispatchableSubPlan = queryPlanResult .getQueryPlan ();
@@ -190,12 +177,14 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
190177
191178 long executionStartTimeNs = System .nanoTime ();
192179 try {
193- queryResults =
194- _queryDispatcher .submitAndReduce (requestContext , dispatchableSubPlan , _mailboxService , _reducerScheduler ,
195- queryTimeoutMs , sqlNodeAndOptions .getOptions (), stageIdStatsMap , traceEnabled );
180+ queryResults = _queryDispatcher .submitAndReduce (requestContext , dispatchableSubPlan , queryTimeoutMs ,
181+ sqlNodeAndOptions .getOptions (), stageIdStatsMap , traceEnabled );
196182 } catch (Throwable t ) {
197- LOGGER .error ("query execution failed" , t );
198- return new BrokerResponseNative (QueryException .getException (QueryException .QUERY_EXECUTION_ERROR , t ));
183+ String consolidatedMessage = ExceptionUtils .consolidateExceptionMessages (t );
184+ LOGGER .error ("Caught exception executing request {}: {}, {}" , requestId , query , consolidatedMessage );
185+ requestContext .setErrorCode (QueryException .QUERY_EXECUTION_ERROR_CODE );
186+ return new BrokerResponseNative (
187+ QueryException .getException (QueryException .QUERY_EXECUTION_ERROR , consolidatedMessage ));
199188 }
200189 long executionEndTimeNs = System .nanoTime ();
201190 updatePhaseTimingForTables (tableNames , BrokerQueryPhase .QUERY_EXECUTION , executionEndTimeNs - executionStartTimeNs );
@@ -301,8 +290,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
301290 BrokerRequest serverBrokerRequest , @ Nullable BrokerRequest offlineBrokerRequest ,
302291 @ Nullable Map <ServerInstance , List <String >> offlineRoutingTable , @ Nullable BrokerRequest realtimeBrokerRequest ,
303292 @ Nullable Map <ServerInstance , List <String >> realtimeRoutingTable , long timeoutMs , ServerStats serverStats ,
304- RequestContext requestContext )
305- throws Exception {
293+ RequestContext requestContext ) {
306294 throw new UnsupportedOperationException ();
307295 }
308296
@@ -315,6 +303,5 @@ public void start() {
315303 public void shutDown () {
316304 _queryDispatcher .shutdown ();
317305 _mailboxService .shutdown ();
318- ExecutorServiceUtils .close (_opChainExecutor );
319306 }
320307}
0 commit comments