1616
1717package com .google .cloud .pubsub .spi .v1 ;
1818
19- import com .google .api .gax .core .FlowController ;
2019import com .google .api .gax .core .ApiClock ;
20+ import com .google .api .gax .core .FlowController ;
21+ import com .google .api .gax .core .FlowController .FlowControlException ;
2122import com .google .api .stats .Distribution ;
23+ import com .google .cloud .pubsub .spi .v1 .MessageDispatcher .OutstandingMessagesBatch .OutstandingMessage ;
2224import com .google .common .annotations .VisibleForTesting ;
2325import com .google .common .collect .Lists ;
2426import com .google .common .primitives .Ints ;
2931import com .google .pubsub .v1 .ReceivedMessage ;
3032import java .util .ArrayList ;
3133import java .util .Collections ;
34+ import java .util .Deque ;
3235import java .util .HashSet ;
33- import java .util .Iterator ;
36+ import java .util .LinkedList ;
3437import java .util .List ;
3538import java .util .PriorityQueue ;
3639import java .util .Set ;
40+ import java .util .concurrent .Executors ;
3741import java .util .concurrent .ScheduledExecutorService ;
3842import java .util .concurrent .ScheduledFuture ;
3943import java .util .concurrent .TimeUnit ;
@@ -56,6 +60,9 @@ class MessageDispatcher {
5660 private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2 ;
5761 @ VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration .millis (100 );
5862 private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60 ; // 10m
63+
64+ private static final ScheduledExecutorService alarmsExecutor =
65+ Executors .newScheduledThreadPool (2 );
5966
6067 private final ScheduledExecutorService executor ;
6168 private final ApiClock clock ;
@@ -184,6 +191,7 @@ public void onFailure(Throwable t) {
184191 setupPendingAcksAlarm ();
185192 flowController .release (1 , outstandingBytes );
186193 messagesWaiter .incrementPendingMessages (-1 );
194+ processOutstandingBatches ();
187195 }
188196
189197 @ Override
@@ -194,25 +202,23 @@ public void onSuccess(AckReply reply) {
194202 synchronized (pendingAcks ) {
195203 pendingAcks .add (ackId );
196204 }
197- setupPendingAcksAlarm ();
198- flowController .release (1 , outstandingBytes );
199205 // Record the latency rounded to the next closest integer.
200206 ackLatencyDistribution .record (
201207 Ints .saturatedCast (
202208 (long ) Math .ceil ((clock .millisTime () - receivedTime .getMillis ()) / 1000D )));
203- messagesWaiter .incrementPendingMessages (-1 );
204- return ;
209+ break ;
205210 case NACK :
206211 synchronized (pendingNacks ) {
207212 pendingNacks .add (ackId );
208213 }
209- setupPendingAcksAlarm ();
210- flowController .release (1 , outstandingBytes );
211- messagesWaiter .incrementPendingMessages (-1 );
212- return ;
214+ break ;
213215 default :
214216 throw new IllegalArgumentException (String .format ("AckReply: %s not supported" , reply ));
215217 }
218+ setupPendingAcksAlarm ();
219+ flowController .release (1 , outstandingBytes );
220+ messagesWaiter .incrementPendingMessages (-1 );
221+ processOutstandingBatches ();
216222 }
217223 }
218224
@@ -269,28 +275,117 @@ public int getMessageDeadlineSeconds() {
269275 return messageDeadlineSeconds ;
270276 }
271277
272- public void processReceivedMessages (List <com .google .pubsub .v1 .ReceivedMessage > responseMessages ) {
273- int receivedMessagesCount = responseMessages .size ();
274- if (receivedMessagesCount == 0 ) {
278+ static class OutstandingMessagesBatch {
279+ static class OutstandingMessage {
280+ private final com .google .pubsub .v1 .ReceivedMessage receivedMessage ;
281+ private final AckHandler ackHandler ;
282+
283+ public OutstandingMessage (ReceivedMessage receivedMessage , AckHandler ackHandler ) {
284+ this .receivedMessage = receivedMessage ;
285+ this .ackHandler = ackHandler ;
286+ }
287+
288+ public com .google .pubsub .v1 .ReceivedMessage receivedMessage () {
289+ return receivedMessage ;
290+ }
291+
292+ public AckHandler ackHandler () {
293+ return ackHandler ;
294+ }
295+ }
296+
297+ private final Deque <OutstandingMessage > messages ;
298+ private final Runnable doneCallback ;
299+
300+ public OutstandingMessagesBatch (Runnable doneCallback ) {
301+ this .messages = new LinkedList <>();
302+ this .doneCallback = doneCallback ;
303+ }
304+
305+ public void addMessage (
306+ com .google .pubsub .v1 .ReceivedMessage receivedMessage , AckHandler ackHandler ) {
307+ this .messages .add (new OutstandingMessage (receivedMessage , ackHandler ));
308+ }
309+
310+ public Deque <OutstandingMessage > messages () {
311+ return messages ;
312+ }
313+
314+ public void done () {
315+ doneCallback .run ();
316+ }
317+ }
318+
319+ Deque <OutstandingMessagesBatch > outstandingMessageBatches = new LinkedList <>();
320+
321+ public void processReceivedMessages (
322+ List <com .google .pubsub .v1 .ReceivedMessage > messages , Runnable doneCallback ) {
323+ if (messages .size () == 0 ) {
324+ doneCallback .run ();
275325 return ;
276326 }
277- Instant now = new Instant (clock .millisTime ());
278- int totalByteCount = 0 ;
279- final ArrayList <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
280- for (ReceivedMessage pubsubMessage : responseMessages ) {
281- int messageSize = pubsubMessage .getMessage ().getSerializedSize ();
282- totalByteCount += messageSize ;
283- ackHandlers .add (new AckHandler (pubsubMessage .getAckId (), messageSize ));
327+ messagesWaiter .incrementPendingMessages (messages .size ());
328+
329+ OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch (doneCallback );
330+ final ArrayList <AckHandler > ackHandlers = new ArrayList <>(messages .size ());
331+ for (ReceivedMessage message : messages ) {
332+ AckHandler ackHandler =
333+ new AckHandler (message .getAckId (), message .getMessage ().getSerializedSize ());
334+ ackHandlers .add (ackHandler );
335+ outstandingBatch .addMessage (message , ackHandler );
284336 }
285- Instant expiration = now .plus (messageDeadlineSeconds * 1000 );
286- logger .log (
287- Level .FINER , "Received {0} messages at {1}" , new Object [] {responseMessages .size (), now });
288-
289- messagesWaiter .incrementPendingMessages (responseMessages .size ());
290- Iterator <AckHandler > acksIterator = ackHandlers .iterator ();
291- for (ReceivedMessage userMessage : responseMessages ) {
292- final PubsubMessage message = userMessage .getMessage ();
293- final AckHandler ackHandler = acksIterator .next ();
337+
338+ Instant expiration = new Instant (clock .millisTime ()).plus (messageDeadlineSeconds * 1000 );
339+ synchronized (outstandingAckHandlers ) {
340+ outstandingAckHandlers .add (
341+ new ExtensionJob (
342+ new Instant (clock .millisTime ()),
343+ expiration ,
344+ INITIAL_ACK_DEADLINE_EXTENSION_SECONDS ,
345+ ackHandlers ));
346+ }
347+ setupNextAckDeadlineExtensionAlarm (expiration );
348+
349+ synchronized (outstandingMessageBatches ) {
350+ outstandingMessageBatches .add (outstandingBatch );
351+ }
352+ processOutstandingBatches ();
353+ }
354+
355+ public void processOutstandingBatches () {
356+ while (true ) {
357+ boolean batchDone = false ;
358+ Runnable batchCallback = null ;
359+ OutstandingMessage outstandingMessage ;
360+ synchronized (outstandingMessageBatches ) {
361+ OutstandingMessagesBatch nextBatch = outstandingMessageBatches .peek ();
362+ if (nextBatch == null ) {
363+ return ;
364+ }
365+ outstandingMessage = nextBatch .messages .peek ();
366+ if (outstandingMessage == null ) {
367+ return ;
368+ }
369+ try {
370+ // This is a non-blocking flow controller.
371+ flowController .reserve (
372+ 1 , outstandingMessage .receivedMessage ().getMessage ().getSerializedSize ());
373+ } catch (FlowController .MaxOutstandingElementCountReachedException
374+ | FlowController .MaxOutstandingRequestBytesReachedException flowControlException ) {
375+ return ;
376+ } catch (FlowControlException unexpectedException ) {
377+ throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
378+ }
379+ nextBatch .messages .poll (); // We got a hold to the message already.
380+ batchDone = nextBatch .messages .isEmpty ();
381+ if (batchDone ) {
382+ outstandingMessageBatches .poll ();
383+ batchCallback = nextBatch .doneCallback ;
384+ }
385+ }
386+
387+ final PubsubMessage message = outstandingMessage .receivedMessage ().getMessage ();
388+ final AckHandler ackHandler = outstandingMessage .ackHandler ();
294389 final SettableFuture <AckReply > response = SettableFuture .create ();
295390 final AckReplyConsumer consumer =
296391 new AckReplyConsumer () {
@@ -311,22 +406,9 @@ public void run() {
311406 }
312407 }
313408 });
314- }
315-
316- synchronized (outstandingAckHandlers ) {
317- outstandingAckHandlers .add (
318- new ExtensionJob (
319- new Instant (clock .millisTime ()),
320- expiration ,
321- INITIAL_ACK_DEADLINE_EXTENSION_SECONDS ,
322- ackHandlers ));
323- }
324- setupNextAckDeadlineExtensionAlarm (expiration );
325-
326- try {
327- flowController .reserve (receivedMessagesCount , totalByteCount );
328- } catch (FlowController .FlowControlException unexpectedException ) {
329- throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
409+ if (batchDone ) {
410+ batchCallback .run ();
411+ }
330412 }
331413 }
332414
@@ -335,7 +417,7 @@ private void setupPendingAcksAlarm() {
335417 try {
336418 if (pendingAcksAlarm == null ) {
337419 pendingAcksAlarm =
338- executor .schedule (
420+ alarmsExecutor .schedule (
339421 new Runnable () {
340422 @ Override
341423 public void run () {
@@ -400,7 +482,7 @@ public void run() {
400482 // drop it.
401483 continue ;
402484 }
403-
485+
404486 // If a message has already been acked, remove it, nothing to do.
405487 for (int i = 0 ; i < job .ackHandlers .size (); ) {
406488 if (job .ackHandlers .get (i ).acked .get ()) {
@@ -464,7 +546,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
464546 nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime ;
465547
466548 ackDeadlineExtensionAlarm =
467- executor .schedule (
549+ alarmsExecutor .schedule (
468550 new AckDeadlineAlarm (),
469551 nextAckDeadlineExtensionAlarmTime .getMillis () - clock .millisTime (),
470552 TimeUnit .MILLISECONDS );
0 commit comments