Skip to content

Commit 339282d

Browse files
authored
Add Pub/Sub emulator, synchronous pull snippets (#2445)
* emulator snippet * removing use of deprecated method * codacy fix * fixing codacy bot issues * adding missing license
1 parent d3cbdc5 commit 339282d

3 files changed

Lines changed: 210 additions & 43 deletions

File tree

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java

Lines changed: 85 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,34 @@
2828
import com.google.api.gax.core.ExecutorProvider;
2929
import com.google.api.gax.core.FixedCredentialsProvider;
3030
import com.google.api.gax.core.InstantiatingExecutorProvider;
31-
import com.google.api.gax.grpc.ChannelProvider;
3231
import com.google.auth.oauth2.ServiceAccountCredentials;
3332
import com.google.cloud.pubsub.v1.AckReplyConsumer;
3433
import com.google.cloud.pubsub.v1.MessageReceiver;
3534
import com.google.cloud.pubsub.v1.Subscriber;
36-
import com.google.cloud.pubsub.v1.TopicAdminSettings;
35+
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
36+
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
37+
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
3738
import com.google.common.util.concurrent.MoreExecutors;
39+
import com.google.pubsub.v1.AcknowledgeRequest;
3840
import com.google.pubsub.v1.PubsubMessage;
41+
import com.google.pubsub.v1.PullRequest;
42+
import com.google.pubsub.v1.PullResponse;
43+
import com.google.pubsub.v1.ReceivedMessage;
3944
import com.google.pubsub.v1.SubscriptionName;
4045
import java.io.FileInputStream;
46+
import java.util.ArrayList;
47+
import java.util.List;
4148
import java.util.concurrent.Executor;
4249

4350
/** This class contains snippets for the {@link Subscriber} interface. */
4451
public class SubscriberSnippets {
4552

4653
private final SubscriptionName subscriptionName;
54+
4755
private final MessageReceiver receiver;
56+
4857
private final ApiFuture<Void> done;
58+
4959
private final Executor executor;
5060

5161
public SubscriberSnippets(
@@ -62,11 +72,13 @@ public SubscriberSnippets(
6272
// [TARGET startAsync()]
6373
public void startAndWait() throws Exception {
6474
Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).build();
65-
subscriber.addListener(new Subscriber.Listener() {
66-
public void failed(Subscriber.State from, Throwable failure) {
67-
// Handle error.
68-
}
69-
}, executor);
75+
subscriber.addListener(
76+
new Subscriber.Listener() {
77+
public void failed(Subscriber.State from, Throwable failure) {
78+
// Handle error.
79+
}
80+
},
81+
executor);
7082
subscriber.startAsync();
7183

7284
// Wait for a stop signal.
@@ -81,7 +93,8 @@ private void createSubscriber() throws Exception {
8193

8294
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
8395
// Instantiate an asynchronous message receiver
84-
MessageReceiver receiver = new MessageReceiver() {
96+
MessageReceiver receiver =
97+
new MessageReceiver() {
8598
@Override
8699
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
87100
// handle incoming message, then ack/nack the received message
@@ -108,53 +121,94 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
108121

109122
private Subscriber createSubscriberWithErrorListener(Subscriber subscriber) throws Exception {
110123
// [START pubsub_subscriber_error_listener]
111-
subscriber.addListener(new Subscriber.Listener() {
112-
public void failed(Subscriber.State from, Throwable failure) {
113-
// Handle error.
114-
}
115-
}, MoreExecutors.directExecutor());
124+
subscriber.addListener(
125+
new Subscriber.Listener() {
126+
public void failed(Subscriber.State from, Throwable failure) {
127+
// Handle error.
128+
}
129+
},
130+
MoreExecutors.directExecutor());
116131
// [END pubsub_subscriber_error_listener]
117132
return subscriber;
118133
}
119134

120135
private Subscriber createSingleThreadedSubscriber() throws Exception {
121136
// [START pubsub_subscriber_single_threaded]
122137
// provide a separate executor service for polling
123-
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
124-
.setExecutorThreadCount(1).build();
138+
ExecutorProvider executorProvider =
139+
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
125140

126-
Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
127-
.setExecutorProvider(executorProvider)
128-
.build();
141+
Subscriber subscriber =
142+
Subscriber.defaultBuilder(subscriptionName, receiver)
143+
.setExecutorProvider(executorProvider)
144+
.build();
129145
// [END pubsub_subscriber_single_threaded]
130146
return subscriber;
131147
}
132148

133149
private Subscriber createSubscriberWithCustomFlowSettings() throws Exception {
134150
// [START pubsub_subscriber_flow_settings]
135-
int maxMessageCount = 10;
151+
long maxMessageCount = 10L;
136152
// Configure max number of messages to be pulled
137-
FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder()
138-
.setMaxOutstandingElementCount(maxMessageCount)
139-
.build();
140-
Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
141-
.setFlowControlSettings(flowControlSettings)
142-
.build();
153+
FlowControlSettings flowControlSettings =
154+
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(maxMessageCount).build();
155+
Subscriber subscriber =
156+
Subscriber.defaultBuilder(subscriptionName, receiver)
157+
.setFlowControlSettings(flowControlSettings)
158+
.build();
143159
// [END pubsub_subscriber_flow_settings]
144160
return subscriber;
145161
}
146162

147163
private Subscriber createSubscriberWithCustomCredentials() throws Exception {
148164
// [START pubsub_subscriber_custom_credentials]
149165
CredentialsProvider credentialsProvider =
150-
FixedCredentialsProvider
151-
.create(ServiceAccountCredentials.fromStream(
152-
new FileInputStream("credentials.json")));
166+
FixedCredentialsProvider.create(
167+
ServiceAccountCredentials.fromStream(new FileInputStream("credentials.json")));
153168

154-
Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
155-
.setCredentialsProvider(credentialsProvider)
156-
.build();
169+
Subscriber subscriber =
170+
Subscriber.defaultBuilder(subscriptionName, receiver)
171+
.setCredentialsProvider(credentialsProvider)
172+
.build();
157173
// [START pubsub_subscriber_custom_credentials]
158174
return subscriber;
159175
}
176+
177+
static List<ReceivedMessage> createSubscriberWithSyncPull(
178+
String projectId, String subscriptionId, int numOfMessages) throws Exception {
179+
// [START subscriber_sync_pull]
180+
SubscriptionAdminSettings subscriptionAdminSettings =
181+
SubscriptionAdminSettings.newBuilder().build();
182+
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriptionAdminSettings)) {
183+
// String projectId = "my-project-id";
184+
// String subscriptionId = "my-subscription-id";
185+
// int numOfMessages = 10; // max number of messages to be pulled
186+
String subscriptionName = SubscriptionName.create(projectId, subscriptionId).toString();
187+
PullRequest pullRequest =
188+
PullRequest.newBuilder()
189+
.setMaxMessages(numOfMessages)
190+
.setReturnImmediately(false) // return immediately if messages are not available
191+
.setSubscription(subscriptionName)
192+
.build();
193+
194+
// use pullCallable().futureCall to asynchronously perform this operation
195+
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
196+
List<String> ackIds = new ArrayList<>();
197+
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
198+
// handle received message
199+
// ...
200+
ackIds.add(message.getAckId());
201+
}
202+
// acknowledge received messages
203+
AcknowledgeRequest acknowledgeRequest =
204+
AcknowledgeRequest.newBuilder()
205+
.setSubscription(subscriptionName)
206+
.addAllAckIds(ackIds)
207+
.build();
208+
// use acknowledgeCallable().futureCall to asynchronously perform this operation
209+
subscriber.acknowledgeCallable().call(acknowledgeRequest);
210+
return pullResponse.getReceivedMessagesList();
211+
}
212+
// [END subscriber_sync_pull]
213+
}
160214
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2017 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.examples.pubsub.snippets;
18+
19+
import com.google.api.gax.core.CredentialsProvider;
20+
import com.google.api.gax.core.NoCredentialsProvider;
21+
import com.google.api.gax.grpc.ChannelProvider;
22+
import com.google.api.gax.grpc.FixedChannelProvider;
23+
import com.google.api.gax.grpc.GrpcTransportProvider;
24+
import com.google.cloud.pubsub.v1.Publisher;
25+
import com.google.cloud.pubsub.v1.TopicAdminClient;
26+
import com.google.cloud.pubsub.v1.TopicAdminSettings;
27+
import com.google.pubsub.v1.TopicName;
28+
import io.grpc.ManagedChannel;
29+
import io.grpc.ManagedChannelBuilder;
30+
import java.io.IOException;
31+
32+
/**
33+
* Snippet that demonstrates creating Pub/Sub clients using the Google Cloud Pub/Sub emulator.
34+
*
35+
* <p>Note: clients cannot start/stop the emulator.
36+
*/
37+
public class UsePubSubEmulatorSnippet {
38+
39+
public static void main(String... args) throws IOException {
40+
// [START use_pubsub_emulator]
41+
String hostport = System.getenv("PUBSUB_EMULATOR_HOST");
42+
ManagedChannel channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext(true).build();
43+
try {
44+
ChannelProvider channelProvider = FixedChannelProvider.create(channel);
45+
CredentialsProvider credentialsProvider = new NoCredentialsProvider();
46+
47+
// Set the channel and credentials provider when creating a `TopicAdminClient`.
48+
// Similarly for SubscriptionAdminClient
49+
TopicAdminClient topicClient =
50+
TopicAdminClient.create(
51+
TopicAdminSettings.newBuilder()
52+
.setTransportProvider(
53+
GrpcTransportProvider.newBuilder()
54+
.setChannelProvider(channelProvider)
55+
.build())
56+
.setCredentialsProvider(credentialsProvider)
57+
.build());
58+
59+
TopicName topicName = TopicName.create("my-project-id", "my-topic-id");
60+
// Set the channel and credentials provider when creating a `Publisher`.
61+
// Similarly for Subscriber
62+
Publisher publisher =
63+
Publisher.defaultBuilder(topicName)
64+
.setChannelProvider(channelProvider)
65+
.setCredentialsProvider(credentialsProvider)
66+
.build();
67+
} finally {
68+
channel.shutdown();
69+
}
70+
// [END use_pubsub_emulator]
71+
}
72+
}

google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertNotNull;
2121

22+
import com.google.api.core.ApiFuture;
2223
import com.google.api.core.ApiFutureCallback;
2324
import com.google.api.core.ApiFutures;
2425
import com.google.api.core.SettableApiFuture;
@@ -29,12 +30,17 @@
2930
import com.google.common.util.concurrent.MoreExecutors;
3031
import com.google.pubsub.v1.PubsubMessage;
3132
import com.google.pubsub.v1.PushConfig;
33+
import com.google.pubsub.v1.ReceivedMessage;
3234
import com.google.pubsub.v1.SubscriptionName;
3335
import com.google.pubsub.v1.TopicName;
36+
import java.util.ArrayList;
37+
import java.util.List;
3438
import java.util.UUID;
3539
import java.util.concurrent.ArrayBlockingQueue;
3640
import java.util.concurrent.BlockingQueue;
3741
import java.util.concurrent.TimeUnit;
42+
import org.junit.After;
43+
import org.junit.Before;
3844
import org.junit.Rule;
3945
import org.junit.Test;
4046
import org.junit.rules.Timeout;
@@ -45,32 +51,30 @@ public class ITPubSubSnippets {
4551

4652
@Rule public Timeout globalTimeout = Timeout.seconds(300);
4753

54+
private TopicName topicName;
55+
private SubscriptionName subscriptionName;
56+
4857
private static String formatForTest(String resourceName) {
4958
return resourceName + "-" + NAME_SUFFIX;
5059
}
5160

52-
@Test
53-
public void testPublisherSubscriber() throws Exception {
54-
TopicName topicName =
55-
TopicName.create(ServiceOptions.getDefaultProjectId(), formatForTest("test-topic"));
56-
SubscriptionName subscriptionName =
61+
@Before
62+
public void setUp() throws Exception {
63+
topicName = TopicName.create(ServiceOptions.getDefaultProjectId(), formatForTest("test-topic"));
64+
subscriptionName =
5765
SubscriptionName.create(
5866
ServiceOptions.getDefaultProjectId(), formatForTest("test-subscription"));
67+
5968
try (TopicAdminClient publisherClient = TopicAdminClient.create();
6069
SubscriptionAdminClient subscriberClient = SubscriptionAdminClient.create()) {
6170
publisherClient.createTopic(topicName);
6271
subscriberClient.createSubscription(
6372
subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
64-
65-
testPublisherSubscriberHelper(topicName, subscriptionName);
66-
67-
subscriberClient.deleteSubscription(subscriptionName);
68-
publisherClient.deleteTopic(topicName);
6973
}
7074
}
7175

72-
private void testPublisherSubscriberHelper(TopicName topicName, SubscriptionName subscriptionName)
73-
throws Exception {
76+
@Test
77+
public void testPublisherAsyncSubscriber() throws Exception {
7478
String messageToPublish = "my-message";
7579

7680
Publisher publisher = null;
@@ -124,4 +128,41 @@ public void run() {
124128
assertNotNull(message);
125129
assertEquals(message.getData().toStringUtf8(), messageToPublish);
126130
}
131+
132+
@Test
133+
public void testPublisherSyncSubscriber() throws Exception {
134+
String messageToPublish = "my-message";
135+
Publisher publisher = null;
136+
try {
137+
publisher = Publisher.defaultBuilder(topicName).build();
138+
PublisherSnippets snippets = new PublisherSnippets(publisher);
139+
List<ApiFuture<String>> apiFutures = new ArrayList<>();
140+
for (int i = 0; i < 5; i++) {
141+
apiFutures.add(snippets.publish(messageToPublish + "-" + i));
142+
}
143+
ApiFutures.allAsList(apiFutures).get();
144+
} finally {
145+
if (publisher != null) {
146+
publisher.shutdown();
147+
}
148+
}
149+
150+
List<ReceivedMessage> messages =
151+
SubscriberSnippets.createSubscriberWithSyncPull(
152+
subscriptionName.getProject(), subscriptionName.getSubscription(), 5);
153+
assertEquals(messages.size(), 5);
154+
for (int i = 0; i < 5; i++) {
155+
String messageData = messages.get(i).getMessage().getData().toStringUtf8();
156+
assertEquals(messageData, messageToPublish + "-" + i);
157+
}
158+
}
159+
160+
@After
161+
public void tearDown() throws Exception {
162+
try (TopicAdminClient publisherClient = TopicAdminClient.create();
163+
SubscriptionAdminClient subscriberClient = SubscriptionAdminClient.create()) {
164+
subscriberClient.deleteSubscription(subscriptionName);
165+
publisherClient.deleteTopic(topicName);
166+
}
167+
}
127168
}

0 commit comments

Comments
 (0)