Skip to content

Commit 997caa8

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 20195 b: refs/heads/autosynth-redis c: 5e0acc0 h: refs/heads/master i: 20193: 61e0d14 20191: 3ba49e9
1 parent 5293c9a commit 997caa8

2 files changed

Lines changed: 25 additions & 35 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ refs/heads/autosynth-iot: f03bdd338a9056ca3b7ea6d9ca901649ba9aab78
136136
refs/heads/autosynth-kms: 2828edfe3d2c53dd6e71912eae8a53c87bf40c87
137137
refs/heads/autosynth-language: c3d990dd34d81e7e935041e7147fb9dd27f8a557
138138
refs/heads/autosynth-os-login: 092fdbed6d5317948f92b708e9f50dedd89fc666
139-
refs/heads/autosynth-redis: 4517817a110e3d5230f63ccff810c0644cfab78a
139+
refs/heads/autosynth-redis: 5e0acc0913b8608dd896a9e8da1241b8bc251a1d
140140
refs/heads/autosynth-scheduler: d97f8743ba965c7d5e492c8dc1f51d023104e260
141141
refs/heads/autosynth-spanner: 9bff86d057df31e04c76d72865e8e073ac5794fb
142142
refs/heads/autosynth-speech: 75d6c62a9d07d3a3642980502a25d07fbde0f232

branches/autosynth-redis/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -371,41 +371,31 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
371371
processOutstandingBatches();
372372
}
373373

374-
public void processOutstandingBatches() {
375-
while (true) {
376-
boolean batchDone = false;
377-
Runnable batchCallback = null;
378-
OutstandingMessage outstandingMessage;
379-
synchronized (outstandingMessageBatches) {
380-
OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek();
381-
if (nextBatch == null) {
382-
return;
383-
}
384-
outstandingMessage = nextBatch.messages.peek();
385-
if (outstandingMessage == null) {
386-
return;
387-
}
388-
try {
389-
// This is a non-blocking flow controller.
390-
flowController.reserve(
391-
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
392-
} catch (FlowController.MaxOutstandingElementCountReachedException
393-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
394-
return;
395-
} catch (FlowControlException unexpectedException) {
396-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397-
}
398-
nextBatch.messages.poll(); // We got a hold to the message already.
399-
batchDone = nextBatch.messages.isEmpty();
400-
if (batchDone) {
401-
outstandingMessageBatches.poll();
402-
batchCallback = nextBatch.doneCallback;
374+
private void processOutstandingBatches() {
375+
synchronized (outstandingMessageBatches) {
376+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377+
nextBatch != null;
378+
nextBatch = outstandingMessageBatches.poll()) {
379+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380+
nextMessage != null;
381+
nextMessage = nextBatch.messages.poll()) {
382+
try {
383+
// This is a non-blocking flow controller.
384+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385+
} catch (FlowController.MaxOutstandingElementCountReachedException
386+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387+
// Unwind previous changes in the batches outstanding.
388+
nextBatch.messages.addFirst(nextMessage);
389+
outstandingMessageBatches.addFirst(nextBatch);
390+
return;
391+
} catch (FlowControlException unexpectedException) {
392+
throw new IllegalStateException(
393+
"Flow control unexpected exception", unexpectedException);
394+
}
395+
processOutstandingMessage(
396+
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
403397
}
404-
}
405-
processOutstandingMessage(
406-
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
407-
if (batchDone) {
408-
batchCallback.run();
398+
nextBatch.doneCallback.run();
409399
}
410400
}
411401
}

0 commit comments

Comments
 (0)