Skip to content

Commit 0c36b64

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 20479 b: refs/heads/autosynth-tasks c: 54aa55d h: refs/heads/master i: 20477: f9db4ec 20475: 38e1f0a 20471: facb64b 20463: 6f62a01 20447: 9890b79 20415: 9a18a2e 20351: c83f2eb 20223: f8f71cb 19967: b8a8707 19455: 6beeb1c 18431: dbde31c 16383: 52dfba4
1 parent 4335003 commit 0c36b64

5 files changed

Lines changed: 4 additions & 14 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ refs/heads/autosynth-redis: 9e1fe503973c7b4a9ba26afb1fcddc2a57ba795a
140140
refs/heads/autosynth-scheduler: 93995812e12a2fc009f4fd2e31f169b3d3cbbac8
141141
refs/heads/autosynth-spanner: a249d9564d8c27ab4439abc6562d7f3b9b7f655d
142142
refs/heads/autosynth-speech: 75d6c62a9d07d3a3642980502a25d07fbde0f232
143-
refs/heads/autosynth-tasks: 1e18a7d8f967c8b749eb99020dedf5b05506e764
143+
refs/heads/autosynth-tasks: 54aa55dc867becf0b7dae15b5cd86cf76801ab47
144144
refs/heads/autosynth-texttospeech: 2dcc5dc22be0f456caa1b6a8a4bcdace2641239c
145145
refs/heads/autosynth-trace: db35fc1080cc51034e9c431c141cbceb53fb19c1
146146
refs/heads/autosynth-websecurityscanner: 55f58d7ce832aed82c4bb5e496af91b6c79fbdc3

branches/autosynth-tasks/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.ConcurrentHashMap;
4141
import java.util.concurrent.ConcurrentMap;
4242
import java.util.concurrent.Executor;
43+
import java.util.concurrent.LinkedBlockingDeque;
4344
import java.util.concurrent.LinkedBlockingQueue;
4445
import java.util.concurrent.ScheduledExecutorService;
4546
import java.util.concurrent.ScheduledFuture;
@@ -90,7 +91,8 @@ class MessageDispatcher {
9091
private final Lock jobLock;
9192
private ScheduledFuture<?> backgroundJob;
9293

93-
private final Deque<OutstandingMessageBatch> outstandingMessageBatches;
94+
private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
95+
new LinkedBlockingDeque<>();
9496

9597
// To keep track of number of seconds the receiver takes to process messages.
9698
private final Distribution ackLatencyDistribution;
@@ -200,7 +202,6 @@ void sendAckOperations(
200202
Duration maxAckExtensionPeriod,
201203
Distribution ackLatencyDistribution,
202204
FlowController flowController,
203-
Deque<OutstandingMessageBatch> outstandingMessageBatches,
204205
Executor executor,
205206
ScheduledExecutorService systemExecutor,
206207
ApiClock clock) {
@@ -211,7 +212,6 @@ void sendAckOperations(
211212
this.receiver = receiver;
212213
this.ackProcessor = ackProcessor;
213214
this.flowController = flowController;
214-
this.outstandingMessageBatches = outstandingMessageBatches;
215215
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
216216
this.ackLatencyDistribution = ackLatencyDistribution;
217217
jobLock = new ReentrantLock();

branches/autosynth-tasks/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import com.google.pubsub.v1.StreamingPullRequest;
4343
import com.google.pubsub.v1.StreamingPullResponse;
4444
import io.grpc.Status;
45-
import java.util.Deque;
4645
import java.util.List;
4746
import java.util.concurrent.ScheduledExecutorService;
4847
import java.util.concurrent.TimeUnit;
@@ -84,7 +83,6 @@ public StreamingSubscriberConnection(
8483
SubscriberStub stub,
8584
int channelAffinity,
8685
FlowController flowController,
87-
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
8886
ScheduledExecutorService executor,
8987
ScheduledExecutorService systemExecutor,
9088
ApiClock clock) {
@@ -100,7 +98,6 @@ public StreamingSubscriberConnection(
10098
maxAckExtensionPeriod,
10199
ackLatencyDistribution,
102100
flowController,
103-
outstandingMessageBatches,
104101
executor,
105102
systemExecutor,
106103
clock);

branches/autosynth-tasks/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
47-
import java.util.Deque;
48-
import java.util.LinkedList;
4947
import java.util.List;
5048
import java.util.concurrent.ScheduledExecutorService;
5149
import java.util.logging.Level;
@@ -115,8 +113,6 @@ public class Subscriber extends AbstractApiService {
115113

116114
private final MessageReceiver receiver;
117115
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
118-
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
119-
new LinkedList<>();
120116
private final ApiClock clock;
121117
private final List<AutoCloseable> closeables = new ArrayList<>();
122118

@@ -329,7 +325,6 @@ private void startStreamingConnections() {
329325
subStub,
330326
i,
331327
flowController,
332-
outstandingMessageBatches,
333328
executor,
334329
alarmsExecutor,
335330
clock));

branches/autosynth-tasks/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.pubsub.v1.ReceivedMessage;
2929
import java.util.ArrayList;
3030
import java.util.Collections;
31-
import java.util.LinkedList;
3231
import java.util.List;
3332
import java.util.concurrent.LinkedBlockingQueue;
3433
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -117,7 +116,6 @@ public void sendAckOperations(
117116
Duration.ofMinutes(60),
118117
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
119118
flowController,
120-
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
121119
MoreExecutors.directExecutor(),
122120
systemExecutor,
123121
clock);

0 commit comments

Comments
 (0)