Skip to content

Commit ab296a3

Browse files
authored
---
yaml --- r: 8775 b: refs/heads/lesv-patch-1 c: 7864376 h: refs/heads/master i: 8773: 982581e 8771: d187463 8767: 0fa305a
1 parent adebf83 commit ab296a3

5 files changed

Lines changed: 21 additions & 8 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ refs/tags/v0.22.0: 18b298fe4bfe8ec2f20b0e0bf7ffdcce5cc3c5fe
6666
refs/heads/vam-google-patch-1: d0c8fee3a4074d0bf7360ce8c4f7f7223d0ee7b9
6767
refs/heads/vam-google-patch-CODEOWNERS: 2ac1616e25229e51d08a984708ef1918f91a35ee
6868
refs/heads/danoscarmike-patch-1: 7342a9916bce4ed00002c7202e2a16c5d46afaea
69-
refs/heads/lesv-patch-1: 20678f4a24a82023c4b674dffec24a2f4803e4f8
69+
refs/heads/lesv-patch-1: 78643762b252ffd850ece57f1e00bd2e6804250d
7070
refs/heads/ml-update-branch: 079dd6610017f5c51b9d1938c12d6d55b61513cf
7171
refs/heads/vkedia-patch-2: 7d8241388a9769a5c069334761b06c7012c878e7
7272
refs/heads/vkedia-patch-3: 4d128043acaa7db9160faf439d2ca6104e8a88cb

branches/lesv-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.google.api.gax.batching.FlowController;
2121
import com.google.api.gax.batching.FlowController.FlowControlException;
2222
import com.google.api.gax.core.Distribution;
23-
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
23+
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
2424
import com.google.common.annotations.VisibleForTesting;
2525
import com.google.common.collect.Lists;
2626
import com.google.common.primitives.Ints;
@@ -81,7 +81,7 @@ class MessageDispatcher {
8181
private Instant nextAckDeadlineExtensionAlarmTime;
8282
private ScheduledFuture<?> pendingAcksAlarm;
8383

84-
private final Deque<OutstandingMessagesBatch> outstandingMessageBatches;
84+
private final Deque<OutstandingMessageBatch> outstandingMessageBatches;
8585

8686
// To keep track of number of seconds the receiver takes to process messages.
8787
private final Distribution ackLatencyDistribution;
@@ -238,6 +238,7 @@ void sendAckOperations(
238238
Duration maxAckExtensionPeriod,
239239
Distribution ackLatencyDistribution,
240240
FlowController flowController,
241+
Deque<OutstandingMessageBatch> outstandingMessageBatches,
241242
ScheduledExecutorService executor,
242243
ScheduledExecutorService systemExecutor,
243244
ApiClock clock) {
@@ -248,7 +249,7 @@ void sendAckOperations(
248249
this.receiver = receiver;
249250
this.ackProcessor = ackProcessor;
250251
this.flowController = flowController;
251-
outstandingMessageBatches = new LinkedList<>();
252+
this.outstandingMessageBatches = outstandingMessageBatches;
252253
outstandingAckHandlers = new PriorityQueue<>();
253254
pendingAcks = new HashSet<>();
254255
pendingNacks = new HashSet<>();
@@ -282,7 +283,7 @@ public int getMessageDeadlineSeconds() {
282283
return messageDeadlineSeconds;
283284
}
284285

285-
static class OutstandingMessagesBatch {
286+
static class OutstandingMessageBatch {
286287
private final Deque<OutstandingMessage> messages;
287288
private final Runnable doneCallback;
288289

@@ -304,7 +305,7 @@ public AckHandler ackHandler() {
304305
}
305306
}
306307

307-
public OutstandingMessagesBatch(Runnable doneCallback) {
308+
public OutstandingMessageBatch(Runnable doneCallback) {
308309
this.messages = new LinkedList<>();
309310
this.doneCallback = doneCallback;
310311
}
@@ -325,7 +326,7 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
325326
}
326327
messagesWaiter.incrementPendingMessages(messages.size());
327328

328-
OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback);
329+
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
329330
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
330331
for (ReceivedMessage message : messages) {
331332
AckHandler ackHandler =
@@ -358,7 +359,7 @@ public void processOutstandingBatches() {
358359
Runnable batchCallback = null;
359360
OutstandingMessage outstandingMessage;
360361
synchronized (outstandingMessageBatches) {
361-
OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek();
362+
OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek();
362363
if (nextBatch == null) {
363364
return;
364365
}

branches/lesv-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.pubsub.v1.PullResponse;
3434
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
3535
import com.google.pubsub.v1.Subscription;
36+
import java.util.Deque;
3637
import java.util.List;
3738
import java.util.concurrent.ScheduledExecutorService;
3839
import java.util.concurrent.TimeUnit;
@@ -70,6 +71,7 @@ public PollingSubscriberConnection(
7071
SubscriberFutureStub stub,
7172
FlowController flowController,
7273
@Nullable Long maxDesiredPulledMessages,
74+
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
7375
ScheduledExecutorService executor,
7476
ScheduledExecutorService systemExecutor,
7577
ApiClock clock) {
@@ -84,6 +86,7 @@ public PollingSubscriberConnection(
8486
maxAckExtensionPeriod,
8587
ackLatencyDistribution,
8688
flowController,
89+
outstandingMessageBatches,
8790
executor,
8891
systemExecutor,
8992
clock);

branches/lesv-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.grpc.stub.ClientCallStreamObserver;
3535
import io.grpc.stub.ClientResponseObserver;
3636
import java.util.ArrayList;
37+
import java.util.Deque;
3738
import java.util.List;
3839
import java.util.concurrent.ScheduledExecutorService;
3940
import java.util.concurrent.TimeUnit;
@@ -71,6 +72,7 @@ public StreamingSubscriberConnection(
7172
Distribution ackLatencyDistribution,
7273
SubscriberStub asyncStub,
7374
FlowController flowController,
75+
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
7476
ScheduledExecutorService executor,
7577
@Nullable ScheduledExecutorService alarmsExecutor,
7678
ApiClock clock) {
@@ -85,6 +87,7 @@ public StreamingSubscriberConnection(
8587
maxAckExtensionPeriod,
8688
ackLatencyDistribution,
8789
flowController,
90+
outstandingMessageBatches,
8891
executor,
8992
alarmsExecutor,
9093
clock);

branches/lesv-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import io.grpc.auth.MoreCallCredentials;
4848
import java.io.IOException;
4949
import java.util.ArrayList;
50+
import java.util.Deque;
51+
import java.util.LinkedList;
5052
import java.util.List;
5153
import java.util.concurrent.ScheduledExecutorService;
5254
import java.util.concurrent.ScheduledFuture;
@@ -115,6 +117,8 @@ public class Subscriber extends AbstractApiService {
115117
private final MessageReceiver receiver;
116118
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
117119
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
120+
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
121+
new LinkedList<>();
118122
private final ApiClock clock;
119123
private final List<AutoCloseable> closeables = new ArrayList<>();
120124
private final boolean useStreaming;
@@ -328,6 +332,7 @@ private void startPollingConnections() throws IOException {
328332
stub,
329333
flowController,
330334
flowControlSettings.getMaxOutstandingElementCount(),
335+
outstandingMessageBatches,
331336
executor,
332337
alarmsExecutor,
333338
clock));
@@ -374,6 +379,7 @@ private void startStreamingConnections() throws IOException {
374379
ackLatencyDistribution,
375380
stub,
376381
flowController,
382+
outstandingMessageBatches,
377383
executor,
378384
alarmsExecutor,
379385
clock));

0 commit comments

Comments
 (0)