|
1 | 1 | package datadog.trace.instrumentation.kotlin.coroutines; |
2 | 2 |
|
| 3 | +import static datadog.trace.instrumentation.kotlin.coroutines.CoroutineContextHelper.getJob; |
| 4 | + |
3 | 5 | import datadog.trace.bootstrap.instrumentation.api.AgentScope; |
4 | 6 | import datadog.trace.bootstrap.instrumentation.api.AgentTracer; |
5 | 7 | import datadog.trace.bootstrap.instrumentation.api.ScopeState; |
| 8 | +import java.util.concurrent.TimeUnit; |
| 9 | +import java.util.concurrent.atomic.AtomicReference; |
| 10 | +import kotlin.Unit; |
6 | 11 | import kotlin.coroutines.CoroutineContext; |
| 12 | +import kotlin.jvm.functions.Function1; |
7 | 13 | import kotlin.jvm.functions.Function2; |
| 14 | +import kotlinx.coroutines.Job; |
8 | 15 | import kotlinx.coroutines.ThreadContextElement; |
9 | 16 | import org.jetbrains.annotations.NotNull; |
10 | 17 | import org.jetbrains.annotations.Nullable; |
11 | 18 |
|
12 | | -public class ScopeStateCoroutineContext implements ThreadContextElement<ScopeState> { |
| 19 | +/** Map a scope state to a coroutine context. */ |
| 20 | +public class ScopeStateCoroutineContext |
| 21 | + implements ThreadContextElement<ScopeState>, Function1<Throwable, Unit> { |
| 22 | + |
| 23 | + // Marker scope state to handle concurrent completion and restoration missing the completed state |
| 24 | + private static final ScopeState COMPLETED = new ScopeState.NoopScopeState(); |
| 25 | + |
| 26 | + // Maximum time to wait if the coroutine is rescheduled before it has been restored properly |
| 27 | + private static final long MAX_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(5); |
13 | 28 |
|
14 | 29 | private static final Key<ScopeStateCoroutineContext> KEY = new ContextElementKey(); |
15 | | - private final ScopeState coroutineScopeState; |
16 | | - @Nullable private ContinuationHandler continuationHandler; |
17 | 30 |
|
18 | | - public ScopeStateCoroutineContext() { |
19 | | - coroutineScopeState = AgentTracer.get().newScopeState(); |
| 31 | + private final ScopeState initialScopeState; |
| 32 | + private final AtomicReference<ScopeState> availableScopeState = new AtomicReference<>(); |
| 33 | + private volatile AgentScope.Continuation continuation; |
| 34 | + private volatile AgentScope continuationScope; |
| 35 | + private volatile boolean registered = false; |
| 36 | + |
| 37 | + public ScopeStateCoroutineContext(CoroutineContext coroutineContext) { |
20 | 38 | final AgentScope activeScope = AgentTracer.get().activeScope(); |
21 | | - if (activeScope != null) { |
22 | | - activeScope.setAsyncPropagation(true); |
23 | | - continuationHandler = |
24 | | - new ContinuationHandler(coroutineScopeState, activeScope.captureConcurrent()); |
| 39 | + if (activeScope != null && activeScope.isAsyncPropagating()) { |
| 40 | + continuation = activeScope.capture(); |
| 41 | + maybeRegisterOnCompletionCallback(coroutineContext); |
| 42 | + } else { |
| 43 | + continuation = null; |
| 44 | + } |
| 45 | + initialScopeState = AgentTracer.get().newScopeState(); |
| 46 | + availableScopeState.set(initialScopeState); |
| 47 | + } |
| 48 | + |
| 49 | + // Called when a coroutine is about to start executing. |
| 50 | + @Override |
| 51 | + public ScopeState updateThreadContext(@NotNull final CoroutineContext coroutineContext) { |
| 52 | + ScopeState scopeState = initialScopeState; |
| 53 | + // There seems to be some weird scheduling bug (IMHO) where a CoroutineContext can be |
| 54 | + // rescheduled if you use a very small delay() so that updateThreadContext can be called on the |
| 55 | + // new thread while restoreThreadContext is still in progress on the old thread and the scope |
| 56 | + // state is not available for us to use. |
| 57 | + // We try to mitigate the issue here by delaying execution for up to MAX_WAIT_NANOS and then not |
| 58 | + // activating the scope state if it's still in use by the old thread. |
| 59 | + long delay = 0; |
| 60 | + long start = 0; |
| 61 | + while (!availableScopeState.compareAndSet(scopeState, null)) { |
| 62 | + // If we've waited too long, then don't try to use the scope state |
| 63 | + if (delay > MAX_WAIT_NANOS) { |
| 64 | + scopeState = null; |
| 65 | + break; |
| 66 | + } |
| 67 | + // If this is the first time around, then update the start time, and the reason for using |
| 68 | + // delay is that we can't check if start is 0, since that is a valid System.nanoTime value |
| 69 | + if (delay == 0) { |
| 70 | + start = System.nanoTime(); |
| 71 | + } |
| 72 | + Thread.yield(); |
| 73 | + // We can't have a delay that is 0 since we will move the start time if that happens |
| 74 | + delay = Long.max(System.nanoTime() - start, 1); |
| 75 | + } |
| 76 | + |
| 77 | + if (scopeState != null) { |
| 78 | + scopeState.activate(); |
| 79 | + if (continuation != null) { |
| 80 | + continuationScope = continuation.activate(); |
| 81 | + continuation = null; |
| 82 | + // Sometimes there is no job available when the continuation is captured so register again |
| 83 | + maybeRegisterOnCompletionCallback(coroutineContext); |
| 84 | + } |
25 | 85 | } |
| 86 | + return scopeState; |
26 | 87 | } |
27 | 88 |
|
| 89 | + // Called when a coroutine is about to stop executing. |
28 | 90 | @Override |
29 | 91 | public void restoreThreadContext( |
30 | 92 | @NotNull final CoroutineContext coroutineContext, final ScopeState oldState) { |
31 | | - oldState.activate(); |
| 93 | + if (oldState != null) { |
| 94 | + // We only need to clean up if we have a continuationScope, since the continuation will |
| 95 | + // already have been activated before we get here |
| 96 | + if (continuationScope != null) { |
| 97 | + Job job = getJob(coroutineContext); |
| 98 | + if (job != null && !job.isActive()) { |
| 99 | + maybeCancelOrClose(); |
| 100 | + } |
| 101 | + } |
| 102 | + oldState.restore(); |
| 103 | + ScopeState maybeCompleted = availableScopeState.getAndSet(oldState); |
| 104 | + if (maybeCompleted == COMPLETED) { |
| 105 | + // The method for invoke on completion was called while we were processing, and we missed |
| 106 | + // that the job is no longer active, so try to clean up again |
| 107 | + if (availableScopeState.compareAndSet(oldState, null)) { |
| 108 | + activateAndMaybeCancelOrClose(oldState); |
| 109 | + availableScopeState.set(oldState); |
| 110 | + } |
| 111 | + } |
| 112 | + } |
32 | 113 | } |
33 | 114 |
|
| 115 | + // Called when a coroutine has been completed. Can be executed concurrently. |
34 | 116 | @Override |
35 | | - public ScopeState updateThreadContext(@NotNull final CoroutineContext coroutineContext) { |
36 | | - final ScopeState oldScopeState = AgentTracer.get().newScopeState(); |
37 | | - oldScopeState.fetchFromActive(); |
38 | | - |
39 | | - coroutineScopeState.activate(); |
| 117 | + public Unit invoke(Throwable throwable) { |
| 118 | + ScopeState scopeState = availableScopeState.getAndSet(COMPLETED); |
| 119 | + // If there is no race with execution, then activate the scope state and clean up |
| 120 | + if (scopeState != null && scopeState != COMPLETED) { |
| 121 | + activateAndMaybeCancelOrClose(scopeState); |
| 122 | + availableScopeState.set(scopeState); |
| 123 | + } |
| 124 | + return Unit.INSTANCE; |
| 125 | + } |
40 | 126 |
|
41 | | - if (continuationHandler != null && !continuationHandler.isActive()) { |
42 | | - continuationHandler.activate(); |
43 | | - continuationHandler.register(coroutineContext); |
| 127 | + private void maybeRegisterOnCompletionCallback(CoroutineContext coroutineContext) { |
| 128 | + if (!registered) { |
| 129 | + // Make sure we clean up on completion |
| 130 | + Job job = getJob(coroutineContext); |
| 131 | + if (job != null) { |
| 132 | + job.invokeOnCompletion(this); |
| 133 | + registered = true; |
| 134 | + } |
44 | 135 | } |
| 136 | + } |
45 | 137 |
|
46 | | - return oldScopeState; |
| 138 | + private void activateAndMaybeCancelOrClose(ScopeState scopeState) { |
| 139 | + scopeState.activate(); |
| 140 | + maybeCancelOrClose(); |
| 141 | + scopeState.restore(); |
| 142 | + } |
| 143 | + |
| 144 | + private void maybeCancelOrClose() { |
| 145 | + // We can have either the continuation or the continuationScope stored but not both |
| 146 | + if (continuation != null) { |
| 147 | + continuation.cancel(); |
| 148 | + continuation = null; |
| 149 | + } else if (continuationScope != null && continuationScope == AgentTracer.activeScope()) { |
| 150 | + continuationScope.close(); |
| 151 | + continuationScope = null; |
| 152 | + } |
47 | 153 | } |
48 | 154 |
|
49 | 155 | @Nullable |
|
0 commit comments