Skip to content

Commit c65b573

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 20653 b: refs/heads/autosynth-websecurityscanner c: 7b9807d h: refs/heads/master i: 20651: e457704
1 parent 7cf5b37 commit c65b573

2 files changed

Lines changed: 21 additions & 27 deletions

File tree

  • branches/autosynth-websecurityscanner/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ refs/heads/autosynth-speech: 75d6c62a9d07d3a3642980502a25d07fbde0f232
143143
refs/heads/autosynth-tasks: 5d6a4f686ffef2080506d3b613fc7563a8bba1cb
144144
refs/heads/autosynth-texttospeech: 2dcc5dc22be0f456caa1b6a8a4bcdace2641239c
145145
refs/heads/autosynth-trace: 8804c46bfe147702ee9c95669f17f42d3790cf23
146-
refs/heads/autosynth-websecurityscanner: 7971d0b4e04147c77f1a118cbc5408f6eb90d500
146+
refs/heads/autosynth-websecurityscanner: 7b9807d5d0a400c757b8905fee768be4c85eba25
147147
refs/heads/bigquerystorage: 06db74d123d7f8a3ef48755c2fcabed09faf8e64
148148
refs/heads/elharo-patch-1: ce159ef828d3c545991ff78e7b6e0d912a9453e9
149149
refs/heads/mrschmidt-numericadd: 96509abadf6190b7886d57fdd9b090da55f5171c

branches/autosynth-websecurityscanner/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -365,38 +365,32 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
365365
}
366366

367367
messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
368-
synchronized (outstandingMessageBatches) {
369-
outstandingMessageBatches.add(outstandingBatch);
370-
}
368+
outstandingMessageBatches.add(outstandingBatch);
371369
processOutstandingBatches();
372370
}
373371

374372
private void processOutstandingBatches() {
375-
synchronized (outstandingMessageBatches) {
376-
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377-
nextBatch != null;
378-
nextBatch = outstandingMessageBatches.poll()) {
379-
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380-
nextMessage != null;
381-
nextMessage = nextBatch.messages.poll()) {
382-
try {
383-
// This is a non-blocking flow controller.
384-
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385-
} catch (FlowController.MaxOutstandingElementCountReachedException
386-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387-
// Unwind previous changes in the batches outstanding.
388-
nextBatch.messages.addFirst(nextMessage);
389-
outstandingMessageBatches.addFirst(nextBatch);
390-
return;
391-
} catch (FlowControlException unexpectedException) {
392-
throw new IllegalStateException(
393-
"Flow control unexpected exception", unexpectedException);
394-
}
395-
processOutstandingMessage(
396-
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
373+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
374+
nextBatch != null;
375+
nextBatch = outstandingMessageBatches.poll()) {
376+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
377+
nextMessage != null;
378+
nextMessage = nextBatch.messages.poll()) {
379+
try {
380+
// This is a non-blocking flow controller.
381+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
382+
} catch (FlowController.MaxOutstandingElementCountReachedException
383+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
384+
// Unwind previous changes in the batches outstanding.
385+
nextBatch.messages.addFirst(nextMessage);
386+
outstandingMessageBatches.addFirst(nextBatch);
387+
return;
388+
} catch (FlowControlException unexpectedException) {
389+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397390
}
398-
nextBatch.doneCallback.run();
391+
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
399392
}
393+
nextBatch.doneCallback.run();
400394
}
401395
}
402396

0 commit comments

Comments
 (0)