Skip to content

Commit dadb6ff

Browse files
authored
pubsub: send message receipts (#2580)
This commit also simplifies acks/nacks logic. * synchronize TestReceiver methods It is accessed from executor threads and must to synced to guarantee property updates are observed. * deflake test If the user code throws an exception, we catch this exception and nack the message. This logic cannot be tested correctly. The test uses a MessageReceiver that can notify us that messages have been processed, so that we can "advanceTime". The receiver can only notify us that it's throwing an exception BEFORE the exception is actually thrown out of the method. (Even with finally-clause, the statements in the clause runs before the giving control back to the caller.) Consequently, we're notifying too soon: the message has not been processed yet as the exception might not have been caught.
1 parent a53f7a8 commit dadb6ff

5 files changed

Lines changed: 234 additions & 241 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 61 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,10 @@
3232
import java.util.ArrayList;
3333
import java.util.Collections;
3434
import java.util.Deque;
35-
import java.util.HashSet;
3635
import java.util.LinkedList;
3736
import java.util.List;
3837
import java.util.PriorityQueue;
39-
import java.util.Set;
38+
import java.util.concurrent.LinkedBlockingQueue;
4039
import java.util.concurrent.ScheduledExecutorService;
4140
import java.util.concurrent.ScheduledFuture;
4241
import java.util.concurrent.TimeUnit;
@@ -73,8 +72,9 @@ class MessageDispatcher {
7372
private final MessageWaiter messagesWaiter;
7473

7574
private final PriorityQueue<ExtensionJob> outstandingAckHandlers;
76-
private final Set<String> pendingAcks;
77-
private final Set<String> pendingNacks;
75+
private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
76+
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
77+
private final LinkedBlockingQueue<String> pendingReceipts = new LinkedBlockingQueue<>();
7878

7979
private final Lock alarmsLock;
8080
// The deadline should be set by the subscriber connection before use,
@@ -185,47 +185,41 @@ private class AckHandler implements FutureCallback<AckReply> {
185185
receivedTimeMillis = clock.millisTime();
186186
}
187187

188+
private void onBoth(LinkedBlockingQueue<String> destination) {
189+
acked.getAndSet(true);
190+
destination.add(ackId);
191+
flowController.release(1, outstandingBytes);
192+
messagesWaiter.incrementPendingMessages(-1);
193+
processOutstandingBatches();
194+
}
195+
188196
@Override
189197
public void onFailure(Throwable t) {
190198
logger.log(
191199
Level.WARNING,
192200
"MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.",
193201
t);
194-
acked.getAndSet(true);
195-
synchronized (pendingNacks) {
196-
pendingNacks.add(ackId);
197-
}
198-
setupPendingAcksAlarm();
199-
flowController.release(1, outstandingBytes);
200-
messagesWaiter.incrementPendingMessages(-1);
201-
processOutstandingBatches();
202+
onBoth(pendingNacks);
202203
}
203204

204205
@Override
205206
public void onSuccess(AckReply reply) {
206-
acked.getAndSet(true);
207+
LinkedBlockingQueue<String> destination;
207208
switch (reply) {
208209
case ACK:
209-
synchronized (pendingAcks) {
210-
pendingAcks.add(ackId);
211-
}
210+
destination = pendingAcks;
212211
// Record the latency rounded to the next closest integer.
213212
ackLatencyDistribution.record(
214213
Ints.saturatedCast(
215214
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
216215
break;
217216
case NACK:
218-
synchronized (pendingNacks) {
219-
pendingNacks.add(ackId);
220-
}
217+
destination = pendingNacks;
221218
break;
222219
default:
223220
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
224221
}
225-
setupPendingAcksAlarm();
226-
flowController.release(1, outstandingBytes);
227-
messagesWaiter.incrementPendingMessages(-1);
228-
processOutstandingBatches();
222+
onBoth(destination);
229223
}
230224
}
231225

@@ -254,8 +248,6 @@ void sendAckOperations(
254248
this.flowController = flowController;
255249
this.outstandingMessageBatches = outstandingMessageBatches;
256250
outstandingAckHandlers = new PriorityQueue<>();
257-
pendingAcks = new HashSet<>();
258-
pendingNacks = new HashSet<>();
259251
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
260252
this.ackLatencyDistribution = ackLatencyDistribution;
261253
alarmsLock = new ReentrantLock();
@@ -264,6 +256,25 @@ void sendAckOperations(
264256
this.clock = clock;
265257
}
266258

259+
public void start() {
260+
pendingAcksAlarm =
261+
systemExecutor.scheduleWithFixedDelay(
262+
new Runnable() {
263+
@Override
264+
public void run() {
265+
try {
266+
processOutstandingAckOperations();
267+
} catch (Throwable t) {
268+
// Catch everything so that one run failing doesn't prevent subsequent runs.
269+
logger.log(Level.WARNING, "failed to send acks/nacks", t);
270+
}
271+
}
272+
},
273+
PENDING_ACKS_SEND_DELAY.toMillis(),
274+
PENDING_ACKS_SEND_DELAY.toMillis(),
275+
TimeUnit.MILLISECONDS);
276+
}
277+
267278
public void stop() {
268279
messagesWaiter.waitNoMessages();
269280
alarmsLock.lock();
@@ -272,6 +283,10 @@ public void stop() {
272283
ackDeadlineExtensionAlarm.cancel(true);
273284
ackDeadlineExtensionAlarm = null;
274285
}
286+
if (pendingAcksAlarm != null) {
287+
pendingAcksAlarm.cancel(false);
288+
pendingAcksAlarm = null;
289+
}
275290
} finally {
276291
alarmsLock.unlock();
277292
}
@@ -328,6 +343,9 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
328343
return;
329344
}
330345
messagesWaiter.incrementPendingMessages(messages.size());
346+
for (ReceivedMessage message : messages) {
347+
pendingReceipts.add(message.getAckId());
348+
}
331349

332350
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
333351
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
@@ -421,32 +439,6 @@ public void run() {
421439
}
422440
}
423441

424-
private void setupPendingAcksAlarm() {
425-
alarmsLock.lock();
426-
try {
427-
if (pendingAcksAlarm == null) {
428-
pendingAcksAlarm =
429-
systemExecutor.schedule(
430-
new Runnable() {
431-
@Override
432-
public void run() {
433-
alarmsLock.lock();
434-
try {
435-
pendingAcksAlarm = null;
436-
} finally {
437-
alarmsLock.unlock();
438-
}
439-
processOutstandingAckOperations();
440-
}
441-
},
442-
PENDING_ACKS_SEND_DELAY.toMillis(),
443-
TimeUnit.MILLISECONDS);
444-
}
445-
} finally {
446-
alarmsLock.unlock();
447-
}
448-
}
449-
450442
private class AckDeadlineAlarm implements Runnable {
451443
@Override
452444
public void run() {
@@ -574,31 +566,26 @@ private void processOutstandingAckOperations(
574566
List<PendingModifyAckDeadline> ackDeadlineExtensions) {
575567
List<PendingModifyAckDeadline> modifyAckDeadlinesToSend =
576568
Lists.newArrayList(ackDeadlineExtensions);
577-
List<String> acksToSend = new ArrayList<>(pendingAcks.size());
578-
synchronized (pendingAcks) {
579-
if (!pendingAcks.isEmpty()) {
580-
try {
581-
acksToSend = new ArrayList<>(pendingAcks);
582-
logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
583-
} finally {
584-
pendingAcks.clear();
585-
}
586-
}
587-
}
569+
570+
List<String> acksToSend = new ArrayList<>();
571+
pendingAcks.drainTo(acksToSend);
572+
logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
573+
588574
PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0);
589-
synchronized (pendingNacks) {
590-
if (!pendingNacks.isEmpty()) {
591-
try {
592-
for (String ackId : pendingNacks) {
593-
nacksToSend.addAckId(ackId);
594-
}
595-
logger.log(Level.FINER, "Sending {0} nacks", pendingNacks.size());
596-
} finally {
597-
pendingNacks.clear();
598-
}
599-
modifyAckDeadlinesToSend.add(nacksToSend);
600-
}
575+
pendingNacks.drainTo(nacksToSend.ackIds);
576+
logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size());
577+
if (!nacksToSend.ackIds.isEmpty()) {
578+
modifyAckDeadlinesToSend.add(nacksToSend);
601579
}
580+
581+
PendingModifyAckDeadline receiptsToSend =
582+
new PendingModifyAckDeadline(getMessageDeadlineSeconds());
583+
pendingReceipts.drainTo(receiptsToSend.ackIds);
584+
logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size());
585+
if (!receiptsToSend.ackIds.isEmpty()) {
586+
modifyAckDeadlinesToSend.add(receiptsToSend);
587+
}
588+
602589
ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
603590
}
604591
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public PollingSubscriberConnection(
100100
@Override
101101
protected void doStart() {
102102
logger.config("Starting subscriber.");
103+
messageDispatcher.start();
103104
pullMessages(INITIAL_BACKOFF);
104105
notifyStarted();
105106
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public StreamingSubscriberConnection(
103103
@Override
104104
protected void doStart() {
105105
logger.config("Starting subscriber.");
106+
messageDispatcher.start();
106107
initialize();
107108
notifyStarted();
108109
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -295,46 +295,52 @@ public String waitForRegistedSubscription() throws InterruptedException {
295295

296296
public List<String> waitAndConsumeReceivedAcks(int expectedCount) throws InterruptedException {
297297
synchronized (acks) {
298-
while (acks.size() < expectedCount) {
299-
acks.wait();
300-
}
298+
waitAtLeast(acks, expectedCount);
301299
List<String> receivedAcksCopy = ImmutableList.copyOf(acks.subList(0, expectedCount));
302-
acks.removeAll(receivedAcksCopy);
300+
acks.subList(0, expectedCount).clear();
303301
return receivedAcksCopy;
304302
}
305303
}
306304

307305
public List<ModifyAckDeadline> waitAndConsumeModifyAckDeadlines(int expectedCount)
308306
throws InterruptedException {
309307
synchronized (modAckDeadlines) {
310-
while (modAckDeadlines.size() < expectedCount) {
311-
modAckDeadlines.wait();
312-
}
308+
waitAtLeast(modAckDeadlines, expectedCount);
313309
List<ModifyAckDeadline> modAckDeadlinesCopy =
314310
ImmutableList.copyOf(modAckDeadlines.subList(0, expectedCount));
315-
modAckDeadlines.removeAll(modAckDeadlinesCopy);
311+
modAckDeadlines.subList(0, expectedCount).clear();
316312
return modAckDeadlinesCopy;
317313
}
318314
}
319315

320316
public int waitForClosedStreams(int expectedCount) throws InterruptedException {
321317
synchronized (closedStreams) {
322-
while (closedStreams.size() < expectedCount) {
323-
closedStreams.wait();
324-
}
318+
waitAtLeast(closedStreams, expectedCount);
325319
return closedStreams.size();
326320
}
327321
}
328322

329323
public int waitForOpenedStreams(int expectedCount) throws InterruptedException {
330324
synchronized (openedStreams) {
331-
while (openedStreams.size() < expectedCount) {
332-
openedStreams.wait();
333-
}
325+
waitAtLeast(openedStreams, expectedCount);
334326
return openedStreams.size();
335327
}
336328
}
337329

330+
// wait until the collection has at least target number of elements.
331+
// caller MUST hold the monitor for the collection.
332+
private static void waitAtLeast(Collection<?> collection, int target)
333+
throws InterruptedException {
334+
long untilMillis = System.currentTimeMillis() + 20_000;
335+
while (collection.size() < target) {
336+
long now = System.currentTimeMillis();
337+
if (now >= untilMillis) {
338+
throw new IllegalStateException("timed out, last state: " + collection);
339+
}
340+
collection.wait(untilMillis - now);
341+
}
342+
}
343+
338344
public void waitForStreamAckDeadline(int expectedValue) throws InterruptedException {
339345
synchronized (messageAckDeadline) {
340346
while (messageAckDeadline.get() != expectedValue) {

0 commit comments

Comments
 (0)