Skip to content

Commit 542e6a8

Browse files
committed
Change kotlin coroutine instrumentation to handle cancelation and concurrency better
1 parent 9f425aa commit 542e6a8

10 files changed

Lines changed: 269 additions & 135 deletions

File tree

dd-java-agent/instrumentation/kotlin-coroutines/src/main/java/datadog/trace/instrumentation/kotlin/coroutines/ContinuationHandler.java

Lines changed: 0 additions & 57 deletions
This file was deleted.

dd-java-agent/instrumentation/kotlin-coroutines/src/main/java/datadog/trace/instrumentation/kotlin/coroutines/CoroutineContextAdvice.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import net.bytebuddy.asm.Advice;
55

66
public class CoroutineContextAdvice {
7-
@Advice.OnMethodEnter
8-
public static void enter(
9-
@Advice.Argument(value = 1, readOnly = false) CoroutineContext coroutineContext) {
7+
8+
// This is applied last to ensure that we have a Job attached before our context is added,
9+
// so we can register the on completion callback to clean up our resources
10+
@Advice.OnMethodExit
11+
public static void exit(@Advice.Return(readOnly = false) CoroutineContext coroutineContext) {
1012
if (coroutineContext != null) {
11-
coroutineContext = coroutineContext.plus(new ScopeStateCoroutineContext());
13+
coroutineContext = coroutineContext.plus(new ScopeStateCoroutineContext(coroutineContext));
1214
}
1315
}
1416
}

dd-java-agent/instrumentation/kotlin-coroutines/src/main/java/datadog/trace/instrumentation/kotlin/coroutines/KotlinCoroutinesInstrumentation.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ public String[] helperClassNames() {
2020
return new String[] {
2121
packageName + ".ScopeStateCoroutineContext",
2222
packageName + ".ScopeStateCoroutineContext$ContextElementKey",
23-
packageName + ".ContinuationHandler",
2423
packageName + ".CoroutineContextHelper",
2524
};
2625
}

dd-java-agent/instrumentation/kotlin-coroutines/src/main/java/datadog/trace/instrumentation/kotlin/coroutines/ScopeStateCoroutineContext.java

Lines changed: 125 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,155 @@
11
package datadog.trace.instrumentation.kotlin.coroutines;
22

3+
import static datadog.trace.instrumentation.kotlin.coroutines.CoroutineContextHelper.getJob;
4+
35
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
46
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
57
import datadog.trace.bootstrap.instrumentation.api.ScopeState;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
import kotlin.Unit;
611
import kotlin.coroutines.CoroutineContext;
12+
import kotlin.jvm.functions.Function1;
713
import kotlin.jvm.functions.Function2;
14+
import kotlinx.coroutines.Job;
815
import kotlinx.coroutines.ThreadContextElement;
916
import org.jetbrains.annotations.NotNull;
1017
import org.jetbrains.annotations.Nullable;
1118

12-
public class ScopeStateCoroutineContext implements ThreadContextElement<ScopeState> {
19+
/** Map a scope state to a coroutine context. */
20+
public class ScopeStateCoroutineContext
21+
implements ThreadContextElement<ScopeState>, Function1<Throwable, Unit> {
22+
23+
// Marker scope state to handle concurrent completion and restoration missing the completed state
24+
private static final ScopeState COMPLETED = new ScopeState.NoopScopeState();
25+
26+
// Maximum time to wait if the coroutine is rescheduled before it has been restored properly
27+
private static final long MAX_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
1328

1429
private static final Key<ScopeStateCoroutineContext> KEY = new ContextElementKey();
15-
private final ScopeState coroutineScopeState;
16-
@Nullable private ContinuationHandler continuationHandler;
1730

18-
public ScopeStateCoroutineContext() {
19-
coroutineScopeState = AgentTracer.get().newScopeState();
31+
private final ScopeState initialScopeState;
32+
private final AtomicReference<ScopeState> availableScopeState = new AtomicReference<>();
33+
private volatile AgentScope.Continuation continuation;
34+
private volatile AgentScope continuationScope;
35+
private volatile boolean registered = false;
36+
37+
public ScopeStateCoroutineContext(CoroutineContext coroutineContext) {
2038
final AgentScope activeScope = AgentTracer.get().activeScope();
21-
if (activeScope != null) {
22-
activeScope.setAsyncPropagation(true);
23-
continuationHandler =
24-
new ContinuationHandler(coroutineScopeState, activeScope.captureConcurrent());
39+
if (activeScope != null && activeScope.isAsyncPropagating()) {
40+
continuation = activeScope.capture();
41+
maybeRegisterOnCompletionCallback(coroutineContext);
42+
} else {
43+
continuation = null;
44+
}
45+
initialScopeState = AgentTracer.get().newScopeState();
46+
availableScopeState.set(initialScopeState);
47+
}
48+
49+
// Called when a coroutine is about to start executing.
50+
@Override
51+
public ScopeState updateThreadContext(@NotNull final CoroutineContext coroutineContext) {
52+
ScopeState scopeState = initialScopeState;
53+
// There seems to be some weird scheduling bug (IMHO) where a CoroutineContext can be
54+
// rescheduled if you use a very small delay() so that updateThreadContext can be called on the
55+
// new thread while restoreThreadContext is still in progress on the old thread and the scope
56+
// state is not available for us to use.
57+
// We try to mitigate the issue here by delaying execution for up to MAX_WAIT_NANOS and then not
58+
// activating the scope state if it's still in use by the old thread.
59+
long delay = 0;
60+
long start = 0;
61+
while (!availableScopeState.compareAndSet(scopeState, null)) {
62+
// If we've waited too long, then don't try to use the scope state
63+
if (delay > MAX_WAIT_NANOS) {
64+
scopeState = null;
65+
break;
66+
}
67+
// If this is the first time around, then update the start time, and the reason for using
68+
// delay is that we can't check if start is 0, since that is a valid System.nanoTime value
69+
if (delay == 0) {
70+
start = System.nanoTime();
71+
}
72+
Thread.yield();
73+
// We can't have a delay that is 0 since we will move the start time if that happens
74+
delay = Long.max(System.nanoTime() - start, 1);
75+
}
76+
77+
if (scopeState != null) {
78+
scopeState.activate();
79+
if (continuation != null) {
80+
continuationScope = continuation.activate();
81+
continuation = null;
82+
// Sometimes there is no job available when the continuation is captured so register again
83+
maybeRegisterOnCompletionCallback(coroutineContext);
84+
}
2585
}
86+
return scopeState;
2687
}
2788

89+
// Called when a coroutine is about to stop executing.
2890
@Override
2991
public void restoreThreadContext(
3092
@NotNull final CoroutineContext coroutineContext, final ScopeState oldState) {
31-
oldState.activate();
93+
if (oldState != null) {
94+
// We only need to clean up if we have a continuationScope, since the continuation will
95+
// already have been activated before we get here
96+
if (continuationScope != null) {
97+
Job job = getJob(coroutineContext);
98+
if (job != null && !job.isActive()) {
99+
maybeCancelOrClose();
100+
}
101+
}
102+
oldState.restore();
103+
ScopeState maybeCompleted = availableScopeState.getAndSet(oldState);
104+
if (maybeCompleted == COMPLETED) {
105+
// The method for invoke on completion was called while we were processing, and we missed
106+
// that the job is no longer active, so try to clean up again
107+
if (availableScopeState.compareAndSet(oldState, null)) {
108+
activateAndMaybeCancelOrClose(oldState);
109+
availableScopeState.set(oldState);
110+
}
111+
}
112+
}
32113
}
33114

115+
// Called when a coroutine has been completed. Can be executed concurrently.
34116
@Override
35-
public ScopeState updateThreadContext(@NotNull final CoroutineContext coroutineContext) {
36-
final ScopeState oldScopeState = AgentTracer.get().newScopeState();
37-
oldScopeState.fetchFromActive();
38-
39-
coroutineScopeState.activate();
117+
public Unit invoke(Throwable throwable) {
118+
ScopeState scopeState = availableScopeState.getAndSet(COMPLETED);
119+
// If there is no race with execution, then activate the scope state and clean up
120+
if (scopeState != null && scopeState != COMPLETED) {
121+
activateAndMaybeCancelOrClose(scopeState);
122+
availableScopeState.set(scopeState);
123+
}
124+
return Unit.INSTANCE;
125+
}
40126

41-
if (continuationHandler != null && !continuationHandler.isActive()) {
42-
continuationHandler.activate();
43-
continuationHandler.register(coroutineContext);
127+
private void maybeRegisterOnCompletionCallback(CoroutineContext coroutineContext) {
128+
if (!registered) {
129+
// Make sure we clean up on completion
130+
Job job = getJob(coroutineContext);
131+
if (job != null) {
132+
job.invokeOnCompletion(this);
133+
registered = true;
134+
}
44135
}
136+
}
45137

46-
return oldScopeState;
138+
private void activateAndMaybeCancelOrClose(ScopeState scopeState) {
139+
scopeState.activate();
140+
maybeCancelOrClose();
141+
scopeState.restore();
142+
}
143+
144+
private void maybeCancelOrClose() {
145+
// We can have either the continuation or the continuationScope stored but not both
146+
if (continuation != null) {
147+
continuation.cancel();
148+
continuation = null;
149+
} else if (continuationScope != null && continuationScope == AgentTracer.activeScope()) {
150+
continuationScope.close();
151+
continuationScope = null;
152+
}
47153
}
48154

49155
@Nullable

dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -263,21 +263,34 @@ ScopeStack scopeStack() {
263263

264264
@Override
265265
public ScopeState newScopeState() {
266-
return new ContinuableScopeState();
266+
return new ContinuableScopeState(this, ScopeStack.emptyStack());
267267
}
268268

269-
private class ContinuableScopeState implements ScopeState {
269+
private static class ContinuableScopeState implements ScopeState {
270270

271-
private ScopeStack localScopeStack = tlsScopeStack.initialValue();
271+
private final ContinuableScopeManager scopeManager;
272+
private final ArrayDeque<ContinuableScope> scopeStack;
273+
private volatile ArrayDeque<ContinuableScope> previousStack;
274+
275+
public ContinuableScopeState(
276+
ContinuableScopeManager scopeManager, ArrayDeque<ContinuableScope> scopeStack) {
277+
this.scopeManager = scopeManager;
278+
this.scopeStack = scopeStack;
279+
this.previousStack = null;
280+
}
272281

273282
@Override
274283
public void activate() {
275-
tlsScopeStack.set(localScopeStack);
284+
ArrayDeque<ContinuableScope> previousStack = scopeManager.scopeStack().swap(scopeStack);
285+
this.previousStack = previousStack;
276286
}
277287

278288
@Override
279-
public void fetchFromActive() {
280-
localScopeStack = tlsScopeStack.get();
289+
public void restore() {
290+
if (previousStack != null) {
291+
scopeManager.scopeStack().swap(scopeStack, previousStack);
292+
previousStack = null;
293+
}
281294
}
282295
}
283296

@@ -507,10 +520,14 @@ protected ScopeStack initialValue() {
507520
*/
508521
static final class ScopeStack {
509522

523+
static ArrayDeque<ContinuableScope> emptyStack() {
524+
return new ArrayDeque<>();
525+
}
526+
510527
private final int nativeThreadId;
511528

512529
private final ProfilingContextIntegration profilingContextIntegration;
513-
private final ArrayDeque<ContinuableScope> stack = new ArrayDeque<>(); // previous scopes
530+
private ArrayDeque<ContinuableScope> stack = emptyStack(); // previous scopes
514531

515532
ContinuableScope top; // current scope
516533

@@ -609,6 +626,31 @@ void clear() {
609626
top = null;
610627
}
611628

629+
ArrayDeque<ContinuableScope> swap(ArrayDeque<ContinuableScope> newStack) {
630+
return swap(null, newStack);
631+
}
632+
633+
ArrayDeque<ContinuableScope> swap(
634+
ArrayDeque<ContinuableScope> expectedStack, ArrayDeque<ContinuableScope> newStack) {
635+
ArrayDeque<ContinuableScope> previousStack = stack;
636+
if (expectedStack != null && expectedStack != previousStack) {
637+
throw new IllegalArgumentException(
638+
"Unexpected scope stack " + previousStack + "found instead of " + expectedStack);
639+
}
640+
if (top != null) {
641+
previousStack.push(top);
642+
}
643+
stack = newStack;
644+
if (newStack.isEmpty()) {
645+
top = null;
646+
onBecomeEmpty();
647+
} else {
648+
top = newStack.pop();
649+
onTopChanged(top);
650+
}
651+
return previousStack;
652+
}
653+
612654
private void onTopChanged(ContinuableScope top) {
613655
long spanId = top.span.getSpanId();
614656
AgentSpan rootSpan = top.span.getLocalRootSpan();

0 commit comments

Comments
 (0)