1616
1717package com .google .cloud .pubsub .spi .v1 ;
1818
19- import com .google .api .gax .core .FlowController ;
2019import com .google .api .gax .core .ApiClock ;
20+ import com .google .api .gax .core .FlowController ;
21+ import com .google .api .gax .core .FlowController .FlowControlException ;
2122import com .google .api .stats .Distribution ;
23+ import com .google .cloud .pubsub .spi .v1 .MessageDispatcher .OutstandingMessagesBatch .OutstandingMessage ;
2224import com .google .common .annotations .VisibleForTesting ;
2325import com .google .common .collect .Lists ;
2426import com .google .common .primitives .Ints ;
2931import com .google .pubsub .v1 .ReceivedMessage ;
3032import java .util .ArrayList ;
3133import java .util .Collections ;
34+ import java .util .Deque ;
3235import java .util .HashSet ;
33- import java .util .Iterator ;
36+ import java .util .LinkedList ;
3437import java .util .List ;
3538import java .util .PriorityQueue ;
3639import java .util .Set ;
40+ import java .util .concurrent .Executors ;
3741import java .util .concurrent .ScheduledExecutorService ;
3842import java .util .concurrent .ScheduledFuture ;
3943import java .util .concurrent .TimeUnit ;
@@ -57,6 +61,9 @@ class MessageDispatcher {
5761 @ VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration .millis (100 );
5862 private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60 ; // 10m
5963
64+ private static final ScheduledExecutorService alarmsExecutor =
65+ Executors .newScheduledThreadPool (2 );
66+
6067 private final ScheduledExecutorService executor ;
6168 private final ApiClock clock ;
6269
@@ -78,6 +85,8 @@ class MessageDispatcher {
7885 private Instant nextAckDeadlineExtensionAlarmTime ;
7986 private ScheduledFuture <?> pendingAcksAlarm ;
8087
88+ private Deque <OutstandingMessagesBatch > outstandingMessageBatches ;
89+
8190 // To keep track of number of seconds the receiver takes to process messages.
8291 private final Distribution ackLatencyDistribution ;
8392
@@ -184,6 +193,7 @@ public void onFailure(Throwable t) {
184193 setupPendingAcksAlarm ();
185194 flowController .release (1 , outstandingBytes );
186195 messagesWaiter .incrementPendingMessages (-1 );
196+ processOutstandingBatches ();
187197 }
188198
189199 @ Override
@@ -194,25 +204,23 @@ public void onSuccess(AckReply reply) {
194204 synchronized (pendingAcks ) {
195205 pendingAcks .add (ackId );
196206 }
197- setupPendingAcksAlarm ();
198- flowController .release (1 , outstandingBytes );
199207 // Record the latency rounded to the next closest integer.
200208 ackLatencyDistribution .record (
201209 Ints .saturatedCast (
202210 (long ) Math .ceil ((clock .millisTime () - receivedTime .getMillis ()) / 1000D )));
203- messagesWaiter .incrementPendingMessages (-1 );
204- return ;
211+ break ;
205212 case NACK :
206213 synchronized (pendingNacks ) {
207214 pendingNacks .add (ackId );
208215 }
209- setupPendingAcksAlarm ();
210- flowController .release (1 , outstandingBytes );
211- messagesWaiter .incrementPendingMessages (-1 );
212- return ;
216+ break ;
213217 default :
214218 throw new IllegalArgumentException (String .format ("AckReply: %s not supported" , reply ));
215219 }
220+ setupPendingAcksAlarm ();
221+ flowController .release (1 , outstandingBytes );
222+ messagesWaiter .incrementPendingMessages (-1 );
223+ processOutstandingBatches ();
216224 }
217225 }
218226
@@ -236,6 +244,7 @@ void sendAckOperations(
236244 this .receiver = receiver ;
237245 this .ackProcessor = ackProcessor ;
238246 this .flowController = flowController ;
247+ outstandingMessageBatches = new LinkedList <>();
239248 outstandingAckHandlers = new PriorityQueue <>();
240249 pendingAcks = new HashSet <>();
241250 pendingNacks = new HashSet <>();
@@ -269,28 +278,113 @@ public int getMessageDeadlineSeconds() {
269278 return messageDeadlineSeconds ;
270279 }
271280
272- public void processReceivedMessages (List <com .google .pubsub .v1 .ReceivedMessage > responseMessages ) {
273- int receivedMessagesCount = responseMessages .size ();
274- if (receivedMessagesCount == 0 ) {
281+ static class OutstandingMessagesBatch {
282+ private final Deque <OutstandingMessage > messages ;
283+ private final Runnable doneCallback ;
284+
285+ static class OutstandingMessage {
286+ private final ReceivedMessage receivedMessage ;
287+ private final AckHandler ackHandler ;
288+
289+ public OutstandingMessage (ReceivedMessage receivedMessage , AckHandler ackHandler ) {
290+ this .receivedMessage = receivedMessage ;
291+ this .ackHandler = ackHandler ;
292+ }
293+
294+ public ReceivedMessage receivedMessage () {
295+ return receivedMessage ;
296+ }
297+
298+ public AckHandler ackHandler () {
299+ return ackHandler ;
300+ }
301+ }
302+
303+ public OutstandingMessagesBatch (Runnable doneCallback ) {
304+ this .messages = new LinkedList <>();
305+ this .doneCallback = doneCallback ;
306+ }
307+
308+ public void addMessage (ReceivedMessage receivedMessage , AckHandler ackHandler ) {
309+ this .messages .add (new OutstandingMessage (receivedMessage , ackHandler ));
310+ }
311+
312+ public Deque <OutstandingMessage > messages () {
313+ return messages ;
314+ }
315+
316+ public void done () {
317+ doneCallback .run ();
318+ }
319+ }
320+
321+ public void processReceivedMessages (List <ReceivedMessage > messages , Runnable doneCallback ) {
322+ if (messages .isEmpty ()) {
323+ doneCallback .run ();
275324 return ;
276325 }
277- Instant now = new Instant (clock .millisTime ());
278- int totalByteCount = 0 ;
279- final ArrayList <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
280- for (ReceivedMessage pubsubMessage : responseMessages ) {
281- int messageSize = pubsubMessage .getMessage ().getSerializedSize ();
282- totalByteCount += messageSize ;
283- ackHandlers .add (new AckHandler (pubsubMessage .getAckId (), messageSize ));
326+ messagesWaiter .incrementPendingMessages (messages .size ());
327+
328+ OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch (doneCallback );
329+ final ArrayList <AckHandler > ackHandlers = new ArrayList <>(messages .size ());
330+ for (ReceivedMessage message : messages ) {
331+ AckHandler ackHandler =
332+ new AckHandler (message .getAckId (), message .getMessage ().getSerializedSize ());
333+ ackHandlers .add (ackHandler );
334+ outstandingBatch .addMessage (message , ackHandler );
335+ }
336+
337+ Instant expiration = new Instant (clock .millisTime ()).plus (messageDeadlineSeconds * 1000 );
338+ synchronized (outstandingAckHandlers ) {
339+ outstandingAckHandlers .add (
340+ new ExtensionJob (
341+ new Instant (clock .millisTime ()),
342+ expiration ,
343+ INITIAL_ACK_DEADLINE_EXTENSION_SECONDS ,
344+ ackHandlers ));
345+ }
346+ setupNextAckDeadlineExtensionAlarm (expiration );
347+
348+ synchronized (outstandingMessageBatches ) {
349+ outstandingMessageBatches .add (outstandingBatch );
284350 }
285- Instant expiration = now .plus (messageDeadlineSeconds * 1000 );
286- logger .log (
287- Level .FINER , "Received {0} messages at {1}" , new Object [] {responseMessages .size (), now });
288-
289- messagesWaiter .incrementPendingMessages (responseMessages .size ());
290- Iterator <AckHandler > acksIterator = ackHandlers .iterator ();
291- for (ReceivedMessage userMessage : responseMessages ) {
292- final PubsubMessage message = userMessage .getMessage ();
293- final AckHandler ackHandler = acksIterator .next ();
351+ processOutstandingBatches ();
352+ }
353+
354+ public void processOutstandingBatches () {
355+ while (true ) {
356+ boolean batchDone = false ;
357+ Runnable batchCallback = null ;
358+ OutstandingMessage outstandingMessage ;
359+ synchronized (outstandingMessageBatches ) {
360+ OutstandingMessagesBatch nextBatch = outstandingMessageBatches .peek ();
361+ if (nextBatch == null ) {
362+ return ;
363+ }
364+ outstandingMessage = nextBatch .messages .peek ();
365+ if (outstandingMessage == null ) {
366+ return ;
367+ }
368+ try {
369+ // This is a non-blocking flow controller.
370+ flowController .reserve (
371+ 1 , outstandingMessage .receivedMessage ().getMessage ().getSerializedSize ());
372+ } catch (FlowController .MaxOutstandingElementCountReachedException
373+ | FlowController .MaxOutstandingRequestBytesReachedException flowControlException ) {
374+ return ;
375+ } catch (FlowControlException unexpectedException ) {
376+ throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
377+ }
378+ nextBatch .messages .poll (); // We got a hold to the message already.
379+ batchDone = nextBatch .messages .isEmpty ();
380+ if (batchDone ) {
381+ outstandingMessageBatches .poll ();
382+ batchCallback = nextBatch .doneCallback ;
383+ }
384+ }
385+
386+ final PubsubMessage message = outstandingMessage .receivedMessage ().getMessage ();
387+ final AckHandler ackHandler = outstandingMessage .ackHandler ();
294388 final SettableFuture <AckReply > response = SettableFuture .create ();
295389 final AckReplyConsumer consumer =
296390 new AckReplyConsumer () {
@@ -311,22 +405,9 @@ public void run() {
311405 }
312406 }
313407 });
314- }
315-
316- synchronized (outstandingAckHandlers ) {
317- outstandingAckHandlers .add (
318- new ExtensionJob (
319- new Instant (clock .millisTime ()),
320- expiration ,
321- INITIAL_ACK_DEADLINE_EXTENSION_SECONDS ,
322- ackHandlers ));
323- }
324- setupNextAckDeadlineExtensionAlarm (expiration );
325-
326- try {
327- flowController .reserve (receivedMessagesCount , totalByteCount );
328- } catch (FlowController .FlowControlException unexpectedException ) {
329- throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
408+ if (batchDone ) {
409+ batchCallback .run ();
410+ }
330411 }
331412 }
332413
@@ -335,7 +416,7 @@ private void setupPendingAcksAlarm() {
335416 try {
336417 if (pendingAcksAlarm == null ) {
337418 pendingAcksAlarm =
338- executor .schedule (
419+ alarmsExecutor .schedule (
339420 new Runnable () {
340421 @ Override
341422 public void run () {
@@ -400,7 +481,7 @@ public void run() {
400481 // drop it.
401482 continue ;
402483 }
403-
484+
404485 // If a message has already been acked, remove it, nothing to do.
405486 for (int i = 0 ; i < job .ackHandlers .size (); ) {
406487 if (job .ackHandlers .get (i ).acked .get ()) {
@@ -464,7 +545,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
464545 nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime ;
465546
466547 ackDeadlineExtensionAlarm =
467- executor .schedule (
548+ alarmsExecutor .schedule (
468549 new AckDeadlineAlarm (),
469550 nextAckDeadlineExtensionAlarmTime .getMillis () - clock .millisTime (),
470551 TimeUnit .MILLISECONDS );
0 commit comments