|
22 | 22 |
|
23 | 23 | package com.google.cloud.examples.pubsub.snippets; |
24 | 24 |
|
| 25 | +import com.google.api.gax.core.SettableRpcFuture; |
25 | 26 | import com.google.cloud.pubsub.spi.v1.AckReply; |
26 | 27 | import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; |
27 | 28 | import com.google.cloud.pubsub.spi.v1.MessageReceiver; |
28 | 29 | import com.google.cloud.pubsub.spi.v1.Subscriber; |
29 | 30 | import com.google.pubsub.v1.PubsubMessage; |
30 | 31 | import com.google.pubsub.v1.SubscriptionName; |
31 | | -import java.util.concurrent.atomic.AtomicBoolean; |
32 | 32 | import java.util.concurrent.atomic.AtomicInteger; |
33 | 33 | import java.util.concurrent.Executor; |
34 | | -import java.util.concurrent.locks.Condition; |
35 | | -import java.util.concurrent.locks.Lock; |
36 | | -import java.util.concurrent.locks.ReentrantLock; |
| 34 | +import java.util.concurrent.TimeUnit; |
37 | 35 |
|
38 | 36 | public class SubscriberSnippets { |
| 37 | + |
| 38 | + private final SubscriptionName subscription; |
| 39 | + |
| 40 | + public SubscriberSnippets(SubscriptionName subscription) { |
| 41 | + this.subscription = subscription; |
| 42 | + } |
| 43 | + |
39 | 44 | /** |
40 | 45 | * Example of receiving a specific number of messages. |
41 | 46 | */ |
42 | 47 | // [TARGET startAsync()] |
43 | 48 | // [VARIABLE "my_project_name"] |
44 | 49 | // [VARIABLE "my_subscription_name"] |
45 | 50 | // [VARIABLE 3] |
46 | | - public void startAsync(String projectName, String subscriptionName, int receiveNum) throws Exception { |
| 51 | + public void startAsync(int receiveNum) throws Exception { |
47 | 52 | // [START startAsync] |
48 | | - SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName); |
49 | | - final Lock lock = new ReentrantLock(); |
50 | | - final Condition doneCondition = lock.newCondition(); |
51 | 53 | final AtomicInteger pendingReceives = new AtomicInteger(receiveNum); |
52 | | - final AtomicBoolean done = new AtomicBoolean(); |
| 54 | + final SettableRpcFuture<Void> done = new SettableRpcFuture<>(); |
53 | 55 |
|
54 | 56 | MessageReceiver receiver = new MessageReceiver() { |
55 | 57 | public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { |
56 | 58 | System.out.println("got message: " + message); |
57 | 59 | consumer.accept(AckReply.ACK, null); |
58 | | - if (pendingReceives.decrementAndGet() != 0) { |
59 | | - return; |
60 | | - } |
61 | | - lock.lock(); |
62 | | - try { |
63 | | - done.set(true); |
64 | | - doneCondition.signal(); |
65 | | - } finally { |
66 | | - lock.unlock(); |
| 60 | + if (pendingReceives.decrementAndGet() == 0) { |
| 61 | + done.set(null); |
67 | 62 | } |
68 | 63 | } |
69 | 64 | }; |
70 | 65 |
|
71 | 66 | Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build(); |
72 | 67 | subscriber.addListener(new Subscriber.SubscriberListener() { |
73 | 68 | public void failed(Subscriber.State from, Throwable failure) { |
74 | | - System.err.println(failure); |
75 | | - lock.lock(); |
76 | | - try { |
77 | | - done.set(true); |
78 | | - doneCondition.signal(); |
79 | | - } finally { |
80 | | - lock.unlock(); |
81 | | - } |
| 69 | + done.setException(failure); |
82 | 70 | } |
83 | 71 | }, new Executor() { |
84 | 72 | public void execute(Runnable command) { |
85 | 73 | command.run(); |
86 | 74 | } |
87 | 75 | }); |
88 | 76 | subscriber.startAsync(); |
89 | | - lock.lock(); |
90 | | - try { |
91 | | - while (!done.get()) { |
92 | | - doneCondition.await(); |
93 | | - } |
94 | | - } finally { |
95 | | - lock.unlock(); |
96 | | - } |
| 77 | + |
| 78 | + done.get(10, TimeUnit.MINUTES); |
97 | 79 | subscriber.stopAsync().awaitTerminated(); |
98 | 80 | // [END startAsync] |
99 | 81 | } |
|
0 commit comments