Skip to content

Commit a905617

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 20791 b: refs/heads/autosynth-iamcredentials c: 7b9807d h: refs/heads/master i: 20789: 4633771 20787: 2a9467e 20783: 950ca4b
1 parent 2ba91a0 commit a905617

2 files changed

Lines changed: 21 additions & 27 deletions

File tree

  • branches/autosynth-iamcredentials/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ refs/heads/pubsub-ordering-keys: 858d4e986a0ba48e08f00d42f51cbdecb175f5d6
162162
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
163163
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032
164164
refs/tags/v0.77.0: 28a85a77883ccf5d48f297fd0ef3b3dca6ce01f0
165-
refs/heads/autosynth-iamcredentials: 7971d0b4e04147c77f1a118cbc5408f6eb90d500
165+
refs/heads/autosynth-iamcredentials: 7b9807d5d0a400c757b8905fee768be4c85eba25
166166
refs/heads/release-google-cloud-java-v0.78.0: fae5e980779cf0173a152636b278015b9f60ee55
167167
refs/heads/release-google-cloud-java-v0.81.0: 0352cd0dd11f4fd1fbd1ff16e7a96beaccc7b475
168168
refs/heads/release-google-cloud-java-v0.81.1-SNAPSHOT: 5a74ccb1f12506a3b67b65521881298fde20bd6f

branches/autosynth-iamcredentials/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -365,38 +365,32 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
365365
}
366366

367367
messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
368-
synchronized (outstandingMessageBatches) {
369-
outstandingMessageBatches.add(outstandingBatch);
370-
}
368+
outstandingMessageBatches.add(outstandingBatch);
371369
processOutstandingBatches();
372370
}
373371

374372
private void processOutstandingBatches() {
375-
synchronized (outstandingMessageBatches) {
376-
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377-
nextBatch != null;
378-
nextBatch = outstandingMessageBatches.poll()) {
379-
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380-
nextMessage != null;
381-
nextMessage = nextBatch.messages.poll()) {
382-
try {
383-
// This is a non-blocking flow controller.
384-
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385-
} catch (FlowController.MaxOutstandingElementCountReachedException
386-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387-
// Unwind previous changes in the batches outstanding.
388-
nextBatch.messages.addFirst(nextMessage);
389-
outstandingMessageBatches.addFirst(nextBatch);
390-
return;
391-
} catch (FlowControlException unexpectedException) {
392-
throw new IllegalStateException(
393-
"Flow control unexpected exception", unexpectedException);
394-
}
395-
processOutstandingMessage(
396-
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
373+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
374+
nextBatch != null;
375+
nextBatch = outstandingMessageBatches.poll()) {
376+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
377+
nextMessage != null;
378+
nextMessage = nextBatch.messages.poll()) {
379+
try {
380+
// This is a non-blocking flow controller.
381+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
382+
} catch (FlowController.MaxOutstandingElementCountReachedException
383+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
384+
// Unwind previous changes in the batches outstanding.
385+
nextBatch.messages.addFirst(nextMessage);
386+
outstandingMessageBatches.addFirst(nextBatch);
387+
return;
388+
} catch (FlowControlException unexpectedException) {
389+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397390
}
398-
nextBatch.doneCallback.run();
391+
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
399392
}
393+
nextBatch.doneCallback.run();
400394
}
401395
}
402396

0 commit comments

Comments
 (0)