Skip to content

Commit 5a03541

Browse files
authored
Merge pull request #3876 from graphql-java/engine-running-tracking
Track Engine sync code execution
2 parents f817bce + 339f657 commit 5a03541

11 files changed

Lines changed: 588 additions & 252 deletions

src/main/java/graphql/execution/AbstractAsyncExecutionStrategy.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import graphql.ExecutionResultImpl;
66
import graphql.PublicSpi;
77

8-
import java.util.LinkedHashMap;
98
import java.util.List;
109
import java.util.Map;
1110
import java.util.concurrent.CompletableFuture;
@@ -23,7 +22,7 @@ public AbstractAsyncExecutionStrategy(DataFetcherExceptionHandler dataFetcherExc
2322
}
2423

2524
protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext executionContext, List<String> fieldNames, CompletableFuture<ExecutionResult> overallResult) {
26-
return (List<Object> results, Throwable exception) -> {
25+
return (List<Object> results, Throwable exception) -> executionContext.run(() -> {
2726
if (exception != null) {
2827
handleNonNullException(executionContext, overallResult, exception);
2928
return;
@@ -35,6 +34,6 @@ protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext exe
3534
resolvedValuesByField.put(fieldName, result);
3635
}
3736
overallResult.complete(new ExecutionResultImpl(resolvedValuesByField, executionContext.getErrors()));
38-
};
37+
});
3938
}
4039
}

src/main/java/graphql/execution/AsyncExecutionStrategy.java

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,54 +38,58 @@ public AsyncExecutionStrategy(DataFetcherExceptionHandler exceptionHandler) {
3838
@Override
3939
@SuppressWarnings("FutureReturnValueIgnored")
4040
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
41-
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
42-
dataLoaderDispatcherStrategy.executionStrategy(executionContext, parameters);
43-
Instrumentation instrumentation = executionContext.getInstrumentation();
44-
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
41+
return executionContext.call(() -> {
42+
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
43+
dataLoaderDispatcherStrategy.executionStrategy(executionContext, parameters);
44+
Instrumentation instrumentation = executionContext.getInstrumentation();
45+
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
4546

46-
ExecutionStrategyInstrumentationContext executionStrategyCtx = ExecutionStrategyInstrumentationContext.nonNullCtx(instrumentation.beginExecutionStrategy(instrumentationParameters, executionContext.getInstrumentationState()));
47+
ExecutionStrategyInstrumentationContext executionStrategyCtx = ExecutionStrategyInstrumentationContext.nonNullCtx(instrumentation.beginExecutionStrategy(instrumentationParameters, executionContext.getInstrumentationState()));
4748

48-
MergedSelectionSet fields = parameters.getFields();
49-
List<String> fieldNames = fields.getKeys();
49+
MergedSelectionSet fields = parameters.getFields();
50+
List<String> fieldNames = fields.getKeys();
5051

51-
Optional<ExecutionResult> isNotSensible = Introspection.isIntrospectionSensible(fields, executionContext);
52-
if (isNotSensible.isPresent()) {
53-
return CompletableFuture.completedFuture(isNotSensible.get());
54-
}
52+
Optional<ExecutionResult> isNotSensible = Introspection.isIntrospectionSensible(fields, executionContext);
53+
if (isNotSensible.isPresent()) {
54+
return CompletableFuture.completedFuture(isNotSensible.get());
55+
}
5556

56-
DeferredExecutionSupport deferredExecutionSupport = createDeferredExecutionSupport(executionContext, parameters);
57-
Async.CombinedBuilder<FieldValueInfo> futures = getAsyncFieldValueInfo(executionContext, parameters, deferredExecutionSupport);
57+
DeferredExecutionSupport deferredExecutionSupport = createDeferredExecutionSupport(executionContext, parameters);
58+
Async.CombinedBuilder<FieldValueInfo> futures = getAsyncFieldValueInfo(executionContext, parameters, deferredExecutionSupport);
5859

59-
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
60-
executionStrategyCtx.onDispatched();
60+
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
61+
executionStrategyCtx.onDispatched();
6162

62-
futures.await().whenComplete((completeValueInfos, throwable) -> {
63-
List<String> fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames);
63+
futures.await().whenComplete((completeValueInfos, throwable) -> {
64+
executionContext.run(() -> {
65+
List<String> fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames);
6466

65-
BiConsumer<List<Object>, Throwable> handleResultsConsumer = handleResults(executionContext, fieldsExecutedOnInitialResult, overallResult);
66-
if (throwable != null) {
67-
handleResultsConsumer.accept(null, throwable.getCause());
68-
return;
69-
}
67+
BiConsumer<List<Object>, Throwable> handleResultsConsumer = handleResults(executionContext, fieldsExecutedOnInitialResult, overallResult);
68+
if (throwable != null) {
69+
handleResultsConsumer.accept(null, throwable.getCause());
70+
return;
71+
}
7072

71-
Async.CombinedBuilder<Object> fieldValuesFutures = Async.ofExpectedSize(completeValueInfos.size());
72-
for (FieldValueInfo completeValueInfo : completeValueInfos) {
73-
fieldValuesFutures.addObject(completeValueInfo.getFieldValueObject());
74-
}
75-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos);
76-
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
77-
fieldValuesFutures.await().whenComplete(handleResultsConsumer);
78-
}).exceptionally((ex) -> {
79-
// if there are any issues with combining/handling the field results,
80-
// complete the future at all costs and bubble up any thrown exception so
81-
// the execution does not hang.
82-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex);
83-
executionStrategyCtx.onFieldValuesException();
84-
overallResult.completeExceptionally(ex);
85-
return null;
86-
});
73+
Async.CombinedBuilder<Object> fieldValuesFutures = Async.ofExpectedSize(completeValueInfos.size());
74+
for (FieldValueInfo completeValueInfo : completeValueInfos) {
75+
fieldValuesFutures.addObject(completeValueInfo.getFieldValueObject());
76+
}
77+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos);
78+
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
79+
fieldValuesFutures.await().whenComplete(handleResultsConsumer);
80+
});
81+
}).exceptionally((ex) -> executionContext.call(() -> {
82+
// if there are any issues with combining/handling the field results,
83+
// complete the future at all costs and bubble up any thrown exception so
84+
// the execution does not hang.
85+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex);
86+
executionStrategyCtx.onFieldValuesException();
87+
overallResult.completeExceptionally(ex);
88+
return null;
89+
}));
8790

88-
overallResult.whenComplete(executionStrategyCtx::onCompleted);
89-
return overallResult;
91+
overallResult.whenComplete(executionStrategyCtx::onCompleted);
92+
return overallResult;
93+
});
9094
}
9195
}

src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,38 +32,41 @@ public AsyncSerialExecutionStrategy(DataFetcherExceptionHandler exceptionHandler
3232
@Override
3333
@SuppressWarnings({"TypeParameterUnusedInFormals", "FutureReturnValueIgnored"})
3434
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
35-
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
35+
return executionContext.call(() -> {
36+
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
3637

37-
Instrumentation instrumentation = executionContext.getInstrumentation();
38-
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
39-
InstrumentationContext<ExecutionResult> executionStrategyCtx = nonNullCtx(instrumentation.beginExecutionStrategy(instrumentationParameters,
40-
executionContext.getInstrumentationState())
41-
);
42-
MergedSelectionSet fields = parameters.getFields();
43-
ImmutableList<String> fieldNames = ImmutableList.copyOf(fields.keySet());
38+
Instrumentation instrumentation = executionContext.getInstrumentation();
39+
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
40+
InstrumentationContext<ExecutionResult> executionStrategyCtx = nonNullCtx(instrumentation.beginExecutionStrategy(instrumentationParameters,
41+
executionContext.getInstrumentationState())
42+
);
43+
MergedSelectionSet fields = parameters.getFields();
44+
ImmutableList<String> fieldNames = ImmutableList.copyOf(fields.keySet());
4445

45-
// this is highly unlikely since Mutations cant do introspection BUT in theory someone could make the query strategy this code
46-
// so belts and braces
47-
Optional<ExecutionResult> isNotSensible = Introspection.isIntrospectionSensible(fields, executionContext);
48-
if (isNotSensible.isPresent()) {
49-
return CompletableFuture.completedFuture(isNotSensible.get());
50-
}
46+
// this is highly unlikely since Mutations cant do introspection BUT in theory someone could make the query strategy this code
47+
// so belts and braces
48+
Optional<ExecutionResult> isNotSensible = Introspection.isIntrospectionSensible(fields, executionContext);
49+
if (isNotSensible.isPresent()) {
50+
return CompletableFuture.completedFuture(isNotSensible.get());
51+
}
5152

52-
CompletableFuture<List<Object>> resultsFuture = Async.eachSequentially(fieldNames, (fieldName, prevResults) -> {
53-
MergedField currentField = fields.getSubField(fieldName);
54-
ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
55-
ExecutionStrategyParameters newParameters = parameters
56-
.transform(builder -> builder.field(currentField).path(fieldPath));
53+
CompletableFuture<List<Object>> resultsFuture = Async.eachSequentially(fieldNames, (fieldName, prevResults) -> executionContext.call(() -> {
54+
MergedField currentField = fields.getSubField(fieldName);
55+
ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
56+
ExecutionStrategyParameters newParameters = parameters
57+
.transform(builder -> builder.field(currentField).path(fieldPath));
5758

58-
return resolveSerialField(executionContext, dataLoaderDispatcherStrategy, newParameters);
59-
});
59+
Object resolveSerialField = resolveSerialField(executionContext, dataLoaderDispatcherStrategy, newParameters);
60+
return resolveSerialField;
61+
}));
6062

61-
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
62-
executionStrategyCtx.onDispatched();
63+
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
64+
executionStrategyCtx.onDispatched();
6365

64-
resultsFuture.whenComplete(handleResults(executionContext, fieldNames, overallResult));
65-
overallResult.whenComplete(executionStrategyCtx::onCompleted);
66-
return overallResult;
66+
resultsFuture.whenComplete(handleResults(executionContext, fieldNames, overallResult));
67+
overallResult.whenComplete(executionStrategyCtx::onCompleted);
68+
return overallResult;
69+
});
6770
}
6871

6972
private Object resolveSerialField(ExecutionContext executionContext,
@@ -74,10 +77,11 @@ private Object resolveSerialField(ExecutionContext executionContext,
7477
Object fieldWithInfo = resolveFieldWithInfo(executionContext, newParameters);
7578
if (fieldWithInfo instanceof CompletableFuture) {
7679
//noinspection unchecked
77-
return ((CompletableFuture<FieldValueInfo>) fieldWithInfo).thenCompose(fvi -> {
80+
return ((CompletableFuture<FieldValueInfo>) fieldWithInfo).thenCompose(fvi -> executionContext.call(() -> {
7881
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
79-
return fvi.getFieldValueFuture();
80-
});
82+
CompletableFuture<Object> fieldValueFuture = fvi.getFieldValueFuture();
83+
return fieldValueFuture;
84+
}));
8185
} else {
8286
FieldValueInfo fvi = (FieldValueInfo) fieldWithInfo;
8387
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package graphql.execution;
2+
3+
import graphql.ExperimentalApi;
4+
import graphql.GraphQLContext;
5+
import org.jspecify.annotations.NullMarked;
6+
7+
@ExperimentalApi
8+
@NullMarked
9+
public interface EngineRunningObserver {
10+
11+
12+
String ENGINE_RUNNING_OBSERVER_KEY = "__ENGINE_RUNNING_OBSERVER";
13+
14+
void runningStateChanged(ExecutionId executionId, GraphQLContext graphQLContext, boolean runningState);
15+
}

src/main/java/graphql/execution/Execution.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import graphql.schema.GraphQLObjectType;
3030
import graphql.schema.GraphQLSchema;
3131
import graphql.schema.impl.SchemaUtil;
32-
import org.jspecify.annotations.NonNull;
3332
import graphql.util.FpKit;
33+
import org.jspecify.annotations.NonNull;
3434
import org.reactivestreams.Publisher;
3535

3636
import java.util.Collections;
@@ -89,6 +89,9 @@ public CompletableFuture<ExecutionResult> execute(Document document, GraphQLSche
8989

9090
boolean propagateErrorsOnNonNullContractFailure = propagateErrorsOnNonNullContractFailure(getOperationResult.operationDefinition.getDirectives());
9191

92+
// can be null
93+
EngineRunningObserver engineRunningObserver = executionInput.getGraphQLContext().get(EngineRunningObserver.ENGINE_RUNNING_OBSERVER_KEY);
94+
9295
ExecutionContext executionContext = newExecutionContextBuilder()
9396
.instrumentation(instrumentation)
9497
.instrumentationState(instrumentationState)
@@ -111,6 +114,7 @@ public CompletableFuture<ExecutionResult> execute(Document document, GraphQLSche
111114
.valueUnboxer(valueUnboxer)
112115
.executionInput(executionInput)
113116
.propagapropagateErrorsOnNonNullContractFailureeErrors(propagateErrorsOnNonNullContractFailure)
117+
.engineRunningObserver(engineRunningObserver)
114118
.build();
115119

116120
executionContext.getGraphQLContext().put(ResultNodesInfo.RESULT_NODES_INFO, executionContext.getResultNodesInfo());

src/main/java/graphql/execution/ExecutionContext.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import graphql.util.FpKit;
2323
import graphql.util.LockKit;
2424
import org.dataloader.DataLoaderRegistry;
25+
import org.jspecify.annotations.Nullable;
2526

2627
import java.util.HashSet;
2728
import java.util.List;
2829
import java.util.Locale;
2930
import java.util.Map;
3031
import java.util.Set;
32+
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.atomic.AtomicInteger;
3134
import java.util.concurrent.atomic.AtomicReference;
3235
import java.util.function.Consumer;
3336
import java.util.function.Supplier;
@@ -63,10 +66,13 @@ public class ExecutionContext {
6366
private final Supplier<ExecutableNormalizedOperation> queryTree;
6467
private final boolean propagateErrorsOnNonNullContractFailure;
6568

69+
private final AtomicInteger isRunning = new AtomicInteger(0);
70+
6671
// this is modified after creation so it needs to be volatile to ensure visibility across Threads
6772
private volatile DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = DataLoaderDispatchStrategy.NO_OP;
6873

6974
private final ResultNodesInfo resultNodesInfo = new ResultNodesInfo();
75+
private final EngineRunningObserver engineRunningObserver;
7076

7177
ExecutionContext(ExecutionContextBuilder builder) {
7278
this.graphQLSchema = builder.graphQLSchema;
@@ -93,6 +99,7 @@ public class ExecutionContext {
9399
this.dataLoaderDispatcherStrategy = builder.dataLoaderDispatcherStrategy;
94100
this.queryTree = FpKit.interThreadMemoize(() -> ExecutableNormalizedOperationFactory.createExecutableNormalizedOperation(graphQLSchema, operationDefinition, fragmentsByName, coercedVariables));
95101
this.propagateErrorsOnNonNullContractFailure = builder.propagateErrorsOnNonNullContractFailure;
102+
this.engineRunningObserver = builder.engineRunningObserver;
96103
}
97104

98105

@@ -141,7 +148,9 @@ public Supplier<NormalizedVariables> getNormalizedVariables() {
141148

142149
/**
143150
* @param <T> for two
151+
*
144152
* @return the legacy context
153+
*
145154
* @deprecated use {@link #getGraphQLContext()} instead
146155
*/
147156
@Deprecated(since = "2021-07-05")
@@ -184,6 +193,7 @@ public ValueUnboxer getValueUnboxer() {
184193
* @return true if the current operation should propagate errors in non-null positions
185194
* Propagating errors is the default. Error aware clients may opt in returning null in non-null positions
186195
* by using the `@experimental_disableErrorPropagation` directive.
196+
*
187197
* @see graphql.Directives#setExperimentalDisableErrorPropagationEnabled(boolean) to change the JVM wide default
188198
*/
189199
@ExperimentalApi
@@ -338,6 +348,7 @@ public DataLoaderDispatchStrategy getDataLoaderDispatcherStrategy() {
338348
* the current values and allows you to transform it how you want.
339349
*
340350
* @param builderConsumer the consumer code that will be given a builder to transform
351+
*
341352
* @return a new ExecutionContext object based on calling build on that builder
342353
*/
343354
public ExecutionContext transform(Consumer<ExecutionContextBuilder> builderConsumer) {
@@ -349,4 +360,59 @@ public ExecutionContext transform(Consumer<ExecutionContextBuilder> builderConsu
349360
public ResultNodesInfo getResultNodesInfo() {
350361
return resultNodesInfo;
351362
}
363+
364+
@Nullable
365+
EngineRunningObserver getEngineRunningObserver() {
366+
return engineRunningObserver;
367+
}
368+
369+
@Internal
370+
public boolean isRunning() {
371+
return isRunning.get() > 0;
372+
}
373+
374+
public void incrementRunning() {
375+
if (isRunning.incrementAndGet() == 1 && engineRunningObserver != null) {
376+
engineRunningObserver.runningStateChanged(executionId, graphQLContext, true);
377+
}
378+
}
379+
380+
public void decrementRunning() {
381+
if (isRunning.decrementAndGet() == 0 && engineRunningObserver != null) {
382+
engineRunningObserver.runningStateChanged(executionId, graphQLContext, false);
383+
}
384+
}
385+
386+
public void incrementRunning(CompletableFuture<?> cf) {
387+
cf.whenComplete((result, throwable) -> {
388+
incrementRunning();
389+
});
390+
}
391+
392+
public void decrementRunning(CompletableFuture<?> cf) {
393+
cf.whenComplete((result, throwable) -> {
394+
decrementRunning();
395+
});
396+
397+
}
398+
399+
public <T> T call(Supplier<T> callable) {
400+
incrementRunning();
401+
try {
402+
return callable.get();
403+
} finally {
404+
decrementRunning();
405+
}
406+
}
407+
408+
public void run(Runnable runnable) {
409+
incrementRunning();
410+
try {
411+
runnable.run();
412+
} finally {
413+
decrementRunning();
414+
}
415+
}
416+
417+
352418
}

0 commit comments

Comments
 (0)