Skip to content

Commit b555c97

Browse files
committed
Decouple the processing of new received messages from the dispatching to
user code - Separated the handling of batched received messages from the per-message dispatching of message to the user code. In order to accomplish this I had to keep the batch in memory while the processing of each message will draw from the in-memory batch until completely depleted. - Drawing flow controller permits on a per message basis (used to try to draw permits for the whole batch potentially deadlocking the whole subscriber), this addresses the deadlock condition raised here #1868 - No longer using blocking flow controller, instead pausing and resuming pulls/streamed-messages based on the flow controller feedback and when new permits become available. - A separate executor for alarms (2 threads in it has showed up to scale pretty well with many subscriber, given our ack operations are pretty lightweight) - Setting the maximum of messages to pull per request based on the number requested by the user in the flow controller (if any), this in a best effort addresses #1868 Fixes #1868, fixes #1865 and fixes #1855
1 parent 3d3a166 commit b555c97

4 files changed

Lines changed: 168 additions & 59 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 130 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import com.google.api.gax.core.FlowController;
2019
import com.google.api.gax.core.ApiClock;
20+
import com.google.api.gax.core.FlowController;
21+
import com.google.api.gax.core.FlowController.FlowControlException;
2122
import com.google.api.stats.Distribution;
23+
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
2224
import com.google.common.annotations.VisibleForTesting;
2325
import com.google.common.collect.Lists;
2426
import com.google.common.primitives.Ints;
@@ -29,11 +31,13 @@
2931
import com.google.pubsub.v1.ReceivedMessage;
3032
import java.util.ArrayList;
3133
import java.util.Collections;
34+
import java.util.Deque;
3235
import java.util.HashSet;
33-
import java.util.Iterator;
36+
import java.util.LinkedList;
3437
import java.util.List;
3538
import java.util.PriorityQueue;
3639
import java.util.Set;
40+
import java.util.concurrent.Executors;
3741
import java.util.concurrent.ScheduledExecutorService;
3842
import java.util.concurrent.ScheduledFuture;
3943
import java.util.concurrent.TimeUnit;
@@ -56,6 +60,9 @@ class MessageDispatcher {
5660
private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2;
5761
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100);
5862
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m
63+
64+
private static final ScheduledExecutorService alarmsExecutor =
65+
Executors.newScheduledThreadPool(2);
5966

6067
private final ScheduledExecutorService executor;
6168
private final ApiClock clock;
@@ -184,6 +191,7 @@ public void onFailure(Throwable t) {
184191
setupPendingAcksAlarm();
185192
flowController.release(1, outstandingBytes);
186193
messagesWaiter.incrementPendingMessages(-1);
194+
processOutstandingBatches();
187195
}
188196

189197
@Override
@@ -194,25 +202,23 @@ public void onSuccess(AckReply reply) {
194202
synchronized (pendingAcks) {
195203
pendingAcks.add(ackId);
196204
}
197-
setupPendingAcksAlarm();
198-
flowController.release(1, outstandingBytes);
199205
// Record the latency rounded to the next closest integer.
200206
ackLatencyDistribution.record(
201207
Ints.saturatedCast(
202208
(long) Math.ceil((clock.millisTime() - receivedTime.getMillis()) / 1000D)));
203-
messagesWaiter.incrementPendingMessages(-1);
204-
return;
209+
break;
205210
case NACK:
206211
synchronized (pendingNacks) {
207212
pendingNacks.add(ackId);
208213
}
209-
setupPendingAcksAlarm();
210-
flowController.release(1, outstandingBytes);
211-
messagesWaiter.incrementPendingMessages(-1);
212-
return;
214+
break;
213215
default:
214216
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
215217
}
218+
setupPendingAcksAlarm();
219+
flowController.release(1, outstandingBytes);
220+
messagesWaiter.incrementPendingMessages(-1);
221+
processOutstandingBatches();
216222
}
217223
}
218224

@@ -269,28 +275,117 @@ public int getMessageDeadlineSeconds() {
269275
return messageDeadlineSeconds;
270276
}
271277

272-
public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
273-
int receivedMessagesCount = responseMessages.size();
274-
if (receivedMessagesCount == 0) {
278+
static class OutstandingMessagesBatch {
279+
static class OutstandingMessage {
280+
private final com.google.pubsub.v1.ReceivedMessage receivedMessage;
281+
private final AckHandler ackHandler;
282+
283+
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
284+
this.receivedMessage = receivedMessage;
285+
this.ackHandler = ackHandler;
286+
}
287+
288+
public com.google.pubsub.v1.ReceivedMessage receivedMessage() {
289+
return receivedMessage;
290+
}
291+
292+
public AckHandler ackHandler() {
293+
return ackHandler;
294+
}
295+
}
296+
297+
private final Deque<OutstandingMessage> messages;
298+
private final Runnable doneCallback;
299+
300+
public OutstandingMessagesBatch(Runnable doneCallback) {
301+
this.messages = new LinkedList<>();
302+
this.doneCallback = doneCallback;
303+
}
304+
305+
public void addMessage(
306+
com.google.pubsub.v1.ReceivedMessage receivedMessage, AckHandler ackHandler) {
307+
this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
308+
}
309+
310+
public Deque<OutstandingMessage> messages() {
311+
return messages;
312+
}
313+
314+
public void done() {
315+
doneCallback.run();
316+
}
317+
}
318+
319+
Deque<OutstandingMessagesBatch> outstandingMessageBatches = new LinkedList<>();
320+
321+
public void processReceivedMessages(
322+
List<com.google.pubsub.v1.ReceivedMessage> messages, Runnable doneCallback) {
323+
if (messages.size() == 0) {
324+
doneCallback.run();
275325
return;
276326
}
277-
Instant now = new Instant(clock.millisTime());
278-
int totalByteCount = 0;
279-
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
280-
for (ReceivedMessage pubsubMessage : responseMessages) {
281-
int messageSize = pubsubMessage.getMessage().getSerializedSize();
282-
totalByteCount += messageSize;
283-
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
327+
messagesWaiter.incrementPendingMessages(messages.size());
328+
329+
OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback);
330+
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
331+
for (ReceivedMessage message : messages) {
332+
AckHandler ackHandler =
333+
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
334+
ackHandlers.add(ackHandler);
335+
outstandingBatch.addMessage(message, ackHandler);
284336
}
285-
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
286-
logger.log(
287-
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});
288-
289-
messagesWaiter.incrementPendingMessages(responseMessages.size());
290-
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
291-
for (ReceivedMessage userMessage : responseMessages) {
292-
final PubsubMessage message = userMessage.getMessage();
293-
final AckHandler ackHandler = acksIterator.next();
337+
338+
Instant expiration = new Instant(clock.millisTime()).plus(messageDeadlineSeconds * 1000);
339+
synchronized (outstandingAckHandlers) {
340+
outstandingAckHandlers.add(
341+
new ExtensionJob(
342+
new Instant(clock.millisTime()),
343+
expiration,
344+
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
345+
ackHandlers));
346+
}
347+
setupNextAckDeadlineExtensionAlarm(expiration);
348+
349+
synchronized (outstandingMessageBatches) {
350+
outstandingMessageBatches.add(outstandingBatch);
351+
}
352+
processOutstandingBatches();
353+
}
354+
355+
public void processOutstandingBatches() {
356+
while (true) {
357+
boolean batchDone = false;
358+
Runnable batchCallback = null;
359+
OutstandingMessage outstandingMessage;
360+
synchronized (outstandingMessageBatches) {
361+
OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek();
362+
if (nextBatch == null) {
363+
return;
364+
}
365+
outstandingMessage = nextBatch.messages.peek();
366+
if (outstandingMessage == null) {
367+
return;
368+
}
369+
try {
370+
// This is a non-blocking flow controller.
371+
flowController.reserve(
372+
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
373+
} catch (FlowController.MaxOutstandingElementCountReachedException
374+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
375+
return;
376+
} catch (FlowControlException unexpectedException) {
377+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
378+
}
379+
nextBatch.messages.poll(); // We got a hold to the message already.
380+
batchDone = nextBatch.messages.isEmpty();
381+
if (batchDone) {
382+
outstandingMessageBatches.poll();
383+
batchCallback = nextBatch.doneCallback;
384+
}
385+
}
386+
387+
final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
388+
final AckHandler ackHandler = outstandingMessage.ackHandler();
294389
final SettableFuture<AckReply> response = SettableFuture.create();
295390
final AckReplyConsumer consumer =
296391
new AckReplyConsumer() {
@@ -311,22 +406,9 @@ public void run() {
311406
}
312407
}
313408
});
314-
}
315-
316-
synchronized (outstandingAckHandlers) {
317-
outstandingAckHandlers.add(
318-
new ExtensionJob(
319-
new Instant(clock.millisTime()),
320-
expiration,
321-
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
322-
ackHandlers));
323-
}
324-
setupNextAckDeadlineExtensionAlarm(expiration);
325-
326-
try {
327-
flowController.reserve(receivedMessagesCount, totalByteCount);
328-
} catch (FlowController.FlowControlException unexpectedException) {
329-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
409+
if (batchDone) {
410+
batchCallback.run();
411+
}
330412
}
331413
}
332414

@@ -335,7 +417,7 @@ private void setupPendingAcksAlarm() {
335417
try {
336418
if (pendingAcksAlarm == null) {
337419
pendingAcksAlarm =
338-
executor.schedule(
420+
alarmsExecutor.schedule(
339421
new Runnable() {
340422
@Override
341423
public void run() {
@@ -400,7 +482,7 @@ public void run() {
400482
// drop it.
401483
continue;
402484
}
403-
485+
404486
// If a message has already been acked, remove it, nothing to do.
405487
for (int i = 0; i < job.ackHandlers.size(); ) {
406488
if (job.ackHandlers.get(i).acked.get()) {
@@ -464,7 +546,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
464546
nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime;
465547

466548
ackDeadlineExtensionAlarm =
467-
executor.schedule(
549+
alarmsExecutor.schedule(
468550
new AckDeadlineAlarm(),
469551
nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millisTime(),
470552
TimeUnit.MILLISECONDS);

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.TimeUnit;
4343
import java.util.logging.Level;
4444
import java.util.logging.Logger;
45+
import javax.annotation.Nullable;
4546
import org.joda.time.Duration;
4647

4748
/**
@@ -61,6 +62,7 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac
6162
private final ScheduledExecutorService executor;
6263
private final SubscriberFutureStub stub;
6364
private final MessageDispatcher messageDispatcher;
65+
private final int maxDesiredPulledMessages;
6466

6567
public PollingSubscriberConnection(
6668
String subscription,
@@ -71,6 +73,7 @@ public PollingSubscriberConnection(
7173
Distribution ackLatencyDistribution,
7274
Channel channel,
7375
FlowController flowController,
76+
@Nullable Integer maxDesiredPulledMessages,
7477
ScheduledExecutorService executor,
7578
ApiClock clock) {
7679
this.subscription = subscription;
@@ -89,6 +92,8 @@ public PollingSubscriberConnection(
8992
executor,
9093
clock);
9194
messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
95+
this.maxDesiredPulledMessages =
96+
maxDesiredPulledMessages != null ? maxDesiredPulledMessages : DEFAULT_MAX_MESSAGES;
9297
}
9398

9499
@Override
@@ -117,7 +122,8 @@ public void onSuccess(Subscription result) {
117122
public void onFailure(Throwable cause) {
118123
notifyFailed(cause);
119124
}
120-
});
125+
},
126+
executor);
121127
}
122128

123129
@Override
@@ -132,7 +138,7 @@ private void pullMessages(final Duration backoff) {
132138
.pull(
133139
PullRequest.newBuilder()
134140
.setSubscription(subscription)
135-
.setMaxMessages(DEFAULT_MAX_MESSAGES)
141+
.setMaxMessages(maxDesiredPulledMessages)
136142
.setReturnImmediately(true)
137143
.build());
138144

@@ -141,7 +147,6 @@ private void pullMessages(final Duration backoff) {
141147
new FutureCallback<PullResponse>() {
142148
@Override
143149
public void onSuccess(PullResponse pullResponse) {
144-
messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList());
145150
if (pullResponse.getReceivedMessagesCount() == 0) {
146151
// No messages in response, possibly caught up in backlog, we backoff to avoid
147152
// slamming the server.
@@ -160,7 +165,14 @@ public void run() {
160165
TimeUnit.MILLISECONDS);
161166
return;
162167
}
163-
pullMessages(INITIAL_BACKOFF);
168+
messageDispatcher.processReceivedMessages(
169+
pullResponse.getReceivedMessagesList(),
170+
new Runnable() {
171+
@Override
172+
public void run() {
173+
pullMessages(INITIAL_BACKOFF);
174+
}
175+
});
164176
}
165177

166178
@Override
@@ -190,7 +202,8 @@ public void run() {
190202
notifyFailed(cause);
191203
}
192204
}
193-
});
205+
},
206+
executor);
194207
}
195208

196209
private boolean isAlive() {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,17 @@ public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestOb
125125

126126
@Override
127127
public void onNext(StreamingPullResponse response) {
128-
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
129-
// Only if not shutdown we will request one more batches of messages to be delivered.
130-
if (isAlive()) {
131-
requestObserver.request(1);
132-
}
128+
messageDispatcher.processReceivedMessages(
129+
response.getReceivedMessagesList(),
130+
new Runnable() {
131+
@Override
132+
public void run() {
133+
// Only if not shutdown we will request one more batches of messages to be delivered.
134+
if (isAlive()) {
135+
requestObserver.request(1);
136+
}
137+
}
138+
});
133139
}
134140

135141
@Override

0 commit comments

Comments
 (0)