Skip to content

Commit 4bd5d48

Browse files
committed
pr comment
1 parent 9695106 commit 4bd5d48

2 files changed

Lines changed: 17 additions & 35 deletions

File tree

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,78 +22,60 @@
2222

2323
package com.google.cloud.examples.pubsub.snippets;
2424

25+
import com.google.api.gax.core.SettableRpcFuture;
2526
import com.google.cloud.pubsub.spi.v1.AckReply;
2627
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
2728
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
2829
import com.google.cloud.pubsub.spi.v1.Subscriber;
2930
import com.google.pubsub.v1.PubsubMessage;
3031
import com.google.pubsub.v1.SubscriptionName;
31-
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.concurrent.atomic.AtomicInteger;
3333
import java.util.concurrent.Executor;
34-
import java.util.concurrent.locks.Condition;
35-
import java.util.concurrent.locks.Lock;
36-
import java.util.concurrent.locks.ReentrantLock;
34+
import java.util.concurrent.TimeUnit;
3735

3836
public class SubscriberSnippets {
37+
38+
private final SubscriptionName subscription;
39+
40+
public SubscriberSnippets(SubscriptionName subscription) {
41+
this.subscription = subscription;
42+
}
43+
3944
/**
4045
* Example of receiving a specific number of messages.
4146
*/
4247
// [TARGET startAsync()]
4348
// [VARIABLE "my_project_name"]
4449
// [VARIABLE "my_subscription_name"]
4550
// [VARIABLE 3]
46-
public void startAsync(String projectName, String subscriptionName, int receiveNum) throws Exception {
51+
public void startAsync(int receiveNum) throws Exception {
4752
// [START startAsync]
48-
SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName);
49-
final Lock lock = new ReentrantLock();
50-
final Condition doneCondition = lock.newCondition();
5153
final AtomicInteger pendingReceives = new AtomicInteger(receiveNum);
52-
final AtomicBoolean done = new AtomicBoolean();
54+
final SettableRpcFuture<Void> done = new SettableRpcFuture<>();
5355

5456
MessageReceiver receiver = new MessageReceiver() {
5557
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
5658
System.out.println("got message: " + message);
5759
consumer.accept(AckReply.ACK, null);
58-
if (pendingReceives.decrementAndGet() != 0) {
59-
return;
60-
}
61-
lock.lock();
62-
try {
63-
done.set(true);
64-
doneCondition.signal();
65-
} finally {
66-
lock.unlock();
60+
if (pendingReceives.decrementAndGet() == 0) {
61+
done.set(null);
6762
}
6863
}
6964
};
7065

7166
Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
7267
subscriber.addListener(new Subscriber.SubscriberListener() {
7368
public void failed(Subscriber.State from, Throwable failure) {
74-
System.err.println(failure);
75-
lock.lock();
76-
try {
77-
done.set(true);
78-
doneCondition.signal();
79-
} finally {
80-
lock.unlock();
81-
}
69+
done.setException(failure);
8270
}
8371
}, new Executor() {
8472
public void execute(Runnable command) {
8573
command.run();
8674
}
8775
});
8876
subscriber.startAsync();
89-
lock.lock();
90-
try {
91-
while (!done.get()) {
92-
doneCondition.await();
93-
}
94-
} finally {
95-
lock.unlock();
96-
}
77+
78+
done.get(10, TimeUnit.MINUTES);
9779
subscriber.stopAsync().awaitTerminated();
9880
// [END startAsync]
9981
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
<github.global.server>github</github.global.server>
9393
<google.auth.version>0.6.0</google.auth.version>
9494
<grpc.version>1.0.3</grpc.version>
95-
<gax.version>0.1.0</gax.version>
95+
<gax.version>0.1.1</gax.version>
9696
<generatedProto.version>0.1.5</generatedProto.version>
9797
<core.version>0.9.3-alpha-SNAPSHOT</core.version>
9898
<beta.version>0.9.3-beta-SNAPSHOT</beta.version>

0 commit comments

Comments
 (0)