Skip to content

Commit 04b53f2

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 18941 b: refs/heads/autosynth-vision c: 7b9807d h: refs/heads/master i: 18939: 93565b5
1 parent 1d0cfd8 commit 04b53f2

2 files changed

Lines changed: 21 additions & 27 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ refs/heads/autosynth-container: f6384095f50b31bfc8208542a49181e158d3681c
117117
refs/heads/autosynth-monitoring: 832528c131ac32b115fd7a369701d9e6e4ec62e5
118118
refs/heads/autosynth-pubsub: 7a327eaed5fb2a5d6c1691feb73c3e0194dd805e
119119
refs/heads/autosynth-video-intelligence: 53ca56e5d077bf67065b730f3fe621f40689fb4c
120-
refs/heads/autosynth-vision: 7971d0b4e04147c77f1a118cbc5408f6eb90d500
120+
refs/heads/autosynth-vision: 7b9807d5d0a400c757b8905fee768be4c85eba25
121121
refs/heads/spanner: b01127f885b4611bf1852abb0ce481eeb7fcc131
122122
refs/tags/v0.68.0: 9cc799fcf68c82ab431d425fefa58ef615ce8e5b
123123
refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea

branches/autosynth-vision/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -365,38 +365,32 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
365365
}
366366

367367
messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
368-
synchronized (outstandingMessageBatches) {
369-
outstandingMessageBatches.add(outstandingBatch);
370-
}
368+
outstandingMessageBatches.add(outstandingBatch);
371369
processOutstandingBatches();
372370
}
373371

374372
private void processOutstandingBatches() {
375-
synchronized (outstandingMessageBatches) {
376-
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377-
nextBatch != null;
378-
nextBatch = outstandingMessageBatches.poll()) {
379-
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380-
nextMessage != null;
381-
nextMessage = nextBatch.messages.poll()) {
382-
try {
383-
// This is a non-blocking flow controller.
384-
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385-
} catch (FlowController.MaxOutstandingElementCountReachedException
386-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387-
// Unwind previous changes in the batches outstanding.
388-
nextBatch.messages.addFirst(nextMessage);
389-
outstandingMessageBatches.addFirst(nextBatch);
390-
return;
391-
} catch (FlowControlException unexpectedException) {
392-
throw new IllegalStateException(
393-
"Flow control unexpected exception", unexpectedException);
394-
}
395-
processOutstandingMessage(
396-
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
373+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
374+
nextBatch != null;
375+
nextBatch = outstandingMessageBatches.poll()) {
376+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
377+
nextMessage != null;
378+
nextMessage = nextBatch.messages.poll()) {
379+
try {
380+
// This is a non-blocking flow controller.
381+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
382+
} catch (FlowController.MaxOutstandingElementCountReachedException
383+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
384+
// Unwind previous changes in the batches outstanding.
385+
nextBatch.messages.addFirst(nextMessage);
386+
outstandingMessageBatches.addFirst(nextBatch);
387+
return;
388+
} catch (FlowControlException unexpectedException) {
389+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397390
}
398-
nextBatch.doneCallback.run();
391+
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
399392
}
393+
nextBatch.doneCallback.run();
400394
}
401395
}
402396

0 commit comments

Comments
 (0)