Skip to content

Commit d70aaf7

Browse files
authored
add back PubSub samples (#1540)
* add back PubSub samples
1 parent 1f68932 commit d70aaf7

7 files changed

Lines changed: 147 additions & 10 deletions

File tree

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2017 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.examples.pubsub.snippets;
18+
19+
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
20+
import com.google.cloud.pubsub.spi.v1.Subscriber;
21+
import com.google.cloud.pubsub.spi.v1.SubscriberClient;
22+
import com.google.common.util.concurrent.Futures;
23+
import com.google.common.util.concurrent.ListenableFuture;
24+
import com.google.common.util.concurrent.MoreExecutors;
25+
import com.google.common.util.concurrent.Service;
26+
import com.google.pubsub.v1.PubsubMessage;
27+
import com.google.pubsub.v1.PushConfig;
28+
import com.google.pubsub.v1.SubscriptionName;
29+
import com.google.pubsub.v1.TopicName;
30+
31+
/**
32+
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull subscription and
33+
* asynchronously pull messages from it.
34+
*/
35+
public class CreateSubscriptionAndPullMessages {
36+
37+
public static void main(String... args) throws Exception {
38+
TopicName topic = TopicName.create("test-project", "test-topic");
39+
SubscriptionName subscription = SubscriptionName.create("test-project", "test-subscription");
40+
41+
try (SubscriberClient subscriberClient = SubscriberClient.create()) {
42+
subscriberClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
43+
}
44+
45+
MessageReceiver receiver =
46+
new MessageReceiver() {
47+
@Override
48+
public ListenableFuture<MessageReceiver.AckReply> receiveMessage(PubsubMessage message) {
49+
System.out.println("got message: " + message.getData().toStringUtf8());
50+
return Futures.immediateFuture(MessageReceiver.AckReply.ACK);
51+
}
52+
};
53+
Subscriber subscriber = null;
54+
try {
55+
subscriber = Subscriber.Builder.newBuilder(subscription, receiver).build();
56+
subscriber.addListener(
57+
new Service.Listener() {
58+
@Override
59+
public void failed(Service.State from, Throwable failure) {
60+
System.err.println(failure);
61+
}
62+
},
63+
MoreExecutors.directExecutor());
64+
subscriber.startAsync().awaitRunning();
65+
Thread.sleep(60000);
66+
} finally {
67+
if (subscriber != null) {
68+
subscriber.stopAsync();
69+
}
70+
}
71+
}
72+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2017 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.examples.pubsub.snippets;
18+
19+
import com.google.cloud.pubsub.spi.v1.Publisher;
20+
import com.google.cloud.pubsub.spi.v1.PublisherClient;
21+
import com.google.common.util.concurrent.Futures;
22+
import com.google.common.util.concurrent.ListenableFuture;
23+
import com.google.protobuf.ByteString;
24+
import com.google.pubsub.v1.PubsubMessage;
25+
import com.google.pubsub.v1.TopicName;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.List;
29+
30+
/**
31+
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub topic and asynchronously
32+
* publish messages to it.
33+
*/
34+
public class CreateTopicAndPublishMessages {
35+
public static void main(String... args) throws Exception {
36+
TopicName topic = TopicName.create("test-project", "test-topic");
37+
try (PublisherClient publisherClient = PublisherClient.create()) {
38+
publisherClient.createTopic(topic);
39+
}
40+
41+
Publisher publisher = null;
42+
try {
43+
publisher = Publisher.Builder.newBuilder(topic).build();
44+
List<String> messages = Arrays.asList("first message", "second message");
45+
List<ListenableFuture<String>> messageIds = new ArrayList<>();
46+
for (String message : messages) {
47+
ByteString data = ByteString.copyFromUtf8(message);
48+
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
49+
ListenableFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
50+
messageIds.add(messageIdFuture);
51+
}
52+
for (String messageId : Futures.allAsList(messageIds).get()) {
53+
System.out.println("published with message ID: " + messageId);
54+
}
55+
} finally {
56+
if (publisher != null) {
57+
publisher.shutdown();
58+
}
59+
}
60+
}
61+
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.google.pubsub.v1.PubsubMessage;
2121

2222
/** Users of the {@link Subscriber} must implement this interface to receive messages. */
23-
interface MessageReceiver {
23+
public interface MessageReceiver {
2424
enum AckReply {
2525
/** To be used for acking a message. */
2626
ACK,

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.base.Preconditions;
2626
import com.google.common.util.concurrent.ListenableFuture;
2727
import com.google.pubsub.v1.PubsubMessage;
28+
import com.google.pubsub.v1.TopicName;
2829
import io.grpc.ManagedChannelBuilder;
2930
import java.io.IOException;
3031
import java.util.concurrent.ScheduledExecutorService;
@@ -191,8 +192,8 @@ public final class Builder {
191192
Optional<ScheduledExecutorService> executor = Optional.absent();
192193

193194
/** Constructs a new {@link Builder} using the given topic. */
194-
public static Builder newBuilder(String topic) {
195-
return new Builder(topic);
195+
public static Builder newBuilder(TopicName topic) {
196+
return new Builder(topic.toString());
196197
}
197198

198199
Builder(String topic) {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.common.util.concurrent.AbstractService;
3030
import com.google.common.util.concurrent.Service;
3131
import com.google.common.util.concurrent.ThreadFactoryBuilder;
32+
import com.google.pubsub.v1.SubscriptionName;
3233
import io.grpc.ManagedChannelBuilder;
3334
import io.grpc.Status;
3435
import io.grpc.StatusRuntimeException;
@@ -401,8 +402,8 @@ public static final class Builder {
401402
* @param receiver an implementation of {@link MessageReceiver} used to process the received
402403
* messages
403404
*/
404-
public static Builder newBuilder(String subscription, MessageReceiver receiver) {
405-
return new Builder(subscription, receiver);
405+
public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) {
406+
return new Builder(subscription.toString(), receiver);
406407
}
407408

408409
Builder(String subscription, MessageReceiver receiver) {

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.pubsub.v1.PublishRequest;
3333
import com.google.pubsub.v1.PublishResponse;
3434
import com.google.pubsub.v1.PubsubMessage;
35+
import com.google.pubsub.v1.TopicName;
3536
import io.grpc.Status;
3637
import io.grpc.StatusException;
3738
import io.grpc.inprocess.InProcessChannelBuilder;
@@ -57,7 +58,7 @@
5758
@RunWith(JUnit4.class)
5859
public class PublisherImplTest {
5960

60-
private static final String TEST_TOPIC = "projects/test-project/topics/test-topic";
61+
private static final TopicName TEST_TOPIC = TopicName.create("test-project", "test-topic");
6162

6263
private static InProcessChannelBuilder testChannelBuilder;
6364

@@ -360,7 +361,7 @@ public void testPublisherGetters() throws Exception {
360361
.build());
361362
Publisher publisher = builder.build();
362363

363-
assertEquals(TEST_TOPIC, publisher.getTopic());
364+
assertEquals(TEST_TOPIC.toString(), publisher.getTopic());
364365
assertEquals(10, publisher.getMaxBundleBytes());
365366
assertEquals(new Duration(11), publisher.getMaxBundleDuration());
366367
assertEquals(12, publisher.getMaxBundleMessages());
@@ -372,7 +373,7 @@ public void testPublisherGetters() throws Exception {
372373
@Test
373374
public void testBuilderParametersAndDefaults() {
374375
Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
375-
assertEquals(TEST_TOPIC, builder.topic);
376+
assertEquals(TEST_TOPIC.toString(), builder.topic);
376377
assertEquals(Optional.absent(), builder.channelBuilder);
377378
assertEquals(Optional.absent(), builder.executor);
378379
assertFalse(builder.failOnFlowControlLimits);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.pubsub.v1.PullResponse;
3434
import com.google.pubsub.v1.ReceivedMessage;
3535
import com.google.pubsub.v1.StreamingPullResponse;
36+
import com.google.pubsub.v1.SubscriptionName;
3637
import io.grpc.Status;
3738
import io.grpc.StatusException;
3839
import io.grpc.StatusRuntimeException;
@@ -63,8 +64,8 @@
6364
@RunWith(Parameterized.class)
6465
public class SubscriberImplTest {
6566

66-
private static final String TEST_SUBSCRIPTION =
67-
"projects/test-project/subscriptions/test-subscription";
67+
private static final SubscriptionName TEST_SUBSCRIPTION =
68+
SubscriptionName.create("test-project", "test-subscription");
6869

6970
private static final PubsubMessage TEST_MESSAGE =
7071
PubsubMessage.newBuilder().setMessageId("1").build();

0 commit comments

Comments
 (0)