Skip to content

Commit 3717f02

Browse files
committed
pr comment
1 parent 4f95f59 commit 3717f02

4 files changed

Lines changed: 101 additions & 50 deletions

File tree

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2017 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/*
18+
* EDITING INSTRUCTIONS
19+
* This file is referenced in Subscriber's javadoc. Any change to this file should be reflected in
20+
* PubSub's javadoc.
21+
*/
22+
23+
package com.google.cloud.examples.pubsub.snippets;
24+
25+
import com.google.cloud.pubsub.spi.v1.AckReply;
26+
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
27+
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
28+
import com.google.pubsub.v1.PubsubMessage;
29+
import java.util.concurrent.BlockingQueue;
30+
31+
public class MessageReceiverSnippets {
32+
private final BlockingQueue<PubsubMessage> blockingQueue;
33+
34+
public MessageReceiverSnippets(BlockingQueue<PubsubMessage> blockingQueue) {
35+
this.blockingQueue = blockingQueue;
36+
}
37+
38+
/**
39+
* This {@code MessageReceiver} passes all messages to a {@link BlockingQueue}.
40+
* This method can be called concurrently from multiple threads,
41+
* so it is important that the queue be thread-safe.
42+
*
43+
* This example is for illustration. Implementations may directly process messages
44+
* instead of sending them to queues.
45+
*/
46+
// [TARGET receiveMessage(PubsubMessage, AckReplyConsumer)]
47+
public MessageReceiver messageReceiver() {
48+
// [START receiveMessage]
49+
MessageReceiver receiver = new MessageReceiver() {
50+
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
51+
if (blockingQueue.offer(message)) {
52+
consumer.accept(AckReply.ACK, null);
53+
} else {
54+
consumer.accept(AckReply.NACK, null);
55+
}
56+
}
57+
};
58+
// [END receiveMessage]
59+
return receiver;
60+
}
61+
}

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,58 +22,46 @@
2222

2323
package com.google.cloud.examples.pubsub.snippets;
2424

25-
import com.google.api.gax.core.SettableRpcFuture;
26-
import com.google.cloud.pubsub.spi.v1.AckReply;
27-
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
25+
import com.google.api.gax.core.RpcFuture;
2826
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
2927
import com.google.cloud.pubsub.spi.v1.Subscriber;
30-
import com.google.pubsub.v1.PubsubMessage;
3128
import com.google.pubsub.v1.SubscriptionName;
32-
import java.util.concurrent.atomic.AtomicInteger;
3329
import java.util.concurrent.Executor;
34-
import java.util.concurrent.TimeUnit;
3530

3631
public class SubscriberSnippets {
3732

3833
private final SubscriptionName subscription;
34+
private final MessageReceiver receiver;
35+
private final RpcFuture<Void> done;
36+
private final Executor executor;
3937

40-
public SubscriberSnippets(SubscriptionName subscription) {
38+
public SubscriberSnippets(
39+
SubscriptionName subscription,
40+
MessageReceiver receiver,
41+
RpcFuture<Void> done,
42+
Executor executor) {
4143
this.subscription = subscription;
44+
this.receiver = receiver;
45+
this.done = done;
46+
this.executor = executor;
4247
}
4348

4449
/**
4550
* Example of receiving a specific number of messages.
4651
*/
4752
// [TARGET startAsync()]
48-
// [VARIABLE 3]
49-
public void startAsync(int receiveNum) throws Exception {
53+
public void startAsync() throws Exception {
5054
// [START startAsync]
51-
final AtomicInteger pendingReceives = new AtomicInteger(receiveNum);
52-
final SettableRpcFuture<Void> done = new SettableRpcFuture<>();
53-
54-
MessageReceiver receiver = new MessageReceiver() {
55-
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
56-
System.out.println("got message: " + message);
57-
consumer.accept(AckReply.ACK, null);
58-
if (pendingReceives.decrementAndGet() == 0) {
59-
done.set(null);
60-
}
61-
}
62-
};
63-
6455
Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
6556
subscriber.addListener(new Subscriber.SubscriberListener() {
6657
public void failed(Subscriber.State from, Throwable failure) {
67-
done.setException(failure);
68-
}
69-
}, new Executor() {
70-
public void execute(Runnable command) {
71-
command.run();
58+
// Handle error.
7259
}
73-
});
60+
}, executor);
7461
subscriber.startAsync();
7562

76-
done.get(10, TimeUnit.MINUTES);
63+
// Wait for a stop signal.
64+
done.get();
7765
subscriber.stopAsync().awaitTerminated();
7866
// [END startAsync]
7967
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,25 @@ public interface MessageReceiver {
2323
/**
2424
* Called when a message is received by the subscriber. The implementation must arrange for {@link
2525
* AckReplyConsumer#accept} to be called after processing the {@code message}.
26+
*
27+
* <p>This {@code MessageReceiver} passes all messages to a {@link BlockingQueue}.
28+
* This method can be called concurrently from multiple threads,
29+
* so it is important that the queue be thread-safe.
30+
*
31+
* This example is for illustration. Implementations may directly process messages
32+
* instead of sending them to queues.
33+
* <pre> {@code
34+
* MessageReceiver receiver = new MessageReceiver() {
35+
* public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
36+
* if (blockingQueue.offer(message)) {
37+
* consumer.accept(AckReply.ACK, null);
38+
* } else {
39+
* consumer.accept(AckReply.NACK, null);
40+
* }
41+
* }
42+
* };
43+
* }</pre>
44+
*
2645
*/
2746
void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer);
2847
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -182,33 +182,16 @@ public boolean isRunning() {
182182
*
183183
* <p>Example of receiving a specific number of messages.
184184
* <pre> {@code
185-
* int receiveNum = 3;
186-
* final AtomicInteger pendingReceives = new AtomicInteger(receiveNum);
187-
* final SettableRpcFuture<Void> done = new SettableRpcFuture<>();
188-
*
189-
* MessageReceiver receiver = new MessageReceiver() {
190-
* public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
191-
* System.out.println("got message: " + message);
192-
* consumer.accept(AckReply.ACK, null);
193-
* if (pendingReceives.decrementAndGet() == 0) {
194-
* done.set(null);
195-
* }
196-
* }
197-
* };
198-
*
199185
* Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
200186
* subscriber.addListener(new Subscriber.SubscriberListener() {
201187
* public void failed(Subscriber.State from, Throwable failure) {
202-
* done.setException(failure);
203-
* }
204-
* }, new Executor() {
205-
* public void execute(Runnable command) {
206-
* command.run();
188+
* // Handle error.
207189
* }
208-
* });
190+
* }, executor);
209191
* subscriber.startAsync();
210192
*
211-
* done.get(10, TimeUnit.MINUTES);
193+
* // Wait for a stop signal.
194+
* done.get();
212195
* subscriber.stopAsync().awaitTerminated();
213196
* }</pre>
214197
*

0 commit comments

Comments
 (0)