Skip to content

Commit 3e4051d

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 19503 b: refs/heads/autosynth-dialogflow c: 5e0acc0 h: refs/heads/master i: 19501: 21c18cc 19499: caec36e 19495: da43354 19487: 947f9ed
1 parent 60c91c0 commit 3e4051d

2 files changed

Lines changed: 25 additions & 35 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ refs/heads/autosynth-bigtable: 45891a7178142c84a16d6a0b792f8ecb950a3159
129129
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca
130130
refs/heads/autosynth-containeranalysis: 0f377286f60df2b6fd37c183bf170c68dc014cfb
131131
refs/heads/autosynth-datastore: 9acd400b484d6691a080c9152a331d88d24fefc1
132-
refs/heads/autosynth-dialogflow: 4517817a110e3d5230f63ccff810c0644cfab78a
132+
refs/heads/autosynth-dialogflow: 5e0acc0913b8608dd896a9e8da1241b8bc251a1d
133133
refs/heads/autosynth-errorreporting: 9891e73a56af7c097829ca7a521b0e862ba6af30
134134
refs/heads/autosynth-firestore: ddb29f8ee445a938fc40be8227dea87b3f508ab3
135135
refs/heads/autosynth-iot: f16fc4ba85bbb37eacecb64a2a7336f685813f6c

branches/autosynth-dialogflow/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -371,41 +371,31 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
371371
processOutstandingBatches();
372372
}
373373

374-
public void processOutstandingBatches() {
375-
while (true) {
376-
boolean batchDone = false;
377-
Runnable batchCallback = null;
378-
OutstandingMessage outstandingMessage;
379-
synchronized (outstandingMessageBatches) {
380-
OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek();
381-
if (nextBatch == null) {
382-
return;
383-
}
384-
outstandingMessage = nextBatch.messages.peek();
385-
if (outstandingMessage == null) {
386-
return;
387-
}
388-
try {
389-
// This is a non-blocking flow controller.
390-
flowController.reserve(
391-
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
392-
} catch (FlowController.MaxOutstandingElementCountReachedException
393-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
394-
return;
395-
} catch (FlowControlException unexpectedException) {
396-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397-
}
398-
nextBatch.messages.poll(); // We got a hold to the message already.
399-
batchDone = nextBatch.messages.isEmpty();
400-
if (batchDone) {
401-
outstandingMessageBatches.poll();
402-
batchCallback = nextBatch.doneCallback;
374+
private void processOutstandingBatches() {
375+
synchronized (outstandingMessageBatches) {
376+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377+
nextBatch != null;
378+
nextBatch = outstandingMessageBatches.poll()) {
379+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380+
nextMessage != null;
381+
nextMessage = nextBatch.messages.poll()) {
382+
try {
383+
// This is a non-blocking flow controller.
384+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385+
} catch (FlowController.MaxOutstandingElementCountReachedException
386+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387+
// Unwind previous changes in the batches outstanding.
388+
nextBatch.messages.addFirst(nextMessage);
389+
outstandingMessageBatches.addFirst(nextBatch);
390+
return;
391+
} catch (FlowControlException unexpectedException) {
392+
throw new IllegalStateException(
393+
"Flow control unexpected exception", unexpectedException);
394+
}
395+
processOutstandingMessage(
396+
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
403397
}
404-
}
405-
processOutstandingMessage(
406-
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
407-
if (batchDone) {
408-
batchCallback.run();
398+
nextBatch.doneCallback.run();
409399
}
410400
}
411401
}

0 commit comments

Comments
 (0)