Skip to content

Commit ac3ca09

Browse files
authored
Prevent AppSec context from being closed more than once on partial flush (#7059)
We had some hooks in `CoreTracer.write` meant to be run whenever a root span is finished. However, they were effectively called not just when the root span finished, but also whenever a partial flush on a child span was performed. This ended up calling the hooks multiple times, and earlier than expected.
1 parent bd6b34d commit ac3ca09

4 files changed

Lines changed: 39 additions & 14 deletions

File tree

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,33 @@ PropagationTags.Factory getPropagationTagsFactory() {
240240
return propagationTagsFactory;
241241
}
242242

243+
/**
244+
* Called when a root span is finished before it is serialized. This is might be called multiple
245+
* times per root span. If a child span is part of a partial flush, this method will be called for
246+
* its root even if not finished.
247+
*/
243248
@Override
244249
public void onRootSpanFinished(AgentSpan root, EndpointTracker tracker) {
245250
profilingContextIntegration.onRootSpanFinished(root, tracker);
246251
}
247252

253+
/**
254+
* Called when a root span is finished before it is serialized. This is guaranteed to be called
255+
* exactly once per root span.
256+
*/
257+
void onRootSpanPublished(final AgentSpan root) {
258+
// Request context is propagated to contexts in child spans.
259+
// Assume here that if present it will be so starting in the top span.
260+
RequestContext requestContext = root.getRequestContext();
261+
if (requestContext != null) {
262+
try {
263+
requestContext.close();
264+
} catch (IOException e) {
265+
log.warn("Error closing request context data", e);
266+
}
267+
}
268+
}
269+
248270
@Override
249271
public EndpointTracker onRootSpanStarted(AgentSpan root) {
250272
return profilingContextIntegration.onRootSpanStarted(root);
@@ -971,17 +993,6 @@ void write(final List<DDSpan> trace) {
971993
}
972994
if (null != rootSpan) {
973995
onRootSpanFinished(rootSpan, rootSpan.getEndpointTracker());
974-
975-
// request context is propagated to contexts in child spans
976-
// Assume here that if present it will be so starting in the top span
977-
RequestContext requestContext = rootSpan.getRequestContext();
978-
if (requestContext != null) {
979-
try {
980-
requestContext.close();
981-
} catch (IOException e) {
982-
log.warn("Error closing request context data", e);
983-
}
984-
}
985996
}
986997
}
987998

dd-trace-core/src/main/java/datadog/trace/core/PendingTrace.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,11 @@ PublishState onPublish(final DDSpan span) {
253253
// write method.
254254
healthMetrics.onFinishSpan();
255255
COMPLETED_SPAN_COUNT.incrementAndGet(this);
256-
return decrementRefAndMaybeWrite(span == getRootSpan());
256+
final DDSpan rootSpan = getRootSpan();
257+
if (span == rootSpan) {
258+
tracer.onRootSpanPublished(rootSpan);
259+
}
260+
return decrementRefAndMaybeWrite(span == rootSpan);
257261
}
258262

259263
@Override

dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class PendingTraceBufferTest extends DDSpecification {
9191
1 * bufferSpy.enqueue(trace)
9292
_ * tracer.getPartialFlushMinSpans() >> 10
9393
1 * tracer.getTimeWithNanoTicks(_)
94+
1 * tracer.onRootSpanPublished(span)
9495
0 * _
9596

9697
when:
@@ -123,6 +124,7 @@ class PendingTraceBufferTest extends DDSpecification {
123124
_ * bufferSpy.longRunningSpansEnabled()
124125
_ * tracer.getPartialFlushMinSpans() >> 10
125126
1 * tracer.getTimeWithNanoTicks(_)
127+
1 * tracer.onRootSpanPublished(parent)
126128
0 * _
127129

128130
when:
@@ -180,6 +182,7 @@ class PendingTraceBufferTest extends DDSpecification {
180182
_ * tracer.getPartialFlushMinSpans() >> 10
181183
_ * traceConfig.getServiceMapping() >> [:]
182184
_ * tracer.getTimeWithNanoTicks(_)
185+
buffer.queue.capacity() * tracer.onRootSpanPublished(_)
183186
0 * _
184187

185188
when:
@@ -195,6 +198,7 @@ class PendingTraceBufferTest extends DDSpecification {
195198
_ * tracer.getPartialFlushMinSpans() >> 10
196199
_ * traceConfig.getServiceMapping() >> [:]
197200
2 * tracer.getTimeWithNanoTicks(_)
201+
1 * tracer.onRootSpanPublished(_)
198202
0 * _
199203
pendingTrace.isEnqueued == 0
200204
}
@@ -216,14 +220,14 @@ class PendingTraceBufferTest extends DDSpecification {
216220
_ * tracer.getPartialFlushMinSpans() >> 10
217221
_ * traceConfig.getServiceMapping() >> [:]
218222
_ * tracer.getTimeWithNanoTicks(_)
223+
buffer.queue.capacity() * tracer.onRootSpanPublished(_)
219224
0 * _
220225

221226
when:
222227
def pendingTrace = factory.create(DDTraceId.ONE)
223228
pendingTrace.longRunningTrackedState = LongRunningTracesTracker.TO_TRACK
224229
addContinuation(newSpanOf(pendingTrace)).finish()
225230

226-
then:
227231
then:
228232
1 * tracer.captureTraceConfig() >> traceConfig
229233
1 * bufferSpy.enqueue(_)
@@ -234,6 +238,7 @@ class PendingTraceBufferTest extends DDSpecification {
234238
_ * tracer.getPartialFlushMinSpans() >> 10
235239
_ * traceConfig.getServiceMapping() >> [:]
236240
_ * tracer.getTimeWithNanoTicks(_)
241+
1 * tracer.onRootSpanPublished(_)
237242
0 * _
238243

239244
pendingTrace.isEnqueued == 0
@@ -261,6 +266,7 @@ class PendingTraceBufferTest extends DDSpecification {
261266
1 * bufferSpy.enqueue(trace)
262267
_ * tracer.getPartialFlushMinSpans() >> 10
263268
1 * tracer.getTimeWithNanoTicks(_)
269+
1 * tracer.onRootSpanPublished(parent)
264270
0 * _
265271

266272
when:
@@ -314,6 +320,7 @@ class PendingTraceBufferTest extends DDSpecification {
314320
}
315321
_ * tracer.getPartialFlushMinSpans() >> 10
316322
1 * tracer.getTimeWithNanoTicks(_)
323+
1 * tracer.onRootSpanPublished(parent)
317324
0 * _
318325

319326
when:
@@ -409,6 +416,7 @@ class PendingTraceBufferTest extends DDSpecification {
409416
1 * tracer.getPartialFlushMinSpans() >> 10000
410417
1 * traceConfig.getServiceMapping() >> [:]
411418
2 * tracer.getTimeWithNanoTicks(_)
419+
1 * tracer.onRootSpanPublished(_)
412420
0 * _
413421

414422
when: "fail to fill the buffer"

internal-api/src/main/java/datadog/trace/api/EndpointCheckpointer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
public interface EndpointCheckpointer {
66
/**
7-
* Callback to be called when a root span is written (together with the trace)
7+
* Callback to be called when a root span is finished (together with the trace). With partial
8+
* flushes, this may be called multiple times when any of the root span's children are finished
9+
* even if the root span is not.
810
*
911
* @param rootSpan the local root span of the trace
1012
* @param tracker the endpoint tracker

0 commit comments

Comments
 (0)