Skip to content

Commit d1db052

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 25977 b: refs/heads/pubsub-ordering-keys c: 54aa55d h: refs/heads/master i: 25975: a2c0d7f
1 parent 3c11a0c commit d1db052

5 files changed

Lines changed: 4 additions & 14 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: 1e18a7d8f967c8b749eb99020dedf5b05506e764
158+
refs/heads/pubsub-ordering-keys: 54aa55dc867becf0b7dae15b5cd86cf76801ab47
159159
"refs/heads/update_mvn_badge": ae2d773814db0f71197ccf5a8612ee1d8056f8de
160160
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
161161
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.ConcurrentHashMap;
4141
import java.util.concurrent.ConcurrentMap;
4242
import java.util.concurrent.Executor;
43+
import java.util.concurrent.LinkedBlockingDeque;
4344
import java.util.concurrent.LinkedBlockingQueue;
4445
import java.util.concurrent.ScheduledExecutorService;
4546
import java.util.concurrent.ScheduledFuture;
@@ -90,7 +91,8 @@ class MessageDispatcher {
9091
private final Lock jobLock;
9192
private ScheduledFuture<?> backgroundJob;
9293

93-
private final Deque<OutstandingMessageBatch> outstandingMessageBatches;
94+
private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
95+
new LinkedBlockingDeque<>();
9496

9597
// To keep track of number of seconds the receiver takes to process messages.
9698
private final Distribution ackLatencyDistribution;
@@ -200,7 +202,6 @@ void sendAckOperations(
200202
Duration maxAckExtensionPeriod,
201203
Distribution ackLatencyDistribution,
202204
FlowController flowController,
203-
Deque<OutstandingMessageBatch> outstandingMessageBatches,
204205
Executor executor,
205206
ScheduledExecutorService systemExecutor,
206207
ApiClock clock) {
@@ -211,7 +212,6 @@ void sendAckOperations(
211212
this.receiver = receiver;
212213
this.ackProcessor = ackProcessor;
213214
this.flowController = flowController;
214-
this.outstandingMessageBatches = outstandingMessageBatches;
215215
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
216216
this.ackLatencyDistribution = ackLatencyDistribution;
217217
jobLock = new ReentrantLock();

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import com.google.pubsub.v1.StreamingPullRequest;
4343
import com.google.pubsub.v1.StreamingPullResponse;
4444
import io.grpc.Status;
45-
import java.util.Deque;
4645
import java.util.List;
4746
import java.util.concurrent.ScheduledExecutorService;
4847
import java.util.concurrent.TimeUnit;
@@ -84,7 +83,6 @@ public StreamingSubscriberConnection(
8483
SubscriberStub stub,
8584
int channelAffinity,
8685
FlowController flowController,
87-
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
8886
ScheduledExecutorService executor,
8987
ScheduledExecutorService systemExecutor,
9088
ApiClock clock) {
@@ -100,7 +98,6 @@ public StreamingSubscriberConnection(
10098
maxAckExtensionPeriod,
10199
ackLatencyDistribution,
102100
flowController,
103-
outstandingMessageBatches,
104101
executor,
105102
systemExecutor,
106103
clock);

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
47-
import java.util.Deque;
48-
import java.util.LinkedList;
4947
import java.util.List;
5048
import java.util.concurrent.ScheduledExecutorService;
5149
import java.util.logging.Level;
@@ -115,8 +113,6 @@ public class Subscriber extends AbstractApiService {
115113

116114
private final MessageReceiver receiver;
117115
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
118-
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
119-
new LinkedList<>();
120116
private final ApiClock clock;
121117
private final List<AutoCloseable> closeables = new ArrayList<>();
122118

@@ -329,7 +325,6 @@ private void startStreamingConnections() {
329325
subStub,
330326
i,
331327
flowController,
332-
outstandingMessageBatches,
333328
executor,
334329
alarmsExecutor,
335330
clock));

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.pubsub.v1.ReceivedMessage;
2929
import java.util.ArrayList;
3030
import java.util.Collections;
31-
import java.util.LinkedList;
3231
import java.util.List;
3332
import java.util.concurrent.LinkedBlockingQueue;
3433
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -117,7 +116,6 @@ public void sendAckOperations(
117116
Duration.ofMinutes(60),
118117
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
119118
flowController,
120-
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
121119
MoreExecutors.directExecutor(),
122120
systemExecutor,
123121
clock);

0 commit comments

Comments
 (0)