Skip to content

Commit 5ee8bd3

Browse files
committed
add Subscriber snippet
The snippet is very long, but the shortest I can make it without loosing details.
1 parent cd5cd48 commit 5ee8bd3

2 files changed

Lines changed: 160 additions & 30 deletions

File tree

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2017 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/*
18+
* EDITING INSTRUCTIONS
19+
* This file is referenced in Subscriber's javadoc. Any change to this file should be reflected in
20+
* PubSub's javadoc.
21+
*/
22+
23+
package com.google.cloud.examples.pubsub.snippets;
24+
25+
import com.google.cloud.pubsub.spi.v1.AckReply;
26+
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
27+
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
28+
import com.google.cloud.pubsub.spi.v1.Subscriber;
29+
import com.google.pubsub.v1.PubsubMessage;
30+
import com.google.pubsub.v1.SubscriptionName;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import java.util.concurrent.Executor;
34+
import java.util.concurrent.locks.Condition;
35+
import java.util.concurrent.locks.Lock;
36+
import java.util.concurrent.locks.ReentrantLock;
37+
38+
public class SubscriberSnippets {
39+
/**
40+
* Example of receiving a specific number of messages.
41+
*/
42+
// [TARGET startAsync()]
43+
// [VARIABLE "my_project_name"]
44+
// [VARIABLE "my_subscription_name"]
45+
// [VARIABLE 3]
46+
public void startAsync(String projectName, String subscriptionName, int receiveNum) throws Exception {
47+
// [START startAsync]
48+
SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName);
49+
final Lock lock = new ReentrantLock();
50+
final Condition doneCondition = lock.newCondition();
51+
final AtomicInteger pendingReceives = new AtomicInteger(receiveNum);
52+
final AtomicBoolean done = new AtomicBoolean();
53+
54+
MessageReceiver receiver = new MessageReceiver() {
55+
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
56+
System.out.println("got message: " + message);
57+
consumer.accept(AckReply.ACK, null);
58+
if (pendingReceives.decrementAndGet() != 0) {
59+
return;
60+
}
61+
lock.lock();
62+
try {
63+
done.set(true);
64+
doneCondition.signal();
65+
} finally {
66+
lock.unlock();
67+
}
68+
}
69+
};
70+
71+
Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
72+
subscriber.addListener(new Subscriber.SubscriberListener() {
73+
public void failed(Subscriber.State from, Throwable failure) {
74+
System.err.println(failure);
75+
lock.lock();
76+
try {
77+
done.set(true);
78+
doneCondition.signal();
79+
} finally {
80+
lock.unlock();
81+
}
82+
}
83+
}, new Executor() {
84+
public void execute(Runnable command) {
85+
command.run();
86+
}
87+
});
88+
subscriber.startAsync();
89+
lock.lock();
90+
try {
91+
while (!done.get()) {
92+
doneCondition.await();
93+
}
94+
} finally {
95+
lock.unlock();
96+
}
97+
subscriber.stopAsync().awaitTerminated();
98+
// [END startAsync]
99+
}
100+
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -75,36 +75,6 @@
7575
*
7676
* <p>If no credentials are provided, the {@link Subscriber} will use application default
7777
* credentials through {@link GoogleCredentials#getApplicationDefault}.
78-
*
79-
* <p>For example, a {@link Subscriber} can be constructed and used to receive messages as follows:
80-
*
81-
* <pre><code>
82-
* MessageReceiver receiver = new MessageReceiver() {
83-
* &#64;Override
84-
* public void receiveMessage(PubsubMessage message, SettableFuture&lt;AckReply&gt; response) {
85-
* // ... process message ...
86-
* return response.set(AckReply.ACK);
87-
* }
88-
* }
89-
*
90-
* Subscriber subscriber =
91-
* Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
92-
* .setMaxBundleAcks(100)
93-
* .build();
94-
*
95-
* subscriber.startAsync();
96-
*
97-
* // ... recommended, listen for fatal errors that break the subscriber streaming ...
98-
* subscriber.addListener(new Listener() {
99-
* &#64;Override
100-
* public void failed(State from, Throwable failure) {
101-
* System.out.println("Subscriber failed with error: " + failure);
102-
* }
103-
* }, Executors.newSingleThreadExecutor());
104-
*
105-
* // ... and when done with the subscriber ...
106-
* subscriber.stopAsync();
107-
* </code></pre>
10878
*/
10979
public class Subscriber {
11080
private static final int THREADS_PER_CHANNEL = 5;
@@ -207,6 +177,66 @@ public boolean isRunning() {
207177
return impl.isRunning();
208178
}
209179

180+
/**
181+
* Initiates service startup and returns immediately.
182+
*
183+
* <p>Example of receiving a specific number of messages.
184+
* <pre> {@code
185+
* String projectName = "my_project_name";
186+
* String subscriptionName = "my_subscription_name";
187+
* int receiveNum = 3;
188+
* SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName);
189+
* final Lock lock = new ReentrantLock();
190+
* final Condition doneCondition = lock.newCondition();
191+
* final AtomicInteger pendingReceives = new AtomicInteger(receiveNum);
192+
* final AtomicBoolean done = new AtomicBoolean();
193+
*
194+
* MessageReceiver receiver = new MessageReceiver() {
195+
* public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
196+
* System.out.println("got message: " + message);
197+
* consumer.accept(AckReply.ACK, null);
198+
* if (pendingReceives.decrementAndGet() != 0) {
199+
* return;
200+
* }
201+
* lock.lock();
202+
* try {
203+
* done.set(true);
204+
* doneCondition.signal();
205+
* } finally {
206+
* lock.unlock();
207+
* }
208+
* }
209+
* };
210+
*
211+
* Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
212+
* subscriber.addListener(new Subscriber.SubscriberListener() {
213+
* public void failed(Subscriber.State from, Throwable failure) {
214+
* System.err.println(failure);
215+
* lock.lock();
216+
* try {
217+
* done.set(true);
218+
* doneCondition.signal();
219+
* } finally {
220+
* lock.unlock();
221+
* }
222+
* }
223+
* }, new Executor() {
224+
* public void execute(Runnable command) {
225+
* command.run();
226+
* }
227+
* });
228+
* subscriber.startAsync();
229+
* lock.lock();
230+
* try {
231+
* while (!done.get()) {
232+
* doneCondition.await();
233+
* }
234+
* } finally {
235+
* lock.unlock();
236+
* }
237+
* subscriber.stopAsync().awaitTerminated();
238+
* }</pre>
239+
*/
210240
public Subscriber startAsync() {
211241
impl.startAsync();
212242
return this;

0 commit comments

Comments
 (0)