Skip to content

Commit 4633771

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 20789 b: refs/heads/autosynth-iamcredentials c: 5e0acc0 h: refs/heads/master i: 20787: 2a9467e
1 parent 654e98d commit 4633771

2 files changed

Lines changed: 25 additions & 35 deletions

File tree

  • branches/autosynth-iamcredentials/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ refs/heads/pubsub-ordering-keys: 858d4e986a0ba48e08f00d42f51cbdecb175f5d6
162162
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
163163
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032
164164
refs/tags/v0.77.0: 28a85a77883ccf5d48f297fd0ef3b3dca6ce01f0
165-
refs/heads/autosynth-iamcredentials: 4517817a110e3d5230f63ccff810c0644cfab78a
165+
refs/heads/autosynth-iamcredentials: 5e0acc0913b8608dd896a9e8da1241b8bc251a1d
166166
refs/heads/release-google-cloud-java-v0.78.0: fae5e980779cf0173a152636b278015b9f60ee55
167167
refs/heads/release-google-cloud-java-v0.81.0: 0352cd0dd11f4fd1fbd1ff16e7a96beaccc7b475
168168
refs/heads/release-google-cloud-java-v0.81.1-SNAPSHOT: 5a74ccb1f12506a3b67b65521881298fde20bd6f

branches/autosynth-iamcredentials/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -371,41 +371,31 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
371371
processOutstandingBatches();
372372
}
373373

374-
public void processOutstandingBatches() {
375-
while (true) {
376-
boolean batchDone = false;
377-
Runnable batchCallback = null;
378-
OutstandingMessage outstandingMessage;
379-
synchronized (outstandingMessageBatches) {
380-
OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek();
381-
if (nextBatch == null) {
382-
return;
383-
}
384-
outstandingMessage = nextBatch.messages.peek();
385-
if (outstandingMessage == null) {
386-
return;
387-
}
388-
try {
389-
// This is a non-blocking flow controller.
390-
flowController.reserve(
391-
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
392-
} catch (FlowController.MaxOutstandingElementCountReachedException
393-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
394-
return;
395-
} catch (FlowControlException unexpectedException) {
396-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397-
}
398-
nextBatch.messages.poll(); // We got a hold to the message already.
399-
batchDone = nextBatch.messages.isEmpty();
400-
if (batchDone) {
401-
outstandingMessageBatches.poll();
402-
batchCallback = nextBatch.doneCallback;
374+
private void processOutstandingBatches() {
375+
synchronized (outstandingMessageBatches) {
376+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377+
nextBatch != null;
378+
nextBatch = outstandingMessageBatches.poll()) {
379+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380+
nextMessage != null;
381+
nextMessage = nextBatch.messages.poll()) {
382+
try {
383+
// This is a non-blocking flow controller.
384+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385+
} catch (FlowController.MaxOutstandingElementCountReachedException
386+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387+
// Unwind previous changes in the batches outstanding.
388+
nextBatch.messages.addFirst(nextMessage);
389+
outstandingMessageBatches.addFirst(nextBatch);
390+
return;
391+
} catch (FlowControlException unexpectedException) {
392+
throw new IllegalStateException(
393+
"Flow control unexpected exception", unexpectedException);
394+
}
395+
processOutstandingMessage(
396+
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
403397
}
404-
}
405-
processOutstandingMessage(
406-
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
407-
if (batchDone) {
408-
batchCallback.run();
398+
nextBatch.doneCallback.run();
409399
}
410400
}
411401
}

0 commit comments

Comments
 (0)