Skip to content

Commit fd594d0

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 23243 b: refs/heads/autosynth-datastore c: 5e0acc0 h: refs/heads/master i: 23241: 2e6ccfc 23239: a69aacb
1 parent 6a3cd36 commit fd594d0

2 files changed

Lines changed: 25 additions & 35 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ refs/heads/autosynth-bigquerystorage: d2c53da3b012e38c662e4df0738042435f19365f
127127
refs/heads/autosynth-bigtable: 9e5429f45cf9face9fed585d0233534993e36b58
128128
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca
129129
refs/heads/autosynth-containeranalysis: af5b804492292b43372c9fe00386696136ccae89
130-
refs/heads/autosynth-datastore: 4517817a110e3d5230f63ccff810c0644cfab78a
130+
refs/heads/autosynth-datastore: 5e0acc0913b8608dd896a9e8da1241b8bc251a1d
131131
refs/heads/autosynth-dialogflow: 7dbc2c1ea714328ccfa4f33645045f017ff080e7
132132
refs/heads/autosynth-errorreporting: 1101a04e8be074802c35332d5fcf8297f61cae32
133133
refs/heads/autosynth-firestore: d1a44f9acc302750e37b008ecb9c1aa535cc94df

branches/autosynth-datastore/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -371,41 +371,31 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
371371
processOutstandingBatches();
372372
}
373373

374-
public void processOutstandingBatches() {
375-
while (true) {
376-
boolean batchDone = false;
377-
Runnable batchCallback = null;
378-
OutstandingMessage outstandingMessage;
379-
synchronized (outstandingMessageBatches) {
380-
OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek();
381-
if (nextBatch == null) {
382-
return;
383-
}
384-
outstandingMessage = nextBatch.messages.peek();
385-
if (outstandingMessage == null) {
386-
return;
387-
}
388-
try {
389-
// This is a non-blocking flow controller.
390-
flowController.reserve(
391-
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
392-
} catch (FlowController.MaxOutstandingElementCountReachedException
393-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
394-
return;
395-
} catch (FlowControlException unexpectedException) {
396-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397-
}
398-
nextBatch.messages.poll(); // We got a hold to the message already.
399-
batchDone = nextBatch.messages.isEmpty();
400-
if (batchDone) {
401-
outstandingMessageBatches.poll();
402-
batchCallback = nextBatch.doneCallback;
374+
private void processOutstandingBatches() {
375+
synchronized (outstandingMessageBatches) {
376+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377+
nextBatch != null;
378+
nextBatch = outstandingMessageBatches.poll()) {
379+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380+
nextMessage != null;
381+
nextMessage = nextBatch.messages.poll()) {
382+
try {
383+
// This is a non-blocking flow controller.
384+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385+
} catch (FlowController.MaxOutstandingElementCountReachedException
386+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387+
// Unwind previous changes in the batches outstanding.
388+
nextBatch.messages.addFirst(nextMessage);
389+
outstandingMessageBatches.addFirst(nextBatch);
390+
return;
391+
} catch (FlowControlException unexpectedException) {
392+
throw new IllegalStateException(
393+
"Flow control unexpected exception", unexpectedException);
394+
}
395+
processOutstandingMessage(
396+
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
403397
}
404-
}
405-
processOutstandingMessage(
406-
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
407-
if (batchDone) {
408-
batchCallback.run();
398+
nextBatch.doneCallback.run();
409399
}
410400
}
411401
}

0 commit comments

Comments
 (0)