Skip to content

Commit 2377cab

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 19787 b: refs/heads/autosynth-kms c: 54aa55d h: refs/heads/master i: 19785: 5b1e9e8 19783: 3c05802
1 parent 514c9c2 commit 2377cab

5 files changed

Lines changed: 4 additions & 14 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ refs/heads/autosynth-dialogflow: 7dbc2c1ea714328ccfa4f33645045f017ff080e7
133133
refs/heads/autosynth-errorreporting: 1101a04e8be074802c35332d5fcf8297f61cae32
134134
refs/heads/autosynth-firestore: d1a44f9acc302750e37b008ecb9c1aa535cc94df
135135
refs/heads/autosynth-iot: f03bdd338a9056ca3b7ea6d9ca901649ba9aab78
136-
refs/heads/autosynth-kms: 1e18a7d8f967c8b749eb99020dedf5b05506e764
136+
refs/heads/autosynth-kms: 54aa55dc867becf0b7dae15b5cd86cf76801ab47
137137
refs/heads/autosynth-language: 6262e2eb76944f01972c887b3e684aaf65ec999a
138138
refs/heads/autosynth-os-login: a88a337797996a205873040a63abe1d3116f5789
139139
refs/heads/autosynth-redis: 0cdb2e47359d51b73763bcea8af3de62aa99119b

branches/autosynth-kms/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.ConcurrentHashMap;
4141
import java.util.concurrent.ConcurrentMap;
4242
import java.util.concurrent.Executor;
43+
import java.util.concurrent.LinkedBlockingDeque;
4344
import java.util.concurrent.LinkedBlockingQueue;
4445
import java.util.concurrent.ScheduledExecutorService;
4546
import java.util.concurrent.ScheduledFuture;
@@ -90,7 +91,8 @@ class MessageDispatcher {
9091
private final Lock jobLock;
9192
private ScheduledFuture<?> backgroundJob;
9293

93-
private final Deque<OutstandingMessageBatch> outstandingMessageBatches;
94+
private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
95+
new LinkedBlockingDeque<>();
9496

9597
// To keep track of number of seconds the receiver takes to process messages.
9698
private final Distribution ackLatencyDistribution;
@@ -200,7 +202,6 @@ void sendAckOperations(
200202
Duration maxAckExtensionPeriod,
201203
Distribution ackLatencyDistribution,
202204
FlowController flowController,
203-
Deque<OutstandingMessageBatch> outstandingMessageBatches,
204205
Executor executor,
205206
ScheduledExecutorService systemExecutor,
206207
ApiClock clock) {
@@ -211,7 +212,6 @@ void sendAckOperations(
211212
this.receiver = receiver;
212213
this.ackProcessor = ackProcessor;
213214
this.flowController = flowController;
214-
this.outstandingMessageBatches = outstandingMessageBatches;
215215
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
216216
this.ackLatencyDistribution = ackLatencyDistribution;
217217
jobLock = new ReentrantLock();

branches/autosynth-kms/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import com.google.pubsub.v1.StreamingPullRequest;
4343
import com.google.pubsub.v1.StreamingPullResponse;
4444
import io.grpc.Status;
45-
import java.util.Deque;
4645
import java.util.List;
4746
import java.util.concurrent.ScheduledExecutorService;
4847
import java.util.concurrent.TimeUnit;
@@ -84,7 +83,6 @@ public StreamingSubscriberConnection(
8483
SubscriberStub stub,
8584
int channelAffinity,
8685
FlowController flowController,
87-
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
8886
ScheduledExecutorService executor,
8987
ScheduledExecutorService systemExecutor,
9088
ApiClock clock) {
@@ -100,7 +98,6 @@ public StreamingSubscriberConnection(
10098
maxAckExtensionPeriod,
10199
ackLatencyDistribution,
102100
flowController,
103-
outstandingMessageBatches,
104101
executor,
105102
systemExecutor,
106103
clock);

branches/autosynth-kms/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
47-
import java.util.Deque;
48-
import java.util.LinkedList;
4947
import java.util.List;
5048
import java.util.concurrent.ScheduledExecutorService;
5149
import java.util.logging.Level;
@@ -115,8 +113,6 @@ public class Subscriber extends AbstractApiService {
115113

116114
private final MessageReceiver receiver;
117115
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
118-
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
119-
new LinkedList<>();
120116
private final ApiClock clock;
121117
private final List<AutoCloseable> closeables = new ArrayList<>();
122118

@@ -329,7 +325,6 @@ private void startStreamingConnections() {
329325
subStub,
330326
i,
331327
flowController,
332-
outstandingMessageBatches,
333328
executor,
334329
alarmsExecutor,
335330
clock));

branches/autosynth-kms/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.pubsub.v1.ReceivedMessage;
2929
import java.util.ArrayList;
3030
import java.util.Collections;
31-
import java.util.LinkedList;
3231
import java.util.List;
3332
import java.util.concurrent.LinkedBlockingQueue;
3433
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -117,7 +116,6 @@ public void sendAckOperations(
117116
Duration.ofMinutes(60),
118117
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
119118
flowController,
120-
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
121119
MoreExecutors.directExecutor(),
122120
systemExecutor,
123121
clock);

0 commit comments

Comments
 (0)