Skip to content

Commit 4f95f59

Browse files
committed
update snippet
1 parent 4bd5d48 commit 4f95f59

2 files changed

Lines changed: 9 additions & 35 deletions

File tree

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ public SubscriberSnippets(SubscriptionName subscription) {
4545
* Example of receiving a specific number of messages.
4646
*/
4747
// [TARGET startAsync()]
48-
// [VARIABLE "my_project_name"]
49-
// [VARIABLE "my_subscription_name"]
5048
// [VARIABLE 3]
5149
public void startAsync(int receiveNum) throws Exception {
5250
// [START startAsync]

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -182,60 +182,36 @@ public boolean isRunning() {
182182
*
183183
* <p>Example of receiving a specific number of messages.
184184
* <pre> {@code
185-
* String projectName = "my_project_name";
186-
* String subscriptionName = "my_subscription_name";
187185
* int receiveNum = 3;
188-
* SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName);
189-
* final Lock lock = new ReentrantLock();
190-
* final Condition doneCondition = lock.newCondition();
191186
* final AtomicInteger pendingReceives = new AtomicInteger(receiveNum);
192-
* final AtomicBoolean done = new AtomicBoolean();
193-
*
187+
* final SettableRpcFuture<Void> done = new SettableRpcFuture<>();
188+
*
194189
* MessageReceiver receiver = new MessageReceiver() {
195190
* public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
196191
* System.out.println("got message: " + message);
197192
* consumer.accept(AckReply.ACK, null);
198-
* if (pendingReceives.decrementAndGet() != 0) {
199-
* return;
200-
* }
201-
* lock.lock();
202-
* try {
203-
* done.set(true);
204-
* doneCondition.signal();
205-
* } finally {
206-
* lock.unlock();
193+
* if (pendingReceives.decrementAndGet() == 0) {
194+
* done.set(null);
207195
* }
208196
* }
209197
* };
210-
*
198+
*
211199
* Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
212200
* subscriber.addListener(new Subscriber.SubscriberListener() {
213201
* public void failed(Subscriber.State from, Throwable failure) {
214-
* System.err.println(failure);
215-
* lock.lock();
216-
* try {
217-
* done.set(true);
218-
* doneCondition.signal();
219-
* } finally {
220-
* lock.unlock();
221-
* }
202+
* done.setException(failure);
222203
* }
223204
* }, new Executor() {
224205
* public void execute(Runnable command) {
225206
* command.run();
226207
* }
227208
* });
228209
* subscriber.startAsync();
229-
* lock.lock();
230-
* try {
231-
* while (!done.get()) {
232-
* doneCondition.await();
233-
* }
234-
* } finally {
235-
* lock.unlock();
236-
* }
210+
*
211+
* done.get(10, TimeUnit.MINUTES);
237212
* subscriber.stopAsync().awaitTerminated();
238213
* }</pre>
214+
*
239215
*/
240216
public Subscriber startAsync() {
241217
impl.startAsync();

0 commit comments

Comments
 (0)