Skip to content

Commit df11f4b

Browse files
authored
---
yaml --- r: 8493 b: refs/heads/master c: 04c183e h: refs/heads/master i: 8491: b02ebad
1 parent dc5a38d commit df11f4b

3 files changed

Lines changed: 57 additions & 319 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: 4a5156a87a7052ddc7b1ce95b62fc0f45f91b094
2+
refs/heads/master: 04c183ec9d98ff84bda558ea98576ad8e32d4505
33
refs/heads/travis: 47e4fee4fd5af9b2a8ce46f23c72ec95f9b195b2
44
refs/heads/gh-pages: 3e16a39145437096333db5811e5c0292719c1823
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616

1717
package com.google.cloud.pubsub.it;
1818

19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertTrue;
19+
import static com.google.common.truth.Truth.assertThat;
2120

22-
import com.google.api.core.SettableApiFuture;
21+
import com.google.auto.value.AutoValue;
2322
import com.google.cloud.ServiceOptions;
2423
import com.google.cloud.pubsub.v1.AckReplyConsumer;
2524
import com.google.cloud.pubsub.v1.MessageReceiver;
@@ -38,6 +37,9 @@
3837
import java.util.Collections;
3938
import java.util.List;
4039
import java.util.UUID;
40+
import java.util.concurrent.BlockingQueue;
41+
import java.util.concurrent.LinkedBlockingQueue;
42+
import java.util.concurrent.TimeUnit;
4143
import org.junit.AfterClass;
4244
import org.junit.BeforeClass;
4345
import org.junit.Rule;
@@ -53,6 +55,17 @@ public class ITPubSubTest {
5355

5456
@Rule public Timeout globalTimeout = Timeout.seconds(300);
5557

58+
@AutoValue
59+
abstract static class MessageAndConsumer {
60+
abstract PubsubMessage message();
61+
62+
abstract AckReplyConsumer consumer();
63+
64+
static MessageAndConsumer create(PubsubMessage message, AckReplyConsumer consumer) {
65+
return new AutoValue_ITPubSubTest_MessageAndConsumer(message, consumer);
66+
}
67+
}
68+
5669
@BeforeClass
5770
public static void setupClass() throws Exception {
5871
topicAdminClient = TopicAdminClient.create();
@@ -81,14 +94,14 @@ public void testTopicPolicy() {
8194
Policy newPolicy =
8295
topicAdminClient.setIamPolicy(
8396
topicName.toString(), policy.toBuilder().addBindings(binding).build());
84-
assertTrue(newPolicy.getBindingsList().contains(binding));
97+
assertThat(newPolicy.getBindingsList()).contains(binding);
8598

8699
String permissionName = "pubsub.topics.get";
87100
List<String> permissions =
88101
topicAdminClient
89102
.testIamPermissions(topicName.toString(), Collections.singletonList(permissionName))
90103
.getPermissionsList();
91-
assertTrue(permissions.contains(permissionName));
104+
assertThat(permissions).contains(permissionName);
92105

93106
topicAdminClient.deleteTopic(topicName);
94107
}
@@ -103,41 +116,68 @@ public void testPublishSubscribe() throws Exception {
103116
topicAdminClient.createTopic(topicName);
104117
subscriptionAdminClient.createSubscription(
105118
subscriptionName, topicName, PushConfig.newBuilder().build(), 10);
106-
PubsubMessage message =
107-
PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("my message")).build();
108119

109-
final SettableApiFuture<PubsubMessage> received = SettableApiFuture.create();
120+
final BlockingQueue<Object> receiveQueue = new LinkedBlockingQueue<>();
110121
Subscriber subscriber =
111122
Subscriber.newBuilder(
112123
subscriptionName,
113124
new MessageReceiver() {
114125
@Override
115126
public void receiveMessage(
116127
final PubsubMessage message, final AckReplyConsumer consumer) {
117-
if (received.set(message)) {
118-
consumer.ack();
119-
} else {
120-
consumer.nack();
121-
}
128+
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
122129
}
123130
})
124131
.build();
125132
subscriber.addListener(
126133
new Subscriber.Listener() {
127134
public void failed(Subscriber.State from, Throwable failure) {
128-
received.setException(failure);
135+
receiveQueue.offer(failure);
129136
}
130137
},
131138
MoreExecutors.directExecutor());
132139
subscriber.startAsync();
133140

134141
Publisher publisher = Publisher.newBuilder(topicName).build();
135-
publisher.publish(message).get();
142+
publisher
143+
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg1")).build())
144+
.get();
145+
publisher
146+
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build())
147+
.get();
136148
publisher.shutdown();
137149

138-
assertEquals(received.get().getData(), message.getData());
150+
// Ack the first message.
151+
MessageAndConsumer toAck = pollQueue(receiveQueue);
152+
toAck.consumer().ack();
153+
154+
// Nack the other.
155+
MessageAndConsumer toNack = pollQueue(receiveQueue);
156+
assertThat(toNack.message().getData()).isNotEqualTo(toAck.message().getData());
157+
toNack.consumer().nack();
158+
159+
// We should get the nacked message back.
160+
MessageAndConsumer redelivered = pollQueue(receiveQueue);
161+
assertThat(redelivered.message().getData()).isEqualTo(toNack.message().getData());
162+
redelivered.consumer().ack();
163+
139164
subscriber.stopAsync().awaitTerminated();
140165
subscriptionAdminClient.deleteSubscription(subscriptionName);
141166
topicAdminClient.deleteTopic(topicName);
142167
}
168+
169+
private MessageAndConsumer pollQueue(BlockingQueue<Object> queue) throws InterruptedException {
170+
Object obj = queue.poll(1, TimeUnit.MINUTES);
171+
if (obj == null) {
172+
return null;
173+
}
174+
if (obj instanceof Throwable) {
175+
throw new IllegalStateException("unexpected error", (Throwable) obj);
176+
}
177+
if (obj instanceof MessageAndConsumer) {
178+
return (MessageAndConsumer) obj;
179+
}
180+
throw new IllegalStateException(
181+
"expected either MessageAndConsumer or Throwable, found: " + obj);
182+
}
143183
}

0 commit comments

Comments
 (0)