Skip to content

Commit 7c04b6e

Browse files
authored
Merge pull request #1 from davidtorres/deadlock-issues
Deadlock issues
2 parents 32ee28f + 7d93670 commit 7c04b6e

4 files changed

Lines changed: 167 additions & 59 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 129 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import com.google.api.gax.core.FlowController;
2019
import com.google.api.gax.core.ApiClock;
20+
import com.google.api.gax.core.FlowController;
21+
import com.google.api.gax.core.FlowController.FlowControlException;
2122
import com.google.api.stats.Distribution;
23+
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
2224
import com.google.common.annotations.VisibleForTesting;
2325
import com.google.common.collect.Lists;
2426
import com.google.common.primitives.Ints;
@@ -29,11 +31,13 @@
2931
import com.google.pubsub.v1.ReceivedMessage;
3032
import java.util.ArrayList;
3133
import java.util.Collections;
34+
import java.util.Deque;
3235
import java.util.HashSet;
33-
import java.util.Iterator;
36+
import java.util.LinkedList;
3437
import java.util.List;
3538
import java.util.PriorityQueue;
3639
import java.util.Set;
40+
import java.util.concurrent.Executors;
3741
import java.util.concurrent.ScheduledExecutorService;
3842
import java.util.concurrent.ScheduledFuture;
3943
import java.util.concurrent.TimeUnit;
@@ -57,6 +61,9 @@ class MessageDispatcher {
5761
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100);
5862
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m
5963

64+
private static final ScheduledExecutorService alarmsExecutor =
65+
Executors.newScheduledThreadPool(2);
66+
6067
private final ScheduledExecutorService executor;
6168
private final ApiClock clock;
6269

@@ -78,6 +85,8 @@ class MessageDispatcher {
7885
private Instant nextAckDeadlineExtensionAlarmTime;
7986
private ScheduledFuture<?> pendingAcksAlarm;
8087

88+
private Deque<OutstandingMessagesBatch> outstandingMessageBatches;
89+
8190
// To keep track of number of seconds the receiver takes to process messages.
8291
private final Distribution ackLatencyDistribution;
8392

@@ -184,6 +193,7 @@ public void onFailure(Throwable t) {
184193
setupPendingAcksAlarm();
185194
flowController.release(1, outstandingBytes);
186195
messagesWaiter.incrementPendingMessages(-1);
196+
processOutstandingBatches();
187197
}
188198

189199
@Override
@@ -194,25 +204,23 @@ public void onSuccess(AckReply reply) {
194204
synchronized (pendingAcks) {
195205
pendingAcks.add(ackId);
196206
}
197-
setupPendingAcksAlarm();
198-
flowController.release(1, outstandingBytes);
199207
// Record the latency rounded to the next closest integer.
200208
ackLatencyDistribution.record(
201209
Ints.saturatedCast(
202210
(long) Math.ceil((clock.millisTime() - receivedTime.getMillis()) / 1000D)));
203-
messagesWaiter.incrementPendingMessages(-1);
204-
return;
211+
break;
205212
case NACK:
206213
synchronized (pendingNacks) {
207214
pendingNacks.add(ackId);
208215
}
209-
setupPendingAcksAlarm();
210-
flowController.release(1, outstandingBytes);
211-
messagesWaiter.incrementPendingMessages(-1);
212-
return;
216+
break;
213217
default:
214218
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
215219
}
220+
setupPendingAcksAlarm();
221+
flowController.release(1, outstandingBytes);
222+
messagesWaiter.incrementPendingMessages(-1);
223+
processOutstandingBatches();
216224
}
217225
}
218226

@@ -236,6 +244,7 @@ void sendAckOperations(
236244
this.receiver = receiver;
237245
this.ackProcessor = ackProcessor;
238246
this.flowController = flowController;
247+
outstandingMessageBatches = new LinkedList<>();
239248
outstandingAckHandlers = new PriorityQueue<>();
240249
pendingAcks = new HashSet<>();
241250
pendingNacks = new HashSet<>();
@@ -269,28 +278,113 @@ public int getMessageDeadlineSeconds() {
269278
return messageDeadlineSeconds;
270279
}
271280

272-
public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
273-
int receivedMessagesCount = responseMessages.size();
274-
if (receivedMessagesCount == 0) {
281+
static class OutstandingMessagesBatch {
282+
private final Deque<OutstandingMessage> messages;
283+
private final Runnable doneCallback;
284+
285+
static class OutstandingMessage {
286+
private final ReceivedMessage receivedMessage;
287+
private final AckHandler ackHandler;
288+
289+
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
290+
this.receivedMessage = receivedMessage;
291+
this.ackHandler = ackHandler;
292+
}
293+
294+
public ReceivedMessage receivedMessage() {
295+
return receivedMessage;
296+
}
297+
298+
public AckHandler ackHandler() {
299+
return ackHandler;
300+
}
301+
}
302+
303+
public OutstandingMessagesBatch(Runnable doneCallback) {
304+
this.messages = new LinkedList<>();
305+
this.doneCallback = doneCallback;
306+
}
307+
308+
public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
309+
this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
310+
}
311+
312+
public Deque<OutstandingMessage> messages() {
313+
return messages;
314+
}
315+
316+
public void done() {
317+
doneCallback.run();
318+
}
319+
}
320+
321+
public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {
322+
if (messages.isEmpty()) {
323+
doneCallback.run();
275324
return;
276325
}
277-
Instant now = new Instant(clock.millisTime());
278-
int totalByteCount = 0;
279-
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
280-
for (ReceivedMessage pubsubMessage : responseMessages) {
281-
int messageSize = pubsubMessage.getMessage().getSerializedSize();
282-
totalByteCount += messageSize;
283-
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
326+
messagesWaiter.incrementPendingMessages(messages.size());
327+
328+
OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback);
329+
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
330+
for (ReceivedMessage message : messages) {
331+
AckHandler ackHandler =
332+
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
333+
ackHandlers.add(ackHandler);
334+
outstandingBatch.addMessage(message, ackHandler);
335+
}
336+
337+
Instant expiration = new Instant(clock.millisTime()).plus(messageDeadlineSeconds * 1000);
338+
synchronized (outstandingAckHandlers) {
339+
outstandingAckHandlers.add(
340+
new ExtensionJob(
341+
new Instant(clock.millisTime()),
342+
expiration,
343+
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
344+
ackHandlers));
345+
}
346+
setupNextAckDeadlineExtensionAlarm(expiration);
347+
348+
synchronized (outstandingMessageBatches) {
349+
outstandingMessageBatches.add(outstandingBatch);
284350
}
285-
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
286-
logger.log(
287-
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});
288-
289-
messagesWaiter.incrementPendingMessages(responseMessages.size());
290-
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
291-
for (ReceivedMessage userMessage : responseMessages) {
292-
final PubsubMessage message = userMessage.getMessage();
293-
final AckHandler ackHandler = acksIterator.next();
351+
processOutstandingBatches();
352+
}
353+
354+
public void processOutstandingBatches() {
355+
while (true) {
356+
boolean batchDone = false;
357+
Runnable batchCallback = null;
358+
OutstandingMessage outstandingMessage;
359+
synchronized (outstandingMessageBatches) {
360+
OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek();
361+
if (nextBatch == null) {
362+
return;
363+
}
364+
outstandingMessage = nextBatch.messages.peek();
365+
if (outstandingMessage == null) {
366+
return;
367+
}
368+
try {
369+
// This is a non-blocking flow controller.
370+
flowController.reserve(
371+
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
372+
} catch (FlowController.MaxOutstandingElementCountReachedException
373+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
374+
return;
375+
} catch (FlowControlException unexpectedException) {
376+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
377+
}
378+
nextBatch.messages.poll(); // We got a hold to the message already.
379+
batchDone = nextBatch.messages.isEmpty();
380+
if (batchDone) {
381+
outstandingMessageBatches.poll();
382+
batchCallback = nextBatch.doneCallback;
383+
}
384+
}
385+
386+
final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
387+
final AckHandler ackHandler = outstandingMessage.ackHandler();
294388
final SettableFuture<AckReply> response = SettableFuture.create();
295389
final AckReplyConsumer consumer =
296390
new AckReplyConsumer() {
@@ -311,22 +405,9 @@ public void run() {
311405
}
312406
}
313407
});
314-
}
315-
316-
synchronized (outstandingAckHandlers) {
317-
outstandingAckHandlers.add(
318-
new ExtensionJob(
319-
new Instant(clock.millisTime()),
320-
expiration,
321-
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
322-
ackHandlers));
323-
}
324-
setupNextAckDeadlineExtensionAlarm(expiration);
325-
326-
try {
327-
flowController.reserve(receivedMessagesCount, totalByteCount);
328-
} catch (FlowController.FlowControlException unexpectedException) {
329-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
408+
if (batchDone) {
409+
batchCallback.run();
410+
}
330411
}
331412
}
332413

@@ -335,7 +416,7 @@ private void setupPendingAcksAlarm() {
335416
try {
336417
if (pendingAcksAlarm == null) {
337418
pendingAcksAlarm =
338-
executor.schedule(
419+
alarmsExecutor.schedule(
339420
new Runnable() {
340421
@Override
341422
public void run() {
@@ -400,7 +481,7 @@ public void run() {
400481
// drop it.
401482
continue;
402483
}
403-
484+
404485
// If a message has already been acked, remove it, nothing to do.
405486
for (int i = 0; i < job.ackHandlers.size(); ) {
406487
if (job.ackHandlers.get(i).acked.get()) {
@@ -464,7 +545,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
464545
nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime;
465546

466547
ackDeadlineExtensionAlarm =
467-
executor.schedule(
548+
alarmsExecutor.schedule(
468549
new AckDeadlineAlarm(),
469550
nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millisTime(),
470551
TimeUnit.MILLISECONDS);

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.TimeUnit;
4343
import java.util.logging.Level;
4444
import java.util.logging.Logger;
45+
import javax.annotation.Nullable;
4546
import org.joda.time.Duration;
4647

4748
/**
@@ -61,6 +62,7 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac
6162
private final ScheduledExecutorService executor;
6263
private final SubscriberFutureStub stub;
6364
private final MessageDispatcher messageDispatcher;
65+
private final int maxDesiredPulledMessages;
6466

6567
public PollingSubscriberConnection(
6668
String subscription,
@@ -71,6 +73,7 @@ public PollingSubscriberConnection(
7173
Distribution ackLatencyDistribution,
7274
Channel channel,
7375
FlowController flowController,
76+
@Nullable Integer maxDesiredPulledMessages,
7477
ScheduledExecutorService executor,
7578
ApiClock clock) {
7679
this.subscription = subscription;
@@ -89,6 +92,8 @@ public PollingSubscriberConnection(
8992
executor,
9093
clock);
9194
messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
95+
this.maxDesiredPulledMessages =
96+
maxDesiredPulledMessages != null ? maxDesiredPulledMessages : DEFAULT_MAX_MESSAGES;
9297
}
9398

9499
@Override
@@ -117,7 +122,8 @@ public void onSuccess(Subscription result) {
117122
public void onFailure(Throwable cause) {
118123
notifyFailed(cause);
119124
}
120-
});
125+
},
126+
executor);
121127
}
122128

123129
@Override
@@ -132,7 +138,7 @@ private void pullMessages(final Duration backoff) {
132138
.pull(
133139
PullRequest.newBuilder()
134140
.setSubscription(subscription)
135-
.setMaxMessages(DEFAULT_MAX_MESSAGES)
141+
.setMaxMessages(maxDesiredPulledMessages)
136142
.setReturnImmediately(true)
137143
.build());
138144

@@ -141,7 +147,6 @@ private void pullMessages(final Duration backoff) {
141147
new FutureCallback<PullResponse>() {
142148
@Override
143149
public void onSuccess(PullResponse pullResponse) {
144-
messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList());
145150
if (pullResponse.getReceivedMessagesCount() == 0) {
146151
// No messages in response, possibly caught up in backlog, we backoff to avoid
147152
// slamming the server.
@@ -160,7 +165,14 @@ public void run() {
160165
TimeUnit.MILLISECONDS);
161166
return;
162167
}
163-
pullMessages(INITIAL_BACKOFF);
168+
messageDispatcher.processReceivedMessages(
169+
pullResponse.getReceivedMessagesList(),
170+
new Runnable() {
171+
@Override
172+
public void run() {
173+
pullMessages(INITIAL_BACKOFF);
174+
}
175+
});
164176
}
165177

166178
@Override
@@ -190,7 +202,8 @@ public void run() {
190202
notifyFailed(cause);
191203
}
192204
}
193-
});
205+
},
206+
executor);
194207
}
195208

196209
private boolean isAlive() {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,17 @@ public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestOb
125125

126126
@Override
127127
public void onNext(StreamingPullResponse response) {
128-
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
129-
// Only if not shutdown we will request one more batches of messages to be delivered.
130-
if (isAlive()) {
131-
requestObserver.request(1);
132-
}
128+
messageDispatcher.processReceivedMessages(
129+
response.getReceivedMessagesList(),
130+
new Runnable() {
131+
@Override
132+
public void run() {
133+
// Only if not shutdown we will request one more batches of messages to be delivered.
134+
if (isAlive()) {
135+
requestObserver.request(1);
136+
}
137+
}
138+
});
133139
}
134140

135141
@Override

0 commit comments

Comments
 (0)