Skip to content

Commit 71c8032

Browse files
committed
introduced dedicated states for start and finish engine
fixes cases where the CF<ER> is finished before the observer is called for the last time
1 parent 389bf16 commit 71c8032

File tree

4 files changed

+47
-30
lines changed

4 files changed

+47
-30
lines changed

src/main/java/graphql/EngineRunningState.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
import static graphql.Assert.assertTrue;
1616
import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING;
17+
import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING_FINISH;
1718
import static graphql.execution.EngineRunningObserver.RunningState.RUNNING;
19+
import static graphql.execution.EngineRunningObserver.RunningState.RUNNING_START;
1820

1921
@Internal
2022
public class EngineRunningState {
@@ -26,6 +28,9 @@ public class EngineRunningState {
2628
@Nullable
2729
private volatile ExecutionId executionId;
2830

31+
// if true the last decrementRunning() call will be ignored
32+
private volatile boolean finished;
33+
2934
private final AtomicInteger isRunning = new AtomicInteger(0);
3035

3136
@VisibleForTesting
@@ -148,7 +153,7 @@ private void decrementRunning() {
148153
return;
149154
}
150155
assertTrue(isRunning.get() > 0);
151-
if (isRunning.decrementAndGet() == 0) {
156+
if (isRunning.decrementAndGet() == 0 && !finished) {
152157
changeOfState(NOT_RUNNING);
153158
}
154159
}
@@ -193,16 +198,20 @@ private void run(Runnable runnable) {
193198
/**
194199
* Only used once outside of this class: when the execution starts
195200
*/
196-
public <T> T call(Supplier<T> supplier) {
201+
public CompletableFuture<ExecutionResult> engineRun(Supplier<CompletableFuture<ExecutionResult>> engineRun) {
197202
if (engineRunningObserver == null) {
198-
return supplier.get();
199-
}
200-
incrementRunning();
201-
try {
202-
return supplier.get();
203-
} finally {
204-
decrementRunning();
203+
return engineRun.get();
205204
}
205+
isRunning.incrementAndGet();
206+
changeOfState(RUNNING_START);
207+
208+
CompletableFuture<ExecutionResult> erCF = engineRun.get();
209+
erCF = erCF.whenComplete((result, throwable) -> {
210+
finished = true;
211+
changeOfState(NOT_RUNNING_FINISH);
212+
});
213+
decrementRunning();
214+
return erCF;
206215
}
207216

208217

src/main/java/graphql/GraphQL.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ public CompletableFuture<ExecutionResult> executeAsync(UnaryOperator<ExecutionIn
413413
*/
414414
public CompletableFuture<ExecutionResult> executeAsync(ExecutionInput executionInput) {
415415
EngineRunningState engineRunningState = new EngineRunningState(executionInput);
416-
return engineRunningState.call(() -> {
416+
return engineRunningState.engineRun(() -> {
417417
ExecutionInput executionInputWithId = ensureInputHasId(executionInput);
418418
engineRunningState.updateExecutionId(executionInputWithId.getExecutionId());
419419

src/main/java/graphql/execution/EngineRunningObserver.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
public interface EngineRunningObserver {
1515

1616
enum RunningState {
17+
/**
18+
* Represents that the engine is running, for the first time
19+
*/
20+
RUNNING_START,
1721
/**
1822
* Represents that the engine code is actively running its own code
1923
*/
@@ -22,6 +26,10 @@ enum RunningState {
2226
* Represents that the engine code is asynchronously waiting for fetching to happen
2327
*/
2428
NOT_RUNNING,
29+
/**
30+
* Represents that the engine is finished
31+
*/
32+
NOT_RUNNING_FINISH
2533
}
2634

2735

src/test/groovy/graphql/EngineRunningTest.groovy

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import static graphql.ExecutionInput.newExecutionInput
2323
import static graphql.execution.EngineRunningObserver.ENGINE_RUNNING_OBSERVER_KEY
2424
import static graphql.execution.EngineRunningObserver.RunningState
2525
import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING
26+
import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING_FINISH
2627
import static graphql.execution.EngineRunningObserver.RunningState.RUNNING
28+
import static graphql.execution.EngineRunningObserver.RunningState.RUNNING_START
2729

2830
class EngineRunningTest extends Specification {
2931

@@ -70,13 +72,13 @@ class EngineRunningTest extends Specification {
7072
when:
7173
def er = graphQL.executeAsync(ei)
7274
then:
73-
states == [RUNNING, NOT_RUNNING]
75+
states == [RUNNING_START, NOT_RUNNING]
7476

7577
when:
7678
states.clear()
7779
cf.complete(new PreparsedDocumentEntry(document))
7880
then:
79-
states == [RUNNING, NOT_RUNNING]
81+
states == [RUNNING, NOT_RUNNING_FINISH]
8082
er.get().data == [hello: "world"]
8183

8284

@@ -114,13 +116,13 @@ class EngineRunningTest extends Specification {
114116
when:
115117
def er = graphQL.executeAsync(ei)
116118
then:
117-
states == [RUNNING, NOT_RUNNING]
119+
states == [RUNNING_START, NOT_RUNNING]
118120

119121
when:
120122
states.clear()
121123
cf.complete(new InstrumentationState() {})
122124
then:
123-
states == [RUNNING, NOT_RUNNING]
125+
states == [RUNNING, NOT_RUNNING_FINISH]
124126
er.get().data == [hello: "world"]
125127

126128

@@ -158,14 +160,14 @@ class EngineRunningTest extends Specification {
158160
when:
159161
def er = graphQL.executeAsync(ei)
160162
then:
161-
states == [RUNNING, NOT_RUNNING]
163+
states == [RUNNING_START, NOT_RUNNING]
162164

163165
when:
164166
states.clear()
165167
cf.complete(ExecutionResultImpl.newExecutionResult().data([hello: "world-modified"]).build())
166168
then:
167169
er.get().data == [hello: "world-modified"]
168-
states == [RUNNING, NOT_RUNNING]
170+
states == [RUNNING, NOT_RUNNING_FINISH]
169171

170172

171173
}
@@ -195,7 +197,7 @@ class EngineRunningTest extends Specification {
195197
def er = graphQL.execute(ei)
196198
then:
197199
er.data == [hello: "world"]
198-
states == [RUNNING, NOT_RUNNING]
200+
states == [RUNNING_START, NOT_RUNNING_FINISH]
199201
}
200202

201203
def "multiple async DF"() {
@@ -251,7 +253,7 @@ class EngineRunningTest extends Specification {
251253
when:
252254
def er = graphQL.executeAsync(ei)
253255
then:
254-
states == [RUNNING, NOT_RUNNING]
256+
states == [RUNNING_START, NOT_RUNNING]
255257

256258
when:
257259
states.clear();
@@ -270,7 +272,7 @@ class EngineRunningTest extends Specification {
270272
states.clear()
271273
cf2.complete("world2")
272274
then:
273-
states == [RUNNING, NOT_RUNNING]
275+
states == [RUNNING, NOT_RUNNING_FINISH]
274276
er.get().data == [hello: "world", hello2: "world2", foo: [name: "FooName"], someStaticValue: [staticValue: "staticValue"]]
275277
}
276278

@@ -299,14 +301,14 @@ class EngineRunningTest extends Specification {
299301
when:
300302
def er = graphQL.executeAsync(ei)
301303
then:
302-
states == [RUNNING, NOT_RUNNING]
304+
states == [RUNNING_START, NOT_RUNNING]
303305

304306
when:
305307
states.clear();
306308
cf.complete("world")
307309

308310
then:
309-
states == [RUNNING, NOT_RUNNING]
311+
states == [RUNNING, NOT_RUNNING_FINISH]
310312
er.get().data == [hello: "world"]
311313
}
312314

@@ -334,15 +336,15 @@ class EngineRunningTest extends Specification {
334336
when:
335337
def er = graphQL.executeAsync(ei)
336338
then:
337-
states == [RUNNING, NOT_RUNNING]
339+
states == [RUNNING_START, NOT_RUNNING]
338340

339341
when:
340342
states.clear();
341343
cf.complete("world")
342344

343345
then:
344346
er.get().data == [hello: "world"]
345-
states == [RUNNING, NOT_RUNNING]
347+
states == [RUNNING, NOT_RUNNING_FINISH]
346348
}
347349

348350

@@ -387,8 +389,7 @@ class EngineRunningTest extends Specification {
387389

388390
then:
389391
result.errors.collect { it.message } == ["recovered"]
390-
// we expect simply going from running to finshed
391-
states == [RUNNING, NOT_RUNNING]
392+
states == [RUNNING, NOT_RUNNING_FINISH]
392393
}
393394

394395
def "async datafetcher failing with async exception handler"() {
@@ -429,7 +430,7 @@ class EngineRunningTest extends Specification {
429430
def er = graphQL.executeAsync(ei)
430431

431432
then:
432-
states == [RUNNING, NOT_RUNNING]
433+
states == [RUNNING_START, NOT_RUNNING]
433434

434435
when:
435436
states.clear()
@@ -445,8 +446,7 @@ class EngineRunningTest extends Specification {
445446

446447
then:
447448
result.errors.collect { it.message } == ["recovered"]
448-
// we expect simply going from running to finshed
449-
new ArrayList<>(states) == [RUNNING, NOT_RUNNING]
449+
states == [RUNNING, NOT_RUNNING_FINISH]
450450
}
451451

452452

@@ -480,7 +480,7 @@ class EngineRunningTest extends Specification {
480480
when:
481481
def er = graphQL.executeAsync(ei)
482482
then:
483-
states == [RUNNING, NOT_RUNNING]
483+
states == [RUNNING_START, NOT_RUNNING]
484484

485485
when:
486486
states.clear();
@@ -494,7 +494,7 @@ class EngineRunningTest extends Specification {
494494
cf2.complete("world2")
495495

496496
then:
497-
states == [RUNNING, NOT_RUNNING]
497+
states == [RUNNING, NOT_RUNNING_FINISH]
498498
er.get().data == [hello: "world", hello2: "world2"]
499499
}
500500
}

0 commit comments

Comments
 (0)