Skip to content

Commit dd5da0a

Browse files
committed
Merge branch 'master' into mariovidal/untrusted_deserialization
2 parents 6a51a9d + 2e9ba7a commit dd5da0a

12 files changed

Lines changed: 81 additions & 154 deletions

File tree

.github/workflows/update-gradle-dependencies.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ jobs:
3333
run: |
3434
git config user.name "github-actions[bot]"
3535
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
36-
git add **/gradle.lockfile
36+
git add "**/gradle.lockfile"
3737
git commit -m "chore: Update Gradle dependencies"
3838
git push -u origin $BRANCH_NAME
3939
gh pr create --title "Update Gradle dependencies" \

dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/log/LogContextScopeListener.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,22 @@
11
package datadog.trace.agent.tooling.log;
22

33
import datadog.trace.api.CorrelationIdentifier;
4-
import datadog.trace.api.DDSpanId;
5-
import datadog.trace.api.DDTraceId;
6-
import datadog.trace.api.InstrumenterConfig;
7-
import datadog.trace.api.TraceConfig;
84
import datadog.trace.api.WithGlobalTracer;
9-
import datadog.trace.api.scopemanager.ExtendedScopeListener;
5+
import datadog.trace.api.scopemanager.ScopeListener;
6+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
107
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.TracerAPI;
118

129
/**
1310
* A scope listener that receives the MDC/ThreadContext put and receive methods and update the trace
1411
* and span reference anytime a new scope is activated or closed.
1512
*/
16-
public abstract class LogContextScopeListener
17-
implements ExtendedScopeListener, WithGlobalTracer.Callback {
13+
public abstract class LogContextScopeListener implements ScopeListener, WithGlobalTracer.Callback {
1814

1915
@Override
20-
public void afterScopeActivated() {}
21-
22-
@Override
23-
public void afterScopeActivated(
24-
DDTraceId traceId, long localRootSpanId, long spanId, TraceConfig traceConfig) {
25-
if (traceConfig != null && traceConfig.isLogsInjectionEnabled()) {
26-
if (InstrumenterConfig.get().isLogs128bTraceIdEnabled() && traceId.toHighOrderLong() != 0) {
27-
add(CorrelationIdentifier.getTraceIdKey(), traceId.toHexString());
28-
} else {
29-
add(CorrelationIdentifier.getTraceIdKey(), traceId.toString());
30-
}
31-
32-
add(CorrelationIdentifier.getSpanIdKey(), DDSpanId.toString(spanId));
16+
public void afterScopeActivated() {
17+
if (AgentTracer.traceConfig().isLogsInjectionEnabled()) {
18+
add(CorrelationIdentifier.getTraceIdKey(), CorrelationIdentifier.getTraceId());
19+
add(CorrelationIdentifier.getSpanIdKey(), CorrelationIdentifier.getSpanId());
3320
}
3421
}
3522

dd-java-agent/cws-tls/src/main/java/datadog/cws/tls/TlsScopeListener.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import datadog.trace.api.DDSpanId;
44
import datadog.trace.api.DDTraceId;
5-
import datadog.trace.api.TraceConfig;
65
import datadog.trace.api.scopemanager.ExtendedScopeListener;
76
import java.util.ArrayDeque;
87
import java.util.Deque;
@@ -50,12 +49,11 @@ void poll() {
5049

5150
@Override
5251
public void afterScopeActivated() {
53-
afterScopeActivated(DDTraceId.ZERO, DDSpanId.ZERO, DDSpanId.ZERO, null);
52+
afterScopeActivated(DDTraceId.ZERO, DDSpanId.ZERO);
5453
}
5554

5655
@Override
57-
public void afterScopeActivated(
58-
DDTraceId traceId, long localRootSpanId, long spanId, TraceConfig traceConfig) {
56+
public void afterScopeActivated(DDTraceId traceId, long spanId) {
5957
push(traceId, spanId);
6058
}
6159

dd-java-agent/cws-tls/src/test/groovy/datadog/cws/tls/TlsTest.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package datadog.cws.tls
22

3-
import datadog.trace.api.DDSpanId
3+
44
import datadog.trace.api.DDTraceId
55
import datadog.trace.bootstrap.instrumentation.api.AgentSpan
66
import datadog.trace.test.util.DDSpecification
@@ -20,8 +20,8 @@ class TlsTest extends DDSpecification {
2020
span.getSpanId() >> 22L
2121

2222
when:
23-
listener.afterScopeActivated(DDTraceId.from(11L), DDSpanId.ZERO, 12L, null)
24-
listener.afterScopeActivated(DDTraceId.from(21L), DDSpanId.ZERO, 22L, null)
23+
listener.afterScopeActivated(DDTraceId.from(11L), 12L)
24+
listener.afterScopeActivated(DDTraceId.from(21L), 22L)
2525
then:
2626
tls.getTraceId() == DDTraceId.from(21L)
2727
tls.getSpanId() == 22L

dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public static class DDAgentWriterBuilder {
3939
Monitoring monitoring = Monitoring.DISABLED;
4040
boolean traceAgentV05Enabled = Config.get().isTraceAgentV05Enabled();
4141
boolean metricsReportingEnabled = Config.get().isTracerMetricsEnabled();
42+
private int flushTimeout = 1;
43+
private TimeUnit flushTimeoutUnit = TimeUnit.SECONDS;
4244
boolean alwaysFlush = false;
4345

4446
private DDAgentApi agentApi;
@@ -116,6 +118,12 @@ public DDAgentWriterBuilder featureDiscovery(DDAgentFeaturesDiscovery featureDis
116118
return this;
117119
}
118120

121+
public DDAgentWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) {
122+
this.flushTimeout = flushTimeout;
123+
this.flushTimeoutUnit = flushTimeoutUnit;
124+
return this;
125+
}
126+
119127
public DDAgentWriterBuilder alwaysFlush(boolean alwaysFlush) {
120128
this.alwaysFlush = alwaysFlush;
121129
return this;
@@ -157,15 +165,23 @@ public DDAgentWriter build() {
157165
singleSpanSampler,
158166
null);
159167

160-
return new DDAgentWriter(traceProcessingWorker, dispatcher, healthMetrics, alwaysFlush);
168+
return new DDAgentWriter(
169+
traceProcessingWorker,
170+
dispatcher,
171+
healthMetrics,
172+
flushTimeout,
173+
flushTimeoutUnit,
174+
alwaysFlush);
161175
}
162176
}
163177

164178
DDAgentWriter(
165179
TraceProcessingWorker worker,
166180
PayloadDispatcher dispatcher,
167181
HealthMetrics healthMetrics,
182+
int flushTimeout,
183+
TimeUnit flushTimeoutUnit,
168184
boolean alwaysFlush) {
169-
super(worker, dispatcher, healthMetrics, alwaysFlush);
185+
super(worker, dispatcher, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
170186
}
171187
}

dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java

Lines changed: 31 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -74,26 +74,16 @@ public TraceProcessingWorker(
7474
spanSamplingWorker.getSpanSamplingQueue(),
7575
droppingPolicy);
7676

77-
boolean runAsDaemon = !Config.get().isCiVisibilityEnabled();
7877
this.serializingHandler =
79-
runAsDaemon
80-
? new DaemonTraceSerializingHandler(
81-
primaryQueue,
82-
secondaryQueue,
83-
healthMetrics,
84-
dispatcher,
85-
flushInterval,
86-
timeUnit,
87-
spanPostProcessor)
88-
: new NonDaemonTraceSerializingHandler(
89-
primaryQueue,
90-
secondaryQueue,
91-
healthMetrics,
92-
dispatcher,
93-
flushInterval,
94-
timeUnit,
95-
spanPostProcessor);
96-
this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler, runAsDaemon);
78+
new TraceSerializingHandler(
79+
primaryQueue,
80+
secondaryQueue,
81+
healthMetrics,
82+
dispatcher,
83+
flushInterval,
84+
timeUnit,
85+
spanPostProcessor);
86+
this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler);
9787
}
9888

9989
public void start() {
@@ -144,91 +134,7 @@ private static MpscBlockingConsumerArrayQueue<Object> createQueue(int capacity)
144134
return new MpscBlockingConsumerArrayQueue<>(capacity);
145135
}
146136

147-
private static class DaemonTraceSerializingHandler extends TraceSerializingHandler {
148-
public DaemonTraceSerializingHandler(
149-
MpscBlockingConsumerArrayQueue<Object> primaryQueue,
150-
MpscBlockingConsumerArrayQueue<Object> secondaryQueue,
151-
HealthMetrics healthMetrics,
152-
PayloadDispatcher payloadDispatcher,
153-
long flushInterval,
154-
TimeUnit timeUnit,
155-
SpanPostProcessor spanPostProcessor) {
156-
super(
157-
primaryQueue,
158-
secondaryQueue,
159-
healthMetrics,
160-
payloadDispatcher,
161-
flushInterval,
162-
timeUnit,
163-
spanPostProcessor);
164-
}
165-
166-
@Override
167-
public void run() {
168-
try {
169-
runDutyCycle();
170-
} catch (InterruptedException e) {
171-
Thread.currentThread().interrupt();
172-
}
173-
log.debug("Datadog trace processor exited. Publishing traces stopped");
174-
}
175-
176-
private void runDutyCycle() throws InterruptedException {
177-
Thread thread = Thread.currentThread();
178-
while (!thread.isInterrupted()) {
179-
consumeFromPrimaryQueue();
180-
consumeFromSecondaryQueue();
181-
flushIfNecessary();
182-
}
183-
}
184-
}
185-
186-
private static class NonDaemonTraceSerializingHandler extends TraceSerializingHandler {
187-
private static final double SHUTDOWN_TIMEOUT_MILLIS = 5_000;
188-
private Long shutdownSignalTimestamp;
189-
190-
public NonDaemonTraceSerializingHandler(
191-
MpscBlockingConsumerArrayQueue<Object> primaryQueue,
192-
MpscBlockingConsumerArrayQueue<Object> secondaryQueue,
193-
HealthMetrics healthMetrics,
194-
PayloadDispatcher payloadDispatcher,
195-
long flushInterval,
196-
TimeUnit timeUnit,
197-
SpanPostProcessor spanPostProcessor) {
198-
super(
199-
primaryQueue,
200-
secondaryQueue,
201-
healthMetrics,
202-
payloadDispatcher,
203-
flushInterval,
204-
timeUnit,
205-
spanPostProcessor);
206-
}
207-
208-
@Override
209-
public void run() {
210-
while (!shouldShutdown()) {
211-
try {
212-
consumeFromPrimaryQueue();
213-
consumeFromSecondaryQueue();
214-
flushIfNecessary();
215-
} catch (InterruptedException e) {
216-
if (shutdownSignalTimestamp == null) {
217-
shutdownSignalTimestamp = System.currentTimeMillis();
218-
}
219-
}
220-
}
221-
log.debug("Datadog trace processor exited. Unpublished traces left: " + !queuesAreEmpty());
222-
}
223-
224-
private boolean shouldShutdown() {
225-
return shutdownSignalTimestamp != null
226-
&& (shutdownSignalTimestamp + SHUTDOWN_TIMEOUT_MILLIS <= System.currentTimeMillis()
227-
|| queuesAreEmpty());
228-
}
229-
}
230-
231-
public abstract static class TraceSerializingHandler implements Runnable {
137+
public static class TraceSerializingHandler implements Runnable {
232138

233139
private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
234140
private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
@@ -261,6 +167,27 @@ public TraceSerializingHandler(
261167
this.spanPostProcessor = spanPostProcessor;
262168
}
263169

170+
@Override
171+
public void run() {
172+
try {
173+
runDutyCycle();
174+
} catch (InterruptedException e) {
175+
Thread.currentThread().interrupt();
176+
}
177+
log.debug(
178+
"Datadog trace processor exited. Publishing traces stopped. Unpublished traces left: "
179+
+ !queuesAreEmpty());
180+
}
181+
182+
private void runDutyCycle() throws InterruptedException {
183+
Thread thread = Thread.currentThread();
184+
while (!thread.isInterrupted()) {
185+
consumeFromPrimaryQueue();
186+
consumeFromSecondaryQueue();
187+
flushIfNecessary();
188+
}
189+
}
190+
264191
@SuppressWarnings("unchecked")
265192
public void onEvent(Object event) {
266193
// publish an incomplete batch if

dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import datadog.trace.common.writer.ddintake.DDIntakeTrackTypeResolver;
2525
import datadog.trace.core.monitor.HealthMetrics;
2626
import datadog.trace.util.Strings;
27+
import java.util.concurrent.TimeUnit;
2728
import okhttp3.HttpUrl;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
@@ -105,6 +106,10 @@ public static Writer createWriter(
105106
.singleSpanSampler(singleSpanSampler)
106107
.flushIntervalMilliseconds(flushIntervalMilliseconds);
107108

109+
if (config.isCiVisibilityEnabled()) {
110+
builder.flushTimeout(5, TimeUnit.SECONDS);
111+
}
112+
108113
if (config.isCiVisibilityCodeCoverageEnabled()) {
109114
final RemoteApi coverageApi =
110115
createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.CITESTCOV);
@@ -140,7 +145,7 @@ public static Writer createWriter(
140145
ddAgentApi.addResponseListener((RemoteResponseListener) sampler);
141146
}
142147

143-
remoteWriter =
148+
DDAgentWriter.DDAgentWriterBuilder builder =
144149
DDAgentWriter.builder()
145150
.agentApi(ddAgentApi)
146151
.featureDiscovery(featuresDiscovery)
@@ -149,8 +154,13 @@ public static Writer createWriter(
149154
.monitoring(commObjects.monitoring)
150155
.alwaysFlush(alwaysFlush)
151156
.spanSamplingRules(singleSpanSampler)
152-
.flushIntervalMilliseconds(flushIntervalMilliseconds)
153-
.build();
157+
.flushIntervalMilliseconds(flushIntervalMilliseconds);
158+
159+
if (config.isCiVisibilityEnabled()) {
160+
builder.flushTimeout(5, TimeUnit.SECONDS);
161+
}
162+
163+
remoteWriter = builder.build();
154164
}
155165

156166
return remoteWriter;

dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScope.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,7 @@ public final void afterActivated() {
178178

179179
for (final ExtendedScopeListener listener : scopeManager.extendedScopeListeners) {
180180
try {
181-
listener.afterScopeActivated(
182-
span.getTraceId(),
183-
span.getLocalRootSpan().getSpanId(),
184-
span.context().getSpanId(),
185-
span.traceConfig());
181+
listener.afterScopeActivated(span.getTraceId(), span.getSpanId());
186182
} catch (Throwable e) {
187183
ContinuableScopeManager.log.debug(
188184
"ExtendedScopeListener threw exception in afterActivated()", e);

dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,7 @@ private void addExtendedScopeListener(final ExtendedScopeListener listener) {
270270
AgentSpan activeSpan = activeSpan();
271271
if (activeSpan != null && activeSpan != NoopAgentSpan.INSTANCE) {
272272
// Notify the listener about the currently active scope
273-
listener.afterScopeActivated(
274-
activeSpan.getTraceId(),
275-
activeSpan.getLocalRootSpan().getSpanId(),
276-
activeSpan.context().getSpanId(),
277-
activeSpan.traceConfig());
273+
listener.afterScopeActivated(activeSpan.getTraceId(), activeSpan.getSpanId());
278274
}
279275
}
280276

dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentWriterTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class DDAgentWriterTest extends DDCoreSpecification {
3535
def dispatcher = new PayloadDispatcherImpl(new DDAgentMapperDiscovery(discovery), api, monitor, monitoring)
3636

3737
@Subject
38-
def writer = new DDAgentWriter(worker, dispatcher, monitor, false)
38+
def writer = new DDAgentWriter(worker, dispatcher, monitor, 1, TimeUnit.SECONDS, false)
3939

4040
// Only used to create spans
4141
def dummyTracer = tracerBuilder().writer(new ListWriter()).build()
@@ -176,7 +176,7 @@ class DDAgentWriterTest extends DDCoreSpecification {
176176
def worker = Mock(TraceProcessingWorker)
177177
def monitor = Stub(HealthMetrics)
178178
def dispatcher = Mock(PayloadDispatcherImpl)
179-
def writer = new DDAgentWriter(worker, dispatcher, monitor, false)
179+
def writer = new DDAgentWriter(worker, dispatcher, monitor, 1, TimeUnit.SECONDS, false)
180180
def p0 = newSpan()
181181
p0.setSamplingPriority(PrioritySampling.SAMPLER_DROP)
182182
def trace = [p0, newSpan()]

0 commit comments

Comments
 (0)