Skip to content

Commit 2b8154d

Browse files
committed
fix race in TestReceiver
If TestReceiver is set so that messages must be explicitly acked, test code acks a message by calling replyNextOutstandingMessage. There is a race between calling the function and the message being delivered. Previously, if the function gets called before the message is delivered, the test fails since polling an empty deque returns a null pointer. This commit makes us wait until a message becomes available instead.
1 parent 1c80923 commit 2b8154d

1 file changed

Lines changed: 26 additions & 18 deletions

File tree

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,10 @@
4343
import java.util.ArrayList;
4444
import java.util.Arrays;
4545
import java.util.Collection;
46-
import java.util.Deque;
4746
import java.util.List;
48-
import java.util.concurrent.ConcurrentLinkedDeque;
4947
import java.util.concurrent.CountDownLatch;
5048
import java.util.concurrent.Executors;
49+
import java.util.concurrent.LinkedBlockingQueue;
5150
import org.joda.time.Duration;
5251
import org.junit.After;
5352
import org.junit.Before;
@@ -86,8 +85,8 @@ public static Collection<Object[]> data() {
8685
private TestReceiver testReceiver;
8786

8887
static class TestReceiver implements MessageReceiver {
89-
private final Deque<SettableFuture<AckReply>> outstandingMessageReplies =
90-
new ConcurrentLinkedDeque<>();
88+
private final LinkedBlockingQueue<SettableFuture<AckReply>> outstandingMessageReplies =
89+
new LinkedBlockingQueue<>();
9190
private AckReply ackReply = AckReply.ACK;
9291
private Optional<CountDownLatch> messageCountLatch = Optional.absent();
9392
private Optional<Throwable> error = Optional.absent();
@@ -123,34 +122,43 @@ public ListenableFuture<AckReply> receiveMessage(PubsubMessage message) {
123122
SettableFuture<AckReply> reply = SettableFuture.create();
124123

125124
if (explicitAckReplies) {
126-
outstandingMessageReplies.add(reply);
127-
} else {
128-
if (error.isPresent()) {
129-
reply.setException(error.get());
130-
} else {
131-
reply.set(ackReply);
125+
try {
126+
outstandingMessageReplies.put(reply);
127+
} catch (InterruptedException e) {
128+
throw new IllegalStateException(e);
132129
}
130+
} else {
131+
replyTo(reply);
133132
}
134133

135134
return reply;
136135
}
137136

138137
public void replyNextOutstandingMessage() {
139138
Preconditions.checkState(explicitAckReplies);
140-
141-
SettableFuture<AckReply> reply = outstandingMessageReplies.poll();
142-
if (error.isPresent()) {
143-
reply.setException(error.get());
144-
} else {
145-
reply.set(ackReply);
139+
try {
140+
replyTo(outstandingMessageReplies.take());
141+
} catch (InterruptedException e) {
142+
throw new IllegalStateException(e);
146143
}
147144
}
148145

149146
public void replyAllOutstandingMessage() {
150147
Preconditions.checkState(explicitAckReplies);
148+
for (; ; ) {
149+
SettableFuture<AckReply> reply = outstandingMessageReplies.poll();
150+
if (reply == null) {
151+
return;
152+
}
153+
replyTo(reply);
154+
}
155+
}
151156

152-
while (!outstandingMessageReplies.isEmpty()) {
153-
replyNextOutstandingMessage();
157+
private void replyTo(SettableFuture<AckReply> reply) {
158+
if (error.isPresent()) {
159+
reply.setException(error.get());
160+
} else {
161+
reply.set(ackReply);
154162
}
155163
}
156164
}

0 commit comments

Comments
 (0)