Skip to content

Commit bd4a098

Browse files
committed
Added test for subscriptions - checking publisher and how many is got asked for
1 parent 033ba93 commit bd4a098

2 files changed

Lines changed: 13 additions & 2 deletions

File tree

src/test/groovy/graphql/execution/SubscriptionExecutionStrategyTest.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ class SubscriptionExecutionStrategyTest extends Specification {
712712
713713
def "we can cancel the operation and the upstream publisher is told"() {
714714
List<Runnable> promises = []
715-
Publisher<Object> publisher = new RxJavaMessagePublisher(10)
715+
RxJavaMessagePublisher publisher = new RxJavaMessagePublisher(10)
716716
717717
DataFetcher newMessageDF = { env -> return publisher }
718718
DataFetcher senderDF = dfThatDoesNotComplete("sender", promises)
@@ -750,6 +750,7 @@ class SubscriptionExecutionStrategyTest extends Specification {
750750
messages.size() == 1
751751
def error = messages[0].errors[0]
752752
assert error.message == "Execution has been asked to be cancelled"
753+
publisher.counter == 2
753754
}
754755
755756
private static DataFetcher<?> dfThatDoesNotComplete(String propertyName, List<Runnable> promises) {

src/test/groovy/graphql/execution/pubsub/RxJavaMessagePublisher.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,27 @@
44
import org.reactivestreams.Publisher;
55
import org.reactivestreams.Subscriber;
66

7+
import java.util.concurrent.atomic.AtomicInteger;
8+
79
/**
810
* This example publisher will create count "messages" and then terminate. Its
911
* uses tRxJava Flowable as a backing publisher
1012
*/
1113
public class RxJavaMessagePublisher implements Publisher<Message> {
1214

1315
private final Flowable<Message> flowable;
16+
private final AtomicInteger counter = new AtomicInteger();
1417

1518
public RxJavaMessagePublisher(final int count) {
1619
flowable = Flowable.range(0, count)
17-
.map(at -> examineMessage(new Message("sender" + at, "text" + at), at));
20+
.map(at -> {
21+
counter.incrementAndGet();
22+
return examineMessage(new Message("sender" + at, "text" + at), at);
23+
});
24+
}
25+
26+
public int getCounter() {
27+
return counter.get();
1828
}
1929

2030
@Override

0 commit comments

Comments
 (0)