2020import com .google .api .gax .batching .FlowController ;
2121import com .google .api .gax .batching .FlowController .FlowControlException ;
2222import com .google .api .gax .core .Distribution ;
23- import com .google .cloud .pubsub .v1 .MessageDispatcher .OutstandingMessagesBatch .OutstandingMessage ;
23+ import com .google .cloud .pubsub .v1 .MessageDispatcher .OutstandingMessageBatch .OutstandingMessage ;
2424import com .google .common .annotations .VisibleForTesting ;
2525import com .google .common .collect .Lists ;
2626import com .google .common .primitives .Ints ;
@@ -81,7 +81,7 @@ class MessageDispatcher {
8181 private Instant nextAckDeadlineExtensionAlarmTime ;
8282 private ScheduledFuture <?> pendingAcksAlarm ;
8383
84- private final Deque <OutstandingMessagesBatch > outstandingMessageBatches ;
84+ private final Deque <OutstandingMessageBatch > outstandingMessageBatches ;
8585
8686 // To keep track of number of seconds the receiver takes to process messages.
8787 private final Distribution ackLatencyDistribution ;
@@ -238,6 +238,7 @@ void sendAckOperations(
238238 Duration maxAckExtensionPeriod ,
239239 Distribution ackLatencyDistribution ,
240240 FlowController flowController ,
241+ Deque <OutstandingMessageBatch > outstandingMessageBatches ,
241242 ScheduledExecutorService executor ,
242243 ScheduledExecutorService systemExecutor ,
243244 ApiClock clock ) {
@@ -248,7 +249,7 @@ void sendAckOperations(
248249 this .receiver = receiver ;
249250 this .ackProcessor = ackProcessor ;
250251 this .flowController = flowController ;
251- outstandingMessageBatches = new LinkedList <>() ;
252+ this . outstandingMessageBatches = outstandingMessageBatches ;
252253 outstandingAckHandlers = new PriorityQueue <>();
253254 pendingAcks = new HashSet <>();
254255 pendingNacks = new HashSet <>();
@@ -282,7 +283,7 @@ public int getMessageDeadlineSeconds() {
282283 return messageDeadlineSeconds ;
283284 }
284285
285- static class OutstandingMessagesBatch {
286+ static class OutstandingMessageBatch {
286287 private final Deque <OutstandingMessage > messages ;
287288 private final Runnable doneCallback ;
288289
@@ -304,7 +305,7 @@ public AckHandler ackHandler() {
304305 }
305306 }
306307
307- public OutstandingMessagesBatch (Runnable doneCallback ) {
308+ public OutstandingMessageBatch (Runnable doneCallback ) {
308309 this .messages = new LinkedList <>();
309310 this .doneCallback = doneCallback ;
310311 }
@@ -325,7 +326,7 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
325326 }
326327 messagesWaiter .incrementPendingMessages (messages .size ());
327328
328- OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch (doneCallback );
329+ OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch (doneCallback );
329330 final ArrayList <AckHandler > ackHandlers = new ArrayList <>(messages .size ());
330331 for (ReceivedMessage message : messages ) {
331332 AckHandler ackHandler =
@@ -358,7 +359,7 @@ public void processOutstandingBatches() {
358359 Runnable batchCallback = null ;
359360 OutstandingMessage outstandingMessage ;
360361 synchronized (outstandingMessageBatches ) {
361- OutstandingMessagesBatch nextBatch = outstandingMessageBatches .peek ();
362+ OutstandingMessageBatch nextBatch = outstandingMessageBatches .peek ();
362363 if (nextBatch == null ) {
363364 return ;
364365 }
0 commit comments