Skip to content

Commit f12c351

Browse files
authored
---
yaml --- r: 8537 b: refs/heads/snehashah-bugfix c: 7864376 h: refs/heads/master i: 8535: 6939f26
1 parent 549a61d commit f12c351

5 files changed

Lines changed: 21 additions & 8 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,4 @@ refs/heads/vkedia-patch-3: 4d128043acaa7db9160faf439d2ca6104e8a88cb
7373
refs/tags/v0.23.0: e5405e1f6d144441b889acd3b6405fdcc3cdfd72
7474
refs/tags/v0.23.1: 30bcf8076ef9d71cc5a858d026cb907bb0954bec
7575
refs/tags/v0.24.0: b3cf61898d9c63d028fe088c14486721318d5fd5
76-
refs/heads/snehashah-bugfix: 20678f4a24a82023c4b674dffec24a2f4803e4f8
76+
refs/heads/snehashah-bugfix: 78643762b252ffd850ece57f1e00bd2e6804250d

branches/snehashah-bugfix/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.google.api.gax.batching.FlowController;
2121
import com.google.api.gax.batching.FlowController.FlowControlException;
2222
import com.google.api.gax.core.Distribution;
23-
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
23+
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
2424
import com.google.common.annotations.VisibleForTesting;
2525
import com.google.common.collect.Lists;
2626
import com.google.common.primitives.Ints;
@@ -81,7 +81,7 @@ class MessageDispatcher {
8181
private Instant nextAckDeadlineExtensionAlarmTime;
8282
private ScheduledFuture<?> pendingAcksAlarm;
8383

84-
private final Deque<OutstandingMessagesBatch> outstandingMessageBatches;
84+
private final Deque<OutstandingMessageBatch> outstandingMessageBatches;
8585

8686
// To keep track of number of seconds the receiver takes to process messages.
8787
private final Distribution ackLatencyDistribution;
@@ -238,6 +238,7 @@ void sendAckOperations(
238238
Duration maxAckExtensionPeriod,
239239
Distribution ackLatencyDistribution,
240240
FlowController flowController,
241+
Deque<OutstandingMessageBatch> outstandingMessageBatches,
241242
ScheduledExecutorService executor,
242243
ScheduledExecutorService systemExecutor,
243244
ApiClock clock) {
@@ -248,7 +249,7 @@ void sendAckOperations(
248249
this.receiver = receiver;
249250
this.ackProcessor = ackProcessor;
250251
this.flowController = flowController;
251-
outstandingMessageBatches = new LinkedList<>();
252+
this.outstandingMessageBatches = outstandingMessageBatches;
252253
outstandingAckHandlers = new PriorityQueue<>();
253254
pendingAcks = new HashSet<>();
254255
pendingNacks = new HashSet<>();
@@ -282,7 +283,7 @@ public int getMessageDeadlineSeconds() {
282283
return messageDeadlineSeconds;
283284
}
284285

285-
static class OutstandingMessagesBatch {
286+
static class OutstandingMessageBatch {
286287
private final Deque<OutstandingMessage> messages;
287288
private final Runnable doneCallback;
288289

@@ -304,7 +305,7 @@ public AckHandler ackHandler() {
304305
}
305306
}
306307

307-
public OutstandingMessagesBatch(Runnable doneCallback) {
308+
public OutstandingMessageBatch(Runnable doneCallback) {
308309
this.messages = new LinkedList<>();
309310
this.doneCallback = doneCallback;
310311
}
@@ -325,7 +326,7 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
325326
}
326327
messagesWaiter.incrementPendingMessages(messages.size());
327328

328-
OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback);
329+
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
329330
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
330331
for (ReceivedMessage message : messages) {
331332
AckHandler ackHandler =
@@ -358,7 +359,7 @@ public void processOutstandingBatches() {
358359
Runnable batchCallback = null;
359360
OutstandingMessage outstandingMessage;
360361
synchronized (outstandingMessageBatches) {
361-
OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek();
362+
OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek();
362363
if (nextBatch == null) {
363364
return;
364365
}

branches/snehashah-bugfix/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.pubsub.v1.PullResponse;
3434
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
3535
import com.google.pubsub.v1.Subscription;
36+
import java.util.Deque;
3637
import java.util.List;
3738
import java.util.concurrent.ScheduledExecutorService;
3839
import java.util.concurrent.TimeUnit;
@@ -70,6 +71,7 @@ public PollingSubscriberConnection(
7071
SubscriberFutureStub stub,
7172
FlowController flowController,
7273
@Nullable Long maxDesiredPulledMessages,
74+
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
7375
ScheduledExecutorService executor,
7476
ScheduledExecutorService systemExecutor,
7577
ApiClock clock) {
@@ -84,6 +86,7 @@ public PollingSubscriberConnection(
8486
maxAckExtensionPeriod,
8587
ackLatencyDistribution,
8688
flowController,
89+
outstandingMessageBatches,
8790
executor,
8891
systemExecutor,
8992
clock);

branches/snehashah-bugfix/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.grpc.stub.ClientCallStreamObserver;
3535
import io.grpc.stub.ClientResponseObserver;
3636
import java.util.ArrayList;
37+
import java.util.Deque;
3738
import java.util.List;
3839
import java.util.concurrent.ScheduledExecutorService;
3940
import java.util.concurrent.TimeUnit;
@@ -71,6 +72,7 @@ public StreamingSubscriberConnection(
7172
Distribution ackLatencyDistribution,
7273
SubscriberStub asyncStub,
7374
FlowController flowController,
75+
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
7476
ScheduledExecutorService executor,
7577
@Nullable ScheduledExecutorService alarmsExecutor,
7678
ApiClock clock) {
@@ -85,6 +87,7 @@ public StreamingSubscriberConnection(
8587
maxAckExtensionPeriod,
8688
ackLatencyDistribution,
8789
flowController,
90+
outstandingMessageBatches,
8891
executor,
8992
alarmsExecutor,
9093
clock);

branches/snehashah-bugfix/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import io.grpc.auth.MoreCallCredentials;
4848
import java.io.IOException;
4949
import java.util.ArrayList;
50+
import java.util.Deque;
51+
import java.util.LinkedList;
5052
import java.util.List;
5153
import java.util.concurrent.ScheduledExecutorService;
5254
import java.util.concurrent.ScheduledFuture;
@@ -115,6 +117,8 @@ public class Subscriber extends AbstractApiService {
115117
private final MessageReceiver receiver;
116118
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
117119
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
120+
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
121+
new LinkedList<>();
118122
private final ApiClock clock;
119123
private final List<AutoCloseable> closeables = new ArrayList<>();
120124
private final boolean useStreaming;
@@ -328,6 +332,7 @@ private void startPollingConnections() throws IOException {
328332
stub,
329333
flowController,
330334
flowControlSettings.getMaxOutstandingElementCount(),
335+
outstandingMessageBatches,
331336
executor,
332337
alarmsExecutor,
333338
clock));
@@ -374,6 +379,7 @@ private void startStreamingConnections() throws IOException {
374379
ackLatencyDistribution,
375380
stub,
376381
flowController,
382+
outstandingMessageBatches,
377383
executor,
378384
alarmsExecutor,
379385
clock));

0 commit comments

Comments
 (0)