Skip to content

Commit c68968b

Browse files
authored
fix race in TestReceiver (#1558)
* fix race in TestReceiver If TestReceiver is set so that messages must be explicitly acked, test code acks a message by calling replyNextOutstandingMessage. There is a race between calling the function and the message being delivered. Previously, if the function gets called before the message is delivered, the test fails since polling an empty deque returns a null pointer. This commit makes us wait until a message becomes available instead.
1 parent 2c858ee commit c68968b

1 file changed

Lines changed: 23 additions & 18 deletions

File tree

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@
4545
import java.util.ArrayList;
4646
import java.util.Arrays;
4747
import java.util.Collection;
48-
import java.util.Deque;
4948
import java.util.List;
50-
import java.util.concurrent.ConcurrentLinkedDeque;
5149
import java.util.concurrent.CountDownLatch;
50+
import java.util.concurrent.LinkedBlockingQueue;
5251
import org.joda.time.Duration;
5352
import org.junit.After;
5453
import org.junit.Before;
@@ -87,8 +86,8 @@ public static Collection<Object[]> data() {
8786
private TestReceiver testReceiver;
8887

8988
static class TestReceiver implements MessageReceiver {
90-
private final Deque<SettableFuture<AckReply>> outstandingMessageReplies =
91-
new ConcurrentLinkedDeque<>();
89+
private final LinkedBlockingQueue<SettableFuture<AckReply>> outstandingMessageReplies =
90+
new LinkedBlockingQueue<>();
9291
private AckReply ackReply = AckReply.ACK;
9392
private Optional<CountDownLatch> messageCountLatch = Optional.absent();
9493
private Optional<Throwable> error = Optional.absent();
@@ -124,34 +123,40 @@ public ListenableFuture<AckReply> receiveMessage(PubsubMessage message) {
124123
SettableFuture<AckReply> reply = SettableFuture.create();
125124

126125
if (explicitAckReplies) {
127-
outstandingMessageReplies.add(reply);
128-
} else {
129-
if (error.isPresent()) {
130-
reply.setException(error.get());
131-
} else {
132-
reply.set(ackReply);
126+
try {
127+
outstandingMessageReplies.put(reply);
128+
} catch (InterruptedException e) {
129+
throw new IllegalStateException(e);
133130
}
131+
} else {
132+
replyTo(reply);
134133
}
135134

136135
return reply;
137136
}
138137

139138
public void replyNextOutstandingMessage() {
140139
Preconditions.checkState(explicitAckReplies);
141-
142-
SettableFuture<AckReply> reply = outstandingMessageReplies.poll();
143-
if (error.isPresent()) {
144-
reply.setException(error.get());
145-
} else {
146-
reply.set(ackReply);
140+
try {
141+
replyTo(outstandingMessageReplies.take());
142+
} catch (InterruptedException e) {
143+
throw new IllegalStateException(e);
147144
}
148145
}
149146

150147
public void replyAllOutstandingMessage() {
151148
Preconditions.checkState(explicitAckReplies);
149+
SettableFuture<AckReply> reply;
150+
while ((reply = outstandingMessageReplies.poll()) != null) {
151+
replyTo(reply);
152+
}
153+
}
152154

153-
while (!outstandingMessageReplies.isEmpty()) {
154-
replyNextOutstandingMessage();
155+
private void replyTo(SettableFuture<AckReply> reply) {
156+
if (error.isPresent()) {
157+
reply.setException(error.get());
158+
} else {
159+
reply.set(ackReply);
155160
}
156161
}
157162
}

0 commit comments

Comments
 (0)