Skip to content

Commit 71aa4c7

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 19285 b: refs/heads/autosynth-bigquerystorage c: 5e0acc0 h: refs/heads/master i: 19283: be8c26d
1 parent b8cadc8 commit 71aa4c7

2 files changed

Lines changed: 25 additions & 35 deletions

File tree

  • branches/autosynth-bigquerystorage/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
124124
refs/heads/autosynth-asset: cd8251de8c40e239ad24dcf9ed93ea2708a3eed5
125125
refs/heads/autosynth-automl: 2a8b018cf05811fd472e5d1053e67a12ceec0b64
126126
refs/heads/autosynth-bigquerydatatransfer: 564833a85642d4194adc025f021e10e723154246
127-
refs/heads/autosynth-bigquerystorage: 4517817a110e3d5230f63ccff810c0644cfab78a
127+
refs/heads/autosynth-bigquerystorage: 5e0acc0913b8608dd896a9e8da1241b8bc251a1d
128128
refs/heads/autosynth-bigtable: 2fbcb15847e0e89e79d6dc07420e28d7dfcea894
129129
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca
130130
refs/heads/autosynth-containeranalysis: 039ca5b8db725c76c16a965ff26b2774322b8ef8

branches/autosynth-bigquerystorage/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -371,41 +371,31 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
371371
processOutstandingBatches();
372372
}
373373

374-
public void processOutstandingBatches() {
375-
while (true) {
376-
boolean batchDone = false;
377-
Runnable batchCallback = null;
378-
OutstandingMessage outstandingMessage;
379-
synchronized (outstandingMessageBatches) {
380-
OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek();
381-
if (nextBatch == null) {
382-
return;
383-
}
384-
outstandingMessage = nextBatch.messages.peek();
385-
if (outstandingMessage == null) {
386-
return;
387-
}
388-
try {
389-
// This is a non-blocking flow controller.
390-
flowController.reserve(
391-
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
392-
} catch (FlowController.MaxOutstandingElementCountReachedException
393-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
394-
return;
395-
} catch (FlowControlException unexpectedException) {
396-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397-
}
398-
nextBatch.messages.poll(); // We got a hold to the message already.
399-
batchDone = nextBatch.messages.isEmpty();
400-
if (batchDone) {
401-
outstandingMessageBatches.poll();
402-
batchCallback = nextBatch.doneCallback;
374+
private void processOutstandingBatches() {
375+
synchronized (outstandingMessageBatches) {
376+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377+
nextBatch != null;
378+
nextBatch = outstandingMessageBatches.poll()) {
379+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380+
nextMessage != null;
381+
nextMessage = nextBatch.messages.poll()) {
382+
try {
383+
// This is a non-blocking flow controller.
384+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385+
} catch (FlowController.MaxOutstandingElementCountReachedException
386+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387+
// Unwind previous changes in the batches outstanding.
388+
nextBatch.messages.addFirst(nextMessage);
389+
outstandingMessageBatches.addFirst(nextBatch);
390+
return;
391+
} catch (FlowControlException unexpectedException) {
392+
throw new IllegalStateException(
393+
"Flow control unexpected exception", unexpectedException);
394+
}
395+
processOutstandingMessage(
396+
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
403397
}
404-
}
405-
processOutstandingMessage(
406-
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
407-
if (batchDone) {
408-
batchCallback.run();
398+
nextBatch.doneCallback.run();
409399
}
410400
}
411401
}

0 commit comments

Comments
 (0)