Skip to content

Commit a2ca992

Browse files
davidtorrespongad
authored andcommitted
Changing the AckReplyConsumer interface (#1758)
Changing the AckReplyConsumer interface to comply to just the Java 8 Consumer interface. It really is not useful to be able to set an exception as ack reply, since the result is the same as nack, if we ever require another result then we can just add one more value to the AckReply enum. Also adding a fail-safe catch so if the receiver ever throws an exception we will interpret that as a nack and keep going. Example fixes to comply with changes to the AckReplyConsumer interface.
1 parent e9c05a5 commit a2ca992

6 files changed

Lines changed: 35 additions & 30 deletions

File tree

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static void main(String... args) throws Exception {
4646
@Override
4747
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
4848
System.out.println("got message: " + message.getData().toStringUtf8());
49-
consumer.accept(AckReply.ACK, null);
49+
consumer.accept(AckReply.ACK);
5050
}
5151
};
5252
Subscriber subscriber = null;

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public MessageReceiver messageReceiver() {
5151
MessageReceiver receiver = new MessageReceiver() {
5252
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
5353
if (blockingQueue.offer(message)) {
54-
consumer.accept(AckReply.ACK, null);
54+
consumer.accept(AckReply.ACK);
5555
} else {
56-
consumer.accept(AckReply.NACK, null);
56+
consumer.accept(AckReply.NACK);
5757
}
5858
}
5959
};

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
/**
2020
* Accepts a reply, sending it to the service.
2121
*
22-
* <p>Both the interface and its method is named after the Java 8's {@code BiConsumer} interface
22+
* <p>Both the interface and its method is named after the Java 8's {@code Consumer} interface
2323
* to make migration to Java 8 and adopting its patterns easier.
2424
*/
2525
public interface AckReplyConsumer {
26-
void accept(AckReply ackReply, Throwable t);
26+
void accept(AckReply ackReply);
2727
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -285,20 +285,20 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
285285
final AckReplyConsumer consumer =
286286
new AckReplyConsumer() {
287287
@Override
288-
public void accept(AckReply reply, Throwable t) {
289-
if (reply != null) {
290-
response.set(reply);
291-
} else {
292-
response.setException(t);
293-
}
288+
public void accept(AckReply reply) {
289+
response.set(reply);
294290
}
295291
};
296292
Futures.addCallback(response, ackHandler);
297293
executor.submit(
298294
new Runnable() {
299295
@Override
300296
public void run() {
301-
receiver.receiveMessage(message, consumer);
297+
try {
298+
receiver.receiveMessage(message, consumer);
299+
} catch (Exception e) {
300+
response.setException(e);
301+
}
302302
}
303303
});
304304
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeSubscriberServiceImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949
class FakeSubscriberServiceImpl extends SubscriberImplBase {
5050
private final AtomicBoolean subscriptionInitialized = new AtomicBoolean(false);
5151
private String subscription = "";
52-
private final AtomicInteger messageAckDeadline = new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
52+
private final AtomicInteger messageAckDeadline =
53+
new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
5354
private final List<Stream> openedStreams = new ArrayList<>();
5455
private final List<Stream> closedStreams = new ArrayList<>();
5556
private final List<String> acks = new ArrayList<>();
@@ -235,7 +236,9 @@ public void getSubscription(
235236

236237
@Override
237238
public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
238-
receivedPullRequest.add(request);
239+
synchronized (receivedPullRequest) {
240+
receivedPullRequest.add(request);
241+
}
239242
try {
240243
responseObserver.onNext(pullResponses.take());
241244
responseObserver.onCompleted();

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,14 @@ static class TestReceiver implements MessageReceiver {
8484
new LinkedBlockingQueue<>();
8585
private AckReply ackReply = AckReply.ACK;
8686
private Optional<CountDownLatch> messageCountLatch = Optional.absent();
87-
private Optional<Throwable> error = Optional.absent();
87+
private Optional<RuntimeException> error = Optional.absent();
8888
private boolean explicitAckReplies;
8989

9090
void setReply(AckReply ackReply) {
9191
this.ackReply = ackReply;
9292
}
9393

94-
void setErrorReply(Throwable error) {
94+
void setErrorReply(RuntimeException error) {
9595
this.error = Optional.of(error);
9696
}
9797

@@ -111,18 +111,20 @@ void waitForExpectedMessages() throws InterruptedException {
111111

112112
@Override
113113
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
114-
if (explicitAckReplies) {
115-
try {
116-
outstandingMessageReplies.put(consumer);
117-
} catch (InterruptedException e) {
118-
throw new IllegalStateException(e);
114+
try {
115+
if (explicitAckReplies) {
116+
try {
117+
outstandingMessageReplies.put(consumer);
118+
} catch (InterruptedException e) {
119+
throw new IllegalStateException(e);
120+
}
121+
} else {
122+
replyTo(consumer);
123+
}
124+
} finally {
125+
if (messageCountLatch.isPresent()) {
126+
messageCountLatch.get().countDown();
119127
}
120-
} else {
121-
replyTo(consumer);
122-
}
123-
124-
if (messageCountLatch.isPresent()) {
125-
messageCountLatch.get().countDown();
126128
}
127129
}
128130

@@ -145,9 +147,9 @@ public void replyAllOutstandingMessage() {
145147

146148
private void replyTo(AckReplyConsumer reply) {
147149
if (error.isPresent()) {
148-
reply.accept(null, error.get());
150+
throw error.get();
149151
} else {
150-
reply.accept(ackReply, null);
152+
reply.accept(ackReply);
151153
}
152154
}
153155
}
@@ -207,7 +209,7 @@ public void testNackSingleMessage() throws Exception {
207209

208210
@Test
209211
public void testReceiverError_NacksMessage() throws Exception {
210-
testReceiver.setErrorReply(new Exception("Can't process message"));
212+
testReceiver.setErrorReply(new RuntimeException("Can't process message"));
211213

212214
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));
213215

0 commit comments

Comments
 (0)