Skip to content

Commit 5b4c31b

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 34651 b: refs/heads/autosynth-texttospeech c: 262b5af h: refs/heads/master i: 34649: 32f3101 34647: cf95df5
1 parent d202d93 commit 5b4c31b

5 files changed

Lines changed: 46 additions & 108 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ refs/heads/autosynth-scheduler: a3de6480746d1cd586ca8b9d75c55a636f371539
140140
refs/heads/autosynth-spanner: d963fe4368e79cf6abae5d511785e8ced8ac57f4
141141
refs/heads/autosynth-speech: c563dcd420cce0a37c39b1b9c24be1b9ba604dc7
142142
refs/heads/autosynth-tasks: 25d1eafe8cb66b00e3dad765dac74a5b45b83e63
143-
refs/heads/autosynth-texttospeech: b697a8b47a7eb0eb08d958221445fadc07a0aa14
143+
refs/heads/autosynth-texttospeech: 262b5af368038754e192e0b32e53ba7319464a06
144144
refs/heads/autosynth-trace: c94eef6e4d9c6fd24888216e28ca7271959c1cf0
145145
refs/heads/autosynth-websecurityscanner: fa561b356aabcd92d415ae8dc88fd8d87dbc5b23
146146
refs/heads/bigquerystorage: 06db74d123d7f8a3ef48755c2fcabed09faf8e64

branches/autosynth-texttospeech/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 21 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.api.gax.batching.FlowController;
2525
import com.google.api.gax.batching.FlowController.FlowControlException;
2626
import com.google.api.gax.core.Distribution;
27-
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
2827
import com.google.common.primitives.Ints;
2928
import com.google.common.util.concurrent.MoreExecutors;
3029
import com.google.pubsub.v1.PubsubMessage;
@@ -33,14 +32,11 @@
3332
import java.util.Arrays;
3433
import java.util.Collection;
3534
import java.util.Collections;
36-
import java.util.Deque;
37-
import java.util.LinkedList;
3835
import java.util.List;
3936
import java.util.Map;
4037
import java.util.concurrent.ConcurrentHashMap;
4138
import java.util.concurrent.ConcurrentMap;
4239
import java.util.concurrent.Executor;
43-
import java.util.concurrent.LinkedBlockingDeque;
4440
import java.util.concurrent.LinkedBlockingQueue;
4541
import java.util.concurrent.ScheduledExecutorService;
4642
import java.util.concurrent.ScheduledFuture;
@@ -91,9 +87,6 @@ class MessageDispatcher {
9187
private final Lock jobLock;
9288
private ScheduledFuture<?> backgroundJob;
9389

94-
private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
95-
new LinkedBlockingDeque<>();
96-
9790
// To keep track of number of seconds the receiver takes to process messages.
9891
private final Distribution ackLatencyDistribution;
9992

@@ -155,7 +148,6 @@ private void forget() {
155148
}
156149
flowController.release(1, outstandingBytes);
157150
messagesWaiter.incrementPendingMessages(-1);
158-
processOutstandingBatches();
159151
}
160152

161153
@Override
@@ -296,50 +288,19 @@ int getMessageDeadlineSeconds() {
296288
return messageDeadlineSeconds.get();
297289
}
298290

299-
static class OutstandingMessageBatch {
300-
private final Deque<OutstandingMessage> messages;
301-
private final Runnable doneCallback;
302-
303-
static class OutstandingMessage {
304-
private final ReceivedMessage receivedMessage;
305-
private final AckHandler ackHandler;
306-
307-
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
308-
this.receivedMessage = receivedMessage;
309-
this.ackHandler = ackHandler;
310-
}
311-
312-
public ReceivedMessage receivedMessage() {
313-
return receivedMessage;
314-
}
315-
316-
public AckHandler ackHandler() {
317-
return ackHandler;
318-
}
319-
}
291+
static class OutstandingMessage {
292+
private final ReceivedMessage receivedMessage;
293+
private final AckHandler ackHandler;
320294

321-
public OutstandingMessageBatch(Runnable doneCallback) {
322-
this.messages = new LinkedList<>();
323-
this.doneCallback = doneCallback;
324-
}
325-
326-
public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
327-
this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
328-
}
329-
330-
public Deque<OutstandingMessage> messages() {
331-
return messages;
295+
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
296+
this.receivedMessage = receivedMessage;
297+
this.ackHandler = ackHandler;
332298
}
333299
}
334300

335-
public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {
336-
if (messages.isEmpty()) {
337-
doneCallback.run();
338-
return;
339-
}
340-
301+
public void processReceivedMessages(List<ReceivedMessage> messages) {
341302
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
342-
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
303+
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
343304
for (ReceivedMessage message : messages) {
344305
AckHandler ackHandler =
345306
new AckHandler(
@@ -355,42 +316,25 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
355316
// totally expire so that pubsub service sends us the message again.
356317
continue;
357318
}
358-
outstandingBatch.addMessage(message, ackHandler);
319+
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
359320
pendingReceipts.add(message.getAckId());
360321
}
361322

362-
if (outstandingBatch.messages.isEmpty()) {
363-
doneCallback.run();
364-
return;
365-
}
366-
367-
messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
368-
outstandingMessageBatches.add(outstandingBatch);
369-
processOutstandingBatches();
323+
processBatch(outstandingBatch);
370324
}
371325

372-
private void processOutstandingBatches() {
373-
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
374-
nextBatch != null;
375-
nextBatch = outstandingMessageBatches.poll()) {
376-
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
377-
nextMessage != null;
378-
nextMessage = nextBatch.messages.poll()) {
379-
try {
380-
// This is a non-blocking flow controller.
381-
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
382-
} catch (FlowController.MaxOutstandingElementCountReachedException
383-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
384-
// Unwind previous changes in the batches outstanding.
385-
nextBatch.messages.addFirst(nextMessage);
386-
outstandingMessageBatches.addFirst(nextBatch);
387-
return;
388-
} catch (FlowControlException unexpectedException) {
389-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
390-
}
391-
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
326+
private void processBatch(List<OutstandingMessage> batch) {
327+
messagesWaiter.incrementPendingMessages(batch.size());
328+
for (OutstandingMessage message : batch) {
329+
// This is a blocking flow controller. We have already incremented MessageWaiter, so
330+
// shutdown will block on processing of all these messages anyway.
331+
try {
332+
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
333+
} catch (FlowControlException unexpectedException) {
334+
// This should be a blocking flow controller and never throw an exception.
335+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
392336
}
393-
nextBatch.doneCallback.run();
337+
processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler);
394338
}
395339
}
396340

branches/autosynth-texttospeech/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -151,26 +151,20 @@ public void onStart(StreamController controller) {
151151
@Override
152152
public void onResponse(StreamingPullResponse response) {
153153
channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
154-
messageDispatcher.processReceivedMessages(
155-
response.getReceivedMessagesList(),
156-
new Runnable() {
157-
@Override
158-
public void run() {
159-
// Only request more if we're not shutdown.
160-
// If errorFuture is done, the stream has either failed or hung up,
161-
// and we don't need to request.
162-
if (isAlive() && !errorFuture.isDone()) {
163-
lock.lock();
164-
try {
165-
thisController.request(1);
166-
} catch (Exception e) {
167-
logger.log(Level.WARNING, "cannot request more messages", e);
168-
} finally {
169-
lock.unlock();
170-
}
171-
}
172-
}
173-
});
154+
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
155+
// Only request more if we're not shutdown.
156+
// If errorFuture is done, the stream has either failed or hung up,
157+
// and we don't need to request.
158+
if (isAlive() && !errorFuture.isDone()) {
159+
lock.lock();
160+
try {
161+
thisController.request(1);
162+
} catch (Exception e) {
163+
logger.log(Level.WARNING, "cannot request more messages", e);
164+
} finally {
165+
lock.unlock();
166+
}
167+
}
174168
}
175169

176170
@Override

branches/autosynth-texttospeech/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private Subscriber(Builder builder) {
130130
builder
131131
.flowControlSettings
132132
.toBuilder()
133-
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
133+
.setLimitExceededBehavior(LimitExceededBehavior.Block)
134134
.build());
135135

136136
this.numPullers = builder.parallelPullCount;

branches/autosynth-texttospeech/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void sendAckOperations(
105105
new FlowController(
106106
FlowControlSettings.newBuilder()
107107
.setMaxOutstandingElementCount(1L)
108-
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
108+
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
109109
.build());
110110

111111
dispatcher =
@@ -124,31 +124,31 @@ public void sendAckOperations(
124124

125125
@Test
126126
public void testReceipt() throws Exception {
127-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
127+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
128128
dispatcher.processOutstandingAckOperations();
129129
assertThat(sentModAcks)
130130
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
131131
}
132132

133133
@Test
134134
public void testAck() throws Exception {
135-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
135+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
136136
consumers.take().ack();
137137
dispatcher.processOutstandingAckOperations();
138138
assertThat(sentAcks).contains(TEST_MESSAGE.getAckId());
139139
}
140140

141141
@Test
142142
public void testNack() throws Exception {
143-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
143+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
144144
consumers.take().nack();
145145
dispatcher.processOutstandingAckOperations();
146146
assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0));
147147
}
148148

149149
@Test
150150
public void testExtension() throws Exception {
151-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
151+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
152152
dispatcher.extendDeadlines();
153153
assertThat(sentModAcks)
154154
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
@@ -161,7 +161,7 @@ public void testExtension() throws Exception {
161161

162162
@Test
163163
public void testExtension_Close() throws Exception {
164-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
164+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
165165
dispatcher.extendDeadlines();
166166
assertThat(sentModAcks)
167167
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
@@ -176,7 +176,7 @@ public void testExtension_Close() throws Exception {
176176

177177
@Test
178178
public void testExtension_GiveUp() throws Exception {
179-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
179+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
180180
dispatcher.extendDeadlines();
181181
assertThat(sentModAcks)
182182
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
@@ -188,7 +188,7 @@ public void testExtension_GiveUp() throws Exception {
188188
dispatcher.extendDeadlines();
189189
assertThat(sentModAcks).isEmpty();
190190

191-
// We should be able to reserve another item in the flow controller and not block shutdown
191+
// We should be able to reserve another item in the flow controller and not block.
192192
flowController.reserve(1, 0);
193193
dispatcher.stop();
194194
}
@@ -197,7 +197,7 @@ public void testExtension_GiveUp() throws Exception {
197197
public void testDeadlineAdjustment() throws Exception {
198198
assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10);
199199

200-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
200+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
201201
clock.advance(42, TimeUnit.SECONDS);
202202
consumers.take().ack();
203203

0 commit comments

Comments
 (0)