Skip to content

Commit 365a0f7

Browse files
authored
add test for Publisher and Subscriber snippets (#1658)
1 parent ba10fac commit 365a0f7

3 files changed

Lines changed: 92 additions & 3 deletions

File tree

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.google.cloud.examples.pubsub;
17+
package com.google.cloud.examples.pubsub.snippets;
1818

1919
import com.google.api.gax.core.RpcFuture;
2020
import com.google.api.gax.core.RpcFutureCallback;
@@ -35,7 +35,7 @@ public PublisherSnippets(Publisher publisher) {
3535
*/
3636
// [TARGET publish(PubsubMessage)]
3737
// [VARIABLE "my_message"]
38-
public void publish(String message) {
38+
public RpcFuture<String> publish(String message) {
3939
// [START publish]
4040
ByteString data = ByteString.copyFromUtf8(message);
4141
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
@@ -50,6 +50,7 @@ public void onFailure(Throwable t) {
5050
}
5151
});
5252
// [END publish]
53+
return messageIdFuture;
5354
}
5455

5556
/**

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public SubscriberSnippets(
5050
* Example of receiving a specific number of messages.
5151
*/
5252
// [TARGET startAsync()]
53-
public void startAsync() throws Exception {
53+
public void startAndWait() throws Exception {
5454
// [START startAsync]
5555
Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
5656
subscriber.addListener(new Subscriber.SubscriberListener() {

google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.junit.Assert.assertNull;
2222
import static org.junit.Assert.assertTrue;
2323

24+
import com.google.api.gax.core.RpcFutureCallback;
25+
import com.google.api.gax.core.SettableRpcFuture;
2426
import com.google.cloud.Identity;
2527
import com.google.cloud.Page;
2628
import com.google.cloud.Policy;
@@ -33,14 +35,25 @@
3335
import com.google.cloud.pubsub.deprecated.SubscriptionInfo;
3436
import com.google.cloud.pubsub.deprecated.Topic;
3537
import com.google.cloud.pubsub.deprecated.TopicInfo;
38+
import com.google.cloud.pubsub.spi.v1.Publisher;
39+
import com.google.cloud.pubsub.spi.v1.PublisherClient;
40+
import com.google.cloud.pubsub.spi.v1.SubscriberClient;
3641
import com.google.common.collect.Iterators;
3742
import com.google.common.collect.Sets;
43+
import com.google.common.util.concurrent.MoreExecutors;
44+
import com.google.pubsub.v1.PubsubMessage;
45+
import com.google.pubsub.v1.PushConfig;
46+
import com.google.pubsub.v1.SubscriptionName;
47+
import com.google.pubsub.v1.TopicName;
3848
import java.util.HashSet;
3949
import java.util.Iterator;
4050
import java.util.List;
4151
import java.util.Set;
4252
import java.util.UUID;
53+
import java.util.concurrent.ArrayBlockingQueue;
54+
import java.util.concurrent.BlockingQueue;
4355
import java.util.concurrent.ExecutionException;
56+
import java.util.concurrent.TimeUnit;
4457
import org.junit.AfterClass;
4558
import org.junit.BeforeClass;
4659
import org.junit.Rule;
@@ -264,4 +277,79 @@ public void testTopicPolicyAsync() throws ExecutionException, InterruptedExcepti
264277
topic.delete();
265278
subscription.delete();
266279
}
280+
281+
@Test
282+
public void testPublisherSubscriber() throws Exception {
283+
TopicName topicName =
284+
TopicName.create(pubsub.getOptions().getProjectId(), formatForTest("test-topic"));
285+
SubscriptionName subscriptionName =
286+
SubscriptionName.create(
287+
pubsub.getOptions().getProjectId(), formatForTest("test-subscription"));
288+
try (PublisherClient publisherClient = PublisherClient.create();
289+
SubscriberClient subscriberClient = SubscriberClient.create()) {
290+
publisherClient.createTopic(topicName);
291+
subscriberClient.createSubscription(
292+
subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
293+
294+
testPublisherSubscriberHelper(topicName, subscriptionName);
295+
296+
subscriberClient.deleteSubscription(subscriptionName);
297+
publisherClient.deleteTopic(topicName);
298+
}
299+
}
300+
301+
private void testPublisherSubscriberHelper(
302+
TopicName topicName, SubscriptionName subscriptionName) throws Exception {
303+
String messageToPublish = "my-message";
304+
305+
Publisher publisher = null;
306+
try {
307+
publisher = Publisher.newBuilder(topicName).build();
308+
PublisherSnippets snippets = new PublisherSnippets(publisher);
309+
final SettableRpcFuture<Void> done = new SettableRpcFuture<>();
310+
snippets
311+
.publish(messageToPublish)
312+
.addCallback(
313+
new RpcFutureCallback<String>() {
314+
public void onSuccess(String messageId) {
315+
done.set(null);
316+
}
317+
318+
public void onFailure(Throwable t) {
319+
done.setException(t);
320+
}
321+
});
322+
done.get();
323+
} finally {
324+
if (publisher != null) {
325+
publisher.shutdown();
326+
}
327+
}
328+
329+
final BlockingQueue<PubsubMessage> queue = new ArrayBlockingQueue<>(1);
330+
final SettableRpcFuture<Void> done = new SettableRpcFuture<>();
331+
final SettableRpcFuture<PubsubMessage> received = new SettableRpcFuture<>();
332+
SubscriberSnippets snippets =
333+
new SubscriberSnippets(
334+
subscriptionName,
335+
new MessageReceiverSnippets(queue).messageReceiver(),
336+
done,
337+
MoreExecutors.directExecutor());
338+
new Thread(new Runnable() {
339+
@Override
340+
public void run() {
341+
try {
342+
received.set(queue.poll(10, TimeUnit.MINUTES));
343+
} catch (InterruptedException e) {
344+
received.set(null);
345+
}
346+
done.set(null); // signal the subscriber to clean up
347+
}
348+
}).start();
349+
snippets.startAndWait(); // blocks until done is set
350+
351+
PubsubMessage message = received.get();
352+
assertNotNull(message);
353+
assertEquals(message.getData().toStringUtf8(), messageToPublish);
354+
}
267355
}

0 commit comments

Comments
 (0)