Skip to content

Commit 5f26d4f

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 22281 b: refs/heads/autosynth-asset c: 54aa55d h: refs/heads/master i: 22279: 5a9a515
1 parent 5719758 commit 5f26d4f

5 files changed

Lines changed: 4 additions & 14 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ refs/heads/autosynth-vision: 3866c747a5e87b5dfd530d7134907a7ed1fb16de
120120
refs/heads/spanner: b01127f885b4611bf1852abb0ce481eeb7fcc131
121121
refs/tags/v0.68.0: 9cc799fcf68c82ab431d425fefa58ef615ce8e5b
122122
refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
123-
refs/heads/autosynth-asset: 1e18a7d8f967c8b749eb99020dedf5b05506e764
123+
refs/heads/autosynth-asset: 54aa55dc867becf0b7dae15b5cd86cf76801ab47
124124
refs/heads/autosynth-automl: 2a8b018cf05811fd472e5d1053e67a12ceec0b64
125125
refs/heads/autosynth-bigquerydatatransfer: 564833a85642d4194adc025f021e10e723154246
126126
refs/heads/autosynth-bigquerystorage: a75c34ed7a11741669121be69a7021a00f1133ce

branches/autosynth-asset/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.ConcurrentHashMap;
4141
import java.util.concurrent.ConcurrentMap;
4242
import java.util.concurrent.Executor;
43+
import java.util.concurrent.LinkedBlockingDeque;
4344
import java.util.concurrent.LinkedBlockingQueue;
4445
import java.util.concurrent.ScheduledExecutorService;
4546
import java.util.concurrent.ScheduledFuture;
@@ -90,7 +91,8 @@ class MessageDispatcher {
9091
private final Lock jobLock;
9192
private ScheduledFuture<?> backgroundJob;
9293

93-
private final Deque<OutstandingMessageBatch> outstandingMessageBatches;
94+
private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
95+
new LinkedBlockingDeque<>();
9496

9597
// To keep track of number of seconds the receiver takes to process messages.
9698
private final Distribution ackLatencyDistribution;
@@ -200,7 +202,6 @@ void sendAckOperations(
200202
Duration maxAckExtensionPeriod,
201203
Distribution ackLatencyDistribution,
202204
FlowController flowController,
203-
Deque<OutstandingMessageBatch> outstandingMessageBatches,
204205
Executor executor,
205206
ScheduledExecutorService systemExecutor,
206207
ApiClock clock) {
@@ -211,7 +212,6 @@ void sendAckOperations(
211212
this.receiver = receiver;
212213
this.ackProcessor = ackProcessor;
213214
this.flowController = flowController;
214-
this.outstandingMessageBatches = outstandingMessageBatches;
215215
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
216216
this.ackLatencyDistribution = ackLatencyDistribution;
217217
jobLock = new ReentrantLock();

branches/autosynth-asset/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import com.google.pubsub.v1.StreamingPullRequest;
4343
import com.google.pubsub.v1.StreamingPullResponse;
4444
import io.grpc.Status;
45-
import java.util.Deque;
4645
import java.util.List;
4746
import java.util.concurrent.ScheduledExecutorService;
4847
import java.util.concurrent.TimeUnit;
@@ -84,7 +83,6 @@ public StreamingSubscriberConnection(
8483
SubscriberStub stub,
8584
int channelAffinity,
8685
FlowController flowController,
87-
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
8886
ScheduledExecutorService executor,
8987
ScheduledExecutorService systemExecutor,
9088
ApiClock clock) {
@@ -100,7 +98,6 @@ public StreamingSubscriberConnection(
10098
maxAckExtensionPeriod,
10199
ackLatencyDistribution,
102100
flowController,
103-
outstandingMessageBatches,
104101
executor,
105102
systemExecutor,
106103
clock);

branches/autosynth-asset/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
47-
import java.util.Deque;
48-
import java.util.LinkedList;
4947
import java.util.List;
5048
import java.util.concurrent.ScheduledExecutorService;
5149
import java.util.logging.Level;
@@ -115,8 +113,6 @@ public class Subscriber extends AbstractApiService {
115113

116114
private final MessageReceiver receiver;
117115
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
118-
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
119-
new LinkedList<>();
120116
private final ApiClock clock;
121117
private final List<AutoCloseable> closeables = new ArrayList<>();
122118

@@ -329,7 +325,6 @@ private void startStreamingConnections() {
329325
subStub,
330326
i,
331327
flowController,
332-
outstandingMessageBatches,
333328
executor,
334329
alarmsExecutor,
335330
clock));

branches/autosynth-asset/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.pubsub.v1.ReceivedMessage;
2929
import java.util.ArrayList;
3030
import java.util.Collections;
31-
import java.util.LinkedList;
3231
import java.util.List;
3332
import java.util.concurrent.LinkedBlockingQueue;
3433
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -117,7 +116,6 @@ public void sendAckOperations(
117116
Duration.ofMinutes(60),
118117
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
119118
flowController,
120-
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
121119
MoreExecutors.directExecutor(),
122120
systemExecutor,
123121
clock);

0 commit comments

Comments
 (0)