2424import com .google .api .gax .batching .FlowController ;
2525import com .google .api .gax .batching .FlowController .FlowControlException ;
2626import com .google .api .gax .core .Distribution ;
27- import com .google .cloud .pubsub .v1 .MessageDispatcher .OutstandingMessageBatch .OutstandingMessage ;
2827import com .google .common .primitives .Ints ;
2928import com .google .common .util .concurrent .MoreExecutors ;
3029import com .google .pubsub .v1 .PubsubMessage ;
3332import java .util .Arrays ;
3433import java .util .Collection ;
3534import java .util .Collections ;
36- import java .util .Deque ;
37- import java .util .LinkedList ;
3835import java .util .List ;
3936import java .util .Map ;
4037import java .util .concurrent .ConcurrentHashMap ;
4138import java .util .concurrent .ConcurrentMap ;
4239import java .util .concurrent .Executor ;
43- import java .util .concurrent .LinkedBlockingDeque ;
4440import java .util .concurrent .LinkedBlockingQueue ;
4541import java .util .concurrent .ScheduledExecutorService ;
4642import java .util .concurrent .ScheduledFuture ;
@@ -91,9 +87,6 @@ class MessageDispatcher {
9187 private final Lock jobLock ;
9288 private ScheduledFuture <?> backgroundJob ;
9389
94- private final LinkedBlockingDeque <OutstandingMessageBatch > outstandingMessageBatches =
95- new LinkedBlockingDeque <>();
96-
9790 // To keep track of number of seconds the receiver takes to process messages.
9891 private final Distribution ackLatencyDistribution ;
9992
@@ -155,7 +148,6 @@ private void forget() {
155148 }
156149 flowController .release (1 , outstandingBytes );
157150 messagesWaiter .incrementPendingMessages (-1 );
158- processOutstandingBatches ();
159151 }
160152
161153 @ Override
@@ -296,50 +288,19 @@ int getMessageDeadlineSeconds() {
296288 return messageDeadlineSeconds .get ();
297289 }
298290
299- static class OutstandingMessageBatch {
300- private final Deque <OutstandingMessage > messages ;
301- private final Runnable doneCallback ;
302-
303- static class OutstandingMessage {
304- private final ReceivedMessage receivedMessage ;
305- private final AckHandler ackHandler ;
306-
307- public OutstandingMessage (ReceivedMessage receivedMessage , AckHandler ackHandler ) {
308- this .receivedMessage = receivedMessage ;
309- this .ackHandler = ackHandler ;
310- }
311-
312- public ReceivedMessage receivedMessage () {
313- return receivedMessage ;
314- }
315-
316- public AckHandler ackHandler () {
317- return ackHandler ;
318- }
319- }
291+ static class OutstandingMessage {
292+ private final ReceivedMessage receivedMessage ;
293+ private final AckHandler ackHandler ;
320294
321- public OutstandingMessageBatch (Runnable doneCallback ) {
322- this .messages = new LinkedList <>();
323- this .doneCallback = doneCallback ;
324- }
325-
326- public void addMessage (ReceivedMessage receivedMessage , AckHandler ackHandler ) {
327- this .messages .add (new OutstandingMessage (receivedMessage , ackHandler ));
328- }
329-
330- public Deque <OutstandingMessage > messages () {
331- return messages ;
295+ public OutstandingMessage (ReceivedMessage receivedMessage , AckHandler ackHandler ) {
296+ this .receivedMessage = receivedMessage ;
297+ this .ackHandler = ackHandler ;
332298 }
333299 }
334300
335- public void processReceivedMessages (List <ReceivedMessage > messages , Runnable doneCallback ) {
336- if (messages .isEmpty ()) {
337- doneCallback .run ();
338- return ;
339- }
340-
301+ public void processReceivedMessages (List <ReceivedMessage > messages ) {
341302 Instant totalExpiration = now ().plus (maxAckExtensionPeriod );
342- OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch ( doneCallback );
303+ List < OutstandingMessage > outstandingBatch = new ArrayList <>( messages . size () );
343304 for (ReceivedMessage message : messages ) {
344305 AckHandler ackHandler =
345306 new AckHandler (
@@ -355,42 +316,25 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
355316 // totally expire so that pubsub service sends us the message again.
356317 continue ;
357318 }
358- outstandingBatch .addMessage ( message , ackHandler );
319+ outstandingBatch .add ( new OutstandingMessage ( message , ackHandler ) );
359320 pendingReceipts .add (message .getAckId ());
360321 }
361322
362- if (outstandingBatch .messages .isEmpty ()) {
363- doneCallback .run ();
364- return ;
365- }
366-
367- messagesWaiter .incrementPendingMessages (outstandingBatch .messages .size ());
368- outstandingMessageBatches .add (outstandingBatch );
369- processOutstandingBatches ();
323+ processBatch (outstandingBatch );
370324 }
371325
372- private void processOutstandingBatches () {
373- for (OutstandingMessageBatch nextBatch = outstandingMessageBatches .poll ();
374- nextBatch != null ;
375- nextBatch = outstandingMessageBatches .poll ()) {
376- for (OutstandingMessage nextMessage = nextBatch .messages .poll ();
377- nextMessage != null ;
378- nextMessage = nextBatch .messages .poll ()) {
379- try {
380- // This is a non-blocking flow controller.
381- flowController .reserve (1 , nextMessage .receivedMessage .getMessage ().getSerializedSize ());
382- } catch (FlowController .MaxOutstandingElementCountReachedException
383- | FlowController .MaxOutstandingRequestBytesReachedException flowControlException ) {
384- // Unwind previous changes in the batches outstanding.
385- nextBatch .messages .addFirst (nextMessage );
386- outstandingMessageBatches .addFirst (nextBatch );
387- return ;
388- } catch (FlowControlException unexpectedException ) {
389- throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
390- }
391- processOutstandingMessage (nextMessage .receivedMessage .getMessage (), nextMessage .ackHandler );
326+ private void processBatch (List <OutstandingMessage > batch ) {
327+ messagesWaiter .incrementPendingMessages (batch .size ());
328+ for (OutstandingMessage message : batch ) {
329+ // This is a blocking flow controller. We have already incremented MessageWaiter, so
330+ // shutdown will block on processing of all these messages anyway.
331+ try {
332+ flowController .reserve (1 , message .receivedMessage .getMessage ().getSerializedSize ());
333+ } catch (FlowControlException unexpectedException ) {
334+ // This should be a blocking flow controller and never throw an exception.
335+ throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
392336 }
393- nextBatch . doneCallback . run ( );
337+ processOutstandingMessage ( message . receivedMessage . getMessage (), message . ackHandler );
394338 }
395339 }
396340
0 commit comments