Skip to content

Commit 36b62dd

Browse files
committed
review and simplification
1 parent 2b1c379 commit 36b62dd

2 files changed

Lines changed: 36 additions & 38 deletions

File tree

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -199,20 +199,20 @@ public ConflatingMetricsAggregator(
199199
this.reportingIntervalTimeUnit = timeUnit;
200200
}
201201

202-
private DDAgentFeaturesDiscovery getFeatures() {
202+
private DDAgentFeaturesDiscovery featuresDiscovery() {
203203
DDAgentFeaturesDiscovery ret = features;
204-
if (ret == null) {
204+
if (ret != null) {
205205
return ret;
206206
}
207+
// no need to synchronise here since it's already done in sharedCommunicationObject.
208+
// At worst, we'll assign multiple time the variable but it will be the same object
207209
ret = sharedCommunicationObjects.featuresDiscovery(Config.get());
208210
features = ret;
209211
return ret;
210212
}
211213

212214
@Override
213215
public void start() {
214-
AgentTaskScheduler.get()
215-
.execute(() -> features = sharedCommunicationObjects.featuresDiscovery(Config.get()));
216216
sink.register(this);
217217
thread.start();
218218
cancellation =
@@ -226,10 +226,6 @@ public void start() {
226226
log.debug("started metrics aggregator");
227227
}
228228

229-
private boolean isMetricsEnabled() {
230-
return getFeatures().supportsMetrics();
231-
}
232-
233229
@Override
234230
public boolean report() {
235231
boolean published;
@@ -246,8 +242,7 @@ public boolean report() {
246242

247243
@Override
248244
public Future<Boolean> forceReport() {
249-
// Ensure the feature is enabled
250-
if (features != null && !isMetricsEnabled()) {
245+
if (!featuresDiscovery().supportsMetrics()) {
251246
return CompletableFuture.completedFuture(false);
252247
}
253248
// Wait for the thread to start
@@ -283,8 +278,8 @@ public Future<Boolean> forceReport() {
283278
public boolean publish(List<? extends CoreSpan<?>> trace) {
284279
boolean forceKeep = false;
285280
int counted = 0;
286-
final DDAgentFeaturesDiscovery features = getFeatures();
287-
if (features != null && features.supportsMetrics()) {
281+
final DDAgentFeaturesDiscovery features = featuresDiscovery();
282+
if (features.supportsMetrics()) {
288283
for (CoreSpan<?> span : trace) {
289284
boolean isTopLevel = span.isTopLevel();
290285
if (shouldComputeMetric(span)) {
@@ -366,34 +361,31 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, DDAgentFeaturesDis
366361

367362
private List<UTF8BytesString> getPeerTags(
368363
CoreSpan<?> span, String spanKind, DDAgentFeaturesDiscovery features) {
369-
if (features != null) {
370-
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
371-
List<UTF8BytesString> peerTags = new ArrayList<>();
372-
for (String peerTag : features.peerTags()) {
373-
Object value = span.getTag(peerTag);
374-
if (value != null) {
375-
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
376-
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
377-
peerTags.add(
378-
cacheAndCreator
379-
.getLeft()
380-
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
381-
}
382-
}
383-
return peerTags;
384-
} else if (SPAN_KIND_INTERNAL.equals(spanKind)) {
385-
// in this case only the base service should be aggregated if present
386-
final Object baseService = span.getTag(BASE_SERVICE);
387-
if (baseService != null) {
364+
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
365+
List<UTF8BytesString> peerTags = new ArrayList<>();
366+
for (String peerTag : features.peerTags()) {
367+
Object value = span.getTag(peerTag);
368+
if (value != null) {
388369
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
389-
cacheAndCreator =
390-
PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER);
391-
return Collections.singletonList(
370+
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
371+
peerTags.add(
392372
cacheAndCreator
393373
.getLeft()
394-
.computeIfAbsent(baseService.toString(), cacheAndCreator.getRight()));
374+
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
395375
}
396376
}
377+
return peerTags;
378+
} else if (SPAN_KIND_INTERNAL.equals(spanKind)) {
379+
// in this case only the base service should be aggregated if present
380+
final Object baseService = span.getTag(BASE_SERVICE);
381+
if (baseService != null) {
382+
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
383+
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER);
384+
return Collections.singletonList(
385+
cacheAndCreator
386+
.getLeft()
387+
.computeIfAbsent(baseService.toString(), cacheAndCreator.getRight()));
388+
}
397389
}
398390
return Collections.emptyList();
399391
}
@@ -448,7 +440,7 @@ public void onEvent(EventType eventType, String message) {
448440
}
449441

450442
private void disable() {
451-
final DDAgentFeaturesDiscovery features = getFeatures();
443+
final DDAgentFeaturesDiscovery features = featuresDiscovery();
452444
features.discover();
453445
if (!features.supportsMetrics()) {
454446
log.debug("Disabling metric reporting because an agent downgrade was detected");

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import datadog.trace.core.traceinterceptor.LatencyTraceInterceptor;
100100
import datadog.trace.lambda.LambdaHandler;
101101
import datadog.trace.relocate.api.RatelimitedLogger;
102+
import datadog.trace.util.AgentTaskScheduler;
102103
import java.io.IOException;
103104
import java.lang.ref.WeakReference;
104105
import java.math.BigInteger;
@@ -783,8 +784,13 @@ private CoreTracer(
783784

784785
metricsAggregator =
785786
createMetricsAggregator(config, sharedCommunicationObjects, this.healthMetrics);
786-
// the jitter is brought implicitly by the callback that can vary
787-
sharedCommunicationObjects.whenReady(metricsAggregator::start);
787+
// Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds
788+
// (using milliseconds granularity.) This avoids a fleet of traced applications starting at the
789+
// same time from sending metrics in sync.
790+
sharedCommunicationObjects.whenReady(
791+
() ->
792+
AgentTaskScheduler.get()
793+
.scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS));
788794

789795
if (dataStreamsMonitoring == null) {
790796
this.dataStreamsMonitoring =

0 commit comments

Comments
 (0)