Skip to content

Commit aee3ca5

Browse files
Implement telemetry for CI Visibility intakes (#6668)
1 parent 2072afb commit aee3ca5

12 files changed

Lines changed: 182 additions & 11 deletions

File tree

dd-trace-api/src/main/java/datadog/trace/api/interceptor/AbstractTraceInterceptor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public enum Priority {
2323
GIT_METADATA(3),
2424

2525
// trace data collection
26+
CI_VISIBILITY_TELEMETRY(Integer.MAX_VALUE - 1),
2627
SERVICE_NAME_COLLECTING(Integer.MAX_VALUE);
2728

2829
private final int idx;

dd-trace-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ apply from: "$rootDir/gradle/version.gradle"
1010
minimumBranchCoverage = 0.5
1111
minimumInstructionCoverage = 0.6
1212
excludedClassesCoverage += [
13+
'datadog.trace.civisibility.interceptor.CiVisibilityTelemetryInterceptor',
1314
'datadog.trace.civisibility.writer.ddintake.CiTestCovMapperV2.PayloadV2',
1415
'datadog.trace.common.writer.ddintake.DDIntakeMapperDiscovery',
1516
'datadog.trace.common.writer.ListWriter',
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package datadog.trace.civisibility.interceptor;
2+
3+
import datadog.trace.api.civisibility.InstrumentationBridge;
4+
import datadog.trace.api.civisibility.telemetry.CiVisibilityCountMetric;
5+
import datadog.trace.api.interceptor.AbstractTraceInterceptor;
6+
import datadog.trace.api.interceptor.MutableSpan;
7+
import java.util.Collection;
8+
9+
public class CiVisibilityTelemetryInterceptor extends AbstractTraceInterceptor {
10+
11+
public CiVisibilityTelemetryInterceptor() {
12+
super(Priority.CI_VISIBILITY_TELEMETRY);
13+
}
14+
15+
@Override
16+
public Collection<? extends MutableSpan> onTraceComplete(
17+
Collection<? extends MutableSpan> trace) {
18+
InstrumentationBridge.getMetricCollector()
19+
.add(CiVisibilityCountMetric.EVENTS_ENQUEUED_FOR_SERIALIZATION, trace.size());
20+
return trace;
21+
}
22+
}

dd-trace-core/src/main/java/datadog/trace/civisibility/writer/ddintake/CiTestCovMapperV2.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@
77
import datadog.communication.serialization.GrowableBuffer;
88
import datadog.communication.serialization.Writable;
99
import datadog.communication.serialization.msgpack.MsgPackWriter;
10+
import datadog.trace.api.civisibility.InstrumentationBridge;
1011
import datadog.trace.api.civisibility.coverage.TestReport;
1112
import datadog.trace.api.civisibility.coverage.TestReportFileEntry;
1213
import datadog.trace.api.civisibility.coverage.TestReportHolder;
14+
import datadog.trace.api.civisibility.telemetry.CiVisibilityDistributionMetric;
15+
import datadog.trace.api.civisibility.telemetry.CiVisibilityMetricCollector;
16+
import datadog.trace.api.civisibility.telemetry.tag.Endpoint;
1317
import datadog.trace.api.gateway.RequestContextSlot;
1418
import datadog.trace.api.intake.TrackType;
1519
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
@@ -46,6 +50,7 @@ public class CiTestCovMapperV2 implements RemoteMapper {
4650
private final MsgPackWriter headerWriter;
4751
private final boolean compressionEnabled;
4852
private int eventCount = 0;
53+
private int serializationTimeMillis = 0;
4954

5055
public CiTestCovMapperV2(boolean compressionEnabled) {
5156
this(5 << 20, compressionEnabled);
@@ -60,6 +65,8 @@ private CiTestCovMapperV2(int size, boolean compressionEnabled) {
6065

6166
@Override
6267
public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
68+
long serializationStartTimestamp = System.currentTimeMillis();
69+
6370
List<TestReport> testReports =
6471
trace.stream()
6572
// only consider test spans, since children spans
@@ -119,6 +126,7 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
119126
}
120127

121128
eventCount += testReports.size();
129+
serializationTimeMillis += (int) (System.currentTimeMillis() - serializationStartTimestamp);
122130
}
123131

124132
private static boolean isTestSpan(CoreSpan<?> span) {
@@ -150,6 +158,17 @@ private void writeHeader() {
150158
@Override
151159
public Payload newPayload() {
152160
writeHeader();
161+
162+
CiVisibilityMetricCollector metricCollector = InstrumentationBridge.getMetricCollector();
163+
metricCollector.add(
164+
CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_EVENTS_COUNT,
165+
eventCount,
166+
Endpoint.CODE_COVERAGE);
167+
metricCollector.add(
168+
CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_EVENTS_SERIALIZATION_MS,
169+
serializationTimeMillis,
170+
Endpoint.CODE_COVERAGE);
171+
153172
return new PayloadV2(compressionEnabled).withHeader(headerBuffer.slice());
154173
}
155174

@@ -161,6 +180,7 @@ public int messageBufferSize() {
161180
@Override
162181
public void reset() {
163182
eventCount = 0;
183+
serializationTimeMillis = 0;
164184
}
165185

166186
@Override

dd-trace-core/src/main/java/datadog/trace/civisibility/writer/ddintake/CiTestCycleMapperV1.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
import datadog.communication.serialization.Writable;
88
import datadog.communication.serialization.msgpack.MsgPackWriter;
99
import datadog.trace.api.WellKnownTags;
10+
import datadog.trace.api.civisibility.InstrumentationBridge;
11+
import datadog.trace.api.civisibility.telemetry.CiVisibilityDistributionMetric;
12+
import datadog.trace.api.civisibility.telemetry.CiVisibilityMetricCollector;
13+
import datadog.trace.api.civisibility.telemetry.tag.Endpoint;
1014
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
1115
import datadog.trace.bootstrap.instrumentation.api.Tags;
1216
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
@@ -50,6 +54,7 @@ public class CiTestCycleMapperV1 implements RemoteMapper {
5054
private final MsgPackWriter headerWriter;
5155
private final boolean compressionEnabled;
5256
private int eventCount = 0;
57+
private int serializationTimeMillis = 0;
5358

5459
public CiTestCycleMapperV1(WellKnownTags wellKnownTags, boolean compressionEnabled) {
5560
this(wellKnownTags, DEFAULT_TOP_LEVEL_TAGS, 5 << 20, compressionEnabled);
@@ -70,6 +75,8 @@ private CiTestCycleMapperV1(
7075

7176
@Override
7277
public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
78+
long serializationStartTimestamp = System.currentTimeMillis();
79+
7380
for (final CoreSpan<?> span : trace) {
7481
int topLevelTagsCount = 0;
7582
for (String topLevelTag : topLevelTags) {
@@ -191,6 +198,7 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
191198
span.processTagsAndBaggage(metaWriter.withWritable(writable));
192199
}
193200
eventCount += trace.size();
201+
serializationTimeMillis += (int) (System.currentTimeMillis() - serializationStartTimestamp);
194202
}
195203

196204
private static boolean equals(CharSequence a, CharSequence b) {
@@ -228,6 +236,17 @@ private void writeHeader() {
228236
@Override
229237
public Payload newPayload() {
230238
writeHeader();
239+
240+
CiVisibilityMetricCollector metricCollector = InstrumentationBridge.getMetricCollector();
241+
metricCollector.add(
242+
CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_EVENTS_COUNT,
243+
eventCount,
244+
Endpoint.TEST_CYCLE);
245+
metricCollector.add(
246+
CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_EVENTS_SERIALIZATION_MS,
247+
serializationTimeMillis,
248+
Endpoint.CODE_COVERAGE);
249+
231250
return new PayloadV1(compressionEnabled).withHeader(headerBuffer.slice());
232251
}
233252

@@ -239,6 +258,7 @@ public int messageBufferSize() {
239258
@Override
240259
public void reset() {
241260
eventCount = 0;
261+
serializationTimeMillis = 0;
242262
}
243263

244264
@Override

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
package datadog.trace.common.writer.ddintake;
22

3-
import static datadog.trace.api.intake.TrackType.NOOP;
43
import static datadog.trace.common.writer.DDIntakeWriter.DEFAULT_INTAKE_TIMEOUT;
54
import static datadog.trace.common.writer.DDIntakeWriter.DEFAULT_INTAKE_VERSION;
65

76
import datadog.communication.http.HttpRetryPolicy;
87
import datadog.communication.http.OkHttpUtils;
8+
import datadog.trace.api.civisibility.InstrumentationBridge;
9+
import datadog.trace.api.civisibility.telemetry.CiVisibilityCountMetric;
910
import datadog.trace.api.intake.TrackType;
1011
import datadog.trace.common.writer.Payload;
1112
import datadog.trace.common.writer.RemoteApi;
13+
import edu.umd.cs.findbugs.annotations.NonNull;
1214
import java.io.IOException;
1315
import java.net.ConnectException;
1416
import java.util.Locale;
@@ -34,15 +36,15 @@ public static DDEvpProxyApiBuilder builder() {
3436

3537
public static class DDEvpProxyApiBuilder {
3638
private String apiVersion = DEFAULT_INTAKE_VERSION;
37-
private TrackType trackType = TrackType.NOOP;
39+
@NonNull private TrackType trackType = TrackType.NOOP;
3840
private long timeoutMillis = TimeUnit.SECONDS.toMillis(DEFAULT_INTAKE_TIMEOUT);
3941

4042
HttpUrl agentUrl = null;
4143
OkHttpClient httpClient = null;
4244
String evpProxyEndpoint;
4345
boolean compressionEnabled;
4446

45-
public DDEvpProxyApiBuilder trackType(final TrackType trackType) {
47+
public DDEvpProxyApiBuilder trackType(@NonNull final TrackType trackType) {
4648
this.trackType = trackType;
4749
return this;
4850
}
@@ -78,8 +80,7 @@ public DDEvpProxyApiBuilder compressionEnabled(boolean compressionEnabled) {
7880
}
7981

8082
public DDEvpProxyApi build() {
81-
final String trackName =
82-
(trackType != null ? trackType.name() : NOOP.name()).toLowerCase(Locale.ROOT);
83+
final String trackName = trackType.name().toLowerCase(Locale.ROOT);
8384
final String subdomain = String.format("%s-intake", trackName);
8485

8586
final HttpUrl evpProxyUrl = agentUrl.resolve(evpProxyEndpoint);
@@ -95,22 +96,27 @@ public DDEvpProxyApi build() {
9596

9697
log.debug("proxiedApiUrl: {}", proxiedApiUrl);
9798
return new DDEvpProxyApi(
98-
client, proxiedApiUrl, subdomain, retryPolicyFactory, compressionEnabled);
99+
trackType, client, proxiedApiUrl, subdomain, retryPolicyFactory, compressionEnabled);
99100
}
100101
}
101102

103+
private final TelemetryListener telemetryListener;
104+
private final TrackType trackType;
102105
private final OkHttpClient httpClient;
103106
private final HttpUrl proxiedApiUrl;
104107
private final String subdomain;
105108
private final HttpRetryPolicy.Factory retryPolicyFactory;
106109

107110
private DDEvpProxyApi(
111+
TrackType trackType,
108112
OkHttpClient httpClient,
109113
HttpUrl proxiedApiUrl,
110114
String subdomain,
111115
HttpRetryPolicy.Factory retryPolicyFactory,
112116
boolean compressionEnabled) {
113117
super(compressionEnabled);
118+
this.telemetryListener = new TelemetryListener(trackType.endpoint);
119+
this.trackType = trackType;
114120
this.httpClient = httpClient;
115121
this.proxiedApiUrl = proxiedApiUrl;
116122
this.subdomain = subdomain;
@@ -122,7 +128,10 @@ public Response sendSerializedTraces(Payload payload) {
122128
final int sizeInBytes = payload.sizeInBytes();
123129

124130
Request.Builder builder =
125-
new Request.Builder().url(proxiedApiUrl).addHeader(DD_EVP_SUBDOMAIN_HEADER, subdomain);
131+
new Request.Builder()
132+
.url(proxiedApiUrl)
133+
.addHeader(DD_EVP_SUBDOMAIN_HEADER, subdomain)
134+
.tag(OkHttpUtils.CustomListener.class, telemetryListener);
126135

127136
if (isCompressionEnabled()) {
128137
builder.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE);
@@ -139,15 +148,21 @@ public Response sendSerializedTraces(Payload payload) {
139148
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
140149
return Response.success(response.code());
141150
} else {
151+
InstrumentationBridge.getMetricCollector()
152+
.add(CiVisibilityCountMetric.ENDPOINT_PAYLOAD_DROPPED, 1, trackType.endpoint);
142153
countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null);
143154
return Response.failed(response.code());
144155
}
145156

146157
} catch (ConnectException e) {
158+
InstrumentationBridge.getMetricCollector()
159+
.add(CiVisibilityCountMetric.ENDPOINT_PAYLOAD_DROPPED, 1, trackType.endpoint);
147160
countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, null);
148161
return Response.failed(e);
149162

150163
} catch (IOException e) {
164+
InstrumentationBridge.getMetricCollector()
165+
.add(CiVisibilityCountMetric.ENDPOINT_PAYLOAD_DROPPED, 1, trackType.endpoint);
151166
countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, e);
152167
return Response.failed(e);
153168
}

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import datadog.communication.http.HttpRetryPolicy;
88
import datadog.communication.http.OkHttpUtils;
99
import datadog.trace.api.Config;
10+
import datadog.trace.api.civisibility.InstrumentationBridge;
11+
import datadog.trace.api.civisibility.telemetry.CiVisibilityCountMetric;
1012
import datadog.trace.api.intake.TrackType;
1113
import datadog.trace.common.writer.Payload;
1214
import datadog.trace.common.writer.RemoteApi;
@@ -91,21 +93,26 @@ public DDIntakeApi build() {
9193
final OkHttpClient client =
9294
(httpClient != null) ? httpClient : OkHttpUtils.buildHttpClient(intakeUrl, timeoutMillis);
9395

94-
return new DDIntakeApi(client, intakeUrl, apiKey, retryPolicyFactory);
96+
return new DDIntakeApi(trackType, client, intakeUrl, apiKey, retryPolicyFactory);
9597
}
9698
}
9799

100+
private final TelemetryListener telemetryListener;
101+
private final TrackType trackType;
98102
private final OkHttpClient httpClient;
99103
private final HttpUrl intakeUrl;
100104
private final String apiKey;
101105
private final HttpRetryPolicy.Factory retryPolicyFactory;
102106

103107
private DDIntakeApi(
108+
TrackType trackType,
104109
OkHttpClient httpClient,
105110
HttpUrl intakeUrl,
106111
String apiKey,
107112
HttpRetryPolicy.Factory retryPolicyFactory) {
108113
super(true);
114+
this.telemetryListener = new TelemetryListener(trackType.endpoint);
115+
this.trackType = trackType;
109116
this.httpClient = httpClient;
110117
this.intakeUrl = intakeUrl;
111118
this.apiKey = apiKey;
@@ -122,6 +129,7 @@ public Response sendSerializedTraces(Payload payload) {
122129
.addHeader(DD_API_KEY_HEADER, apiKey)
123130
.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE)
124131
.post(payload.toRequest())
132+
.tag(OkHttpUtils.CustomListener.class, telemetryListener)
125133
.build();
126134
totalTraces += payload.traceCount();
127135
receivedTraces += payload.traceCount();
@@ -133,15 +141,21 @@ public Response sendSerializedTraces(Payload payload) {
133141
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
134142
return Response.success(response.code());
135143
} else {
144+
InstrumentationBridge.getMetricCollector()
145+
.add(CiVisibilityCountMetric.ENDPOINT_PAYLOAD_DROPPED, 1, trackType.endpoint);
136146
countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null);
137147
return Response.failed(response.code());
138148
}
139149

140150
} catch (ConnectException e) {
151+
InstrumentationBridge.getMetricCollector()
152+
.add(CiVisibilityCountMetric.ENDPOINT_PAYLOAD_DROPPED, 1, trackType.endpoint);
141153
countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, null);
142154
return Response.failed(e);
143155

144156
} catch (IOException e) {
157+
InstrumentationBridge.getMetricCollector()
158+
.add(CiVisibilityCountMetric.ENDPOINT_PAYLOAD_DROPPED, 1, trackType.endpoint);
145159
countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, e);
146160
return Response.failed(e);
147161
}

0 commit comments

Comments
 (0)