Skip to content

Commit 798004e

Browse files
davidtorrespongad
authored andcommitted
---
yaml --- r: 5487 b: refs/heads/master c: a2ca992 h: refs/heads/master i: 5485: 3dc14cc 5483: 828bd37 5479: aa98d8c 5471: 30e4766
1 parent 4d1baec commit 798004e

7 files changed

Lines changed: 36 additions & 31 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: e9c05a5a9aed5103076e2e2d6d24f59f48a90ef3
2+
refs/heads/master: a2ca9924be8fd0372b5a1c430e05b942fbacebda
33
refs/heads/travis: dae77e558b884bc1b165155482d76c8e40b0fca4
44
refs/heads/gh-pages: 4936f6d1c43be1ab76229d2743bae07f4b4124b3
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static void main(String... args) throws Exception {
4646
@Override
4747
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
4848
System.out.println("got message: " + message.getData().toStringUtf8());
49-
consumer.accept(AckReply.ACK, null);
49+
consumer.accept(AckReply.ACK);
5050
}
5151
};
5252
Subscriber subscriber = null;

trunk/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public MessageReceiver messageReceiver() {
5151
MessageReceiver receiver = new MessageReceiver() {
5252
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
5353
if (blockingQueue.offer(message)) {
54-
consumer.accept(AckReply.ACK, null);
54+
consumer.accept(AckReply.ACK);
5555
} else {
56-
consumer.accept(AckReply.NACK, null);
56+
consumer.accept(AckReply.NACK);
5757
}
5858
}
5959
};

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
/**
2020
* Accepts a reply, sending it to the service.
2121
*
22-
* <p>Both the interface and its method is named after the Java 8's {@code BiConsumer} interface
22+
* <p>Both the interface and its method is named after the Java 8's {@code Consumer} interface
2323
* to make migration to Java 8 and adopting its patterns easier.
2424
*/
2525
public interface AckReplyConsumer {
26-
void accept(AckReply ackReply, Throwable t);
26+
void accept(AckReply ackReply);
2727
}

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -285,20 +285,20 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
285285
final AckReplyConsumer consumer =
286286
new AckReplyConsumer() {
287287
@Override
288-
public void accept(AckReply reply, Throwable t) {
289-
if (reply != null) {
290-
response.set(reply);
291-
} else {
292-
response.setException(t);
293-
}
288+
public void accept(AckReply reply) {
289+
response.set(reply);
294290
}
295291
};
296292
Futures.addCallback(response, ackHandler);
297293
executor.submit(
298294
new Runnable() {
299295
@Override
300296
public void run() {
301-
receiver.receiveMessage(message, consumer);
297+
try {
298+
receiver.receiveMessage(message, consumer);
299+
} catch (Exception e) {
300+
response.setException(e);
301+
}
302302
}
303303
});
304304
}

trunk/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeSubscriberServiceImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949
class FakeSubscriberServiceImpl extends SubscriberImplBase {
5050
private final AtomicBoolean subscriptionInitialized = new AtomicBoolean(false);
5151
private String subscription = "";
52-
private final AtomicInteger messageAckDeadline = new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
52+
private final AtomicInteger messageAckDeadline =
53+
new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
5354
private final List<Stream> openedStreams = new ArrayList<>();
5455
private final List<Stream> closedStreams = new ArrayList<>();
5556
private final List<String> acks = new ArrayList<>();
@@ -235,7 +236,9 @@ public void getSubscription(
235236

236237
@Override
237238
public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
238-
receivedPullRequest.add(request);
239+
synchronized (receivedPullRequest) {
240+
receivedPullRequest.add(request);
241+
}
239242
try {
240243
responseObserver.onNext(pullResponses.take());
241244
responseObserver.onCompleted();

trunk/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,14 @@ static class TestReceiver implements MessageReceiver {
8484
new LinkedBlockingQueue<>();
8585
private AckReply ackReply = AckReply.ACK;
8686
private Optional<CountDownLatch> messageCountLatch = Optional.absent();
87-
private Optional<Throwable> error = Optional.absent();
87+
private Optional<RuntimeException> error = Optional.absent();
8888
private boolean explicitAckReplies;
8989

9090
void setReply(AckReply ackReply) {
9191
this.ackReply = ackReply;
9292
}
9393

94-
void setErrorReply(Throwable error) {
94+
void setErrorReply(RuntimeException error) {
9595
this.error = Optional.of(error);
9696
}
9797

@@ -111,18 +111,20 @@ void waitForExpectedMessages() throws InterruptedException {
111111

112112
@Override
113113
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
114-
if (explicitAckReplies) {
115-
try {
116-
outstandingMessageReplies.put(consumer);
117-
} catch (InterruptedException e) {
118-
throw new IllegalStateException(e);
114+
try {
115+
if (explicitAckReplies) {
116+
try {
117+
outstandingMessageReplies.put(consumer);
118+
} catch (InterruptedException e) {
119+
throw new IllegalStateException(e);
120+
}
121+
} else {
122+
replyTo(consumer);
123+
}
124+
} finally {
125+
if (messageCountLatch.isPresent()) {
126+
messageCountLatch.get().countDown();
119127
}
120-
} else {
121-
replyTo(consumer);
122-
}
123-
124-
if (messageCountLatch.isPresent()) {
125-
messageCountLatch.get().countDown();
126128
}
127129
}
128130

@@ -145,9 +147,9 @@ public void replyAllOutstandingMessage() {
145147

146148
private void replyTo(AckReplyConsumer reply) {
147149
if (error.isPresent()) {
148-
reply.accept(null, error.get());
150+
throw error.get();
149151
} else {
150-
reply.accept(ackReply, null);
152+
reply.accept(ackReply);
151153
}
152154
}
153155
}
@@ -207,7 +209,7 @@ public void testNackSingleMessage() throws Exception {
207209

208210
@Test
209211
public void testReceiverError_NacksMessage() throws Exception {
210-
testReceiver.setErrorReply(new Exception("Can't process message"));
212+
testReceiver.setErrorReply(new RuntimeException("Can't process message"));
211213

212214
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));
213215

0 commit comments

Comments
 (0)