Skip to content

Commit 72e1844

Browse files
Broker tracing (#8628)
* remove BrokerResponse dependency * move RequestStatistics into pinot-spi and use tracer as RequestStatistics factory
1 parent 4e14101 commit 72e1844

File tree

10 files changed

+406
-78
lines changed

10 files changed

+406
-78
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,15 @@
4242
import javax.ws.rs.core.MediaType;
4343
import javax.ws.rs.core.Response;
4444
import org.apache.pinot.broker.api.HttpRequesterIdentity;
45-
import org.apache.pinot.broker.api.RequestStatistics;
4645
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
4746
import org.apache.pinot.common.exception.QueryException;
4847
import org.apache.pinot.common.metrics.BrokerMeter;
4948
import org.apache.pinot.common.metrics.BrokerMetrics;
5049
import org.apache.pinot.common.response.BrokerResponse;
5150
import org.apache.pinot.common.response.broker.BrokerResponseNative;
5251
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
52+
import org.apache.pinot.spi.trace.RequestScope;
53+
import org.apache.pinot.spi.trace.Tracing;
5354
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
5455
import org.apache.pinot.spi.utils.JsonUtils;
5556
import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -105,9 +106,11 @@ public void processQueryGet(
105106
if (debugOptions != null) {
106107
requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
107108
}
108-
BrokerResponse brokerResponse =
109-
_requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics());
110-
asyncResponse.resume(brokerResponse.toJsonString());
109+
try (RequestScope requestStatistics = Tracing.getTracer().createRequestScope()) {
110+
BrokerResponse brokerResponse =
111+
_requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), requestStatistics);
112+
asyncResponse.resume(brokerResponse.toJsonString());
113+
}
111114
} catch (Exception e) {
112115
LOGGER.error("Caught exception while processing GET request", e);
113116
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
@@ -135,9 +138,11 @@ public void processQueryPost(String query, @Suspended AsyncResponse asyncRespons
135138
@Context org.glassfish.grizzly.http.server.Request requestContext) {
136139
try {
137140
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
138-
BrokerResponse brokerResponse =
139-
_requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics());
140-
asyncResponse.resume(brokerResponse);
141+
try (RequestScope requestStatistics = Tracing.getTracer().createRequestScope()) {
142+
BrokerResponse brokerResponse =
143+
_requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), requestStatistics);
144+
asyncResponse.resume(brokerResponse);
145+
}
141146
} catch (Exception e) {
142147
LOGGER.error("Caught exception while processing POST request", e);
143148
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
@@ -222,7 +227,9 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
222227
}
223228
switch (sqlType) {
224229
case DQL:
225-
return _requestHandler.handleRequest(sqlRequestJson, httpRequesterIdentity, new RequestStatistics());
230+
try (RequestScope requestStatistics = Tracing.getTracer().createRequestScope()) {
231+
return _requestHandler.handleRequest(sqlRequestJson, httpRequesterIdentity, requestStatistics);
232+
}
226233
case DML:
227234
Map<String, String> headers = new HashMap<>();
228235
httpRequesterIdentity.getHttpHeaders().entries()

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import javax.annotation.concurrent.ThreadSafe;
4242
import org.apache.commons.collections.CollectionUtils;
4343
import org.apache.commons.lang3.StringUtils;
44-
import org.apache.pinot.broker.api.RequestStatistics;
4544
import org.apache.pinot.broker.api.RequesterIdentity;
4645
import org.apache.pinot.broker.broker.AccessControlFactory;
4746
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
@@ -90,6 +89,7 @@
9089
import org.apache.pinot.spi.data.Schema;
9190
import org.apache.pinot.spi.env.PinotConfiguration;
9291
import org.apache.pinot.spi.exception.BadQueryRequestException;
92+
import org.apache.pinot.spi.trace.RequestStatistics;
9393
import org.apache.pinot.spi.utils.BytesUtils;
9494
import org.apache.pinot.spi.utils.CommonConstants;
9595
import org.apache.pinot.spi.utils.CommonConstants.Broker;
@@ -207,7 +207,7 @@ private BrokerResponseNative handleSQLRequest(long requestId, String query, Json
207207
@Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
208208
throws Exception {
209209
LOGGER.debug("SQL query for request {}: {}", requestId, query);
210-
requestStatistics.setPql(query);
210+
requestStatistics.setQuery(query);
211211

212212
// Compile the request
213213
long compilationStartTimeNs = System.nanoTime();
@@ -562,7 +562,7 @@ private BrokerResponseNative handleSQLRequest(long requestId, String query, Json
562562
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
563563
brokerResponse.setTimeUsedMs(totalTimeMs);
564564
requestStatistics.setQueryProcessingTime(totalTimeMs);
565-
requestStatistics.setStatistics(brokerResponse);
565+
augmentStatistics(requestStatistics, brokerResponse);
566566

567567
logBrokerResponse(requestId, query, requestStatistics, brokerRequest, numUnavailableSegments, serverStats,
568568
brokerResponse, totalTimeMs);
@@ -706,7 +706,7 @@ private BrokerResponseNative handlePQLRequest(long requestId, String query, Json
706706
@Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
707707
throws Exception {
708708
LOGGER.debug("PQL query for request {}: {}", requestId, query);
709-
requestStatistics.setPql(query);
709+
requestStatistics.setQuery(query);
710710

711711
// Compile the request
712712
long compilationStartTimeNs = System.nanoTime();
@@ -947,7 +947,7 @@ private BrokerResponseNative handlePQLRequest(long requestId, String query, Json
947947
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
948948
brokerResponse.setTimeUsedMs(totalTimeMs);
949949
requestStatistics.setQueryProcessingTime(totalTimeMs);
950-
requestStatistics.setStatistics(brokerResponse);
950+
augmentStatistics(requestStatistics, brokerResponse);
951951

952952
LOGGER.debug("Broker Response: {}", brokerResponse);
953953

@@ -1570,7 +1570,7 @@ private BrokerResponseNative processLiteralOnlyQuery(PinotQuery pinotQuery, long
15701570
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - compilationStartTimeNs);
15711571
brokerResponse.setTimeUsedMs(totalTimeMs);
15721572
requestStatistics.setQueryProcessingTime(totalTimeMs);
1573-
requestStatistics.setStatistics(brokerResponse);
1573+
augmentStatistics(requestStatistics, brokerResponse);
15741574
return brokerResponse;
15751575
}
15761576

@@ -2276,6 +2276,30 @@ protected abstract BrokerResponseNative processBrokerRequest(long requestId, Bro
22762276
List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics)
22772277
throws Exception;
22782278

2279+
private static void augmentStatistics(RequestStatistics statistics, BrokerResponse response) {
2280+
statistics.setTotalDocs(response.getTotalDocs());
2281+
statistics.setNumDocsScanned(response.getNumDocsScanned());
2282+
statistics.setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter());
2283+
statistics.setNumEntriesScannedPostFilter(response.getNumEntriesScannedPostFilter());
2284+
statistics.setNumSegmentsQueried(response.getNumSegmentsQueried());
2285+
statistics.setNumSegmentsProcessed(response.getNumSegmentsProcessed());
2286+
statistics.setNumSegmentsMatched(response.getNumSegmentsMatched());
2287+
statistics.setNumServersQueried(response.getNumServersQueried());
2288+
statistics.setNumSegmentsProcessed(response.getNumSegmentsProcessed());
2289+
statistics.setNumServersResponded(response.getNumServersResponded());
2290+
statistics.setNumGroupsLimitReached(response.isNumGroupsLimitReached());
2291+
statistics.setNumExceptions(response.getExceptionsSize());
2292+
statistics.setOfflineThreadCpuTimeNs(response.getOfflineThreadCpuTimeNs());
2293+
statistics.setRealtimeThreadCpuTimeNs(response.getRealtimeThreadCpuTimeNs());
2294+
statistics.setOfflineSystemActivitiesCpuTimeNs(response.getOfflineSystemActivitiesCpuTimeNs());
2295+
statistics.setRealtimeSystemActivitiesCpuTimeNs(response.getRealtimeSystemActivitiesCpuTimeNs());
2296+
statistics.setOfflineResponseSerializationCpuTimeNs(response.getOfflineResponseSerializationCpuTimeNs());
2297+
statistics.setRealtimeResponseSerializationCpuTimeNs(response.getRealtimeResponseSerializationCpuTimeNs());
2298+
statistics.setOfflineTotalCpuTimeNs(response.getOfflineTotalCpuTimeNs());
2299+
statistics.setRealtimeTotalCpuTimeNs(response.getRealtimeTotalCpuTimeNs());
2300+
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
2301+
}
2302+
22792303
/**
22802304
* Helper class to pass the per server statistics.
22812305
*/

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import com.fasterxml.jackson.databind.JsonNode;
2222
import javax.annotation.Nullable;
2323
import javax.annotation.concurrent.ThreadSafe;
24-
import org.apache.pinot.broker.api.RequestStatistics;
2524
import org.apache.pinot.broker.api.RequesterIdentity;
2625
import org.apache.pinot.common.response.BrokerResponse;
26+
import org.apache.pinot.spi.trace.RequestStatistics;
2727

2828

2929
@ThreadSafe

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import javax.annotation.Nullable;
2727
import javax.annotation.concurrent.ThreadSafe;
28-
import org.apache.pinot.broker.api.RequestStatistics;
2928
import org.apache.pinot.broker.broker.AccessControlFactory;
3029
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
3130
import org.apache.pinot.broker.routing.BrokerRoutingManager;
@@ -42,6 +41,7 @@
4241
import org.apache.pinot.core.transport.ServerRoutingInstance;
4342
import org.apache.pinot.spi.config.table.TableType;
4443
import org.apache.pinot.spi.env.PinotConfiguration;
44+
import org.apache.pinot.spi.trace.RequestStatistics;
4545

4646

4747
/**

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.concurrent.TimeUnit;
2626
import javax.annotation.Nullable;
2727
import javax.annotation.concurrent.ThreadSafe;
28-
import org.apache.pinot.broker.api.RequestStatistics;
2928
import org.apache.pinot.broker.broker.AccessControlFactory;
3029
import org.apache.pinot.broker.failuredetector.FailureDetector;
3130
import org.apache.pinot.broker.failuredetector.FailureDetectorFactory;
@@ -50,6 +49,7 @@
5049
import org.apache.pinot.core.transport.ServerResponse;
5150
import org.apache.pinot.core.transport.ServerRoutingInstance;
5251
import org.apache.pinot.spi.env.PinotConfiguration;
52+
import org.apache.pinot.spi.trace.RequestStatistics;
5353
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
5454
import org.slf4j.Logger;
5555
import org.slf4j.LoggerFactory;

pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@
2323
import java.util.Collections;
2424
import java.util.Random;
2525
import java.util.concurrent.TimeUnit;
26-
import org.apache.pinot.broker.api.RequestStatistics;
2726
import org.apache.pinot.broker.broker.AccessControlFactory;
2827
import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
2928
import org.apache.pinot.common.metrics.BrokerMetrics;
3029
import org.apache.pinot.common.metrics.PinotMetricUtils;
3130
import org.apache.pinot.common.response.broker.BrokerResponseNative;
3231
import org.apache.pinot.common.utils.DataSchema;
3332
import org.apache.pinot.spi.env.PinotConfiguration;
33+
import org.apache.pinot.spi.trace.RequestStatistics;
34+
import org.apache.pinot.spi.trace.Tracing;
3435
import org.apache.pinot.spi.utils.BytesUtils;
3536
import org.apache.pinot.sql.parsers.CalciteSqlParser;
3637
import org.testng.Assert;
@@ -128,7 +129,7 @@ public void testBrokerRequestHandler()
128129
RANDOM.nextBytes(randBytes);
129130
String ranStr = BytesUtils.toHexString(randBytes);
130131
JsonNode request = new ObjectMapper().readTree(String.format("{\"sql\":\"SELECT %d, '%s'\"}", randNum, ranStr));
131-
RequestStatistics requestStats = new RequestStatistics();
132+
RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
132133
BrokerResponseNative brokerResponse = requestHandler.handleRequest(request, null, requestStats);
133134
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0), String.format("%d", randNum));
134135
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnDataType(0),
@@ -153,7 +154,7 @@ public void testBrokerRequestHandlerWithAsFunction()
153154
long currentTsMin = System.currentTimeMillis();
154155
JsonNode request = new ObjectMapper().readTree(
155156
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
156-
RequestStatistics requestStats = new RequestStatistics();
157+
RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
157158
BrokerResponseNative brokerResponse = requestHandler.handleRequest(request, null, requestStats);
158159
long currentTsMax = System.currentTimeMillis();
159160
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0), "currentTs");
@@ -173,7 +174,7 @@ public void testBrokerRequestHandlerWithAsFunction()
173174
request = new ObjectMapper().readTree(
174175
"{\"sql\":\"SELECT ago('PT1H') as oneHourAgoTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as "
175176
+ "firstDayOf2020\"}");
176-
requestStats = new RequestStatistics();
177+
requestStats = Tracing.getTracer().createRequestScope();
177178
brokerResponse = requestHandler.handleRequest(request, null, requestStats);
178179
long oneHourAgoTsMax = System.currentTimeMillis() - ONE_HOUR_IN_MS;
179180
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0), "oneHourAgoTs");
@@ -194,7 +195,7 @@ public void testBrokerRequestHandlerWithAsFunction()
194195
request = new ObjectMapper().readTree(
195196
"{\"sql\":\"SELECT encodeUrl('key1=value 1&key2=value@!$2&key3=value%3') AS encoded, "
196197
+ "decodeUrl('key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253') AS decoded\"}");
197-
requestStats = new RequestStatistics();
198+
requestStats = Tracing.getTracer().createRequestScope();
198199
brokerResponse = requestHandler.handleRequest(request, null, requestStats);
199200
System.out.println(brokerResponse.getResultTable());
200201
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0), "encoded");
@@ -224,7 +225,7 @@ public void testExplainPlanLiteralOnly()
224225
ObjectMapper objectMapper = new ObjectMapper();
225226
// Test 1: select constant
226227
JsonNode request = objectMapper.readTree("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}");
227-
RequestStatistics requestStats = new RequestStatistics();
228+
RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
228229
BrokerResponseNative brokerResponse = requestHandler.handleRequest(request, null, requestStats);
229230

230231
checkExplainResultSchema(brokerResponse.getResultTable().getDataSchema(),
@@ -242,7 +243,7 @@ public void testExplainPlanLiteralOnly()
242243
request = objectMapper.readTree(
243244
"{\"sql\":\"EXPLAIN PLAN FOR SELECT 6+8 as addition, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as "
244245
+ "firstDayOf2020\"}");
245-
requestStats = new RequestStatistics();
246+
requestStats = Tracing.getTracer().createRequestScope();
246247
brokerResponse = requestHandler.handleRequest(request, null, requestStats);
247248

248249
checkExplainResultSchema(brokerResponse.getResultTable().getDataSchema(),

0 commit comments

Comments
 (0)