Skip to content

Commit 43e1298

Browse files
allow automatic tracing when a request is sampled by a registered tracer (#8629)
1 parent 7edad89 commit 43e1298

File tree

9 files changed

+78
-68
lines changed

9 files changed

+78
-68
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java

Lines changed: 47 additions & 48 deletions
Large diffs are not rendered by default.

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import javax.annotation.concurrent.ThreadSafe;
2424
import org.apache.pinot.broker.api.RequesterIdentity;
2525
import org.apache.pinot.common.response.BrokerResponse;
26-
import org.apache.pinot.spi.trace.RequestStatistics;
26+
import org.apache.pinot.spi.trace.RequestContext;
2727

2828

2929
@ThreadSafe
@@ -34,6 +34,6 @@ public interface BrokerRequestHandler {
3434
void shutDown();
3535

3636
BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity,
37-
RequestStatistics requestStatistics)
37+
RequestContext requestContext)
3838
throws Exception;
3939
}

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import org.apache.pinot.core.transport.ServerRoutingInstance;
4242
import org.apache.pinot.spi.config.table.TableType;
4343
import org.apache.pinot.spi.env.PinotConfiguration;
44-
import org.apache.pinot.spi.trace.RequestStatistics;
44+
import org.apache.pinot.spi.trace.RequestContext;
4545

4646

4747
/**
@@ -81,18 +81,20 @@ public synchronized void shutDown() {
8181
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
8282
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance,
8383
List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance,
84-
List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics)
84+
List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext)
8585
throws Exception {
8686
// TODO: Support failure detection
8787
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
8888
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>();
8989
if (offlineBrokerRequest != null) {
9090
assert offlineRoutingTable != null;
91-
sendRequest(TableType.OFFLINE, offlineBrokerRequest, offlineRoutingTable, responseMap);
91+
sendRequest(TableType.OFFLINE, offlineBrokerRequest, offlineRoutingTable, responseMap,
92+
requestContext.isSampledRequest());
9293
}
9394
if (realtimeBrokerRequest != null) {
9495
assert realtimeRoutingTable != null;
95-
sendRequest(TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap);
96+
sendRequest(TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap,
97+
requestContext.isSampledRequest());
9698
}
9799
return _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs,
98100
_brokerMetrics);
@@ -103,15 +105,16 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
103105
*/
104106
private void sendRequest(TableType tableType, BrokerRequest brokerRequest,
105107
Map<ServerInstance, List<String>> routingTable,
106-
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap) {
108+
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, boolean trace) {
107109
for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) {
108110
ServerInstance serverInstance = routingEntry.getKey();
109111
List<String> segments = routingEntry.getValue();
110112
String serverHost = serverInstance.getHostname();
111113
int port = serverInstance.getGrpcPort();
112114
// TODO: enable throttling on per host bases.
113115
Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverHost, port,
114-
new GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true));
116+
new GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true)
117+
.setEnableTrace(trace));
115118
responseMap.put(serverInstance.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC),
116119
streamingResponse);
117120
}

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949
import org.apache.pinot.core.transport.ServerResponse;
5050
import org.apache.pinot.core.transport.ServerRoutingInstance;
5151
import org.apache.pinot.spi.env.PinotConfiguration;
52-
import org.apache.pinot.spi.trace.RequestStatistics;
52+
import org.apache.pinot.spi.trace.RequestContext;
53+
import org.apache.pinot.spi.utils.CommonConstants;
5354
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
5455
import org.slf4j.Logger;
5556
import org.slf4j.LoggerFactory;
@@ -95,9 +96,12 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
9596
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
9697
@Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
9798
@Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
98-
RequestStatistics requestStatistics)
99+
RequestContext requestContext)
99100
throws Exception {
100101
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
102+
if (requestContext.isSampledRequest()) {
103+
serverBrokerRequest.getPinotQuery().putToQueryOptions(CommonConstants.Broker.Request.TRACE, "true");
104+
}
101105

102106
String rawTableName = TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName());
103107
long scatterGatherStartTimeNs = System.nanoTime();
@@ -134,7 +138,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
134138
_brokerReduceService.reduceOnDataTable(originalBrokerRequest, serverBrokerRequest, dataTableMap,
135139
reduceTimeOutMs, _brokerMetrics);
136140
final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
137-
requestStatistics.setReduceTimeNanos(reduceTimeNanos);
141+
requestContext.setReduceTimeNanos(reduceTimeNanos);
138142
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, reduceTimeNanos);
139143

140144
brokerResponse.setNumServersQueried(numServersQueried);

pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.pinot.common.response.broker.BrokerResponseNative;
3131
import org.apache.pinot.common.utils.DataSchema;
3232
import org.apache.pinot.spi.env.PinotConfiguration;
33-
import org.apache.pinot.spi.trace.RequestStatistics;
33+
import org.apache.pinot.spi.trace.RequestContext;
3434
import org.apache.pinot.spi.trace.Tracing;
3535
import org.apache.pinot.spi.utils.BytesUtils;
3636
import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -129,7 +129,7 @@ public void testBrokerRequestHandler()
129129
RANDOM.nextBytes(randBytes);
130130
String ranStr = BytesUtils.toHexString(randBytes);
131131
JsonNode request = new ObjectMapper().readTree(String.format("{\"sql\":\"SELECT %d, '%s'\"}", randNum, ranStr));
132-
RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
132+
RequestContext requestStats = Tracing.getTracer().createRequestScope();
133133
BrokerResponseNative brokerResponse = requestHandler.handleRequest(request, null, requestStats);
134134
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0), String.format("%d", randNum));
135135
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnDataType(0),
@@ -154,7 +154,7 @@ public void testBrokerRequestHandlerWithAsFunction()
154154
long currentTsMin = System.currentTimeMillis();
155155
JsonNode request = new ObjectMapper().readTree(
156156
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
157-
RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
157+
RequestContext requestStats = Tracing.getTracer().createRequestScope();
158158
BrokerResponseNative brokerResponse = requestHandler.handleRequest(request, null, requestStats);
159159
long currentTsMax = System.currentTimeMillis();
160160
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0), "currentTs");
@@ -225,7 +225,7 @@ public void testExplainPlanLiteralOnly()
225225
ObjectMapper objectMapper = new ObjectMapper();
226226
// Test 1: select constant
227227
JsonNode request = objectMapper.readTree("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}");
228-
RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
228+
RequestContext requestStats = Tracing.getTracer().createRequestScope();
229229
BrokerResponseNative brokerResponse = requestHandler.handleRequest(request, null, requestStats);
230230

231231
checkExplainResultSchema(brokerResponse.getResultTable().getDataSchema(),

pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java renamed to pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* This object can be used to publish the query processing statistics to a stream for
2727
* post-processing at a finer level than metrics.
2828
*/
29-
public class DefaultRequestStatistics implements RequestScope {
29+
public class DefaultRequestContext implements RequestScope {
3030

3131
private static final String DEFAULT_TABLE_NAME = "NotYetParsed";
3232

@@ -64,7 +64,7 @@ public class DefaultRequestStatistics implements RequestScope {
6464
private FanoutType _fanoutType;
6565
private int _numUnavailableSegments;
6666

67-
public DefaultRequestStatistics() {
67+
public DefaultRequestContext() {
6868
}
6969

7070
@Override

pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java renamed to pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.apache.pinot.spi.trace;
2020

21-
public interface RequestStatistics {
21+
public interface RequestContext {
2222
long getOfflineSystemActivitiesCpuTimeNs();
2323

2424
void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs);
@@ -51,6 +51,10 @@ public interface RequestStatistics {
5151

5252
long getRequestId();
5353

54+
default boolean isSampledRequest() {
55+
return false;
56+
}
57+
5458
long getRequestArrivalTimeMillis();
5559

5660
long getReduceTimeMillis();

pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@
2222
* A scope wrapping an end to end synchronous pinot request.
2323
* Can be extended by a custom tracer to meter request latency.
2424
*/
25-
public interface RequestScope extends Scope, RequestStatistics {
25+
public interface RequestScope extends Scope, RequestContext {
2626
}

pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public interface Tracer {
4848
* @return the request record
4949
*/
5050
default RequestScope createRequestScope() {
51-
return new DefaultRequestStatistics();
51+
return new DefaultRequestContext();
5252
}
5353

5454
/**

0 commit comments

Comments
 (0)