Skip to content

Commit 775ac8a

Browse files
committed
Adding NoopPinotMetricFactory and corresponding changes
1 parent 1c4512e commit 775ac8a

File tree

3 files changed

+317
-37
lines changed

3 files changed

+317
-37
lines changed

pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.connector.presto;
2020

21+
import com.google.common.collect.ImmutableMap;
2122
import java.net.InetAddress;
2223
import java.net.UnknownHostException;
2324
import java.util.ArrayList;
@@ -35,15 +36,19 @@
3536
import org.apache.pinot.common.metrics.PinotMetricUtils;
3637
import org.apache.pinot.common.request.BrokerRequest;
3738
import org.apache.pinot.common.utils.DataTable;
39+
import org.apache.pinot.connector.presto.plugin.metrics.NoopPinotMetricFactory;
3840
import org.apache.pinot.core.transport.AsyncQueryResponse;
3941
import org.apache.pinot.core.transport.QueryRouter;
4042
import org.apache.pinot.core.transport.ServerInstance;
4143
import org.apache.pinot.core.transport.ServerResponse;
4244
import org.apache.pinot.core.transport.ServerRoutingInstance;
4345
import org.apache.pinot.spi.config.table.TableType;
46+
import org.apache.pinot.spi.env.PinotConfiguration;
4447
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
4548
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
4649

50+
import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_METRICS_FACTORY_CLASS_NAME;
51+
4752

4853
public class PinotScatterGatherQueryClient {
4954
private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();
@@ -72,8 +77,7 @@ public boolean isRetriable() {
7277
}
7378
}
7479

75-
public static class PinotException
76-
extends RuntimeException {
80+
public static class PinotException extends RuntimeException {
7781
private final ErrorCode _errorCode;
7882

7983
public PinotException(ErrorCode errorCode, String message, Throwable t) {
@@ -188,6 +192,8 @@ public String getSslProvider() {
188192

189193
public PinotScatterGatherQueryClient(Config pinotConfig) {
190194
_prestoHostId = getDefaultPrestoId();
195+
PinotMetricUtils.init(new PinotConfiguration(
196+
ImmutableMap.of(CONFIG_OF_METRICS_FACTORY_CLASS_NAME, NoopPinotMetricFactory.class.getName())));
191197
_brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
192198
_brokerMetrics.initializeGlobalMeters();
193199
TlsConfig tlsConfig = getTlsConfig(pinotConfig);
@@ -240,13 +246,8 @@ private String getDefaultPrestoId() {
240246
return defaultBrokerId;
241247
}
242248

243-
public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
244-
String pql,
245-
String serverHost,
246-
List<String> segments,
247-
long connectionTimeoutInMillis,
248-
boolean ignoreEmptyResponses,
249-
int pinotRetryCount) {
249+
public Map<ServerInstance, DataTable> queryPinotServerForDataTable(String pql, String serverHost,
250+
List<String> segments, long connectionTimeoutInMillis, boolean ignoreEmptyResponses, int pinotRetryCount) {
250251
BrokerRequest brokerRequest;
251252
try {
252253
brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(pql);
@@ -260,7 +261,7 @@ public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
260261
new ArrayList<>(segments));
261262

262263
// Unfortunately the retries will all hit the same server because the routing decision has already been made by
263-
// the pinot broker
264+
// the pinot broker
264265
Map<ServerInstance, DataTable> serverResponseMap = doWithRetries(pinotRetryCount, (requestId) -> {
265266
String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
266267
if (!_concurrentQueriesCountMap.containsKey(serverHost)) {
@@ -276,17 +277,17 @@ public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
276277
QueryRouter nextAvailableQueryRouter = getNextAvailableQueryRouter();
277278
if (TableNameBuilder.getTableTypeFromTableName(brokerRequest.getQuerySource().getTableName())
278279
== TableType.REALTIME) {
279-
asyncQueryResponse = nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest,
280-
routingTable, connectionTimeoutInMillis);
280+
asyncQueryResponse =
281+
nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest, routingTable,
282+
connectionTimeoutInMillis);
281283
} else {
282-
asyncQueryResponse = nextAvailableQueryRouter
283-
.submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null, connectionTimeoutInMillis);
284+
asyncQueryResponse =
285+
nextAvailableQueryRouter.submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null,
286+
connectionTimeoutInMillis);
284287
}
285-
Map<ServerInstance, DataTable> serverInstanceDataTableMap = gatherServerResponses(
286-
ignoreEmptyResponses,
287-
routingTable,
288-
asyncQueryResponse,
289-
brokerRequest.getQuerySource().getTableName());
288+
Map<ServerInstance, DataTable> serverInstanceDataTableMap =
289+
gatherServerResponses(ignoreEmptyResponses, routingTable, asyncQueryResponse,
290+
brokerRequest.getQuerySource().getTableName());
290291
_queryRouters.offer(nextAvailableQueryRouter);
291292
_concurrentQueriesCountMap.get(serverHost).decrementAndGet();
292293
return serverInstanceDataTableMap;
@@ -320,15 +321,15 @@ private Map<ServerInstance, DataTable> gatherServerResponses(boolean ignoreEmpty
320321
: entry.getValue().toString();
321322
routingTableForLogging.put(entry.getKey().toString(), valueToPrint);
322323
});
323-
throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE, String
324-
.format("%d of %d servers responded with routing table servers: %s, query stats: %s",
324+
throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE,
325+
String.format("%d of %d servers responded with routing table servers: %s, query stats: %s",
325326
queryResponses.size(), routingTable.size(), routingTableForLogging, asyncQueryResponse.getStats()));
326327
}
327328
}
328329
Map<ServerInstance, DataTable> serverResponseMap = new HashMap<>();
329-
queryResponses.entrySet().forEach(entry -> serverResponseMap.put(
330-
new ServerInstance(new InstanceConfig(
331-
String.format("Server_%s_%d", entry.getKey().getHostname(), entry.getKey().getPort()))),
330+
queryResponses.entrySet().forEach(entry -> serverResponseMap.put(new ServerInstance(
331+
new InstanceConfig(String.format("Server_%s_%d", entry.getKey().getHostname(),
332+
entry.getKey().getPort()))),
332333
entry.getValue().getDataTable()));
333334
return serverResponseMap;
334335
} catch (InterruptedException e) {

pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,6 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
/*
20-
* Licensed under the Apache License, Version 2.0 (the "License");
21-
* you may not use this file except in compliance with the License.
22-
* You may obtain a copy of the License at
23-
*
24-
* http://www.apache.org/licenses/LICENSE-2.0
25-
*
26-
* Unless required by applicable law or agreed to in writing, software
27-
* distributed under the License is distributed on an "AS IS" BASIS,
28-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29-
* See the License for the specific language governing permissions and
30-
* limitations under the License.
31-
*/
3219
package org.apache.pinot.connector.presto.grpc;
3320

3421
import java.util.HashMap;

0 commit comments

Comments
 (0)