Skip to content

Commit cf18158

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 19287 b: refs/heads/autosynth-bigquerystorage c: 7b9807d h: refs/heads/master i: 19285: 71aa4c7 19283: be8c26d 19279: 89dc801
1 parent a82fa66 commit cf18158

2 files changed

Lines changed: 21 additions & 27 deletions

File tree

  • branches/autosynth-bigquerystorage/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
124124
refs/heads/autosynth-asset: cd8251de8c40e239ad24dcf9ed93ea2708a3eed5
125125
refs/heads/autosynth-automl: 2a8b018cf05811fd472e5d1053e67a12ceec0b64
126126
refs/heads/autosynth-bigquerydatatransfer: 564833a85642d4194adc025f021e10e723154246
127-
refs/heads/autosynth-bigquerystorage: 7971d0b4e04147c77f1a118cbc5408f6eb90d500
127+
refs/heads/autosynth-bigquerystorage: 7b9807d5d0a400c757b8905fee768be4c85eba25
128128
refs/heads/autosynth-bigtable: 2fbcb15847e0e89e79d6dc07420e28d7dfcea894
129129
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca
130130
refs/heads/autosynth-containeranalysis: 039ca5b8db725c76c16a965ff26b2774322b8ef8

branches/autosynth-bigquerystorage/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -365,38 +365,32 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
365365
}
366366

367367
messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
368-
synchronized (outstandingMessageBatches) {
369-
outstandingMessageBatches.add(outstandingBatch);
370-
}
368+
outstandingMessageBatches.add(outstandingBatch);
371369
processOutstandingBatches();
372370
}
373371

374372
private void processOutstandingBatches() {
375-
synchronized (outstandingMessageBatches) {
376-
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377-
nextBatch != null;
378-
nextBatch = outstandingMessageBatches.poll()) {
379-
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380-
nextMessage != null;
381-
nextMessage = nextBatch.messages.poll()) {
382-
try {
383-
// This is a non-blocking flow controller.
384-
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385-
} catch (FlowController.MaxOutstandingElementCountReachedException
386-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387-
// Unwind previous changes in the batches outstanding.
388-
nextBatch.messages.addFirst(nextMessage);
389-
outstandingMessageBatches.addFirst(nextBatch);
390-
return;
391-
} catch (FlowControlException unexpectedException) {
392-
throw new IllegalStateException(
393-
"Flow control unexpected exception", unexpectedException);
394-
}
395-
processOutstandingMessage(
396-
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
373+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
374+
nextBatch != null;
375+
nextBatch = outstandingMessageBatches.poll()) {
376+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
377+
nextMessage != null;
378+
nextMessage = nextBatch.messages.poll()) {
379+
try {
380+
// This is a non-blocking flow controller.
381+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
382+
} catch (FlowController.MaxOutstandingElementCountReachedException
383+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
384+
// Unwind previous changes in the batches outstanding.
385+
nextBatch.messages.addFirst(nextMessage);
386+
outstandingMessageBatches.addFirst(nextBatch);
387+
return;
388+
} catch (FlowControlException unexpectedException) {
389+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397390
}
398-
nextBatch.doneCallback.run();
391+
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
399392
}
393+
nextBatch.doneCallback.run();
400394
}
401395
}
402396

0 commit comments

Comments
 (0)