1818 */
1919package org .apache .pinot .connector .presto ;
2020
21+ import com .google .common .collect .ImmutableMap ;
2122import java .net .InetAddress ;
2223import java .net .UnknownHostException ;
2324import java .util .ArrayList ;
3536import org .apache .pinot .common .metrics .PinotMetricUtils ;
3637import org .apache .pinot .common .request .BrokerRequest ;
3738import org .apache .pinot .common .utils .DataTable ;
39+ import org .apache .pinot .connector .presto .plugin .metrics .NoopPinotMetricFactory ;
3840import org .apache .pinot .core .transport .AsyncQueryResponse ;
3941import org .apache .pinot .core .transport .QueryRouter ;
4042import org .apache .pinot .core .transport .ServerInstance ;
4143import org .apache .pinot .core .transport .ServerResponse ;
4244import org .apache .pinot .core .transport .ServerRoutingInstance ;
4345import org .apache .pinot .spi .config .table .TableType ;
46+ import org .apache .pinot .spi .env .PinotConfiguration ;
4447import org .apache .pinot .spi .utils .builder .TableNameBuilder ;
4548import org .apache .pinot .sql .parsers .CalciteSqlCompiler ;
4649
50+ import static org .apache .pinot .spi .utils .CommonConstants .CONFIG_OF_METRICS_FACTORY_CLASS_NAME ;
51+
4752
4853public class PinotScatterGatherQueryClient {
4954 private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler ();
@@ -72,8 +77,7 @@ public boolean isRetriable() {
7277 }
7378 }
7479
75- public static class PinotException
76- extends RuntimeException {
80+ public static class PinotException extends RuntimeException {
7781 private final ErrorCode _errorCode ;
7882
7983 public PinotException (ErrorCode errorCode , String message , Throwable t ) {
@@ -188,6 +192,8 @@ public String getSslProvider() {
188192
189193 public PinotScatterGatherQueryClient (Config pinotConfig ) {
190194 _prestoHostId = getDefaultPrestoId ();
195+ PinotMetricUtils .init (new PinotConfiguration (
196+ ImmutableMap .of (CONFIG_OF_METRICS_FACTORY_CLASS_NAME , NoopPinotMetricFactory .class .getName ())));
191197 _brokerMetrics = new BrokerMetrics (PinotMetricUtils .getPinotMetricsRegistry ());
192198 _brokerMetrics .initializeGlobalMeters ();
193199 TlsConfig tlsConfig = getTlsConfig (pinotConfig );
@@ -240,13 +246,8 @@ private String getDefaultPrestoId() {
240246 return defaultBrokerId ;
241247 }
242248
243- public Map <ServerInstance , DataTable > queryPinotServerForDataTable (
244- String pql ,
245- String serverHost ,
246- List <String > segments ,
247- long connectionTimeoutInMillis ,
248- boolean ignoreEmptyResponses ,
249- int pinotRetryCount ) {
249+ public Map <ServerInstance , DataTable > queryPinotServerForDataTable (String pql , String serverHost ,
250+ List <String > segments , long connectionTimeoutInMillis , boolean ignoreEmptyResponses , int pinotRetryCount ) {
250251 BrokerRequest brokerRequest ;
251252 try {
252253 brokerRequest = REQUEST_COMPILER .compileToBrokerRequest (pql );
@@ -260,7 +261,7 @@ public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
260261 new ArrayList <>(segments ));
261262
262263 // Unfortunately the retries will all hit the same server because the routing decision has already been made by
263- // the pinot broker
264+ // the pinot broker
264265 Map <ServerInstance , DataTable > serverResponseMap = doWithRetries (pinotRetryCount , (requestId ) -> {
265266 String rawTableName = TableNameBuilder .extractRawTableName (brokerRequest .getQuerySource ().getTableName ());
266267 if (!_concurrentQueriesCountMap .containsKey (serverHost )) {
@@ -276,17 +277,17 @@ public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
276277 QueryRouter nextAvailableQueryRouter = getNextAvailableQueryRouter ();
277278 if (TableNameBuilder .getTableTypeFromTableName (brokerRequest .getQuerySource ().getTableName ())
278279 == TableType .REALTIME ) {
279- asyncQueryResponse = nextAvailableQueryRouter .submitQuery (requestId , rawTableName , null , null , brokerRequest ,
280- routingTable , connectionTimeoutInMillis );
280+ asyncQueryResponse =
281+ nextAvailableQueryRouter .submitQuery (requestId , rawTableName , null , null , brokerRequest , routingTable ,
282+ connectionTimeoutInMillis );
281283 } else {
282- asyncQueryResponse = nextAvailableQueryRouter
283- .submitQuery (requestId , rawTableName , brokerRequest , routingTable , null , null , connectionTimeoutInMillis );
284+ asyncQueryResponse =
285+ nextAvailableQueryRouter .submitQuery (requestId , rawTableName , brokerRequest , routingTable , null , null ,
286+ connectionTimeoutInMillis );
284287 }
285- Map <ServerInstance , DataTable > serverInstanceDataTableMap = gatherServerResponses (
286- ignoreEmptyResponses ,
287- routingTable ,
288- asyncQueryResponse ,
289- brokerRequest .getQuerySource ().getTableName ());
288+ Map <ServerInstance , DataTable > serverInstanceDataTableMap =
289+ gatherServerResponses (ignoreEmptyResponses , routingTable , asyncQueryResponse ,
290+ brokerRequest .getQuerySource ().getTableName ());
290291 _queryRouters .offer (nextAvailableQueryRouter );
291292 _concurrentQueriesCountMap .get (serverHost ).decrementAndGet ();
292293 return serverInstanceDataTableMap ;
@@ -320,15 +321,15 @@ private Map<ServerInstance, DataTable> gatherServerResponses(boolean ignoreEmpty
320321 : entry .getValue ().toString ();
321322 routingTableForLogging .put (entry .getKey ().toString (), valueToPrint );
322323 });
323- throw new PinotException (ErrorCode .PINOT_INSUFFICIENT_SERVER_RESPONSE , String
324- .format ("%d of %d servers responded with routing table servers: %s, query stats: %s" ,
324+ throw new PinotException (ErrorCode .PINOT_INSUFFICIENT_SERVER_RESPONSE ,
325+ String .format ("%d of %d servers responded with routing table servers: %s, query stats: %s" ,
325326 queryResponses .size (), routingTable .size (), routingTableForLogging , asyncQueryResponse .getStats ()));
326327 }
327328 }
328329 Map <ServerInstance , DataTable > serverResponseMap = new HashMap <>();
329- queryResponses .entrySet ().forEach (entry -> serverResponseMap .put (
330- new ServerInstance ( new InstanceConfig (
331- String . format ( "Server_%s_%d" , entry . getKey (). getHostname (), entry .getKey ().getPort ()))),
330+ queryResponses .entrySet ().forEach (entry -> serverResponseMap .put (new ServerInstance (
331+ new InstanceConfig (String . format ( "Server_%s_%d" , entry . getKey (). getHostname (),
332+ entry .getKey ().getPort ()))),
332333 entry .getValue ().getDataTable ()));
333334 return serverResponseMap ;
334335 } catch (InterruptedException e ) {
0 commit comments